From d9fbb9e102398011c40db482f1a3e7828108b7f0 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 19:58:43 +0200 Subject: [PATCH] fix(xep,core): Ensure in-order processing of incoming stanzas --- packages/moxxmpp/lib/src/connection.dart | 30 +++--- packages/moxxmpp/lib/src/parser.dart | 16 +-- .../moxxmpp/lib/src/util/incoming_queue.dart | 99 +++++++++++++++++++ .../lib/src/xeps/xep_0045/xep_0045.dart | 10 +- packages/moxxmpp/lib/src/xeps/xep_0115.dart | 76 ++++++++------ .../moxxmpp/lib/src/util/incoming_queue.dart | 1 + packages/moxxmpp/pubspec.yaml | 1 + packages/moxxmpp/test/xmpp_parser_test.dart | 95 ++++++++++++------ 8 files changed, 245 insertions(+), 83 deletions(-) create mode 100644 packages/moxxmpp/lib/src/util/incoming_queue.dart create mode 100644 packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 869d3a5..48f3d1d 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -25,12 +25,12 @@ import 'package:moxxmpp/src/settings.dart'; import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; +import 'package:moxxmpp/src/util/incoming_queue.dart'; import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/typed_map.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart'; -import 'package:synchronized/synchronized.dart'; import 'package:uuid/uuid.dart'; /// The states the XmppConnection can be in @@ -90,9 +90,12 @@ class XmppConnection { }, ); + _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream); _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done - _socketStream.transform(_streamParser).forEach(handleXmlStream); + _socketStream + .transform(_streamParser) + .forEach(_incomingStanzaQueue.addStanza); _socket.getEventStream().listen(handleSocketEvent); _stanzaQueue = AsyncStanzaQueue( @@ -170,10 +173,6 @@ class XmppConnection { T? getNegotiatorById(String id) => _negotiationsHandler.getNegotiatorById(id); - /// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator - /// is still running. - final Lock _negotiationLock = Lock(); - /// The logger for the class final Logger _log = Logger('XmppConnection'); @@ -182,6 +181,8 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + late final IncomingStanzaQueue _incomingStanzaQueue; + late final AsyncStanzaQueue _stanzaQueue; /// Returns the JID we authenticate with and add the resource that we have bound. @@ -591,6 +592,8 @@ class XmppConnection { await _reconnectionPolicy.setShouldReconnect(true); } + _incomingStanzaQueue.negotiationsDone = true; + // Tell consumers of the event stream that we're done with stream feature // negotiations await _sendEvent( @@ -828,17 +831,17 @@ class XmppConnection { // causing (a) the negotiator to become confused and (b) the stanzas/nonzas to be // missed. This causes the data to wait while the negotiator is running and thus // prevent this issue. - await _negotiationLock.synchronized(() async { - if (_routingState != RoutingState.negotiating) { - unawaited(handleXmlStream(event)); - return; - } + if (_routingState != RoutingState.negotiating) { + unawaited(handleXmlStream(event)); + return; + } - await _negotiationsHandler.negotiate(event); - }); + await _negotiationsHandler.negotiate(event); break; case RoutingState.handleStanzas: + _log.finest('Handling ${node.tag} (${node.attributes["id"]})'); await _handleStanza(node); + _log.finest('Handling ${node.tag} (${node.attributes["id"]}) done'); break; case RoutingState.preConnection: case RoutingState.error: @@ -903,6 +906,7 @@ class XmppConnection { // Kill a possibly existing connection _socket.close(); + _incomingStanzaQueue.negotiationsDone = false; await _reconnectionPolicy.reset(); _enableReconnectOnSuccess = enableReconnectOnSuccess; if (shouldReconnect) { diff --git a/packages/moxxmpp/lib/src/parser.dart b/packages/moxxmpp/lib/src/parser.dart index 1c9b458..6818cb6 100644 --- a/packages/moxxmpp/lib/src/parser.dart +++ b/packages/moxxmpp/lib/src/parser.dart @@ -57,9 +57,10 @@ class _ChunkedConversionBuffer { } /// A buffer to put between a socket's input and a full XML stream. -class XMPPStreamParser extends StreamTransformerBase { - final StreamController _streamController = - StreamController(); +class XMPPStreamParser + extends StreamTransformerBase> { + final StreamController> _streamController = + StreamController>(); /// Turns a String into a list of [XmlEvent]s in a chunked fashion. _ChunkedConversionBuffer _eventBuffer = @@ -117,13 +118,14 @@ class XMPPStreamParser extends StreamTransformerBase { } @override - Stream bind(Stream stream) { + Stream> bind(Stream stream) { // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they // create streams we cannot close. We need to be able to destroy and recreate an // XML parser whenever we start a new connection. stream.listen((input) { final events = _eventBuffer.convert(input); final streamHeaderEvents = _streamHeaderSelector.convert(events); + final objects = List.empty(growable: true); // Process the stream header separately. for (final event in streamHeaderEvents) { @@ -135,7 +137,7 @@ class XMPPStreamParser extends StreamTransformerBase { continue; } - _streamController.add( + objects.add( XMPPStreamHeader( Map.fromEntries( event.attributes.map((attr) { @@ -151,13 +153,15 @@ class XMPPStreamParser extends StreamTransformerBase { final children = _childBuffer.convert(childEvents); for (final node in children) { if (node.nodeType == XmlNodeType.ELEMENT) { - _streamController.add( + objects.add( XMPPStreamElement( XMLNode.fromXmlElement(node as XmlElement), ), ); } } + + _streamController.add(objects); }); return _streamController.stream; diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart new file mode 100644 index 0000000..36a77cd --- /dev/null +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -0,0 +1,99 @@ +import 'dart:async'; +import 'dart:collection'; +import 'package:logging/logging.dart'; +import 'package:moxxmpp/src/parser.dart'; +import 'package:synchronized/synchronized.dart'; + +typedef LockResult = (Completer?, XMPPStreamObject); + +class IncomingStanzaQueue { + IncomingStanzaQueue(this._callback); + + final Queue> _queue = Queue(); + + final Future Function(XMPPStreamObject) _callback; + bool _isRunning = false; + + final Lock _lock = Lock(); + + final Logger _log = Logger('IncomingStanzaQueue'); + + bool negotiationsDone = false; + + Future _processStreamObject( + Future? future, + XMPPStreamObject object, + ) async { + if (future == null) { + if (object is XMPPStreamElement) { + _log.finest( + 'Bypassing queue for ${object.node.tag} (${object.node.attributes["id"]})', + ); + } + return _callback(object); + } + + // Wait for our turn. + await future; + + // Run the callback. + if (object is XMPPStreamElement) { + _log.finest('Running callback for ${object.node.toXml()}'); + } + await _callback(object); + if (object is XMPPStreamElement) { + _log.finest( + 'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done', + ); + } + + // Run the next entry. + _log.finest('Entering second lock...'); + await _lock.synchronized(() { + _log.finest('Second lock entered...'); + if (_queue.isNotEmpty) { + _log.finest('New queue size: ${_queue.length - 1}'); + _queue.removeFirst().complete(); + } else { + _isRunning = false; + _log.finest('New queue size: 0'); + } + }); + } + + Future addStanza(List objects) async { + _log.finest('Entering initial lock...'); + await _lock.synchronized(() { + _log.finest('Lock entered...'); + + for (final object in objects) { + if (canBypassQueue(object)) { + unawaited( + _processStreamObject(null, object), + ); + continue; + } + + final completer = Completer(); + if (_isRunning) { + _queue.add(completer); + } else { + _isRunning = true; + completer.complete(); + } + + unawaited( + _processStreamObject(completer.future, object), + ); + } + }); + } + + bool canBypassQueue(XMPPStreamObject object) { + // TODO: Ask the StanzaAwaiter if the stanza is awaited + return object is XMPPStreamElement && + negotiationsDone && + object.node.tag == 'iq' && + ['result', 'error'].contains(object.node.attributes['type'] as String?); + } +} diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index 0042871..e929ca5 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -251,19 +251,23 @@ class MUCManager extends XmppManagerBase { StanzaHandlerData state, ) async { if (presence.from == null) { + logger.finest('Ignoring presence as it has no from attribute'); return state; } final from = JID.fromString(presence.from!); final bareFrom = from.toBare(); return _cacheLock.synchronized(() { + logger.finest('Lock aquired for presence from ${presence.from}'); final room = _mucRoomCache[bareFrom]; if (room == null) { + logger.finest('Ignoring presence as it does not belong to a room'); return state; } if (from.resource.isEmpty) { // TODO(Unknown): Handle presence from the room itself. + logger.finest('Ignoring presence as it has no resource'); return state; } @@ -311,6 +315,7 @@ class MUCManager extends XmppManagerBase { // Set the nick to make sure we're in sync with the MUC. room.nick = from.resource; + logger.finest('Self-presence handled'); return StanzaHandlerData( true, false, @@ -360,8 +365,10 @@ class MUCManager extends XmppManagerBase { } room.members[from.resource] = member; + logger.finest('${from.resource} added to the member list'); } + logger.finest('Ran through'); return StanzaHandlerData( true, false, @@ -398,7 +405,8 @@ class MUCManager extends XmppManagerBase { ) async { final fromJid = JID.fromString(message.from!); final roomJid = fromJid.toBare(); - return _mucRoomCache.synchronized(() { + return _cacheLock.synchronized(() { + logger.finest('Lock aquired for message from ${message.from}'); final roomState = _mucRoomCache[roomJid]; if (roomState == null) { return state; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0115.dart b/packages/moxxmpp/lib/src/xeps/xep_0115.dart index 19ebdf6..ff8890d 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0115.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0115.dart @@ -173,39 +173,15 @@ class EntityCapabilitiesManager extends XmppManagerBase { }); } - @visibleForTesting - Future onPresence( - Stanza stanza, - StanzaHandlerData state, - ) async { - if (stanza.from == null) { - return state; - } - - final from = JID.fromString(stanza.from!); - final c = stanza.firstTag('c', xmlns: capsXmlns)!; - - final hashFunctionName = c.attributes['hash'] as String?; - final capabilityNode = c.attributes['node'] as String?; - final ver = c.attributes['ver'] as String?; - if (hashFunctionName == null || capabilityNode == null || ver == null) { - return state; - } - - // Check if we know of the hash - final isCached = - await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); - if (isCached) { - return state; - } - + Future _performQuery(Stanza presence, String ver, + String hashFunctionName, String capabilityNode, JID from) async { final dm = getAttributes().getManagerById(discoManager)!; final discoRequest = await dm.discoInfoQuery( from, node: capabilityNode, ); if (discoRequest.isType()) { - return state; + return; } final discoInfo = discoRequest.get(); @@ -220,7 +196,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { discoInfo, ), ); - return state; + return; } // Validate the disco#info result according to XEP-0115 ยง 5.4 @@ -234,7 +210,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal identity', ); - return state; + return; } } @@ -245,7 +221,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal feature', ); - return state; + return; } } @@ -273,7 +249,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: Extended Info FORM_TYPE contains more than one value(s) of different value.', ); - return state; + return; } } @@ -288,7 +264,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one Extended Disco Info forms with the same FORM_TYPE value', ); - return state; + return; } // Check if the field type is hidden @@ -325,7 +301,43 @@ class EntityCapabilitiesManager extends XmppManagerBase { 'Capability hash mismatch from $from: Received "$ver", expected "$computedCapabilityHash".', ); } + } + @visibleForTesting + Future onPresence( + Stanza stanza, + StanzaHandlerData state, + ) async { + if (stanza.from == null) { + return state; + } + + final from = JID.fromString(stanza.from!); + final c = stanza.firstTag('c', xmlns: capsXmlns)!; + + final hashFunctionName = c.attributes['hash'] as String?; + final capabilityNode = c.attributes['node'] as String?; + final ver = c.attributes['ver'] as String?; + if (hashFunctionName == null || capabilityNode == null || ver == null) { + return state; + } + + // Check if we know of the hash + final isCached = + await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); + if (isCached) { + return state; + } + + unawaited( + _performQuery( + stanza, + ver, + hashFunctionName, + capabilityNode, + from, + ), + ); return state; } diff --git a/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -0,0 +1 @@ + diff --git a/packages/moxxmpp/pubspec.yaml b/packages/moxxmpp/pubspec.yaml index 852dccb..281c3cb 100644 --- a/packages/moxxmpp/pubspec.yaml +++ b/packages/moxxmpp/pubspec.yaml @@ -8,6 +8,7 @@ environment: sdk: '>=3.0.0 <4.0.0' dependencies: + async_queue: ^1.3.0 collection: ^1.16.0 cryptography: ^2.0.5 hex: ^0.2.0 diff --git a/packages/moxxmpp/test/xmpp_parser_test.dart b/packages/moxxmpp/test/xmpp_parser_test.dart index 265f393..7098b79 100644 --- a/packages/moxxmpp/test/xmpp_parser_test.dart +++ b/packages/moxxmpp/test/xmpp_parser_test.dart @@ -11,14 +11,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -36,14 +38,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -64,14 +68,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -93,13 +99,15 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((node) { - if (node is XMPPStreamElement) { - if (node.node.tag == 'childa') { - childa = true; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is XMPPStreamElement) { + if (event.node.tag == 'childa') { + childa = true; + } + } else if (event is XMPPStreamHeader) { + attrs = event.attributes; } - } else if (node is XMPPStreamHeader) { - attrs = node.attributes; } }), ); @@ -118,11 +126,13 @@ void main() { var gotFeatures = false; unawaited( controller.stream.transform(parser).forEach( - (event) { - if (event is! XMPPStreamElement) return; + (events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; - if (event.node.tag == 'stream:features') { - gotFeatures = true; + if (event.node.tag == 'stream:features') { + gotFeatures = true; + } } }, ), @@ -157,4 +167,27 @@ void main() { await Future.delayed(const Duration(seconds: 1)); expect(gotFeatures, true); }); + + test('Test the order of concatenated stanzas', () async { + // NOTE: This seems weird, but it turns out that not keeping this order leads to + // MUC joins (on Moxxy) not catching every bit of presence before marking the + // MUC as joined. + final parser = XMPPStreamParser(); + final controller = StreamController(); + var called = false; + + unawaited( + controller.stream.transform(parser).forEach((events) { + expect(events.isNotEmpty, true); + expect((events[0] as XMPPStreamElement).node.tag, 'childa'); + expect((events[1] as XMPPStreamElement).node.tag, 'childb'); + expect((events[2] as XMPPStreamElement).node.tag, 'childc'); + called = true; + }), + ); + controller.add(''); + + await Future.delayed(const Duration(seconds: 2)); + expect(called, true); + }); }