From c7d58c3d3f8598eaf36b5f1984c9ed9b52c89351 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 27 Sep 2023 18:57:13 +0200 Subject: [PATCH 01/23] feat(core): Add logging for when a manager ends processing early --- packages/moxxmpp/lib/src/connection.dart | 58 +++++++++++++++++------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index ee1e9c1..869d3a5 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -49,6 +49,19 @@ enum XmppConnectionState { error } +/// (The actual stanza handler, Name of the owning manager). +typedef _StanzaHandlerWrapper = (StanzaHandler, String); + +/// Wrapper around [stanzaHandlerSortComparator] for [_StanzaHandlerWrapper]. +int _stanzaHandlerWrapperSortComparator( + _StanzaHandlerWrapper a, + _StanzaHandlerWrapper b, +) { + final (ha, _) = a; + final (hb, _) = b; + return stanzaHandlerSortComparator(ha, hb); +} + /// This class is a connection to the server. class XmppConnection { XmppConnection( @@ -112,13 +125,13 @@ class XmppConnection { final StanzaAwaiter _stanzaAwaiter = StanzaAwaiter(); /// Sorted list of handlers that we call or incoming and outgoing stanzas - final List _incomingStanzaHandlers = + final List<_StanzaHandlerWrapper> _incomingStanzaHandlers = List.empty(growable: true); - final List _incomingPreStanzaHandlers = + final List<_StanzaHandlerWrapper> _incomingPreStanzaHandlers = List.empty(growable: true); - final List _outgoingPreStanzaHandlers = + final List<_StanzaHandlerWrapper> _outgoingPreStanzaHandlers = List.empty(growable: true); - final List _outgoingPostStanzaHandlers = + final List<_StanzaHandlerWrapper> _outgoingPostStanzaHandlers = List.empty(growable: true); final StreamController _eventStreamController = StreamController.broadcast(); @@ -198,18 +211,25 @@ class XmppConnection { _xmppManagers[manager.id] = manager; - _incomingStanzaHandlers.addAll(manager.getIncomingStanzaHandlers()); - _incomingPreStanzaHandlers.addAll(manager.getIncomingPreStanzaHandlers()); - _outgoingPreStanzaHandlers.addAll(manager.getOutgoingPreStanzaHandlers()); - _outgoingPostStanzaHandlers - .addAll(manager.getOutgoingPostStanzaHandlers()); + _incomingStanzaHandlers.addAll( + manager.getIncomingStanzaHandlers().map((h) => (h, manager.name)), + ); + _incomingPreStanzaHandlers.addAll( + manager.getIncomingPreStanzaHandlers().map((h) => (h, manager.name)), + ); + _outgoingPreStanzaHandlers.addAll( + manager.getOutgoingPreStanzaHandlers().map((h) => (h, manager.name)), + ); + _outgoingPostStanzaHandlers.addAll( + manager.getOutgoingPostStanzaHandlers().map((h) => (h, manager.name)), + ); } // Sort them - _incomingStanzaHandlers.sort(stanzaHandlerSortComparator); - _incomingPreStanzaHandlers.sort(stanzaHandlerSortComparator); - _outgoingPreStanzaHandlers.sort(stanzaHandlerSortComparator); - _outgoingPostStanzaHandlers.sort(stanzaHandlerSortComparator); + _incomingStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _incomingPreStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _outgoingPreStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _outgoingPostStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); // Run the post register callbacks for (final manager in _xmppManagers.values) { @@ -650,15 +670,21 @@ class XmppConnection { /// call its callback and end the processing if the callback returned true; continue /// if it returned false. Future _runStanzaHandlers( - List handlers, + List<_StanzaHandlerWrapper> handlers, Stanza stanza, { StanzaHandlerData? initial, }) async { var state = initial ?? StanzaHandlerData(false, false, stanza, TypedMap()); - for (final handler in handlers) { + for (final handlerRaw in handlers) { + final (handler, managerName) = handlerRaw; if (handler.matches(state.stanza)) { state = await handler.callback(state.stanza, state); - if (state.done || state.cancel) return state; + if (state.done || state.cancel) { + _log.finest( + 'Processing ended early for ${stanza.tag} by $managerName', + ); + return state; + } } } From 9211963390e327f2316c5021fda75f0c4a23a3de Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 27 Sep 2023 18:57:38 +0200 Subject: [PATCH 02/23] fix(xep): Fix ending presence processing too early if containing a photo --- packages/moxxmpp/lib/src/xeps/xep_0054.dart | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0054.dart b/packages/moxxmpp/lib/src/xeps/xep_0054.dart index ac25ede..eb821f9 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0054.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0054.dart @@ -56,14 +56,13 @@ class VCardManager extends XmppManagerBase { final x = presence.firstTag('x', xmlns: vCardTempUpdate)!; final hash = x.firstTag('photo')!.innerText(); - // TODO(Unknown): Use the presence manager interface. getAttributes().sendEvent( VCardAvatarUpdatedEvent( JID.fromString(presence.from!), hash, ), ); - return state..done = true; + return state; } VCardPhoto? _parseVCardPhoto(XMLNode? node) { From aba90f2e9047451a3b0807ee9f2f1568c43ff118 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 27 Sep 2023 18:58:17 +0200 Subject: [PATCH 03/23] feat(example): Print the number of users in the MUC --- examples_dart/bin/muc_client.dart | 15 ++++++++++----- packages/moxxmpp/lib/src/xeps/xep_0045/types.dart | 1 + 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/examples_dart/bin/muc_client.dart b/examples_dart/bin/muc_client.dart index 4e88c84..1eeb11e 100644 --- a/examples_dart/bin/muc_client.dart +++ b/examples_dart/bin/muc_client.dart @@ -75,11 +75,16 @@ void main(List args) async { }); // Join room - await connection.getManagerById(mucManager)!.joinRoom( - muc, - nick, - maxHistoryStanzas: 0, - ); + final mm = connection.getManagerById(mucManager)!; + await mm.joinRoom( + muc, + nick, + maxHistoryStanzas: 0, + ); + final state = (await mm.getRoomState(muc))!; + + print('=====> ${state.members.length} users in room'); + print('=====> ${state.members.values.map((m) => m.nick).join(", ")}'); final repl = Repl(prompt: '> '); await for (final line in repl.runAsync()) { diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart index a58866c..06bb806 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart @@ -5,6 +5,7 @@ import 'package:moxxmpp/src/xeps/xep_0004.dart'; import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; class InvalidAffiliationException implements Exception {} + class InvalidRoleException implements Exception {} enum Affiliation { From d9fbb9e102398011c40db482f1a3e7828108b7f0 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 19:58:43 +0200 Subject: [PATCH 04/23] fix(xep,core): Ensure in-order processing of incoming stanzas --- packages/moxxmpp/lib/src/connection.dart | 30 +++--- packages/moxxmpp/lib/src/parser.dart | 16 +-- .../moxxmpp/lib/src/util/incoming_queue.dart | 99 +++++++++++++++++++ .../lib/src/xeps/xep_0045/xep_0045.dart | 10 +- packages/moxxmpp/lib/src/xeps/xep_0115.dart | 76 ++++++++------ .../moxxmpp/lib/src/util/incoming_queue.dart | 1 + packages/moxxmpp/pubspec.yaml | 1 + packages/moxxmpp/test/xmpp_parser_test.dart | 95 ++++++++++++------ 8 files changed, 245 insertions(+), 83 deletions(-) create mode 100644 packages/moxxmpp/lib/src/util/incoming_queue.dart create mode 100644 packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 869d3a5..48f3d1d 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -25,12 +25,12 @@ import 'package:moxxmpp/src/settings.dart'; import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; +import 'package:moxxmpp/src/util/incoming_queue.dart'; import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/typed_map.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart'; -import 'package:synchronized/synchronized.dart'; import 'package:uuid/uuid.dart'; /// The states the XmppConnection can be in @@ -90,9 +90,12 @@ class XmppConnection { }, ); + _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream); _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done - _socketStream.transform(_streamParser).forEach(handleXmlStream); + _socketStream + .transform(_streamParser) + .forEach(_incomingStanzaQueue.addStanza); _socket.getEventStream().listen(handleSocketEvent); _stanzaQueue = AsyncStanzaQueue( @@ -170,10 +173,6 @@ class XmppConnection { T? getNegotiatorById(String id) => _negotiationsHandler.getNegotiatorById(id); - /// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator - /// is still running. - final Lock _negotiationLock = Lock(); - /// The logger for the class final Logger _log = Logger('XmppConnection'); @@ -182,6 +181,8 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + late final IncomingStanzaQueue _incomingStanzaQueue; + late final AsyncStanzaQueue _stanzaQueue; /// Returns the JID we authenticate with and add the resource that we have bound. @@ -591,6 +592,8 @@ class XmppConnection { await _reconnectionPolicy.setShouldReconnect(true); } + _incomingStanzaQueue.negotiationsDone = true; + // Tell consumers of the event stream that we're done with stream feature // negotiations await _sendEvent( @@ -828,17 +831,17 @@ class XmppConnection { // causing (a) the negotiator to become confused and (b) the stanzas/nonzas to be // missed. This causes the data to wait while the negotiator is running and thus // prevent this issue. - await _negotiationLock.synchronized(() async { - if (_routingState != RoutingState.negotiating) { - unawaited(handleXmlStream(event)); - return; - } + if (_routingState != RoutingState.negotiating) { + unawaited(handleXmlStream(event)); + return; + } - await _negotiationsHandler.negotiate(event); - }); + await _negotiationsHandler.negotiate(event); break; case RoutingState.handleStanzas: + _log.finest('Handling ${node.tag} (${node.attributes["id"]})'); await _handleStanza(node); + _log.finest('Handling ${node.tag} (${node.attributes["id"]}) done'); break; case RoutingState.preConnection: case RoutingState.error: @@ -903,6 +906,7 @@ class XmppConnection { // Kill a possibly existing connection _socket.close(); + _incomingStanzaQueue.negotiationsDone = false; await _reconnectionPolicy.reset(); _enableReconnectOnSuccess = enableReconnectOnSuccess; if (shouldReconnect) { diff --git a/packages/moxxmpp/lib/src/parser.dart b/packages/moxxmpp/lib/src/parser.dart index 1c9b458..6818cb6 100644 --- a/packages/moxxmpp/lib/src/parser.dart +++ b/packages/moxxmpp/lib/src/parser.dart @@ -57,9 +57,10 @@ class _ChunkedConversionBuffer { } /// A buffer to put between a socket's input and a full XML stream. -class XMPPStreamParser extends StreamTransformerBase { - final StreamController _streamController = - StreamController(); +class XMPPStreamParser + extends StreamTransformerBase> { + final StreamController> _streamController = + StreamController>(); /// Turns a String into a list of [XmlEvent]s in a chunked fashion. _ChunkedConversionBuffer _eventBuffer = @@ -117,13 +118,14 @@ class XMPPStreamParser extends StreamTransformerBase { } @override - Stream bind(Stream stream) { + Stream> bind(Stream stream) { // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they // create streams we cannot close. We need to be able to destroy and recreate an // XML parser whenever we start a new connection. stream.listen((input) { final events = _eventBuffer.convert(input); final streamHeaderEvents = _streamHeaderSelector.convert(events); + final objects = List.empty(growable: true); // Process the stream header separately. for (final event in streamHeaderEvents) { @@ -135,7 +137,7 @@ class XMPPStreamParser extends StreamTransformerBase { continue; } - _streamController.add( + objects.add( XMPPStreamHeader( Map.fromEntries( event.attributes.map((attr) { @@ -151,13 +153,15 @@ class XMPPStreamParser extends StreamTransformerBase { final children = _childBuffer.convert(childEvents); for (final node in children) { if (node.nodeType == XmlNodeType.ELEMENT) { - _streamController.add( + objects.add( XMPPStreamElement( XMLNode.fromXmlElement(node as XmlElement), ), ); } } + + _streamController.add(objects); }); return _streamController.stream; diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart new file mode 100644 index 0000000..36a77cd --- /dev/null +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -0,0 +1,99 @@ +import 'dart:async'; +import 'dart:collection'; +import 'package:logging/logging.dart'; +import 'package:moxxmpp/src/parser.dart'; +import 'package:synchronized/synchronized.dart'; + +typedef LockResult = (Completer?, XMPPStreamObject); + +class IncomingStanzaQueue { + IncomingStanzaQueue(this._callback); + + final Queue> _queue = Queue(); + + final Future Function(XMPPStreamObject) _callback; + bool _isRunning = false; + + final Lock _lock = Lock(); + + final Logger _log = Logger('IncomingStanzaQueue'); + + bool negotiationsDone = false; + + Future _processStreamObject( + Future? future, + XMPPStreamObject object, + ) async { + if (future == null) { + if (object is XMPPStreamElement) { + _log.finest( + 'Bypassing queue for ${object.node.tag} (${object.node.attributes["id"]})', + ); + } + return _callback(object); + } + + // Wait for our turn. + await future; + + // Run the callback. + if (object is XMPPStreamElement) { + _log.finest('Running callback for ${object.node.toXml()}'); + } + await _callback(object); + if (object is XMPPStreamElement) { + _log.finest( + 'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done', + ); + } + + // Run the next entry. + _log.finest('Entering second lock...'); + await _lock.synchronized(() { + _log.finest('Second lock entered...'); + if (_queue.isNotEmpty) { + _log.finest('New queue size: ${_queue.length - 1}'); + _queue.removeFirst().complete(); + } else { + _isRunning = false; + _log.finest('New queue size: 0'); + } + }); + } + + Future addStanza(List objects) async { + _log.finest('Entering initial lock...'); + await _lock.synchronized(() { + _log.finest('Lock entered...'); + + for (final object in objects) { + if (canBypassQueue(object)) { + unawaited( + _processStreamObject(null, object), + ); + continue; + } + + final completer = Completer(); + if (_isRunning) { + _queue.add(completer); + } else { + _isRunning = true; + completer.complete(); + } + + unawaited( + _processStreamObject(completer.future, object), + ); + } + }); + } + + bool canBypassQueue(XMPPStreamObject object) { + // TODO: Ask the StanzaAwaiter if the stanza is awaited + return object is XMPPStreamElement && + negotiationsDone && + object.node.tag == 'iq' && + ['result', 'error'].contains(object.node.attributes['type'] as String?); + } +} diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index 0042871..e929ca5 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -251,19 +251,23 @@ class MUCManager extends XmppManagerBase { StanzaHandlerData state, ) async { if (presence.from == null) { + logger.finest('Ignoring presence as it has no from attribute'); return state; } final from = JID.fromString(presence.from!); final bareFrom = from.toBare(); return _cacheLock.synchronized(() { + logger.finest('Lock aquired for presence from ${presence.from}'); final room = _mucRoomCache[bareFrom]; if (room == null) { + logger.finest('Ignoring presence as it does not belong to a room'); return state; } if (from.resource.isEmpty) { // TODO(Unknown): Handle presence from the room itself. + logger.finest('Ignoring presence as it has no resource'); return state; } @@ -311,6 +315,7 @@ class MUCManager extends XmppManagerBase { // Set the nick to make sure we're in sync with the MUC. room.nick = from.resource; + logger.finest('Self-presence handled'); return StanzaHandlerData( true, false, @@ -360,8 +365,10 @@ class MUCManager extends XmppManagerBase { } room.members[from.resource] = member; + logger.finest('${from.resource} added to the member list'); } + logger.finest('Ran through'); return StanzaHandlerData( true, false, @@ -398,7 +405,8 @@ class MUCManager extends XmppManagerBase { ) async { final fromJid = JID.fromString(message.from!); final roomJid = fromJid.toBare(); - return _mucRoomCache.synchronized(() { + return _cacheLock.synchronized(() { + logger.finest('Lock aquired for message from ${message.from}'); final roomState = _mucRoomCache[roomJid]; if (roomState == null) { return state; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0115.dart b/packages/moxxmpp/lib/src/xeps/xep_0115.dart index 19ebdf6..ff8890d 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0115.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0115.dart @@ -173,39 +173,15 @@ class EntityCapabilitiesManager extends XmppManagerBase { }); } - @visibleForTesting - Future onPresence( - Stanza stanza, - StanzaHandlerData state, - ) async { - if (stanza.from == null) { - return state; - } - - final from = JID.fromString(stanza.from!); - final c = stanza.firstTag('c', xmlns: capsXmlns)!; - - final hashFunctionName = c.attributes['hash'] as String?; - final capabilityNode = c.attributes['node'] as String?; - final ver = c.attributes['ver'] as String?; - if (hashFunctionName == null || capabilityNode == null || ver == null) { - return state; - } - - // Check if we know of the hash - final isCached = - await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); - if (isCached) { - return state; - } - + Future _performQuery(Stanza presence, String ver, + String hashFunctionName, String capabilityNode, JID from) async { final dm = getAttributes().getManagerById(discoManager)!; final discoRequest = await dm.discoInfoQuery( from, node: capabilityNode, ); if (discoRequest.isType()) { - return state; + return; } final discoInfo = discoRequest.get(); @@ -220,7 +196,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { discoInfo, ), ); - return state; + return; } // Validate the disco#info result according to XEP-0115 § 5.4 @@ -234,7 +210,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal identity', ); - return state; + return; } } @@ -245,7 +221,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal feature', ); - return state; + return; } } @@ -273,7 +249,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: Extended Info FORM_TYPE contains more than one value(s) of different value.', ); - return state; + return; } } @@ -288,7 +264,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one Extended Disco Info forms with the same FORM_TYPE value', ); - return state; + return; } // Check if the field type is hidden @@ -325,7 +301,43 @@ class EntityCapabilitiesManager extends XmppManagerBase { 'Capability hash mismatch from $from: Received "$ver", expected "$computedCapabilityHash".', ); } + } + @visibleForTesting + Future onPresence( + Stanza stanza, + StanzaHandlerData state, + ) async { + if (stanza.from == null) { + return state; + } + + final from = JID.fromString(stanza.from!); + final c = stanza.firstTag('c', xmlns: capsXmlns)!; + + final hashFunctionName = c.attributes['hash'] as String?; + final capabilityNode = c.attributes['node'] as String?; + final ver = c.attributes['ver'] as String?; + if (hashFunctionName == null || capabilityNode == null || ver == null) { + return state; + } + + // Check if we know of the hash + final isCached = + await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); + if (isCached) { + return state; + } + + unawaited( + _performQuery( + stanza, + ver, + hashFunctionName, + capabilityNode, + from, + ), + ); return state; } diff --git a/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -0,0 +1 @@ + diff --git a/packages/moxxmpp/pubspec.yaml b/packages/moxxmpp/pubspec.yaml index 852dccb..281c3cb 100644 --- a/packages/moxxmpp/pubspec.yaml +++ b/packages/moxxmpp/pubspec.yaml @@ -8,6 +8,7 @@ environment: sdk: '>=3.0.0 <4.0.0' dependencies: + async_queue: ^1.3.0 collection: ^1.16.0 cryptography: ^2.0.5 hex: ^0.2.0 diff --git a/packages/moxxmpp/test/xmpp_parser_test.dart b/packages/moxxmpp/test/xmpp_parser_test.dart index 265f393..7098b79 100644 --- a/packages/moxxmpp/test/xmpp_parser_test.dart +++ b/packages/moxxmpp/test/xmpp_parser_test.dart @@ -11,14 +11,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -36,14 +38,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -64,14 +68,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -93,13 +99,15 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((node) { - if (node is XMPPStreamElement) { - if (node.node.tag == 'childa') { - childa = true; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is XMPPStreamElement) { + if (event.node.tag == 'childa') { + childa = true; + } + } else if (event is XMPPStreamHeader) { + attrs = event.attributes; } - } else if (node is XMPPStreamHeader) { - attrs = node.attributes; } }), ); @@ -118,11 +126,13 @@ void main() { var gotFeatures = false; unawaited( controller.stream.transform(parser).forEach( - (event) { - if (event is! XMPPStreamElement) return; + (events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; - if (event.node.tag == 'stream:features') { - gotFeatures = true; + if (event.node.tag == 'stream:features') { + gotFeatures = true; + } } }, ), @@ -157,4 +167,27 @@ void main() { await Future.delayed(const Duration(seconds: 1)); expect(gotFeatures, true); }); + + test('Test the order of concatenated stanzas', () async { + // NOTE: This seems weird, but it turns out that not keeping this order leads to + // MUC joins (on Moxxy) not catching every bit of presence before marking the + // MUC as joined. + final parser = XMPPStreamParser(); + final controller = StreamController(); + var called = false; + + unawaited( + controller.stream.transform(parser).forEach((events) { + expect(events.isNotEmpty, true); + expect((events[0] as XMPPStreamElement).node.tag, 'childa'); + expect((events[1] as XMPPStreamElement).node.tag, 'childb'); + expect((events[2] as XMPPStreamElement).node.tag, 'childc'); + called = true; + }), + ); + controller.add(''); + + await Future.delayed(const Duration(seconds: 2)); + expect(called, true); + }); } From fb4b4c71e2d03b0afff8dea505904f87938e6ff9 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 19:59:38 +0200 Subject: [PATCH 05/23] fix(core): Remove async_queue --- packages/moxxmpp/pubspec.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/moxxmpp/pubspec.yaml b/packages/moxxmpp/pubspec.yaml index 281c3cb..852dccb 100644 --- a/packages/moxxmpp/pubspec.yaml +++ b/packages/moxxmpp/pubspec.yaml @@ -8,7 +8,6 @@ environment: sdk: '>=3.0.0 <4.0.0' dependencies: - async_queue: ^1.3.0 collection: ^1.16.0 cryptography: ^2.0.5 hex: ^0.2.0 From 3a94dd9634b7a9b0f4ec13a000f9b683a4ad4c49 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:01:09 +0200 Subject: [PATCH 06/23] feat(core): Log handler executions --- packages/moxxmpp/lib/src/connection.dart | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 48f3d1d..1d44579 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -681,10 +681,13 @@ class XmppConnection { for (final handlerRaw in handlers) { final (handler, managerName) = handlerRaw; if (handler.matches(state.stanza)) { + _log.finest( + 'Running handler for ${stanza.tag} (${stanza.attributes["id"]}) of $managerName', + ); state = await handler.callback(state.stanza, state); if (state.done || state.cancel) { _log.finest( - 'Processing ended early for ${stanza.tag} by $managerName', + 'Processing ended early for ${stanza.tag} (${stanza.attributes["id"]}) by $managerName', ); return state; } From 49d3c6411b461b15f9b7bd49d826dc7475116746 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:24:58 +0200 Subject: [PATCH 07/23] fix(tests): Fix tests --- packages/moxxmpp/lib/src/awaiter.dart | 31 +++++++---- packages/moxxmpp/lib/src/connection.dart | 17 ++++-- packages/moxxmpp/lib/src/stanza.dart | 6 +- .../moxxmpp/lib/src/util/incoming_queue.dart | 55 +++++++++---------- .../lib/src/xeps/xep_0045/xep_0045.dart | 1 - packages/moxxmpp/lib/src/xeps/xep_0115.dart | 9 ++- packages/moxxmpp/test/awaiter_test.dart | 17 +----- packages/moxxmpp/test/xeps/xep_0115_test.dart | 3 + 8 files changed, 77 insertions(+), 62 deletions(-) diff --git a/packages/moxxmpp/lib/src/awaiter.dart b/packages/moxxmpp/lib/src/awaiter.dart index c730038..13722b1 100644 --- a/packages/moxxmpp/lib/src/awaiter.dart +++ b/packages/moxxmpp/lib/src/awaiter.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; @@ -11,7 +10,7 @@ class _StanzaSurrogateKey { /// The JID the original stanza was sent to. We expect the result to come from the /// same JID. - final String sentTo; + final String? sentTo; /// The ID of the original stanza. We expect the result to have the same ID. final String id; @@ -52,7 +51,7 @@ class StanzaAwaiter { /// [tag] is the stanza's tag name. /// /// Returns a future that might resolve to the response to the stanza. - Future> addPending(String to, String id, String tag) async { + Future> addPending(String? to, String id, String tag) async { final completer = await _lock.synchronized(() { final completer = Completer(); _pending[_StanzaSurrogateKey(to, id, tag)] = completer; @@ -62,20 +61,15 @@ class StanzaAwaiter { return completer.future; } - /// Checks if the stanza [stanza] is being awaited. [bareJid] is the bare JID of - /// the connection. + /// Checks if the stanza [stanza] is being awaited. /// If [stanza] is awaited, resolves the future and returns true. If not, returns /// false. - Future onData(XMLNode stanza, JID bareJid) async { - assert(bareJid.isBare(), 'bareJid must be bare'); - + Future onData(XMLNode stanza) async { final id = stanza.attributes['id'] as String?; if (id == null) return false; final key = _StanzaSurrogateKey( - // Section 8.1.2.1 § 3 of RFC 6120 says that an empty "from" indicates that the - // attribute is implicitly from our own bare JID. - stanza.attributes['from'] as String? ?? bareJid.toString(), + stanza.attributes['from'] as String?, id, stanza.tag, ); @@ -91,4 +85,19 @@ class StanzaAwaiter { return false; }); } + + /// Checks if [stanza] represents a stanza that is awaited. Returns true, if [stanza] + /// is awaited. False, if not. + Future isAwaited(XMLNode stanza) async { + final id = stanza.attributes['id'] as String?; + if (id == null) return false; + + final key = _StanzaSurrogateKey( + stanza.attributes['from'] as String?, + id, + stanza.tag, + ); + + return _lock.synchronized(() => _pending.containsKey(key)); + } } diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 1d44579..34d537b 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -90,7 +90,7 @@ class XmppConnection { }, ); - _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream); + _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done _socketStream @@ -531,12 +531,15 @@ class XmppConnection { _log.finest('==> $prefix${newStanza.toXml()}'); if (details.awaitable) { + final isOwnJid = + data.stanza.to == connectionSettings.jid.toBare().toString(); + await _stanzaAwaiter .addPending( // A stanza with no to attribute is for direct processing by the server. As such, // we can correlate it by just *assuming* we have that attribute // (RFC 6120 Section 8.1.1.1) - data.stanza.to ?? connectionSettings.jid.toBare().toString(), + isOwnJid ? null : data.stanza.to, data.stanza.id!, data.stanza.tag, ) @@ -773,9 +776,15 @@ class XmppConnection { return; } + // In case the stanza came from our own bare Jid, remove it so that the stanza + // awaiter works correctly. + final isOwnJid = incomingPreHandlers.stanza.from == + connectionSettings.jid.toBare().toString(); + final ownJidStanza = isOwnJid + ? incomingPreHandlers.stanza.copyWith(from: null) + : incomingPreHandlers.stanza; final awaited = await _stanzaAwaiter.onData( - incomingPreHandlers.stanza, - connectionSettings.jid.toBare(), + ownJidStanza, ); if (awaited) { return; diff --git a/packages/moxxmpp/lib/src/stanza.dart b/packages/moxxmpp/lib/src/stanza.dart index 561eef4..b90c1e6 100644 --- a/packages/moxxmpp/lib/src/stanza.dart +++ b/packages/moxxmpp/lib/src/stanza.dart @@ -102,6 +102,8 @@ class RemoteServerTimeoutError extends StanzaError { /// An unknown error. class UnknownStanzaError extends StanzaError {} +const _stanzaNotDefined = Object(); + class Stanza extends XMLNode { // ignore: use_super_parameters Stanza({ @@ -216,7 +218,7 @@ class Stanza extends XMLNode { Stanza copyWith({ String? id, - String? from, + Object? from = _stanzaNotDefined, String? to, String? type, List? children, @@ -225,7 +227,7 @@ class Stanza extends XMLNode { return Stanza( tag: tag, to: to ?? this.to, - from: from ?? this.from, + from: from != _stanzaNotDefined ? from as String? : this.from, id: id ?? this.id, type: type ?? this.type, children: children ?? this.children, diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart index 36a77cd..202b1f4 100644 --- a/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -1,24 +1,37 @@ import 'dart:async'; import 'dart:collection'; import 'package:logging/logging.dart'; +import 'package:moxxmpp/src/awaiter.dart'; import 'package:moxxmpp/src/parser.dart'; import 'package:synchronized/synchronized.dart'; -typedef LockResult = (Completer?, XMPPStreamObject); - class IncomingStanzaQueue { - IncomingStanzaQueue(this._callback); + IncomingStanzaQueue(this._callback, this._stanzaAwaiter); + /// The queue for storing the completer of each + /// incoming stanza (or stream object to be precise). + /// Only access while holding the lock [_lock]. final Queue> _queue = Queue(); - final Future Function(XMPPStreamObject) _callback; + /// Flag indicating whether a callback is already running (true) + /// or not. "a callback" and not "the callback" because awaited stanzas + /// are allowed to bypass the queue. + /// Only access while holding the lock [_lock]. bool _isRunning = false; + /// The function to call to process an incoming stream object. + final Future Function(XMPPStreamObject) _callback; + + /// Lock guarding both [_queue] and [_isRunning]. final Lock _lock = Lock(); + // TODO: Remove once we can await stanzas (or can we). + bool negotiationsDone = false; + + /// Logger. final Logger _log = Logger('IncomingStanzaQueue'); - bool negotiationsDone = false; + final StanzaAwaiter _stanzaAwaiter; Future _processStreamObject( Future? future, @@ -37,37 +50,22 @@ class IncomingStanzaQueue { await future; // Run the callback. - if (object is XMPPStreamElement) { - _log.finest('Running callback for ${object.node.toXml()}'); - } await _callback(object); - if (object is XMPPStreamElement) { - _log.finest( - 'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done', - ); - } // Run the next entry. - _log.finest('Entering second lock...'); await _lock.synchronized(() { - _log.finest('Second lock entered...'); if (_queue.isNotEmpty) { - _log.finest('New queue size: ${_queue.length - 1}'); _queue.removeFirst().complete(); } else { _isRunning = false; - _log.finest('New queue size: 0'); } }); } Future addStanza(List objects) async { - _log.finest('Entering initial lock...'); - await _lock.synchronized(() { - _log.finest('Lock entered...'); - + await _lock.synchronized(() async { for (final object in objects) { - if (canBypassQueue(object)) { + if (await canBypassQueue(object)) { unawaited( _processStreamObject(null, object), ); @@ -89,11 +87,12 @@ class IncomingStanzaQueue { }); } - bool canBypassQueue(XMPPStreamObject object) { - // TODO: Ask the StanzaAwaiter if the stanza is awaited - return object is XMPPStreamElement && - negotiationsDone && - object.node.tag == 'iq' && - ['result', 'error'].contains(object.node.attributes['type'] as String?); + Future canBypassQueue(XMPPStreamObject object) async { + if (object is XMPPStreamHeader) { + return false; + } + + object as XMPPStreamElement; + return _stanzaAwaiter.isAwaited(object.node); } } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index e929ca5..08b08cb 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -16,7 +16,6 @@ import 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0045/events.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; import 'package:moxxmpp/src/xeps/xep_0359.dart'; -import 'package:synchronized/extension.dart'; import 'package:synchronized/synchronized.dart'; /// (Room JID, nickname) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0115.dart b/packages/moxxmpp/lib/src/xeps/xep_0115.dart index ff8890d..749011b 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0115.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0115.dart @@ -173,8 +173,13 @@ class EntityCapabilitiesManager extends XmppManagerBase { }); } - Future _performQuery(Stanza presence, String ver, - String hashFunctionName, String capabilityNode, JID from) async { + Future _performQuery( + Stanza presence, + String ver, + String hashFunctionName, + String capabilityNode, + JID from, + ) async { final dm = getAttributes().getManagerById(discoManager)!; final discoRequest = await dm.discoInfoQuery( from, diff --git a/packages/moxxmpp/test/awaiter_test.dart b/packages/moxxmpp/test/awaiter_test.dart index acde248..8367305 100644 --- a/packages/moxxmpp/test/awaiter_test.dart +++ b/packages/moxxmpp/test/awaiter_test.dart @@ -3,8 +3,6 @@ import 'package:moxxmpp/src/awaiter.dart'; import 'package:test/test.dart'; void main() { - const bareJid = JID('moxxmpp', 'server3.example', ''); - test('Test awaiting an awaited stanza with a from attribute', () async { final awaiter = StanzaAwaiter(); @@ -20,14 +18,12 @@ void main() { XMLNode.fromString( '', ), - bareJid, ); expect(result1, false); final result2 = await awaiter.onData( XMLNode.fromString( '', ), - bareJid, ); expect(result2, false); @@ -37,7 +33,6 @@ void main() { ); final result3 = await awaiter.onData( stanza, - bareJid, ); expect(result3, true); expect(await future, stanza); @@ -47,12 +42,11 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); @@ -60,7 +54,6 @@ void main() { final stanza = XMLNode.fromString(''); final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); @@ -70,13 +63,12 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the correct answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( stanza, - bareJid, ); expect(result1, true); expect(await future, stanza); @@ -84,7 +76,6 @@ void main() { // Receive it again final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, false); }); @@ -93,20 +84,18 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); // Receive the correct answer final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); diff --git a/packages/moxxmpp/test/xeps/xep_0115_test.dart b/packages/moxxmpp/test/xeps/xep_0115_test.dart index 47837c8..ff90977 100644 --- a/packages/moxxmpp/test/xeps/xep_0115_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0115_test.dart @@ -343,6 +343,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); expect( await manager.getCachedDiscoInfoFromJid(aliceJid) != null, @@ -513,6 +514,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect( @@ -549,6 +551,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect( From 59b90307c236ecfafadf655dac30cc397d36d583 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:29:25 +0200 Subject: [PATCH 08/23] fix(core): Remove the negotiation lock --- packages/moxxmpp/lib/src/connection.dart | 5 ----- packages/moxxmpp/lib/src/util/incoming_queue.dart | 3 --- 2 files changed, 8 deletions(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 34d537b..7584034 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -595,8 +595,6 @@ class XmppConnection { await _reconnectionPolicy.setShouldReconnect(true); } - _incomingStanzaQueue.negotiationsDone = true; - // Tell consumers of the event stream that we're done with stream feature // negotiations await _sendEvent( @@ -851,9 +849,7 @@ class XmppConnection { await _negotiationsHandler.negotiate(event); break; case RoutingState.handleStanzas: - _log.finest('Handling ${node.tag} (${node.attributes["id"]})'); await _handleStanza(node); - _log.finest('Handling ${node.tag} (${node.attributes["id"]}) done'); break; case RoutingState.preConnection: case RoutingState.error: @@ -918,7 +914,6 @@ class XmppConnection { // Kill a possibly existing connection _socket.close(); - _incomingStanzaQueue.negotiationsDone = false; await _reconnectionPolicy.reset(); _enableReconnectOnSuccess = enableReconnectOnSuccess; if (shouldReconnect) { diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart index 202b1f4..152e2f8 100644 --- a/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -25,9 +25,6 @@ class IncomingStanzaQueue { /// Lock guarding both [_queue] and [_isRunning]. final Lock _lock = Lock(); - // TODO: Remove once we can await stanzas (or can we). - bool negotiationsDone = false; - /// Logger. final Logger _log = Logger('IncomingStanzaQueue'); From edf1d0b25709085a1f3839ca0cb41161b6f3a8ca Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:33:56 +0200 Subject: [PATCH 09/23] feat(core): Replace custom class with a record type --- packages/moxxmpp/lib/src/awaiter.dart | 37 +++---------------- .../moxxmpp/lib/src/util/incoming_queue.dart | 3 ++ 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/packages/moxxmpp/lib/src/awaiter.dart b/packages/moxxmpp/lib/src/awaiter.dart index 13722b1..d135cb8 100644 --- a/packages/moxxmpp/lib/src/awaiter.dart +++ b/packages/moxxmpp/lib/src/awaiter.dart @@ -1,34 +1,9 @@ import 'dart:async'; -import 'package:meta/meta.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; -/// A surrogate key for awaiting stanzas. -@immutable -class _StanzaSurrogateKey { - const _StanzaSurrogateKey(this.sentTo, this.id, this.tag); - - /// The JID the original stanza was sent to. We expect the result to come from the - /// same JID. - final String? sentTo; - - /// The ID of the original stanza. We expect the result to have the same ID. - final String id; - - /// The tag name of the stanza. - final String tag; - - @override - int get hashCode => sentTo.hashCode ^ id.hashCode ^ tag.hashCode; - - @override - bool operator ==(Object other) { - return other is _StanzaSurrogateKey && - other.sentTo == sentTo && - other.id == id && - other.tag == tag; - } -} +/// (JID we sent a stanza to, the id of the sent stanza, the tag of the sent stanza). +typedef _StanzaCompositeKey = (String?, String, String); /// This class handles the await semantics for stanzas. Stanzas are given a "unique" /// key equal to the tuple (to, id, tag) with which their response is identified. @@ -40,7 +15,7 @@ class _StanzaSurrogateKey { /// This class also handles some "edge cases" of RFC 6120, like an empty "from" attribute. class StanzaAwaiter { /// The pending stanzas, identified by their surrogate key. - final Map<_StanzaSurrogateKey, Completer> _pending = {}; + final Map<_StanzaCompositeKey, Completer> _pending = {}; /// The critical section for accessing [StanzaAwaiter._pending]. final Lock _lock = Lock(); @@ -54,7 +29,7 @@ class StanzaAwaiter { Future> addPending(String? to, String id, String tag) async { final completer = await _lock.synchronized(() { final completer = Completer(); - _pending[_StanzaSurrogateKey(to, id, tag)] = completer; + _pending[(to, id, tag)] = completer; return completer; }); @@ -68,7 +43,7 @@ class StanzaAwaiter { final id = stanza.attributes['id'] as String?; if (id == null) return false; - final key = _StanzaSurrogateKey( + final key = ( stanza.attributes['from'] as String?, id, stanza.tag, @@ -92,7 +67,7 @@ class StanzaAwaiter { final id = stanza.attributes['id'] as String?; if (id == null) return false; - final key = _StanzaSurrogateKey( + final key = ( stanza.attributes['from'] as String?, id, stanza.tag, diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart index 152e2f8..abb9a4a 100644 --- a/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -5,6 +5,8 @@ import 'package:moxxmpp/src/awaiter.dart'; import 'package:moxxmpp/src/parser.dart'; import 'package:synchronized/synchronized.dart'; +/// A queue for incoming [XMPPStreamObject]s to ensure "in order" +/// processing (except for stanzas that are awaited). class IncomingStanzaQueue { IncomingStanzaQueue(this._callback, this._stanzaAwaiter); @@ -90,6 +92,7 @@ class IncomingStanzaQueue { } object as XMPPStreamElement; + // TODO: Check the from attribute to ensure that it is matched correctly. return _stanzaAwaiter.isAwaited(object.node); } } From 0a68f09fb4f56a30dfa6eacb0df3aa09ec312683 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:46:14 +0200 Subject: [PATCH 10/23] fix(style): Fix style issues --- packages/moxxmpp/lib/src/awaiter.dart | 20 +++++++++- packages/moxxmpp/lib/src/connection.dart | 19 +++------- .../moxxmpp/lib/src/util/incoming_queue.dart | 1 - packages/moxxmpp/test/awaiter_test.dart | 38 +++++++++++++++++-- 4 files changed, 58 insertions(+), 20 deletions(-) diff --git a/packages/moxxmpp/lib/src/awaiter.dart b/packages/moxxmpp/lib/src/awaiter.dart index d135cb8..6ffcf16 100644 --- a/packages/moxxmpp/lib/src/awaiter.dart +++ b/packages/moxxmpp/lib/src/awaiter.dart @@ -3,8 +3,12 @@ import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; /// (JID we sent a stanza to, the id of the sent stanza, the tag of the sent stanza). +// ignore: avoid_private_typedef_functions typedef _StanzaCompositeKey = (String?, String, String); +/// Callback function that returns the bare JID of the connection as a String. +typedef GetBareJidCallback = String Function(); + /// This class handles the await semantics for stanzas. Stanzas are given a "unique" /// key equal to the tuple (to, id, tag) with which their response is identified. /// @@ -14,6 +18,10 @@ typedef _StanzaCompositeKey = (String?, String, String); /// /// This class also handles some "edge cases" of RFC 6120, like an empty "from" attribute. class StanzaAwaiter { + StanzaAwaiter(this._bareJidCallback); + + final GetBareJidCallback _bareJidCallback; + /// The pending stanzas, identified by their surrogate key. final Map<_StanzaCompositeKey, Completer> _pending = {}; @@ -27,9 +35,12 @@ class StanzaAwaiter { /// /// Returns a future that might resolve to the response to the stanza. Future> addPending(String? to, String id, String tag) async { + // Check if we want to send a stanza to our bare JID and replace it with null. + final processedTo = to != null && to == _bareJidCallback() ? null : to; + final completer = await _lock.synchronized(() { final completer = Completer(); - _pending[(to, id, tag)] = completer; + _pending[(processedTo, id, tag)] = completer; return completer; }); @@ -43,8 +54,13 @@ class StanzaAwaiter { final id = stanza.attributes['id'] as String?; if (id == null) return false; + // Check if we want to send a stanza to our bare JID and replace it with null. + final from = stanza.attributes['from'] as String?; + final processedFrom = + from != null && from == _bareJidCallback() ? null : from; + final key = ( - stanza.attributes['from'] as String?, + processedFrom, id, stanza.tag, ); diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 7584034..1cffafe 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -90,6 +90,9 @@ class XmppConnection { }, ); + _stanzaAwaiter = StanzaAwaiter( + () => connectionSettings.jid.toBare().toString(), + ); _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done @@ -125,7 +128,7 @@ class XmppConnection { final ConnectivityManager _connectivityManager; /// A helper for handling await semantics with stanzas - final StanzaAwaiter _stanzaAwaiter = StanzaAwaiter(); + late final StanzaAwaiter _stanzaAwaiter; /// Sorted list of handlers that we call or incoming and outgoing stanzas final List<_StanzaHandlerWrapper> _incomingStanzaHandlers = @@ -531,15 +534,12 @@ class XmppConnection { _log.finest('==> $prefix${newStanza.toXml()}'); if (details.awaitable) { - final isOwnJid = - data.stanza.to == connectionSettings.jid.toBare().toString(); - await _stanzaAwaiter .addPending( // A stanza with no to attribute is for direct processing by the server. As such, // we can correlate it by just *assuming* we have that attribute // (RFC 6120 Section 8.1.1.1) - isOwnJid ? null : data.stanza.to, + data.stanza.to, data.stanza.id!, data.stanza.tag, ) @@ -774,15 +774,8 @@ class XmppConnection { return; } - // In case the stanza came from our own bare Jid, remove it so that the stanza - // awaiter works correctly. - final isOwnJid = incomingPreHandlers.stanza.from == - connectionSettings.jid.toBare().toString(); - final ownJidStanza = isOwnJid - ? incomingPreHandlers.stanza.copyWith(from: null) - : incomingPreHandlers.stanza; final awaited = await _stanzaAwaiter.onData( - ownJidStanza, + incomingPreHandlers.stanza, ); if (awaited) { return; diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart index abb9a4a..5b0c653 100644 --- a/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -92,7 +92,6 @@ class IncomingStanzaQueue { } object as XMPPStreamElement; - // TODO: Check the from attribute to ensure that it is matched correctly. return _stanzaAwaiter.isAwaited(object.node); } } diff --git a/packages/moxxmpp/test/awaiter_test.dart b/packages/moxxmpp/test/awaiter_test.dart index 8367305..f9b1813 100644 --- a/packages/moxxmpp/test/awaiter_test.dart +++ b/packages/moxxmpp/test/awaiter_test.dart @@ -2,9 +2,12 @@ import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/src/awaiter.dart'; import 'package:test/test.dart'; +const bareJid = 'user4@example.org'; +String getBareJidCallback() => bareJid; + void main() { test('Test awaiting an awaited stanza with a from attribute', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza final future = await awaiter.addPending( @@ -39,7 +42,7 @@ void main() { }); test('Test awaiting an awaited stanza without a from attribute', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza final future = await awaiter.addPending(null, 'abc123', 'iq'); @@ -60,7 +63,7 @@ void main() { }); test('Test awaiting a stanza that was already awaited', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza final future = await awaiter.addPending(null, 'abc123', 'iq'); @@ -81,7 +84,7 @@ void main() { }); test('Test ignoring a stanza that has the wrong tag', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza final future = await awaiter.addPending(null, 'abc123', 'iq'); @@ -100,4 +103,31 @@ void main() { expect(result2, true); expect(await future, stanza); }); + + test('Sending a stanza to our bare JID', () async { + final awaiter = StanzaAwaiter(getBareJidCallback); + + // "Send" a stanza + final future = await awaiter.addPending(bareJid, 'abc123', 'iq'); + + // Receive the response. + final stanza = XMLNode.fromString(''); + await awaiter.onData(stanza); + expect(await future, stanza); + }); + + test( + 'Sending a stanza to our bare JID and receiving stanza with a from attribute', + () async { + final awaiter = StanzaAwaiter(getBareJidCallback); + + // "Send" a stanza + final future = await awaiter.addPending(bareJid, 'abc123', 'iq'); + + // Receive the response. + final stanza = + XMLNode.fromString(''); + await awaiter.onData(stanza); + expect(await future, stanza); + }); } From 41b789fa28d4b4aa2cf642197a08a07a967f8b6b Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:50:03 +0200 Subject: [PATCH 11/23] feat(core): Stop an exception in a handler to deadlock the connection --- packages/moxxmpp/CHANGELOG.md | 3 +++ packages/moxxmpp/lib/src/connection.dart | 8 +++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/moxxmpp/CHANGELOG.md b/packages/moxxmpp/CHANGELOG.md index a8c57f6..ee3c3d7 100644 --- a/packages/moxxmpp/CHANGELOG.md +++ b/packages/moxxmpp/CHANGELOG.md @@ -23,6 +23,9 @@ - *BREAKING*: `UserAvatarManager`'s `getAvatarId` with `getLatestMetadata`. - The `PubSubManager` now supports PubSub's `max_items` in `getItems`. - *BREAKING*: `vCardManager`'s `VCardAvatarUpdatedEvent` no longer automatically requests the newest VCard avatar. +- *BREAKING*: `XmppConnection` now tries to ensure that incoming data is processed in-order. The only exception are awaited stanzas as they are allowed to bypass the queue. +- *BREAKING*: If a stanza handler causes an exception, the handler is simply skipped while processing. +- Add better logging around what stanza handler is running and if they end processing early. ## 0.3.1 diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 1cffafe..bc8466d 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -685,7 +685,13 @@ class XmppConnection { _log.finest( 'Running handler for ${stanza.tag} (${stanza.attributes["id"]}) of $managerName', ); - state = await handler.callback(state.stanza, state); + try { + state = await handler.callback(state.stanza, state); + } catch (ex) { + _log.severe( + 'Handler from $managerName for ${stanza.tag} (${stanza.attributes["id"]}) failed with "$ex"', + ); + } if (state.done || state.cancel) { _log.finest( 'Processing ended early for ${stanza.tag} (${stanza.attributes["id"]}) by $managerName', From 87866bf3f5dd94194798fe41da17cfc3829b5bdd Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 20:53:29 +0200 Subject: [PATCH 12/23] fix(core): Allow disabling logging of all incoming and outgoing data --- packages/moxxmpp_socket_tcp/CHANGELOG.md | 5 +++++ .../integration_test/badxmpp_certificate_test.dart | 2 +- .../integration_test/failure_reconnection_test.dart | 4 ++-- packages/moxxmpp_socket_tcp/lib/src/socket.dart | 13 +++++++++++-- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/packages/moxxmpp_socket_tcp/CHANGELOG.md b/packages/moxxmpp_socket_tcp/CHANGELOG.md index d0a7586..5902472 100644 --- a/packages/moxxmpp_socket_tcp/CHANGELOG.md +++ b/packages/moxxmpp_socket_tcp/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.4.0 + +- Keep version in sync with moxxmpp +- *BREAKING*: `TCPSocketWrapper` now takes a boolean parameter that enables logging of all incoming and outgoing data. + ## 0.3.1 - Keep version in sync with moxxmpp diff --git a/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart b/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart index 25e7aa9..b4c5a7d 100644 --- a/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart +++ b/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart @@ -5,7 +5,7 @@ import 'package:test/test.dart'; Future _runTest(String domain) async { var gotTLSException = false; - final socket = TCPSocketWrapper(); + final socket = TCPSocketWrapper(true); final log = Logger('TestLogger'); socket.getEventStream().listen((event) { if (event is XmppSocketTLSFailedEvent) { diff --git a/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart b/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart index d9eb4d0..4d012c0 100644 --- a/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart +++ b/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart @@ -19,7 +19,7 @@ void main() { TestingSleepReconnectionPolicy(10), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - TCPSocketWrapper(), + TCPSocketWrapper(true), )..connectionSettings = ConnectionSettings( jid: JID.fromString('testuser@no-sasl.badxmpp.eu'), password: 'abc123', @@ -59,7 +59,7 @@ void main() { TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - TCPSocketWrapper(), + TCPSocketWrapper(true), )..connectionSettings = ConnectionSettings( jid: JID.fromString('testuser@no-sasl.badxmpp.eu'), password: 'abc123', diff --git a/packages/moxxmpp_socket_tcp/lib/src/socket.dart b/packages/moxxmpp_socket_tcp/lib/src/socket.dart index 7922bdc..ce2e6c6 100644 --- a/packages/moxxmpp_socket_tcp/lib/src/socket.dart +++ b/packages/moxxmpp_socket_tcp/lib/src/socket.dart @@ -10,6 +10,11 @@ import 'package:moxxmpp_socket_tcp/src/rfc_2782.dart'; /// TCP socket implementation for XmppConnection class TCPSocketWrapper extends BaseSocketWrapper { + TCPSocketWrapper(this._logIncomingOutgoing); + + /// Flag controlling whether incoming/outgoing data is logged or not. + final bool _logIncomingOutgoing; + /// The underlying Socket/SecureSocket instance. Socket? _socket; @@ -212,7 +217,9 @@ class TCPSocketWrapper extends BaseSocketWrapper { _socketSubscription = _socket!.listen( (List event) { final data = utf8.decode(event); - _log.finest('<== $data'); + if (_logIncomingOutgoing) { + _log.finest('<== $data'); + } _dataStream.add(data); }, onError: (Object error) { @@ -297,7 +304,9 @@ class TCPSocketWrapper extends BaseSocketWrapper { return; } - _log.finest('==> $data'); + if (_logIncomingOutgoing) { + _log.finest('==> $data'); + } try { _socket!.write(data); From e7922668b1e3139b9208a76ad9bb4f4e66b5f304 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 21:13:57 +0200 Subject: [PATCH 13/23] feat(core): Add a callback for raw data events --- packages/moxxmpp/lib/src/connection.dart | 15 +++++++++++++-- packages/moxxmpp/lib/src/managers/base.dart | 3 +++ .../moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart | 5 +++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index bc8466d..d65eaed 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -71,7 +71,11 @@ class XmppConnection { this._socket, { this.connectingTimeout = const Duration(minutes: 2), }) : _reconnectionPolicy = reconnectionPolicy, - _connectivityManager = connectivityManager { + _connectivityManager = connectivityManager, + assert( + _socket.getDataStream().isBroadcast, + "The socket's data stream must be a broadcast stream", + ) { // Allow the reconnection policy to perform reconnections by itself _reconnectionPolicy.register( _attemptReconnection, @@ -95,10 +99,10 @@ class XmppConnection { ); _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); - // TODO(Unknown): Handle on done _socketStream .transform(_streamParser) .forEach(_incomingStanzaQueue.addStanza); + _socketStream.listen(_handleOnDataCallbacks); _socket.getEventStream().listen(handleSocketEvent); _stanzaQueue = AsyncStanzaQueue( @@ -314,6 +318,13 @@ class XmppConnection { return getManagerById(csiManager); } + /// Called whenever we receive data on the socket. + Future _handleOnDataCallbacks(String _) async { + for (final manager in _xmppManagers.values) { + unawaited(manager.onData()); + } + } + /// Attempts to reconnect to the server by following an exponential backoff. Future _attemptReconnection() async { _log.finest('_attemptReconnection: Setting state to notConnected'); diff --git a/packages/moxxmpp/lib/src/managers/base.dart b/packages/moxxmpp/lib/src/managers/base.dart index ab61ed3..ae14458 100644 --- a/packages/moxxmpp/lib/src/managers/base.dart +++ b/packages/moxxmpp/lib/src/managers/base.dart @@ -80,6 +80,9 @@ abstract class XmppManagerBase { /// handler's priority, the earlier it is run. List getNonzaHandlers() => []; + /// Whenever the socket receives data, this method is called, if it is non-null. + Future onData() async {} + /// Return a list of features that should be included in a disco response. List getDiscoFeatures() => []; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart index 49082d9..f89a996 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart @@ -75,6 +75,11 @@ class StreamManagementManager extends XmppManagerBase { return acks; } + @override + Future onData() async { + logger.finest('Got data!'); + } + /// Called when a stanza has been acked to decide whether we should trigger a /// StanzaAckedEvent. /// From 1f712151e442d00a4525bede5135af687d75b3b7 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 21:18:43 +0200 Subject: [PATCH 14/23] feat(xep): Ignore the ack timer if we are receiving data --- .../moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart index f89a996..df34553 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart @@ -77,7 +77,13 @@ class StreamManagementManager extends XmppManagerBase { @override Future onData() async { - logger.finest('Got data!'); + // The ack timer does not matter if we are currently in the middle of receiving + // data. + await _ackLock.synchronized(() { + if (_pendingAcks > 0) { + _resetAckTimer(); + } + }); } /// Called when a stanza has been acked to decide whether we should trigger a @@ -230,6 +236,12 @@ class StreamManagementManager extends XmppManagerBase { _ackTimer = null; } + /// Resets the ack timer. + void _resetAckTimer() { + _stopAckTimer(); + _startAckTimer(); + } + @visibleForTesting Future handleAckTimeout() async { _stopAckTimer(); @@ -320,8 +332,7 @@ class StreamManagementManager extends XmppManagerBase { // Reset the timer if (_pendingAcks > 0) { - _stopAckTimer(); - _startAckTimer(); + _resetAckTimer(); } } From 882d20dc7a8d3abca7fc045d4f99014e94cf77a5 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 21:19:28 +0200 Subject: [PATCH 15/23] feat(core): Bump moxxmpp_socket_tcp's version --- packages/moxxmpp_socket_tcp/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/moxxmpp_socket_tcp/pubspec.yaml b/packages/moxxmpp_socket_tcp/pubspec.yaml index 380f343..3f887f6 100644 --- a/packages/moxxmpp_socket_tcp/pubspec.yaml +++ b/packages/moxxmpp_socket_tcp/pubspec.yaml @@ -1,6 +1,6 @@ name: moxxmpp_socket_tcp description: A socket for moxxmpp using TCP that implements the RFC6120 connection algorithm and XEP-0368 -version: 0.3.1 +version: 0.4.0 homepage: https://codeberg.org/moxxy/moxxmpp publish_to: https://git.polynom.me/api/packages/Moxxy/pub From e97d6e6517c756de830f550528c6006996cf8a7c Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Fri, 29 Sep 2023 22:45:56 +0200 Subject: [PATCH 16/23] feat(xep): Track our own affiliation and role --- .../moxxmpp/lib/src/xeps/xep_0045/events.dart | 17 ++++++++-- .../moxxmpp/lib/src/xeps/xep_0045/types.dart | 7 ++++ .../lib/src/xeps/xep_0045/xep_0045.dart | 32 +++++++++++++------ 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart index 8eec00f..2ce352f 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart @@ -3,14 +3,25 @@ import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; /// Triggered when the MUC changes our nickname. -class NickChangedByMUCEvent extends XmppEvent { - NickChangedByMUCEvent(this.roomJid, this.nick); +class OwnDataChangedEvent extends XmppEvent { + OwnDataChangedEvent( + this.roomJid, + this.nick, + this.affiliation, + this.role, + ); /// The JID of the room. final JID roomJid; - /// The new nickname. + /// Our nickname. final String nick; + + /// Our affiliation. + final Affiliation affiliation; + + /// Our role. + final Role role; } /// Triggered when an entity joins the MUC. diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart index 06bb806..58ba309 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart @@ -149,6 +149,13 @@ class RoomState { /// Flag whether we're joined and can process messages bool joined; + /// Our own affiliation inside the MUC. + Affiliation? affiliation; + + /// Our own role inside the MUC. + Role? role; + + /// The list of messages that we sent and are waiting for their echo. late final List pendingMessages; /// "List" of entities inside the MUC. diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index 08b08cb..0b121c5 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -300,20 +300,32 @@ class MUCManager extends XmppManagerBase { final role = Role.fromString( item.attributes['role']! as String, ); + final affiliation = Affiliation.fromString( + item.attributes['affiliation']! as String, + ); if (statuses.contains('110')) { - if (room.nick != from.resource) { - // Notify us of the changed nick. - getAttributes().sendEvent( - NickChangedByMUCEvent( - bareFrom, - from.resource, - ), - ); + if (room.joined) { + if (room.nick != from.resource || + room.affiliation != affiliation || + room.role != role) { + // Notify us of the changed data. + getAttributes().sendEvent( + OwnDataChangedEvent( + bareFrom, + from.resource, + affiliation, + role, + ), + ); + } } - // Set the nick to make sure we're in sync with the MUC. - room.nick = from.resource; + // Set the data to make sure we're in sync with the MUC. + room + ..nick = from.resource + ..affiliation = affiliation + ..role = role; logger.finest('Self-presence handled'); return StanzaHandlerData( true, From 6d3a5e98de68f002f405006caf354496b7f7a09d Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Sun, 1 Oct 2023 13:33:58 +0200 Subject: [PATCH 17/23] fix(xep): Export XEP-0045 events --- packages/moxxmpp/lib/moxxmpp.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/moxxmpp/lib/moxxmpp.dart b/packages/moxxmpp/lib/moxxmpp.dart index 00c1c43..6560fe4 100644 --- a/packages/moxxmpp/lib/moxxmpp.dart +++ b/packages/moxxmpp/lib/moxxmpp.dart @@ -47,6 +47,7 @@ export 'package:moxxmpp/src/xeps/xep_0030/helpers.dart'; export 'package:moxxmpp/src/xeps/xep_0030/types.dart'; export 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; export 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; +export 'package:moxxmpp/src/xeps/xep_0045/events.dart'; export 'package:moxxmpp/src/xeps/xep_0045/types.dart'; export 'package:moxxmpp/src/xeps/xep_0045/xep_0045.dart'; export 'package:moxxmpp/src/xeps/xep_0054.dart'; From 007cdce53d99b218100fede7735ea367a161e44e Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Sun, 1 Oct 2023 13:34:13 +0200 Subject: [PATCH 18/23] fix(xep): Fix wrong event being triggered on join --- .../lib/src/xeps/xep_0045/xep_0045.dart | 4 +- packages/moxxmpp/test/xeps/xep_0045_test.dart | 357 ++++++++++++++++++ 2 files changed, 359 insertions(+), 2 deletions(-) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index 0b121c5..dc68132 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -360,14 +360,14 @@ class MUCManager extends XmppManagerBase { if (room.joined) { if (room.members.containsKey(from.resource)) { getAttributes().sendEvent( - MemberJoinedEvent( + MemberChangedEvent( bareFrom, member, ), ); } else { getAttributes().sendEvent( - MemberChangedEvent( + MemberJoinedEvent( bareFrom, member, ), diff --git a/packages/moxxmpp/test/xeps/xep_0045_test.dart b/packages/moxxmpp/test/xeps/xep_0045_test.dart index 0f92aae..de83d51 100644 --- a/packages/moxxmpp/test/xeps/xep_0045_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0045_test.dart @@ -332,4 +332,361 @@ void main() { ); }, ); + + test( + 'Testing a user joining a room', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a new user joins the room. + MemberJoinedEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberJoinedEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.member.nick, 'papatutuwawa'); + expect(event!.member.affiliation, Affiliation.admin); + expect(event!.member.role, Role.participant); + + final roomAfterJoin = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterJoin.members.length, 3); + }, + ); + + test( + 'Testing a user leaving a room', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a new user joins the room. + MemberLeftEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberLeftEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.nick, 'secondwitch'); + + final roomAfterLeave = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterLeave.members.length, 1); + }, + ); } From 93e9d6ca220b16934730559259f882bb2229e8f2 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Sun, 1 Oct 2023 20:44:47 +0200 Subject: [PATCH 19/23] feat(xep): Handle a user changing their nickname --- .../moxxmpp/lib/src/xeps/xep_0045/events.dart | 14 ++ .../lib/src/xeps/xep_0045/status_codes.dart | 2 + .../lib/src/xeps/xep_0045/xep_0045.dart | 59 ++++-- packages/moxxmpp/test/xeps/xep_0045_test.dart | 184 +++++++++++++++++- 4 files changed, 244 insertions(+), 15 deletions(-) create mode 100644 packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart index 2ce352f..f3f795d 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart @@ -56,3 +56,17 @@ class MemberLeftEvent extends XmppEvent { /// The nick of the user who left. final String nick; } + +/// Triggered when an entity changes their nick. +class MemberChangedNickEvent extends XmppEvent { + MemberChangedNickEvent(this.roomJid, this.oldNick, this.newNick); + + /// The JID of the room. + final JID roomJid; + + /// The original nick. + final String oldNick; + + /// The new nick. + final String newNick; +} diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart new file mode 100644 index 0000000..734029a --- /dev/null +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart @@ -0,0 +1,2 @@ +const selfPresenceStatus = '110'; +const nicknameChangedStatus = '303'; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index dc68132..60b180b 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -14,6 +14,7 @@ import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0045/events.dart'; +import 'package:moxxmpp/src/xeps/xep_0045/status_codes.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; import 'package:moxxmpp/src/xeps/xep_0359.dart'; import 'package:synchronized/synchronized.dart'; @@ -304,7 +305,7 @@ class MUCManager extends XmppManagerBase { item.attributes['affiliation']! as String, ); - if (statuses.contains('110')) { + if (statuses.contains(selfPresenceStatus)) { if (room.joined) { if (room.nick != from.resource || room.affiliation != affiliation || @@ -335,19 +336,49 @@ class MUCManager extends XmppManagerBase { ); } - if (presence.attributes['type'] == 'unavailable' && role == Role.none) { - // Cannot happen while joining, so we assume we are joined - assert( - room.joined, - 'Should not receive unavailable with role="none" while joining', - ); - room.members.remove(from.resource); - getAttributes().sendEvent( - MemberLeftEvent( - bareFrom, - from.resource, - ), - ); + if (presence.attributes['type'] == 'unavailable') { + if (role == Role.none) { + // Cannot happen while joining, so we assume we are joined + assert( + room.joined, + 'Should not receive unavailable with role="none" while joining', + ); + room.members.remove(from.resource); + getAttributes().sendEvent( + MemberLeftEvent( + bareFrom, + from.resource, + ), + ); + } else if (statuses.contains(nicknameChangedStatus)) { + assert( + room.joined, + 'Should not receive nick change while joining', + ); + final newNick = item.attributes['nick']! as String; + final member = RoomMember( + newNick, + Affiliation.fromString( + item.attributes['affiliation']! as String, + ), + role, + ); + + // Remove the old member. + room.members.remove(from.resource); + + // Add the "new" member". + room.members[newNick] = member; + + // Trigger an event. + getAttributes().sendEvent( + MemberChangedNickEvent( + bareFrom, + from.resource, + newNick, + ), + ); + } } else { final member = RoomMember( from.resource, diff --git a/packages/moxxmpp/test/xeps/xep_0045_test.dart b/packages/moxxmpp/test/xeps/xep_0045_test.dart index de83d51..d857552 100644 --- a/packages/moxxmpp/test/xeps/xep_0045_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0045_test.dart @@ -658,7 +658,7 @@ void main() { 2, ); - // Now a new user joins the room. + // Now a user leaves the room. MemberLeftEvent? event; conn.asBroadcastStream().listen((e) { if (e is MemberLeftEvent) { @@ -689,4 +689,186 @@ void main() { expect(roomAfterLeave.members.length, 1); }, ); + + test( + 'Test a user changing their nick name', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a new user changes their nick. + MemberChangedNickEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberChangedNickEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.oldNick, 'firstwitch'); + expect(event!.newNick, 'papatutuwawa'); + + final roomAfterChange = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterChange.members.length, 2); + expect(roomAfterChange.members['firstwitch'], null); + expect(roomAfterChange.members['papatutuwawa'] != null, true); + }, + ); } From f287d501ab861a7766184d85664c7d036deac439 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 4 Oct 2023 22:15:24 +0200 Subject: [PATCH 20/23] fix(example): Comform examples to the new TCPSocketWrapper constructor --- examples_dart/bin/component.dart | 2 ++ examples_dart/bin/omemo_client.dart | 2 +- examples_dart/lib/socket.dart | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples_dart/bin/component.dart b/examples_dart/bin/component.dart index 41d60ec..4208198 100644 --- a/examples_dart/bin/component.dart +++ b/examples_dart/bin/component.dart @@ -3,6 +3,8 @@ import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; class TestingTCPSocketWrapper extends TCPSocketWrapper { + TestingTCPSocketWrapper() : super(true); + @override bool onBadCertificate(dynamic certificate, String domain) { return true; diff --git a/examples_dart/bin/omemo_client.dart b/examples_dart/bin/omemo_client.dart index 5241d03..191c3e4 100644 --- a/examples_dart/bin/omemo_client.dart +++ b/examples_dart/bin/omemo_client.dart @@ -30,7 +30,7 @@ void main(List args) async { TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - ExampleTCPSocketWrapper(parser.srvRecord), + ExampleTCPSocketWrapper(parser.srvRecord, true), )..connectionSettings = parser.connectionSettings; // Generate OMEMO data diff --git a/examples_dart/lib/socket.dart b/examples_dart/lib/socket.dart index 3031050..8f217df 100644 --- a/examples_dart/lib/socket.dart +++ b/examples_dart/lib/socket.dart @@ -3,7 +3,7 @@ import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; /// A simple socket for examples that allows injection of SRV records (since /// we cannot use moxdns here). class ExampleTCPSocketWrapper extends TCPSocketWrapper { - ExampleTCPSocketWrapper(this.srvRecord); + ExampleTCPSocketWrapper(this.srvRecord, bool logData) : super(logData); /// A potential SRV record to inject for testing. final MoxSrvRecord? srvRecord; From 97f082b6f5018d4db4e3ae94b14768f6ca1f9fa8 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 4 Oct 2023 22:15:59 +0200 Subject: [PATCH 21/23] feat(example): Add a very simple client example --- examples_dart/bin/simple_client.dart | 112 +++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 examples_dart/bin/simple_client.dart diff --git a/examples_dart/bin/simple_client.dart b/examples_dart/bin/simple_client.dart new file mode 100644 index 0000000..00070f7 --- /dev/null +++ b/examples_dart/bin/simple_client.dart @@ -0,0 +1,112 @@ +import 'package:logging/logging.dart'; +import 'package:moxxmpp/moxxmpp.dart'; +import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; + +/// The JID we want to authenticate as. +final xmppUser = JID.fromString('jane@example.com'); + +/// The password to authenticate with. +const xmppPass = 'secret'; + +/// The [xmppHost]:[xmppPort] server address to connect to. +/// In a real application, one might prefer to use [TCPSocketWrapper] +/// with a custom DNS implementation to let moxxmpp resolve the XMPP +/// server's address automatically. However, if we just provide a host +/// and a port, then [TCPSocketWrapper] will just skip the resolution and +/// immediately use the provided connection details. +const xmppHost = 'localhost'; +const xmppPort = 5222; + +void main(List args) async { + Logger.root.level = Level.ALL; + Logger.root.onRecord.listen((record) { + print('${record.level.name}|${record.time}: ${record.message}'); + }); + + // This class manages every aspect of handling the XMPP stream. + final connection = XmppConnection( + // A reconnection policy tells the connection how to handle an error + // while or after connecting to the server. The [TestingReconnectionPolicy] + // immediately triggers a reconnection. In a real implementation, one might + // prefer to use a smarter strategy, like using an exponential backoff. + TestingReconnectionPolicy(), + + // A connectivity manager tells the connection when it can connect. This is to + // ensure that we're not constantly trying to reconnect because we have no + // Internet connection. [AlwaysConnectedConnectivityManager] always says that + // we're connected. In a real application, one might prefer to use a smarter + // strategy, like using connectivity_plus to query the system's network connectivity + // state. + AlwaysConnectedConnectivityManager(), + + // This kind of negotiator tells the connection how to handle the stream + // negotiations. The [ClientToServerNegotiator] allows to connect to the server + // as a regular client. Another negotiator would be the [ComponentToServerNegotiator] that + // allows for connections to the server where we're acting as a component. + ClientToServerNegotiator(), + + // A wrapper around any kind of connection. In this case, we use the [TCPSocketWrapper], which + // uses a dart:io Socket/SecureSocket to connect to the server. If you want, you can also + // provide your own socket to use, for example, WebSockets or any other connection + // mechanism. + TCPSocketWrapper(false), + )..connectionSettings = ConnectionSettings( + jid: xmppUser, + password: xmppPass, + host: xmppHost, + port: xmppPort, + ); + + // Register a set of "managers" that provide you with implementations of various + // XEPs. Some have interdependencies, which need to be met. However, this example keeps + // it simple and just registers a [MessageManager], which has no required dependencies. + await connection.registerManagers([ + // The [MessageManager] handles receiving and sending stanzas. + MessageManager(), + ]); + + // Feature negotiators are objects that tell the connection negotiator what stream features + // we can negotiate and enable. moxxmpp negotiators always try to enable their features. + await connection.registerFeatureNegotiators([ + // This negotiator authenticates to the server using SASL PLAIN with the provided + // credentials. + SaslPlainNegotiator(), + // This negotiator attempts to bind a resource. By default, it's always a random one. + ResourceBindingNegotiator(), + // This negotiator attempts to do StartTLS before authenticating. + StartTlsNegotiator(), + ]); + + // Set up a stream handler for the connection's event stream. Managers and negotiators + // may trigger certain events. The [MessageManager], for example, triggers a [MessageEvent] + // whenever a message is received. If other managers are registered that parse a message's + // contents, then they can add their data to the event. + connection.asBroadcastStream().listen((event) { + if (event is! MessageEvent) { + return; + } + + // The text body (contents of the element) are returned as a + // [MessageBodyData] object. However, a message does not have to contain a + // body, so it is nullable. + final body = event.extensions.get()?.body; + print('[<-- ${event.from}] $body'); + }); + + // Connect to the server. + final result = await connection.connect( + // This flag indicates that we want to reconnect in case something happens. + shouldReconnect: true, + // This flag indicates that we want the returned Future to only resolve + // once the stream negotiations are done and no negotiator has any feature left + // to negotiate. + waitUntilLogin: true, + ); + + // Check if the connection was successful. [connection.connect] can return a boolean + // to indicate success or a [XmppError] in case the connection attempt failed. + if (!result.isType()) { + print('Failed to connect to server'); + return; + } +} From 8a2435e4add2469cb304fa41033da00c4dc803f2 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 4 Oct 2023 22:35:00 +0200 Subject: [PATCH 22/23] fix(example): Bump moxxmpp* versions to 0.4.0 --- examples_dart/pubspec.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples_dart/pubspec.yaml b/examples_dart/pubspec.yaml index b2cd194..3c92787 100644 --- a/examples_dart/pubspec.yaml +++ b/examples_dart/pubspec.yaml @@ -12,10 +12,10 @@ dependencies: logging: ^1.0.2 moxxmpp: hosted: https://git.polynom.me/api/packages/Moxxy/pub - version: 0.3.1 + version: 0.4.0 moxxmpp_socket_tcp: hosted: https://git.polynom.me/api/packages/Moxxy/pub - version: 0.3.1 + version: 0.4.0 omemo_dart: hosted: https://git.polynom.me/api/packages/PapaTutuWawa/pub version: ^0.5.1 From be7581e841db9d89d9af2b4686c891b131a4e131 Mon Sep 17 00:00:00 2001 From: "Alexander \"PapaTutuWawa" Date: Wed, 4 Oct 2023 22:42:36 +0200 Subject: [PATCH 23/23] fix(core): Remove empty file --- .../moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart | 1 - 1 file changed, 1 deletion(-) delete mode 100644 packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart diff --git a/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart deleted file mode 100644 index 8b13789..0000000 --- a/packages/moxxmpp/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ /dev/null @@ -1 +0,0 @@ -