From ad1242c47daabc9ef4a0bcf0aed25b03a4141ce0 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 13 Jan 2023 15:16:51 +0100 Subject: [PATCH] feat: Try to lock reconnections behind a flag --- packages/moxxmpp/lib/src/connection.dart | 45 ++++++++++++++++++++---- packages/moxxmpp/lib/src/reconnect.dart | 28 +++++++++------ 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index c6b7342..d6dc14a 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -49,10 +49,12 @@ enum XmppConnectionState { enum StanzaFromType { /// Add the full JID to the stanza as the from attribute full, + /// Add the bare JID to the stanza as the from attribute bare, + /// Add no JID as the from attribute - none + none, } /// Nonza describing the XMPP stream header. @@ -207,9 +209,30 @@ class XmppConnection { /// is still running. final Lock _negotiationLock = Lock(); - /// Misc + /// The logger for the class final Logger _log = Logger('XmppConnection'); + /// A value indicating whether a connection attempt is currently running or not + bool _isConnectionRunning = false; + final Lock _connectionRunningLock = Lock(); + + /// Enters the critical section for accessing [XmppConnection._isConnectionRunning] + /// and does the following: + /// - if _isConnectionRunning is false, set it to true and return false. + /// - if _isConnectionRunning is true, return true. + Future _testAndSetIsConnectionRunning() async => _connectionRunningLock.synchronized(() { + if (!_isConnectionRunning) { + _isConnectionRunning = true; + return false; + } + + return true; + }); + + /// Enters the critical section for accessing [XmppConnection._isConnectionRunning] + /// and sets it to false. + Future _resetIsConnectionRunning() async => _connectionRunningLock.synchronized(() => _isConnectionRunning = false); + ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy; List get serverFeatures => _serverFeatures; @@ -365,6 +388,11 @@ class XmppConnection { /// Attempts to reconnect to the server by following an exponential backoff. Future _attemptReconnection() async { + if (await _testAndSetIsConnectionRunning()) { + _log.warning('_attemptReconnection is called but connection attempt is already running. Ignoring...'); + return; + } + _log.finest('_attemptReconnection: Setting state to notConnected'); await _setConnectionState(XmppConnectionState.notConnected); _log.finest('_attemptReconnection: Done'); @@ -400,6 +428,7 @@ class XmppConnection { } await _setConnectionState(XmppConnectionState.error); + await _resetIsConnectionRunning(); await _reconnectionPolicy.onFailure(); } @@ -797,6 +826,7 @@ class XmppConnection { /// a disco sweep among other things. Future _onNegotiationsDone() async { // Set the connection state + await _resetIsConnectionRunning(); await _setConnectionState(XmppConnectionState.connected); // Resolve the connection completion future @@ -980,9 +1010,10 @@ class XmppConnection { } /// To be called when we lost the network connection. - void _onNetworkConnectionLost() { + Future _onNetworkConnectionLost() async { _socket.close(); - _setConnectionState(XmppConnectionState.notConnected); + await _resetIsConnectionRunning(); + await _setConnectionState(XmppConnectionState.notConnected); } /// Attempt to gracefully close the session @@ -1023,11 +1054,12 @@ class XmppConnection { /// Like [connect] but the Future resolves when the resource binding is either done or /// SASL has failed. - Future connectAwaitable({ String? lastResource }) { + Future connectAwaitable({ String? lastResource }) async { _runPreConnectionAssertions(); + await _resetIsConnectionRunning(); _connectionCompleter = Completer(); _log.finest('Calling connect() from connectAwaitable'); - connect(lastResource: lastResource); + await connect(lastResource: lastResource); return _connectionCompleter!.future; } @@ -1039,6 +1071,7 @@ class XmppConnection { } _runPreConnectionAssertions(); + await _resetIsConnectionRunning(); _reconnectionPolicy.setShouldReconnect(true); if (lastResource != null) { diff --git a/packages/moxxmpp/lib/src/reconnect.dart b/packages/moxxmpp/lib/src/reconnect.dart index 1c3ae9b..b0fb236 100644 --- a/packages/moxxmpp/lib/src/reconnect.dart +++ b/packages/moxxmpp/lib/src/reconnect.dart @@ -4,27 +4,33 @@ import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:synchronized/synchronized.dart'; -abstract class ReconnectionPolicy { +/// A callback function to be called when the connection to the server has been lost. +typedef ConnectionLostCallback = Future Function(); - ReconnectionPolicy() - : _shouldAttemptReconnection = false, - _isReconnecting = false, - _isReconnectingLock = Lock(); +/// A function that, when called, causes the XmppConnection to connect to the server, if +/// another reconnection is not already running. +typedef PerformReconnectFunction = Future Function(); + +abstract class ReconnectionPolicy { /// Function provided by XmppConnection that allows the policy /// to perform a reconnection. - Future Function()? performReconnect; + PerformReconnectFunction? performReconnect; + /// Function provided by XmppConnection that allows the policy /// to say that we lost the connection. - void Function()? triggerConnectionLost; + ConnectionLostCallback? triggerConnectionLost; + /// Indicate if should try to reconnect. - bool _shouldAttemptReconnection; + bool _shouldAttemptReconnection = false; + /// Indicate if a reconnection attempt is currently running. - bool _isReconnecting; + bool _isReconnecting = false; + /// And the corresponding lock - final Lock _isReconnectingLock; + final Lock _isReconnectingLock = Lock(); /// Called by XmppConnection to register the policy. - void register(Future Function() performReconnect, void Function() triggerConnectionLost) { + void register(PerformReconnectFunction performReconnect, ConnectionLostCallback triggerConnectionLost) { this.performReconnect = performReconnect; this.triggerConnectionLost = triggerConnectionLost;