From 2557a2fe5bebd952b205c4c718b965c70849a0c1 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 29 Mar 2023 15:25:17 +0200 Subject: [PATCH] feat(core): Make the ping manager optional --- example/lib/main.dart | 4 +- packages/moxxmpp/lib/src/connection.dart | 61 ++-------- packages/moxxmpp/lib/src/events.dart | 6 + packages/moxxmpp/lib/src/ping.dart | 109 +++++++++++++----- packages/moxxmpp/test/negotiator_test.dart | 1 - packages/moxxmpp/test/xeps/xep_0030_test.dart | 1 - packages/moxxmpp/test/xeps/xep_0060_test.dart | 1 - packages/moxxmpp/test/xeps/xep_0198_test.dart | 5 - packages/moxxmpp/test/xmpp_test.dart | 4 - 9 files changed, 98 insertions(+), 94 deletions(-) diff --git a/example/lib/main.dart b/example/lib/main.dart index 9cf66a4..0494940 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -77,7 +77,9 @@ class _MyHomePageState extends State { StreamManagementManager(), DiscoManager([]), RosterManager(TestingRosterStateManager("", [])), - PingManager(), + PingManager( + const Duration(minutes: 3), + ), MessageManager(), PresenceManager(), ]) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 4e2b296..3d01536 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -83,7 +83,6 @@ class XmppConnection { ReconnectionPolicy reconnectionPolicy, ConnectivityManager connectivityManager, this._socket, { - this.connectionPingDuration = const Duration(minutes: 3), this.connectingTimeout = const Duration(minutes: 2), }) : _reconnectionPolicy = reconnectionPolicy, _connectivityManager = connectivityManager { @@ -143,10 +142,6 @@ class XmppConnection { /// UUID object to generate stanza and origin IDs final Uuid _uuid = const Uuid(); - /// The time between sending a ping to keep the connection open - // TODO(Unknown): Only start the timer if we did not send a stanza after n seconds - final Duration connectionPingDuration; - /// The time that we may spent in the "connecting" state final Duration connectingTimeout; @@ -159,9 +154,6 @@ class XmppConnection { /// True if we are authenticated. False if not. bool _isAuthenticated = false; - /// Timer for the keep-alive ping. - Timer? _connectionPingTimer; - /// Timer for the connecting timeout Timer? _connectingTimeoutTimer; @@ -627,22 +619,7 @@ class XmppConnection { final oldState = _connectionState; _connectionState = state; - final sm = getNegotiatorById( - streamManagementNegotiator, - ); - await _sendEvent( - ConnectionStateChangedEvent( - state, - oldState, - sm?.isResumed ?? false, - ), - ); - if (state == XmppConnectionState.connected) { - _log.finest('Starting _pingConnectionTimer'); - _connectionPingTimer = - Timer.periodic(connectionPingDuration, _pingConnectionOpen); - // We are connected, so the timer can stop. _destroyConnectingTimer(); } else if (state == XmppConnectionState.connecting) { @@ -655,14 +632,18 @@ class XmppConnection { } else { // Just make sure the connecting timeout timer is not running _destroyConnectingTimer(); - - // The ping timer makes no sense if we are not connected - if (_connectionPingTimer != null) { - _log.finest('Destroying _pingConnectionTimer'); - _connectionPingTimer!.cancel(); - _connectionPingTimer = null; - } } + + final sm = getNegotiatorById( + streamManagementNegotiator, + ); + await _sendEvent( + ConnectionStateChangedEvent( + state, + oldState, + sm?.isResumed ?? false, + ), + ); } /// Sets the routing state and logs the change @@ -682,22 +663,6 @@ class XmppConnection { return _eventStreamController.stream.asBroadcastStream(); } - /// Timer callback to prevent the connection from timing out. - Future _pingConnectionOpen(Timer timer) async { - // Follow the recommendation of XEP-0198 and just request an ack. If SM is not enabled, - // send a whitespace ping - _log.finest('_pingConnectionTimer: Callback called.'); - - if (_connectionState == XmppConnectionState.connected) { - _log.finest('_pingConnectionTimer: Connected. Triggering a ping event.'); - unawaited(_sendEvent(SendPingEvent())); - } else { - _log.finest( - '_pingConnectionTimer: Not connected. Not triggering an event.', - ); - } - } - /// Iterate over [handlers] and check if the handler matches [stanza]. If it does, /// call its callback and end the processing if the callback returned true; continue /// if it returned false. @@ -1130,10 +1095,6 @@ class XmppConnection { _xmppManagers.containsKey(discoManager), 'A DiscoManager is mandatory', ); - assert( - _xmppManagers.containsKey(pingManager), - 'A PingManager is mandatory', - ); } /// The private implementation for [XmppConnection.connect]. The parameters have diff --git a/packages/moxxmpp/lib/src/events.dart b/packages/moxxmpp/lib/src/events.dart index 54b4c30..a410995 100644 --- a/packages/moxxmpp/lib/src/events.dart +++ b/packages/moxxmpp/lib/src/events.dart @@ -26,6 +26,12 @@ class ConnectionStateChangedEvent extends XmppEvent { final XmppConnectionState before; final XmppConnectionState state; final bool resumed; + + /// Indicates whether the connection state switched from a not connected state to a + /// connected state. + bool get connectionEstablished => + before != XmppConnectionState.connected && + state == XmppConnectionState.connected; } /// Triggered when we encounter a stream error. diff --git a/packages/moxxmpp/lib/src/ping.dart b/packages/moxxmpp/lib/src/ping.dart index 422ae8c..a401295 100644 --- a/packages/moxxmpp/lib/src/ping.dart +++ b/packages/moxxmpp/lib/src/ping.dart @@ -1,10 +1,23 @@ +import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/managers/base.dart'; import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; +import 'package:synchronized/synchronized.dart'; +/// This manager class is responsible to sending periodic pings, if required, using +/// either whitespaces or Stream Management. Keep in mind, that without +/// Stream Management, a stale connection cannot be detected. class PingManager extends XmppManagerBase { - PingManager() : super(pingManager); + PingManager(this._pingDuration) : super(pingManager); + + /// The time between pings, when connected. + final Duration _pingDuration; + + /// The actual timer. + Timer? _pingTimer; + final Lock _timerLock = Lock(); @override Future isSupported() async => true; @@ -15,39 +28,73 @@ class PingManager extends XmppManagerBase { ); } + /// Cancel a potentially scheduled ping timer. Can be overriden to cancel a custom timing mechanism. + /// By default, cancels a [Timer.periodic] that was set up prior. + @visibleForOverriding + Future cancelPing() async { + await _timerLock.synchronized(() { + logger.finest('Cancelling timer'); + _pingTimer?.cancel(); + _pingTimer = null; + }); + } + + /// Schedule a ping to be sent after a given amount of time. Can be overriden for custom timing mechanisms. + /// By default, uses a [Timer.periodic] timer to trigger a ping. + /// NOTE: This function is called whenever the connection is re-established. Custom + /// implementations should thus guard against multiple timers being started. + @visibleForOverriding + Future schedulePing() async { + await _timerLock.synchronized(() { + logger.finest('Scheduling new timer? ${_pingTimer != null}'); + + _pingTimer ??= Timer.periodic( + _pingDuration, + _sendPing, + ); + }); + } + + Future _sendPing(Timer _) async { + logger.finest('Attempting to send ping'); + final attrs = getAttributes(); + final socket = attrs.getSocket(); + + if (socket.managesKeepalives()) { + logger.finest('Not sending ping as the socket manages it.'); + return; + } + + final stream = attrs.getManagerById(smManager) as StreamManagementManager?; + if (stream != null) { + if (stream + .isStreamManagementEnabled() /*&& stream.getUnackedStanzaCount() > 0*/) { + logger.finest('Sending an ack ping as Stream Management is enabled'); + stream.sendAckRequestPing(); + } else if (attrs.getSocket().whitespacePingAllowed()) { + logger.finest( + 'Sending a whitespace ping as Stream Management is not enabled', + ); + attrs.getConnection().sendWhitespacePing(); + } else { + _logWarning(); + } + } else { + if (attrs.getSocket().whitespacePingAllowed()) { + attrs.getConnection().sendWhitespacePing(); + } else { + _logWarning(); + } + } + } + @override Future onXmppEvent(XmppEvent event) async { - if (event is SendPingEvent) { - logger.finest('Received ping event.'); - final attrs = getAttributes(); - final socket = attrs.getSocket(); - - if (socket.managesKeepalives()) { - logger.finest('Not sending ping as the socket manages it.'); - return; - } - - final stream = - attrs.getManagerById(smManager) as StreamManagementManager?; - if (stream != null) { - if (stream - .isStreamManagementEnabled() /*&& stream.getUnackedStanzaCount() > 0*/) { - logger.finest('Sending an ack ping as Stream Management is enabled'); - stream.sendAckRequestPing(); - } else if (attrs.getSocket().whitespacePingAllowed()) { - logger.finest( - 'Sending a whitespace ping as Stream Management is not enabled', - ); - attrs.getConnection().sendWhitespacePing(); - } else { - _logWarning(); - } + if (event is ConnectionStateChangedEvent) { + if (event.connectionEstablished) { + await schedulePing(); } else { - if (attrs.getSocket().whitespacePingAllowed()) { - attrs.getConnection().sendWhitespacePing(); - } else { - _logWarning(); - } + await cancelPing(); } } } diff --git a/packages/moxxmpp/test/negotiator_test.dart b/packages/moxxmpp/test/negotiator_test.dart index 39c477e..ecf14ee 100644 --- a/packages/moxxmpp/test/negotiator_test.dart +++ b/packages/moxxmpp/test/negotiator_test.dart @@ -75,7 +75,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), EntityCapabilitiesManager('http://moxxmpp.example'), ]) ..setConnectionSettings( diff --git a/packages/moxxmpp/test/xeps/xep_0030_test.dart b/packages/moxxmpp/test/xeps/xep_0030_test.dart index e0249eb..6f15953 100644 --- a/packages/moxxmpp/test/xeps/xep_0030_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0030_test.dart @@ -82,7 +82,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager(null, [])), DiscoManager([]), - PingManager(), EntityCapabilitiesManager('http://moxxmpp.example'), ]); conn.registerFeatureNegotiators([ diff --git a/packages/moxxmpp/test/xeps/xep_0060_test.dart b/packages/moxxmpp/test/xeps/xep_0060_test.dart index a0edbf7..c52a328 100644 --- a/packages/moxxmpp/test/xeps/xep_0060_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0060_test.dart @@ -168,7 +168,6 @@ void main() { PresenceManager(), MessageManager(), RosterManager(TestingRosterStateManager(null, [])), - PingManager(), ]); connection ..registerFeatureNegotiators([ diff --git a/packages/moxxmpp/test/xeps/xep_0198_test.dart b/packages/moxxmpp/test/xeps/xep_0198_test.dart index d589344..66e9349 100644 --- a/packages/moxxmpp/test/xeps/xep_0198_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0198_test.dart @@ -294,7 +294,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), sm, CarbonsManager()..forceEnable(), EntityCapabilitiesManager('http://moxxmpp.example'), @@ -420,7 +419,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), sm, CarbonsManager()..forceEnable(), //EntityCapabilitiesManager('http://moxxmpp.example'), @@ -580,7 +578,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), StreamManagementManager(), ]); conn.registerFeatureNegotiators([ @@ -675,7 +672,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), StreamManagementManager(), ]); conn.registerFeatureNegotiators([ @@ -767,7 +763,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), StreamManagementManager(), ]); conn.registerFeatureNegotiators([ diff --git a/packages/moxxmpp/test/xmpp_test.dart b/packages/moxxmpp/test/xmpp_test.dart index 9d4da42..a20b982 100644 --- a/packages/moxxmpp/test/xmpp_test.dart +++ b/packages/moxxmpp/test/xmpp_test.dart @@ -137,7 +137,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), StreamManagementManager(), EntityCapabilitiesManager('http://moxxmpp.example'), ]); @@ -194,7 +193,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), EntityCapabilitiesManager('http://moxxmpp.example'), ]); conn.registerFeatureNegotiators([ @@ -254,7 +252,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), EntityCapabilitiesManager('http://moxxmpp.example'), ]); conn.registerFeatureNegotiators([SaslPlainNegotiator()]); @@ -409,7 +406,6 @@ void main() { PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), - PingManager(), ]); conn ..registerFeatureNegotiators([