From c1ad646905eaddaaa74b342fe736b6703d7cab70 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 7 Apr 2023 15:25:50 +0200 Subject: [PATCH] fix(core,xep): Fix multiple issues - Fix a deadlock in the RandomBackoffReconnectionPolicy on failure - Fix enablement of stream management happening too early --- packages/moxxmpp/lib/src/connection.dart | 37 +++++++++-------- packages/moxxmpp/lib/src/reconnect.dart | 40 ++++++++++++++----- .../lib/src/xeps/xep_0198/negotiator.dart | 33 ++++++++------- .../lib/src/xeps/xep_0384/xep_0384.dart | 1 - packages/moxxmpp/test/reconnection_test.dart | 11 ++++- packages/moxxmpp/test/xeps/xep_0198_test.dart | 18 ++++++--- 6 files changed, 88 insertions(+), 52 deletions(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index e3e9ec8..0bc0cec 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -10,6 +10,7 @@ import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/handlers/base.dart'; import 'package:moxxmpp/src/iq.dart'; +import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/managers/attributes.dart'; import 'package:moxxmpp/src/managers/base.dart'; import 'package:moxxmpp/src/managers/data.dart'; @@ -140,6 +141,8 @@ class XmppConnection { RoutingState _routingState = RoutingState.preConnection; /// The currently bound resource or '' if none has been bound yet. + /// NOTE: A Using the empty string is okay since RFC7622 says that + /// the resource MUST NOT be zero octets. String _resource = ''; String get resource => _resource; @@ -170,6 +173,13 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + /// Returns the JID we authenticate with and add the resource that we have bound. + JID _getJidWithResource() { + assert(_resource.isNotEmpty, 'The resource must not be empty'); + + return connectionSettings.jid.withResource(_resource); + } + /// Registers a list of [XmppManagerBase] sub-classes as managers on this connection. Future registerManagers(List managers) async { for (final manager in managers) { @@ -181,7 +191,7 @@ class XmppConnection { sendEvent: _sendEvent, getConnectionSettings: () => connectionSettings, getManagerById: getManagerById, - getFullJID: () => connectionSettings.jid.withResource(_resource), + getFullJID: _getJidWithResource, getSocket: () => _socket, getConnection: () => this, getNegotiatorById: _negotiationsHandler.getNegotiatorById, @@ -232,7 +242,7 @@ class XmppConnection { _sendEvent, _negotiationsHandler.getNegotiatorById, getManagerById, - () => connectionSettings.jid.withResource(_resource), + _getJidWithResource, () => _socket, () => _isAuthenticated, _setAuthenticated, @@ -295,8 +305,11 @@ class XmppConnection { // Connect again // ignore: cascade_invocations _log.finest('Calling _connectImpl() from _attemptReconnection'); - await _connectImpl( - waitForConnection: true, + unawaited( + _connectImpl( + waitForConnection: true, + shouldReconnect: false, + ), ); } @@ -431,7 +444,7 @@ class XmppConnection { case StanzaFromType.full: { stanza_ = stanza_.copyWith( - from: connectionSettings.jid.withResource(_resource).toString(), + from: _getJidWithResource().toString(), ); } break; @@ -840,7 +853,6 @@ class XmppConnection { /// The private implementation for [XmppConnection.connect]. The parameters have /// the same meaning as with [XmppConnection.connect]. Future> _connectImpl({ - String? lastResource, bool waitForConnection = false, bool shouldReconnect = true, bool waitUntilLogin = false, @@ -863,11 +875,9 @@ class XmppConnection { _connectionCompleter = Completer(); } - if (lastResource != null) { - setResource(lastResource, triggerEvent: false); - } else { - setResource('', triggerEvent: false); - } + // Reset the resource. If we use stream resumption from XEP-0198, then the + // manager will set it on successful resumption. + setResource('', triggerEvent: false); // If requested, wait until we have a network connection if (waitForConnection) { @@ -914,9 +924,6 @@ class XmppConnection { /// Start the connection process using the provided connection settings. /// - /// If [lastResource] is set, then its value is used as the connection's resource. - /// Useful for stream resumption. - /// /// [shouldReconnect] indicates whether the reconnection attempts should be /// automatically performed after a fatal failure of any kind occurs. /// @@ -931,14 +938,12 @@ class XmppConnection { /// [enableReconnectOnSuccess] indicates that automatic reconnection is to be /// enabled once the connection has been successfully established. Future> connect({ - String? lastResource, bool? shouldReconnect, bool waitForConnection = false, bool waitUntilLogin = false, bool enableReconnectOnSuccess = true, }) async { final result = _connectImpl( - lastResource: lastResource, shouldReconnect: shouldReconnect ?? !waitUntilLogin, waitForConnection: waitForConnection, waitUntilLogin: waitUntilLogin, diff --git a/packages/moxxmpp/lib/src/reconnect.dart b/packages/moxxmpp/lib/src/reconnect.dart index 0841175..94c29ce 100644 --- a/packages/moxxmpp/lib/src/reconnect.dart +++ b/packages/moxxmpp/lib/src/reconnect.dart @@ -111,31 +111,47 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy { /// Called when the backoff expired @visibleForTesting Future onTimerElapsed() async { - _log.fine('Timer elapsed. Waiting for lock...'); - await _timerLock.synchronized(() async { + _log.finest('Timer elapsed. Waiting for lock...'); + final shouldContinue = await _timerLock.synchronized(() async { + _log.finest('Timer lock aquired'); + if (_timer == null) { + _log.finest( + 'The timer is already set to null. Doing nothing.', + ); + return false; + } + if (!(await getIsReconnecting())) { - return; + return false; } if (!(await getShouldReconnect())) { - _log.fine( + _log.finest( 'Should not reconnect. Stopping here.', ); - return; + return false; } - _log.fine('Triggering reconnect'); _timer?.cancel(); _timer = null; - await performReconnect!(); + return true; }); + + if (!shouldContinue) { + return; + } + + _log.fine('Triggering reconnect'); + await performReconnect!(); } @override Future reset() async { _log.finest('Resetting internal state'); - _timer?.cancel(); - _timer = null; + await _timerLock.synchronized(() { + _timer?.cancel(); + _timer = null; + }); await super.reset(); } @@ -144,9 +160,11 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy { final seconds = Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime; _log.finest('Failure occured. Starting random backoff with ${seconds}s'); - _timer?.cancel(); - _timer = Timer(Duration(seconds: seconds), onTimerElapsed); + await _timerLock.synchronized(() { + _timer?.cancel(); + _timer = Timer(Duration(seconds: seconds), onTimerElapsed); + }); } @override diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/negotiator.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/negotiator.dart index 3008370..0f3176e 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/negotiator.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/negotiator.dart @@ -1,6 +1,5 @@ import 'package:collection/collection.dart'; import 'package:logging/logging.dart'; -import 'package:meta/meta.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart'; @@ -60,11 +59,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator bool _inlineStreamEnablementRequested = false; /// Cached resource for stream resumption - String _resource = ''; - @visibleForTesting - void setResource(String resource) { - _resource = resource; - } + String resource = ''; @override bool canInlineFeature(List features) { @@ -90,7 +85,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator @override Future onXmppEvent(XmppEvent event) async { if (event is ResourceBoundEvent) { - _resource = event.resource; + resource = event.resource; } } @@ -100,7 +95,9 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator if (sm.state.streamResumptionId != null && !_resumeFailed) { // We could do Stream resumption - return super.matchesFeature(features) && attributes.isAuthenticated(); + return super.matchesFeature(features) && + attributes.isAuthenticated() && + resource.isNotEmpty; } else { // We cannot do a stream resumption return super.matchesFeature(features) && @@ -122,23 +119,23 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator _resumeFailed = true; _isResumed = false; _state = _StreamManagementNegotiatorState.ready; + resource = ''; + attributes.setResource('', triggerEvent: false); } Future _onStreamResumptionSuccessful(XMLNode resumed) async { assert(resumed.tag == 'resumed', 'The correct element must be passed'); + assert( + resource.isNotEmpty, + 'The Stream Management Negotiator must know of the previous resource', + ); final h = int.parse(resumed.attributes['h']! as String); await attributes.sendEvent(StreamResumedEvent(h: h)); _resumeFailed = false; _isResumed = true; - - if (attributes.getConnection().resource.isEmpty && _resource.isNotEmpty) { - attributes.setResource(_resource); - } else if (attributes.getConnection().resource.isNotEmpty && - _resource.isEmpty) { - _resource = attributes.getConnection().resource; - } + attributes.setResource(resource); } Future _onStreamEnablementSuccessful(XMLNode enabled) async { @@ -152,7 +149,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator await attributes.sendEvent( StreamManagementEnabledEvent( - resource: attributes.getFullJID().resource, + resource: resource, id: id, location: enabled.attributes['location'] as String?, ), @@ -197,10 +194,12 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator _log.finest('Stream Management resumption successful'); assert( - attributes.getFullJID().resource != '', + resource.isNotEmpty, 'Resume only works when we already have a resource bound and know about it', ); + // TODO(Unknown): Don't do this here. We trigger an event that the CSIManager + // can consume. final csi = attributes.getManagerById(csiManager) as CSIManager?; if (csi != null) { csi.restoreCSIState(); diff --git a/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart b/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart index 75eec1e..b3d3491 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart @@ -42,7 +42,6 @@ const _doNotEncryptList = [ DoNotEncrypt('stanza-id', stableIdXmlns), ]; -@mustCallSuper abstract class BaseOmemoManager extends XmppManagerBase { BaseOmemoManager() : super(omemoManager); diff --git a/packages/moxxmpp/test/reconnection_test.dart b/packages/moxxmpp/test/reconnection_test.dart index 436b2af..992f912 100644 --- a/packages/moxxmpp/test/reconnection_test.dart +++ b/packages/moxxmpp/test/reconnection_test.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'package:moxxmpp/moxxmpp.dart'; import 'package:test/test.dart'; import 'helpers/logging.dart'; @@ -63,6 +64,8 @@ void main() { counter++; }); await policy.setShouldReconnect(true); + // ignore: avoid_print + print('policy.setShouldReconnect(true) done'); // We have a failure expect( @@ -70,9 +73,13 @@ void main() { true, ); await policy.onFailure(); + // ignore: avoid_print + print('policy.onFailure() done'); - await policy.onTimerElapsed(); - await policy.onTimerElapsed(); + unawaited(policy.onTimerElapsed()); + unawaited(policy.onTimerElapsed()); + + await Future.delayed(const Duration(seconds: 3)); expect(counter, 1); }); } diff --git a/packages/moxxmpp/test/xeps/xep_0198_test.dart b/packages/moxxmpp/test/xeps/xep_0198_test.dart index f8745c8..100d159 100644 --- a/packages/moxxmpp/test/xeps/xep_0198_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0198_test.dart @@ -514,7 +514,7 @@ void main() { await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), - StreamManagementNegotiator(), + StreamManagementNegotiator()..resource = 'test-resource', ]); await conn.getManagerById(smManager)!.setState( StreamManagementState( @@ -536,6 +536,15 @@ void main() { .isStreamManagementEnabled(), true, ); + expect(conn.resource, 'MU29eEZn'); + expect( + conn + .getNegotiatorById( + streamManagementNegotiator, + )! + .resource, + 'MU29eEZn', + ); }); test('Test a successful stream resumption', () async { @@ -604,7 +613,7 @@ void main() { await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), - StreamManagementNegotiator(), + StreamManagementNegotiator()..resource = 'abc123', ]); await conn.getManagerById(smManager)!.setState( StreamManagementState( @@ -615,7 +624,6 @@ void main() { ); await conn.connect( - lastResource: 'abc123', waitUntilLogin: true, ); expect(fakeSocket.getState(), 4); @@ -690,7 +698,7 @@ void main() { await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), - StreamManagementNegotiator()..setResource('test-resource'), + StreamManagementNegotiator()..resource = 'test-resource', Sasl2Negotiator( userAgent: const UserAgent( id: 'd4565fa7-4d72-4749-b3d3-740edbf87770', @@ -787,7 +795,7 @@ void main() { await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), - StreamManagementNegotiator()..setResource('test-resource'), + StreamManagementNegotiator()..resource = 'test-resource', Bind2Negotiator(), Sasl2Negotiator( userAgent: const UserAgent(