From 9cc735d854806426d5c452af026d1a8d79775dd5 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Sat, 12 Nov 2022 21:49:13 +0100 Subject: [PATCH] fix: Fix reconnections when the connection is awaited --- .gitignore | 3 + packages/moxxmpp/.pubignore | 1 + .../failure_reconnection_test.dart | 55 +++++++++++++++ packages/moxxmpp/lib/src/connection.dart | 69 +++++++++++++------ packages/moxxmpp/lib/src/reconnect.dart | 22 +++++- packages/moxxmpp/pubspec.yaml | 3 + packages/moxxmpp_socket_tcp/.pubignore | 1 + 7 files changed, 131 insertions(+), 23 deletions(-) create mode 100644 packages/moxxmpp/.pubignore create mode 100644 packages/moxxmpp/integration_test/failure_reconnection_test.dart create mode 100644 packages/moxxmpp_socket_tcp/.pubignore diff --git a/.gitignore b/.gitignore index f4c3816..b674af8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ build/ # Omit committing pubspec.lock for library packages; see # https://dart.dev/guides/libraries/private-files#pubspeclock. pubspec.lock + +# Omit pubspec override files generated by melos +**/pubspec_overrides.yaml diff --git a/packages/moxxmpp/.pubignore b/packages/moxxmpp/.pubignore new file mode 100644 index 0000000..97774a3 --- /dev/null +++ b/packages/moxxmpp/.pubignore @@ -0,0 +1 @@ +pubspec_overrides.yaml \ No newline at end of file diff --git a/packages/moxxmpp/integration_test/failure_reconnection_test.dart b/packages/moxxmpp/integration_test/failure_reconnection_test.dart new file mode 100644 index 0000000..2f521a6 --- /dev/null +++ b/packages/moxxmpp/integration_test/failure_reconnection_test.dart @@ -0,0 +1,55 @@ +import 'package:logging/logging.dart'; +import 'package:moxxmpp/moxxmpp.dart'; +import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; +import 'package:test/test.dart'; + +void main() { + Logger.root.level = Level.ALL; + Logger.root.onRecord.listen((record) { + print('${record.level.name}: ${record.time}: ${record.message}'); + }); + final log = Logger('FailureReconnectionTest'); + + test('Failing an awaited connection', () async { + var errors = 0; + final connection = XmppConnection( + TestingSleepReconnectionPolicy(10), + TCPSocketWrapper(false), + ); + connection.registerFeatureNegotiators([ + StartTlsNegotiator(), + ]); + connection.registerManagers([ + DiscoManager(), + RosterManager(), + PingManager(), + MessageManager(), + PresenceManager('http://moxxmpp.example'), + ]); + connection.asBroadcastStream().listen((event) { + if (event is ConnectionStateChangedEvent) { + if (event.state == XmppConnectionState.error) { + errors++; + } + } + }); + + connection.setConnectionSettings( + ConnectionSettings( + jid: JID.fromString('testuser@no-sasl.badxmpp.eu'), + password: 'abc123', + useDirectTLS: true, + allowPlainAuth: true, + ), + ); + + final result = await connection.connectAwaitable(); + log.info('Connection failed as expected'); + expect(result.success, false); + expect(errors, 1); + + log.info('Waiting 20 seconds for unexpected reconnections'); + await Future.delayed(const Duration(seconds: 20)); + expect(errors, 1); + }, timeout: Timeout.factor(2)); +} diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index e8dc77e..1c97b28 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -163,6 +163,8 @@ class XmppConnection { /// Completers for certain actions // ignore: use_late_for_private_fields_and_variables Completer? _connectionCompleter; + /// Controls whether an XmppSocketClosureEvent triggers a reconnection. + bool _socketClosureTriggersReconnect = true; /// Negotiators final Map _featureNegotiators; @@ -350,8 +352,18 @@ class XmppConnection { _log.severe('handleError: Called with null'); } - // TODO(Unknown): This may be too harsh for every error - await _setConnectionState(XmppConnectionState.notConnected); + // Whenever we encounter an error that would trigger a reconnection attempt while + // the connection result is being awaited, don't attempt a reconnection but instead + // try to gracefully disconnect. + if (_connectionCompleter != null) { + _log.info('Not triggering reconnection since connection result is being awaited'); + await _disconnect(triggeredByUser: false, state: XmppConnectionState.error); + _connectionCompleter?.complete(const XmppConnectionResult(false)); + _connectionCompleter = null; + return; + } + + await _setConnectionState(XmppConnectionState.error); await _reconnectionPolicy.onFailure(); } @@ -360,8 +372,12 @@ class XmppConnection { if (event is XmppSocketErrorEvent) { await handleError(event.error); } else if (event is XmppSocketClosureEvent) { - _log.fine('Received XmppSocketClosureEvent. Reconnecting...'); - await _reconnectionPolicy.onFailure(); + if (_socketClosureTriggersReconnect) { + _log.fine('Received XmppSocketClosureEvent. Reconnecting...'); + await _reconnectionPolicy.onFailure(); + } else { + _log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...'); + } } } @@ -798,16 +814,10 @@ class XmppConnection { await _onNegotiationsDone(); } else if (_currentNegotiator!.state == NegotiatorState.error) { _log.severe('Negotiator returned an error'); - - _updateRoutingState(RoutingState.error); - await _setConnectionState(XmppConnectionState.error); - _connectionCompleter?.complete(const XmppConnectionResult(false)); - _connectionCompleter = null; - - _closeSocket(); + await handleError(null); } } - + void _closeSocket() { _socket.close(); } @@ -965,17 +975,32 @@ class XmppConnection { /// Attempt to gracefully close the session Future disconnect() async { - _reconnectionPolicy.setShouldReconnect(false); - getPresenceManager().sendUnavailablePresence(); - _socket.prepareDisconnect(); - sendRawString(''); - await _setConnectionState(XmppConnectionState.notConnected); - _socket.close(); - - // Clear Stream Management state, if available - await getStreamManagementManager()?.resetState(); + await _disconnect(state: XmppConnectionState.notConnected); } + Future _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async { + _reconnectionPolicy.setShouldReconnect(false); + _socketClosureTriggersReconnect = false; + + if (triggeredByUser) { + getPresenceManager().sendUnavailablePresence(); + } + + _socket.prepareDisconnect(); + + if (triggeredByUser) { + sendRawString(''); + } + + await _setConnectionState(state); + _socket.close(); + + if (triggeredByUser) { + // Clear Stream Management state, if available + await getStreamManagementManager()?.resetState(); + } + } + /// Make sure that all required managers are registered void _runPreConnectionAssertions() { assert(_xmppManagers.containsKey(presenceManager), 'A PresenceManager is mandatory'); @@ -1009,7 +1034,7 @@ class XmppConnection { } await _reconnectionPolicy.reset(); - + _socketClosureTriggersReconnect = true; await _sendEvent(ConnectingEvent()); final smManager = getStreamManagementManager(); diff --git a/packages/moxxmpp/lib/src/reconnect.dart b/packages/moxxmpp/lib/src/reconnect.dart index 6eb3a4a..d2511a8 100644 --- a/packages/moxxmpp/lib/src/reconnect.dart +++ b/packages/moxxmpp/lib/src/reconnect.dart @@ -93,6 +93,7 @@ class ExponentialBackoffReconnectionPolicy extends ReconnectionPolicy { final isReconnecting = await isReconnectionRunning(); if (shouldReconnect) { if (!isReconnecting) { + await setIsReconnecting(true); await performReconnect!(); } else { // Should never happen. @@ -117,7 +118,6 @@ class ExponentialBackoffReconnectionPolicy extends ReconnectionPolicy { Future onFailure() async { _log.finest('Failure occured. Starting exponential backoff'); _counter++; - await setIsReconnecting(true); if (_timer != null) { _timer!.cancel(); @@ -148,3 +148,23 @@ class TestingReconnectionPolicy extends ReconnectionPolicy { @override Future reset() async {} } + +/// A reconnection policy for tests that waits a constant number of seconds before +/// attempting a reconnection. +@visibleForTesting +class TestingSleepReconnectionPolicy extends ReconnectionPolicy { + TestingSleepReconnectionPolicy(this._sleepAmount) : super(); + final int _sleepAmount; + + @override + Future onSuccess() async {} + + @override + Future onFailure() async { + await Future.delayed(Duration(seconds: _sleepAmount)); + await performReconnect!(); + } + + @override + Future reset() async {} +} diff --git a/packages/moxxmpp/pubspec.yaml b/packages/moxxmpp/pubspec.yaml index 0764ca5..97d6257 100644 --- a/packages/moxxmpp/pubspec.yaml +++ b/packages/moxxmpp/pubspec.yaml @@ -29,5 +29,8 @@ dependencies: dev_dependencies: build_runner: ^2.1.11 + moxxmpp_socket_tcp: + hosted: https://git.polynom.me/api/packages/Moxxy/pub + version: ^0.1.2+1 test: ^1.16.0 very_good_analysis: ^3.0.1 diff --git a/packages/moxxmpp_socket_tcp/.pubignore b/packages/moxxmpp_socket_tcp/.pubignore new file mode 100644 index 0000000..97774a3 --- /dev/null +++ b/packages/moxxmpp_socket_tcp/.pubignore @@ -0,0 +1 @@ +pubspec_overrides.yaml \ No newline at end of file