diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index c8f77ea..33aada3 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -91,7 +91,6 @@ class XmppConnection { // Allow the reconnection policy to perform reconnections by itself _reconnectionPolicy.register( _attemptReconnection, - _onNetworkConnectionLost, ); _socketStream = _socket.getDataStream(); @@ -398,7 +397,7 @@ class XmppConnection { // The error is recoverable await _setConnectionState(XmppConnectionState.notConnected); - if (await _reconnectionPolicy.getShouldReconnect()) { + if (await _reconnectionPolicy.canTriggerFailure()) { await _reconnectionPolicy.onFailure(); } else { _log.info( @@ -1009,12 +1008,6 @@ class XmppConnection { ); } - /// To be called when we lost the network connection. - Future _onNetworkConnectionLost() async { - _socket.close(); - await _setConnectionState(XmppConnectionState.notConnected); - } - /// Attempt to gracefully close the session Future disconnect() async { await _disconnect(state: XmppConnectionState.notConnected); @@ -1069,6 +1062,15 @@ class XmppConnection { // Kill a possibly existing connection _socket.close(); + await _reconnectionPolicy.reset(); + _enableReconnectOnSuccess = enableReconnectOnSuccess; + if (shouldReconnect) { + await _reconnectionPolicy.setShouldReconnect(true); + } else { + await _reconnectionPolicy.setShouldReconnect(false); + } + await _sendEvent(ConnectingEvent()); + if (waitUntilLogin) { _log.finest('Setting up completer for awaiting completed login'); _connectionCompleter = Completer(); @@ -1080,16 +1082,6 @@ class XmppConnection { setResource('', triggerEvent: false); } - _enableReconnectOnSuccess = enableReconnectOnSuccess; - if (shouldReconnect) { - await _reconnectionPolicy.setShouldReconnect(true); - } else { - await _reconnectionPolicy.setShouldReconnect(false); - } - - await _reconnectionPolicy.reset(); - await _sendEvent(ConnectingEvent()); - // If requested, wait until we have a network connection if (waitForConnection) { _log.info('Waiting for okay from connectivityManager'); diff --git a/packages/moxxmpp/lib/src/reconnect.dart b/packages/moxxmpp/lib/src/reconnect.dart index 9782f8b..9e3d6ff 100644 --- a/packages/moxxmpp/lib/src/reconnect.dart +++ b/packages/moxxmpp/lib/src/reconnect.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'dart:math'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/util/queue.dart'; import 'package:synchronized/synchronized.dart'; /// A callback function to be called when the connection to the server has been lost. @@ -17,70 +16,68 @@ abstract class ReconnectionPolicy { /// to perform a reconnection. PerformReconnectFunction? performReconnect; - /// Function provided by XmppConnection that allows the policy - /// to say that we lost the connection. - ConnectionLostCallback? triggerConnectionLost; + final Lock _lock = Lock(); + + /// Indicate if a reconnection attempt is currently running. + bool _isReconnecting = false; /// Indicate if should try to reconnect. bool _shouldAttemptReconnection = false; - /// Indicate if a reconnection attempt is currently running. @protected - bool isReconnecting = false; + Future canTryReconnecting() async => _lock.synchronized(() => !_isReconnecting); - /// And the corresponding lock @protected - final Lock lock = Lock(); - - /// The lock for accessing [_shouldAttemptReconnection] - @protected - final Lock shouldReconnectLock = Lock(); + Future getIsReconnecting() async => _lock.synchronized(() => _isReconnecting); + Future _resetIsReconnecting() async { + await _lock.synchronized(() { + _isReconnecting = false; + }); + } + /// Called by XmppConnection to register the policy. void register( PerformReconnectFunction performReconnect, - ConnectionLostCallback triggerConnectionLost, ) { this.performReconnect = performReconnect; - this.triggerConnectionLost = triggerConnectionLost; - - unawaited(reset()); } /// In case the policy depends on some internal state, this state must be reset /// to an initial state when reset is called. In case timers run, they must be /// terminated. - Future reset(); + @mustCallSuper + Future reset() async { + await _resetIsReconnecting(); + } + @mustCallSuper + Future canTriggerFailure() async { + return _lock.synchronized(() { + if (_shouldAttemptReconnection && !_isReconnecting) { + _isReconnecting = true; + return true; + } + + return false; + }); + } + /// Called by the XmppConnection when the reconnection failed. Future onFailure() async {} - + /// Caled by the XmppConnection when the reconnection was successful. Future onSuccess(); Future getShouldReconnect() async { - return shouldReconnectLock.synchronized(() => _shouldAttemptReconnection); + return _lock.synchronized(() => _shouldAttemptReconnection); } /// Set whether a reconnection attempt should be made. Future setShouldReconnect(bool value) async { - return shouldReconnectLock + return _lock .synchronized(() => _shouldAttemptReconnection = value); } - - /// Returns true if the manager is currently triggering a reconnection. If not, returns - /// false. - Future isReconnectionRunning() async { - return lock.synchronized(() => isReconnecting); - } - - /// Set the isReconnecting state to [value]. - @protected - Future setIsReconnecting(bool value) async { - await lock.synchronized(() async { - isReconnecting = value; - }); - } } /// A simple reconnection strategy: Make the reconnection delays exponentially longer @@ -105,90 +102,59 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy { /// Backoff timer. Timer? _timer; - final Lock _timerLock = Lock(); - /// Logger. final Logger _log = Logger('RandomBackoffReconnectionPolicy'); - /// Event queue - final AsyncQueue _eventQueue = AsyncQueue(); - + final Lock _timerLock = Lock(); + /// Called when the backoff expired - Future _onTimerElapsed() async { - _log.fine('Timer elapsed. Waiting for lock'); - await lock.synchronized(() async { - _log.fine('Lock aquired'); - if (!(await getShouldReconnect())) { - _log.fine( - 'Backoff timer expired but getShouldReconnect() returned false', - ); + @visibleForTesting + Future onTimerElapsed() async { + _log.fine('Timer elapsed. Waiting for lock...'); + await _timerLock.synchronized(() async { + if (!(await getIsReconnecting())) { return; } - if (isReconnecting) { + if (!(await getShouldReconnect())) { _log.fine( - 'Backoff timer expired but a reconnection is running, so doing nothing.', + 'Should not reconnect. Stopping here.', ); return; } _log.fine('Triggering reconnect'); - isReconnecting = true; + _timer?.cancel(); + _timer = null; await performReconnect!(); }); - - await _timerLock.synchronized(() { - _timer?.cancel(); - _timer = null; - }); - } - - Future _reset() async { - _log.finest('Resetting internal state'); - - await _timerLock.synchronized(() { - _timer?.cancel(); - _timer = null; - }); - - await setIsReconnecting(false); } @override Future reset() async { - // ignore: unnecessary_lambdas - await _eventQueue.addJob(() => _reset()); + _log.finest('Resetting internal state'); + _timer?.cancel(); + _timer = null; + await super.reset(); } - Future _onFailure() async { - final shouldContinue = await _timerLock.synchronized(() { - return _timer == null; - }); - if (!shouldContinue) { - _log.finest( - '_onFailure: Not backing off since _timer is already running', - ); - return; - } - + @override + Future onFailure() async { final seconds = Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime; _log.finest('Failure occured. Starting random backoff with ${seconds}s'); _timer?.cancel(); - _timer = Timer(Duration(seconds: seconds), _onTimerElapsed); + _timer = Timer(Duration(seconds: seconds), onTimerElapsed); } - - @override - Future onFailure() async { - // ignore: unnecessary_lambdas - await _eventQueue.addJob(() => _onFailure()); - } - + @override Future onSuccess() async { await reset(); } + + @visibleForTesting + bool isTimerRunning() => _timer != null; } /// A stub reconnection policy for tests. @@ -203,7 +169,9 @@ class TestingReconnectionPolicy extends ReconnectionPolicy { Future onFailure() async {} @override - Future reset() async {} + Future reset() async { + await super.reset(); + } } /// A reconnection policy for tests that waits a constant number of seconds before @@ -223,5 +191,7 @@ class TestingSleepReconnectionPolicy extends ReconnectionPolicy { } @override - Future reset() async {} + Future reset() async { + await super.reset(); + } } diff --git a/packages/moxxmpp/test/reconnection_test.dart b/packages/moxxmpp/test/reconnection_test.dart new file mode 100644 index 0000000..c9a4c0f --- /dev/null +++ b/packages/moxxmpp/test/reconnection_test.dart @@ -0,0 +1,78 @@ +import 'package:moxxmpp/moxxmpp.dart'; +import 'package:test/test.dart'; +import 'helpers/logging.dart'; + +void main() { + initLogger(); + + test('Test triggering a reconnect multiple times', () async { + final policy = RandomBackoffReconnectionPolicy( + 9998, + 9999, + ); + await policy.setShouldReconnect(true); + + // We have a failure + expect( + await policy.canTriggerFailure(), + true, + ); + await policy.onFailure(); + + // Try to trigger another one + expect( + await policy.canTriggerFailure(), + false, + ); + }); + + test('Test resetting while reconnecting', () async { + final policy = RandomBackoffReconnectionPolicy( + 9998, + 9999, + )..register(() async => expect(true, false)); + await policy.setShouldReconnect(true); + + // We have a failure + expect( + await policy.canTriggerFailure(), + true, + ); + await policy.onFailure(); + expect(policy.isTimerRunning(), true); + + // We reset + await policy.reset(); + expect(policy.isTimerRunning(), false); + + // We have another failure + expect( + await policy.canTriggerFailure(), + true, + ); + }); + + test('Test triggering the timer callback twice', () async { + final policy = RandomBackoffReconnectionPolicy( + 9998, + 9999, + ); + var counter = 0; + policy.register(() async { + await policy.reset(); + counter++; + }); + await policy.setShouldReconnect(true); + + // We have a failure + expect( + await policy.canTriggerFailure(), + true, + ); + await policy.onFailure(); + + await policy.onTimerElapsed(); + await policy.onTimerElapsed(); + expect(counter, 1); + }); +}