diff --git a/examples_dart/bin/component.dart b/examples_dart/bin/component.dart index 41d60ec..4208198 100644 --- a/examples_dart/bin/component.dart +++ b/examples_dart/bin/component.dart @@ -3,6 +3,8 @@ import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; class TestingTCPSocketWrapper extends TCPSocketWrapper { + TestingTCPSocketWrapper() : super(true); + @override bool onBadCertificate(dynamic certificate, String domain) { return true; diff --git a/examples_dart/bin/muc_client.dart b/examples_dart/bin/muc_client.dart index 4e88c84..1eeb11e 100644 --- a/examples_dart/bin/muc_client.dart +++ b/examples_dart/bin/muc_client.dart @@ -75,11 +75,16 @@ void main(List args) async { }); // Join room - await connection.getManagerById(mucManager)!.joinRoom( - muc, - nick, - maxHistoryStanzas: 0, - ); + final mm = connection.getManagerById(mucManager)!; + await mm.joinRoom( + muc, + nick, + maxHistoryStanzas: 0, + ); + final state = (await mm.getRoomState(muc))!; + + print('=====> ${state.members.length} users in room'); + print('=====> ${state.members.values.map((m) => m.nick).join(", ")}'); final repl = Repl(prompt: '> '); await for (final line in repl.runAsync()) { diff --git a/examples_dart/bin/omemo_client.dart b/examples_dart/bin/omemo_client.dart index 5241d03..191c3e4 100644 --- a/examples_dart/bin/omemo_client.dart +++ b/examples_dart/bin/omemo_client.dart @@ -30,7 +30,7 @@ void main(List args) async { TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - ExampleTCPSocketWrapper(parser.srvRecord), + ExampleTCPSocketWrapper(parser.srvRecord, true), )..connectionSettings = parser.connectionSettings; // Generate OMEMO data diff --git a/examples_dart/bin/simple_client.dart b/examples_dart/bin/simple_client.dart new file mode 100644 index 0000000..00070f7 --- /dev/null +++ b/examples_dart/bin/simple_client.dart @@ -0,0 +1,112 @@ +import 'package:logging/logging.dart'; +import 'package:moxxmpp/moxxmpp.dart'; +import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; + +/// The JID we want to authenticate as. +final xmppUser = JID.fromString('jane@example.com'); + +/// The password to authenticate with. +const xmppPass = 'secret'; + +/// The [xmppHost]:[xmppPort] server address to connect to. +/// In a real application, one might prefer to use [TCPSocketWrapper] +/// with a custom DNS implementation to let moxxmpp resolve the XMPP +/// server's address automatically. However, if we just provide a host +/// and a port, then [TCPSocketWrapper] will just skip the resolution and +/// immediately use the provided connection details. +const xmppHost = 'localhost'; +const xmppPort = 5222; + +void main(List args) async { + Logger.root.level = Level.ALL; + Logger.root.onRecord.listen((record) { + print('${record.level.name}|${record.time}: ${record.message}'); + }); + + // This class manages every aspect of handling the XMPP stream. + final connection = XmppConnection( + // A reconnection policy tells the connection how to handle an error + // while or after connecting to the server. The [TestingReconnectionPolicy] + // immediately triggers a reconnection. In a real implementation, one might + // prefer to use a smarter strategy, like using an exponential backoff. + TestingReconnectionPolicy(), + + // A connectivity manager tells the connection when it can connect. This is to + // ensure that we're not constantly trying to reconnect because we have no + // Internet connection. [AlwaysConnectedConnectivityManager] always says that + // we're connected. In a real application, one might prefer to use a smarter + // strategy, like using connectivity_plus to query the system's network connectivity + // state. + AlwaysConnectedConnectivityManager(), + + // This kind of negotiator tells the connection how to handle the stream + // negotiations. The [ClientToServerNegotiator] allows to connect to the server + // as a regular client. Another negotiator would be the [ComponentToServerNegotiator] that + // allows for connections to the server where we're acting as a component. + ClientToServerNegotiator(), + + // A wrapper around any kind of connection. In this case, we use the [TCPSocketWrapper], which + // uses a dart:io Socket/SecureSocket to connect to the server. If you want, you can also + // provide your own socket to use, for example, WebSockets or any other connection + // mechanism. + TCPSocketWrapper(false), + )..connectionSettings = ConnectionSettings( + jid: xmppUser, + password: xmppPass, + host: xmppHost, + port: xmppPort, + ); + + // Register a set of "managers" that provide you with implementations of various + // XEPs. Some have interdependencies, which need to be met. However, this example keeps + // it simple and just registers a [MessageManager], which has no required dependencies. + await connection.registerManagers([ + // The [MessageManager] handles receiving and sending stanzas. + MessageManager(), + ]); + + // Feature negotiators are objects that tell the connection negotiator what stream features + // we can negotiate and enable. moxxmpp negotiators always try to enable their features. + await connection.registerFeatureNegotiators([ + // This negotiator authenticates to the server using SASL PLAIN with the provided + // credentials. + SaslPlainNegotiator(), + // This negotiator attempts to bind a resource. By default, it's always a random one. + ResourceBindingNegotiator(), + // This negotiator attempts to do StartTLS before authenticating. + StartTlsNegotiator(), + ]); + + // Set up a stream handler for the connection's event stream. Managers and negotiators + // may trigger certain events. The [MessageManager], for example, triggers a [MessageEvent] + // whenever a message is received. If other managers are registered that parse a message's + // contents, then they can add their data to the event. + connection.asBroadcastStream().listen((event) { + if (event is! MessageEvent) { + return; + } + + // The text body (contents of the element) are returned as a + // [MessageBodyData] object. However, a message does not have to contain a + // body, so it is nullable. + final body = event.extensions.get()?.body; + print('[<-- ${event.from}] $body'); + }); + + // Connect to the server. + final result = await connection.connect( + // This flag indicates that we want to reconnect in case something happens. + shouldReconnect: true, + // This flag indicates that we want the returned Future to only resolve + // once the stream negotiations are done and no negotiator has any feature left + // to negotiate. + waitUntilLogin: true, + ); + + // Check if the connection was successful. [connection.connect] can return a boolean + // to indicate success or a [XmppError] in case the connection attempt failed. + if (!result.isType()) { + print('Failed to connect to server'); + return; + } +} diff --git a/examples_dart/lib/socket.dart b/examples_dart/lib/socket.dart index 3031050..8f217df 100644 --- a/examples_dart/lib/socket.dart +++ b/examples_dart/lib/socket.dart @@ -3,7 +3,7 @@ import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart'; /// A simple socket for examples that allows injection of SRV records (since /// we cannot use moxdns here). class ExampleTCPSocketWrapper extends TCPSocketWrapper { - ExampleTCPSocketWrapper(this.srvRecord); + ExampleTCPSocketWrapper(this.srvRecord, bool logData) : super(logData); /// A potential SRV record to inject for testing. final MoxSrvRecord? srvRecord; diff --git a/examples_dart/pubspec.yaml b/examples_dart/pubspec.yaml index b2cd194..3c92787 100644 --- a/examples_dart/pubspec.yaml +++ b/examples_dart/pubspec.yaml @@ -12,10 +12,10 @@ dependencies: logging: ^1.0.2 moxxmpp: hosted: https://git.polynom.me/api/packages/Moxxy/pub - version: 0.3.1 + version: 0.4.0 moxxmpp_socket_tcp: hosted: https://git.polynom.me/api/packages/Moxxy/pub - version: 0.3.1 + version: 0.4.0 omemo_dart: hosted: https://git.polynom.me/api/packages/PapaTutuWawa/pub version: ^0.5.1 diff --git a/packages/moxxmpp/CHANGELOG.md b/packages/moxxmpp/CHANGELOG.md index a8c57f6..ee3c3d7 100644 --- a/packages/moxxmpp/CHANGELOG.md +++ b/packages/moxxmpp/CHANGELOG.md @@ -23,6 +23,9 @@ - *BREAKING*: `UserAvatarManager`'s `getAvatarId` with `getLatestMetadata`. - The `PubSubManager` now supports PubSub's `max_items` in `getItems`. - *BREAKING*: `vCardManager`'s `VCardAvatarUpdatedEvent` no longer automatically requests the newest VCard avatar. +- *BREAKING*: `XmppConnection` now tries to ensure that incoming data is processed in-order. The only exception are awaited stanzas as they are allowed to bypass the queue. +- *BREAKING*: If a stanza handler causes an exception, the handler is simply skipped while processing. +- Add better logging around what stanza handler is running and if they end processing early. ## 0.3.1 diff --git a/packages/moxxmpp/lib/moxxmpp.dart b/packages/moxxmpp/lib/moxxmpp.dart index 00c1c43..6560fe4 100644 --- a/packages/moxxmpp/lib/moxxmpp.dart +++ b/packages/moxxmpp/lib/moxxmpp.dart @@ -47,6 +47,7 @@ export 'package:moxxmpp/src/xeps/xep_0030/helpers.dart'; export 'package:moxxmpp/src/xeps/xep_0030/types.dart'; export 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; export 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; +export 'package:moxxmpp/src/xeps/xep_0045/events.dart'; export 'package:moxxmpp/src/xeps/xep_0045/types.dart'; export 'package:moxxmpp/src/xeps/xep_0045/xep_0045.dart'; export 'package:moxxmpp/src/xeps/xep_0054.dart'; diff --git a/packages/moxxmpp/lib/src/awaiter.dart b/packages/moxxmpp/lib/src/awaiter.dart index c730038..6ffcf16 100644 --- a/packages/moxxmpp/lib/src/awaiter.dart +++ b/packages/moxxmpp/lib/src/awaiter.dart @@ -1,35 +1,13 @@ import 'dart:async'; -import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; -/// A surrogate key for awaiting stanzas. -@immutable -class _StanzaSurrogateKey { - const _StanzaSurrogateKey(this.sentTo, this.id, this.tag); +/// (JID we sent a stanza to, the id of the sent stanza, the tag of the sent stanza). +// ignore: avoid_private_typedef_functions +typedef _StanzaCompositeKey = (String?, String, String); - /// The JID the original stanza was sent to. We expect the result to come from the - /// same JID. - final String sentTo; - - /// The ID of the original stanza. We expect the result to have the same ID. - final String id; - - /// The tag name of the stanza. - final String tag; - - @override - int get hashCode => sentTo.hashCode ^ id.hashCode ^ tag.hashCode; - - @override - bool operator ==(Object other) { - return other is _StanzaSurrogateKey && - other.sentTo == sentTo && - other.id == id && - other.tag == tag; - } -} +/// Callback function that returns the bare JID of the connection as a String. +typedef GetBareJidCallback = String Function(); /// This class handles the await semantics for stanzas. Stanzas are given a "unique" /// key equal to the tuple (to, id, tag) with which their response is identified. @@ -40,8 +18,12 @@ class _StanzaSurrogateKey { /// /// This class also handles some "edge cases" of RFC 6120, like an empty "from" attribute. class StanzaAwaiter { + StanzaAwaiter(this._bareJidCallback); + + final GetBareJidCallback _bareJidCallback; + /// The pending stanzas, identified by their surrogate key. - final Map<_StanzaSurrogateKey, Completer> _pending = {}; + final Map<_StanzaCompositeKey, Completer> _pending = {}; /// The critical section for accessing [StanzaAwaiter._pending]. final Lock _lock = Lock(); @@ -52,30 +34,33 @@ class StanzaAwaiter { /// [tag] is the stanza's tag name. /// /// Returns a future that might resolve to the response to the stanza. - Future> addPending(String to, String id, String tag) async { + Future> addPending(String? to, String id, String tag) async { + // Check if we want to send a stanza to our bare JID and replace it with null. + final processedTo = to != null && to == _bareJidCallback() ? null : to; + final completer = await _lock.synchronized(() { final completer = Completer(); - _pending[_StanzaSurrogateKey(to, id, tag)] = completer; + _pending[(processedTo, id, tag)] = completer; return completer; }); return completer.future; } - /// Checks if the stanza [stanza] is being awaited. [bareJid] is the bare JID of - /// the connection. + /// Checks if the stanza [stanza] is being awaited. /// If [stanza] is awaited, resolves the future and returns true. If not, returns /// false. - Future onData(XMLNode stanza, JID bareJid) async { - assert(bareJid.isBare(), 'bareJid must be bare'); - + Future onData(XMLNode stanza) async { final id = stanza.attributes['id'] as String?; if (id == null) return false; - final key = _StanzaSurrogateKey( - // Section 8.1.2.1 § 3 of RFC 6120 says that an empty "from" indicates that the - // attribute is implicitly from our own bare JID. - stanza.attributes['from'] as String? ?? bareJid.toString(), + // Check if we want to send a stanza to our bare JID and replace it with null. + final from = stanza.attributes['from'] as String?; + final processedFrom = + from != null && from == _bareJidCallback() ? null : from; + + final key = ( + processedFrom, id, stanza.tag, ); @@ -91,4 +76,19 @@ class StanzaAwaiter { return false; }); } + + /// Checks if [stanza] represents a stanza that is awaited. Returns true, if [stanza] + /// is awaited. False, if not. + Future isAwaited(XMLNode stanza) async { + final id = stanza.attributes['id'] as String?; + if (id == null) return false; + + final key = ( + stanza.attributes['from'] as String?, + id, + stanza.tag, + ); + + return _lock.synchronized(() => _pending.containsKey(key)); + } } diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index ee1e9c1..d65eaed 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -25,12 +25,12 @@ import 'package:moxxmpp/src/settings.dart'; import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; +import 'package:moxxmpp/src/util/incoming_queue.dart'; import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/typed_map.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart'; -import 'package:synchronized/synchronized.dart'; import 'package:uuid/uuid.dart'; /// The states the XmppConnection can be in @@ -49,6 +49,19 @@ enum XmppConnectionState { error } +/// (The actual stanza handler, Name of the owning manager). +typedef _StanzaHandlerWrapper = (StanzaHandler, String); + +/// Wrapper around [stanzaHandlerSortComparator] for [_StanzaHandlerWrapper]. +int _stanzaHandlerWrapperSortComparator( + _StanzaHandlerWrapper a, + _StanzaHandlerWrapper b, +) { + final (ha, _) = a; + final (hb, _) = b; + return stanzaHandlerSortComparator(ha, hb); +} + /// This class is a connection to the server. class XmppConnection { XmppConnection( @@ -58,7 +71,11 @@ class XmppConnection { this._socket, { this.connectingTimeout = const Duration(minutes: 2), }) : _reconnectionPolicy = reconnectionPolicy, - _connectivityManager = connectivityManager { + _connectivityManager = connectivityManager, + assert( + _socket.getDataStream().isBroadcast, + "The socket's data stream must be a broadcast stream", + ) { // Allow the reconnection policy to perform reconnections by itself _reconnectionPolicy.register( _attemptReconnection, @@ -77,9 +94,15 @@ class XmppConnection { }, ); + _stanzaAwaiter = StanzaAwaiter( + () => connectionSettings.jid.toBare().toString(), + ); + _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); - // TODO(Unknown): Handle on done - _socketStream.transform(_streamParser).forEach(handleXmlStream); + _socketStream + .transform(_streamParser) + .forEach(_incomingStanzaQueue.addStanza); + _socketStream.listen(_handleOnDataCallbacks); _socket.getEventStream().listen(handleSocketEvent); _stanzaQueue = AsyncStanzaQueue( @@ -109,16 +132,16 @@ class XmppConnection { final ConnectivityManager _connectivityManager; /// A helper for handling await semantics with stanzas - final StanzaAwaiter _stanzaAwaiter = StanzaAwaiter(); + late final StanzaAwaiter _stanzaAwaiter; /// Sorted list of handlers that we call or incoming and outgoing stanzas - final List _incomingStanzaHandlers = + final List<_StanzaHandlerWrapper> _incomingStanzaHandlers = List.empty(growable: true); - final List _incomingPreStanzaHandlers = + final List<_StanzaHandlerWrapper> _incomingPreStanzaHandlers = List.empty(growable: true); - final List _outgoingPreStanzaHandlers = + final List<_StanzaHandlerWrapper> _outgoingPreStanzaHandlers = List.empty(growable: true); - final List _outgoingPostStanzaHandlers = + final List<_StanzaHandlerWrapper> _outgoingPostStanzaHandlers = List.empty(growable: true); final StreamController _eventStreamController = StreamController.broadcast(); @@ -157,10 +180,6 @@ class XmppConnection { T? getNegotiatorById(String id) => _negotiationsHandler.getNegotiatorById(id); - /// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator - /// is still running. - final Lock _negotiationLock = Lock(); - /// The logger for the class final Logger _log = Logger('XmppConnection'); @@ -169,6 +188,8 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + late final IncomingStanzaQueue _incomingStanzaQueue; + late final AsyncStanzaQueue _stanzaQueue; /// Returns the JID we authenticate with and add the resource that we have bound. @@ -198,18 +219,25 @@ class XmppConnection { _xmppManagers[manager.id] = manager; - _incomingStanzaHandlers.addAll(manager.getIncomingStanzaHandlers()); - _incomingPreStanzaHandlers.addAll(manager.getIncomingPreStanzaHandlers()); - _outgoingPreStanzaHandlers.addAll(manager.getOutgoingPreStanzaHandlers()); - _outgoingPostStanzaHandlers - .addAll(manager.getOutgoingPostStanzaHandlers()); + _incomingStanzaHandlers.addAll( + manager.getIncomingStanzaHandlers().map((h) => (h, manager.name)), + ); + _incomingPreStanzaHandlers.addAll( + manager.getIncomingPreStanzaHandlers().map((h) => (h, manager.name)), + ); + _outgoingPreStanzaHandlers.addAll( + manager.getOutgoingPreStanzaHandlers().map((h) => (h, manager.name)), + ); + _outgoingPostStanzaHandlers.addAll( + manager.getOutgoingPostStanzaHandlers().map((h) => (h, manager.name)), + ); } // Sort them - _incomingStanzaHandlers.sort(stanzaHandlerSortComparator); - _incomingPreStanzaHandlers.sort(stanzaHandlerSortComparator); - _outgoingPreStanzaHandlers.sort(stanzaHandlerSortComparator); - _outgoingPostStanzaHandlers.sort(stanzaHandlerSortComparator); + _incomingStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _incomingPreStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _outgoingPreStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); + _outgoingPostStanzaHandlers.sort(_stanzaHandlerWrapperSortComparator); // Run the post register callbacks for (final manager in _xmppManagers.values) { @@ -290,6 +318,13 @@ class XmppConnection { return getManagerById(csiManager); } + /// Called whenever we receive data on the socket. + Future _handleOnDataCallbacks(String _) async { + for (final manager in _xmppManagers.values) { + unawaited(manager.onData()); + } + } + /// Attempts to reconnect to the server by following an exponential backoff. Future _attemptReconnection() async { _log.finest('_attemptReconnection: Setting state to notConnected'); @@ -515,7 +550,7 @@ class XmppConnection { // A stanza with no to attribute is for direct processing by the server. As such, // we can correlate it by just *assuming* we have that attribute // (RFC 6120 Section 8.1.1.1) - data.stanza.to ?? connectionSettings.jid.toBare().toString(), + data.stanza.to, data.stanza.id!, data.stanza.tag, ) @@ -650,15 +685,30 @@ class XmppConnection { /// call its callback and end the processing if the callback returned true; continue /// if it returned false. Future _runStanzaHandlers( - List handlers, + List<_StanzaHandlerWrapper> handlers, Stanza stanza, { StanzaHandlerData? initial, }) async { var state = initial ?? StanzaHandlerData(false, false, stanza, TypedMap()); - for (final handler in handlers) { + for (final handlerRaw in handlers) { + final (handler, managerName) = handlerRaw; if (handler.matches(state.stanza)) { - state = await handler.callback(state.stanza, state); - if (state.done || state.cancel) return state; + _log.finest( + 'Running handler for ${stanza.tag} (${stanza.attributes["id"]}) of $managerName', + ); + try { + state = await handler.callback(state.stanza, state); + } catch (ex) { + _log.severe( + 'Handler from $managerName for ${stanza.tag} (${stanza.attributes["id"]}) failed with "$ex"', + ); + } + if (state.done || state.cancel) { + _log.finest( + 'Processing ended early for ${stanza.tag} (${stanza.attributes["id"]}) by $managerName', + ); + return state; + } } } @@ -743,7 +793,6 @@ class XmppConnection { final awaited = await _stanzaAwaiter.onData( incomingPreHandlers.stanza, - connectionSettings.jid.toBare(), ); if (awaited) { return; @@ -802,14 +851,12 @@ class XmppConnection { // causing (a) the negotiator to become confused and (b) the stanzas/nonzas to be // missed. This causes the data to wait while the negotiator is running and thus // prevent this issue. - await _negotiationLock.synchronized(() async { - if (_routingState != RoutingState.negotiating) { - unawaited(handleXmlStream(event)); - return; - } + if (_routingState != RoutingState.negotiating) { + unawaited(handleXmlStream(event)); + return; + } - await _negotiationsHandler.negotiate(event); - }); + await _negotiationsHandler.negotiate(event); break; case RoutingState.handleStanzas: await _handleStanza(node); diff --git a/packages/moxxmpp/lib/src/managers/base.dart b/packages/moxxmpp/lib/src/managers/base.dart index ab61ed3..ae14458 100644 --- a/packages/moxxmpp/lib/src/managers/base.dart +++ b/packages/moxxmpp/lib/src/managers/base.dart @@ -80,6 +80,9 @@ abstract class XmppManagerBase { /// handler's priority, the earlier it is run. List getNonzaHandlers() => []; + /// Whenever the socket receives data, this method is called, if it is non-null. + Future onData() async {} + /// Return a list of features that should be included in a disco response. List getDiscoFeatures() => []; diff --git a/packages/moxxmpp/lib/src/parser.dart b/packages/moxxmpp/lib/src/parser.dart index 1c9b458..6818cb6 100644 --- a/packages/moxxmpp/lib/src/parser.dart +++ b/packages/moxxmpp/lib/src/parser.dart @@ -57,9 +57,10 @@ class _ChunkedConversionBuffer { } /// A buffer to put between a socket's input and a full XML stream. -class XMPPStreamParser extends StreamTransformerBase { - final StreamController _streamController = - StreamController(); +class XMPPStreamParser + extends StreamTransformerBase> { + final StreamController> _streamController = + StreamController>(); /// Turns a String into a list of [XmlEvent]s in a chunked fashion. _ChunkedConversionBuffer _eventBuffer = @@ -117,13 +118,14 @@ class XMPPStreamParser extends StreamTransformerBase { } @override - Stream bind(Stream stream) { + Stream> bind(Stream stream) { // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they // create streams we cannot close. We need to be able to destroy and recreate an // XML parser whenever we start a new connection. stream.listen((input) { final events = _eventBuffer.convert(input); final streamHeaderEvents = _streamHeaderSelector.convert(events); + final objects = List.empty(growable: true); // Process the stream header separately. for (final event in streamHeaderEvents) { @@ -135,7 +137,7 @@ class XMPPStreamParser extends StreamTransformerBase { continue; } - _streamController.add( + objects.add( XMPPStreamHeader( Map.fromEntries( event.attributes.map((attr) { @@ -151,13 +153,15 @@ class XMPPStreamParser extends StreamTransformerBase { final children = _childBuffer.convert(childEvents); for (final node in children) { if (node.nodeType == XmlNodeType.ELEMENT) { - _streamController.add( + objects.add( XMPPStreamElement( XMLNode.fromXmlElement(node as XmlElement), ), ); } } + + _streamController.add(objects); }); return _streamController.stream; diff --git a/packages/moxxmpp/lib/src/stanza.dart b/packages/moxxmpp/lib/src/stanza.dart index 561eef4..b90c1e6 100644 --- a/packages/moxxmpp/lib/src/stanza.dart +++ b/packages/moxxmpp/lib/src/stanza.dart @@ -102,6 +102,8 @@ class RemoteServerTimeoutError extends StanzaError { /// An unknown error. class UnknownStanzaError extends StanzaError {} +const _stanzaNotDefined = Object(); + class Stanza extends XMLNode { // ignore: use_super_parameters Stanza({ @@ -216,7 +218,7 @@ class Stanza extends XMLNode { Stanza copyWith({ String? id, - String? from, + Object? from = _stanzaNotDefined, String? to, String? type, List? children, @@ -225,7 +227,7 @@ class Stanza extends XMLNode { return Stanza( tag: tag, to: to ?? this.to, - from: from ?? this.from, + from: from != _stanzaNotDefined ? from as String? : this.from, id: id ?? this.id, type: type ?? this.type, children: children ?? this.children, diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart new file mode 100644 index 0000000..5b0c653 --- /dev/null +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -0,0 +1,97 @@ +import 'dart:async'; +import 'dart:collection'; +import 'package:logging/logging.dart'; +import 'package:moxxmpp/src/awaiter.dart'; +import 'package:moxxmpp/src/parser.dart'; +import 'package:synchronized/synchronized.dart'; + +/// A queue for incoming [XMPPStreamObject]s to ensure "in order" +/// processing (except for stanzas that are awaited). +class IncomingStanzaQueue { + IncomingStanzaQueue(this._callback, this._stanzaAwaiter); + + /// The queue for storing the completer of each + /// incoming stanza (or stream object to be precise). + /// Only access while holding the lock [_lock]. + final Queue> _queue = Queue(); + + /// Flag indicating whether a callback is already running (true) + /// or not. "a callback" and not "the callback" because awaited stanzas + /// are allowed to bypass the queue. + /// Only access while holding the lock [_lock]. + bool _isRunning = false; + + /// The function to call to process an incoming stream object. + final Future Function(XMPPStreamObject) _callback; + + /// Lock guarding both [_queue] and [_isRunning]. + final Lock _lock = Lock(); + + /// Logger. + final Logger _log = Logger('IncomingStanzaQueue'); + + final StanzaAwaiter _stanzaAwaiter; + + Future _processStreamObject( + Future? future, + XMPPStreamObject object, + ) async { + if (future == null) { + if (object is XMPPStreamElement) { + _log.finest( + 'Bypassing queue for ${object.node.tag} (${object.node.attributes["id"]})', + ); + } + return _callback(object); + } + + // Wait for our turn. + await future; + + // Run the callback. + await _callback(object); + + // Run the next entry. + await _lock.synchronized(() { + if (_queue.isNotEmpty) { + _queue.removeFirst().complete(); + } else { + _isRunning = false; + } + }); + } + + Future addStanza(List objects) async { + await _lock.synchronized(() async { + for (final object in objects) { + if (await canBypassQueue(object)) { + unawaited( + _processStreamObject(null, object), + ); + continue; + } + + final completer = Completer(); + if (_isRunning) { + _queue.add(completer); + } else { + _isRunning = true; + completer.complete(); + } + + unawaited( + _processStreamObject(completer.future, object), + ); + } + }); + } + + Future canBypassQueue(XMPPStreamObject object) async { + if (object is XMPPStreamHeader) { + return false; + } + + object as XMPPStreamElement; + return _stanzaAwaiter.isAwaited(object.node); + } +} diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart index 8eec00f..f3f795d 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/events.dart @@ -3,14 +3,25 @@ import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; /// Triggered when the MUC changes our nickname. -class NickChangedByMUCEvent extends XmppEvent { - NickChangedByMUCEvent(this.roomJid, this.nick); +class OwnDataChangedEvent extends XmppEvent { + OwnDataChangedEvent( + this.roomJid, + this.nick, + this.affiliation, + this.role, + ); /// The JID of the room. final JID roomJid; - /// The new nickname. + /// Our nickname. final String nick; + + /// Our affiliation. + final Affiliation affiliation; + + /// Our role. + final Role role; } /// Triggered when an entity joins the MUC. @@ -45,3 +56,17 @@ class MemberLeftEvent extends XmppEvent { /// The nick of the user who left. final String nick; } + +/// Triggered when an entity changes their nick. +class MemberChangedNickEvent extends XmppEvent { + MemberChangedNickEvent(this.roomJid, this.oldNick, this.newNick); + + /// The JID of the room. + final JID roomJid; + + /// The original nick. + final String oldNick; + + /// The new nick. + final String newNick; +} diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart new file mode 100644 index 0000000..734029a --- /dev/null +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/status_codes.dart @@ -0,0 +1,2 @@ +const selfPresenceStatus = '110'; +const nicknameChangedStatus = '303'; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart index a58866c..58ba309 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/types.dart @@ -5,6 +5,7 @@ import 'package:moxxmpp/src/xeps/xep_0004.dart'; import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; class InvalidAffiliationException implements Exception {} + class InvalidRoleException implements Exception {} enum Affiliation { @@ -148,6 +149,13 @@ class RoomState { /// Flag whether we're joined and can process messages bool joined; + /// Our own affiliation inside the MUC. + Affiliation? affiliation; + + /// Our own role inside the MUC. + Role? role; + + /// The list of messages that we sent and are waiting for their echo. late final List pendingMessages; /// "List" of entities inside the MUC. diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index 0042871..60b180b 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -14,9 +14,9 @@ import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0045/events.dart'; +import 'package:moxxmpp/src/xeps/xep_0045/status_codes.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; import 'package:moxxmpp/src/xeps/xep_0359.dart'; -import 'package:synchronized/extension.dart'; import 'package:synchronized/synchronized.dart'; /// (Room JID, nickname) @@ -251,19 +251,23 @@ class MUCManager extends XmppManagerBase { StanzaHandlerData state, ) async { if (presence.from == null) { + logger.finest('Ignoring presence as it has no from attribute'); return state; } final from = JID.fromString(presence.from!); final bareFrom = from.toBare(); return _cacheLock.synchronized(() { + logger.finest('Lock aquired for presence from ${presence.from}'); final room = _mucRoomCache[bareFrom]; if (room == null) { + logger.finest('Ignoring presence as it does not belong to a room'); return state; } if (from.resource.isEmpty) { // TODO(Unknown): Handle presence from the room itself. + logger.finest('Ignoring presence as it has no resource'); return state; } @@ -297,20 +301,33 @@ class MUCManager extends XmppManagerBase { final role = Role.fromString( item.attributes['role']! as String, ); + final affiliation = Affiliation.fromString( + item.attributes['affiliation']! as String, + ); - if (statuses.contains('110')) { - if (room.nick != from.resource) { - // Notify us of the changed nick. - getAttributes().sendEvent( - NickChangedByMUCEvent( - bareFrom, - from.resource, - ), - ); + if (statuses.contains(selfPresenceStatus)) { + if (room.joined) { + if (room.nick != from.resource || + room.affiliation != affiliation || + room.role != role) { + // Notify us of the changed data. + getAttributes().sendEvent( + OwnDataChangedEvent( + bareFrom, + from.resource, + affiliation, + role, + ), + ); + } } - // Set the nick to make sure we're in sync with the MUC. - room.nick = from.resource; + // Set the data to make sure we're in sync with the MUC. + room + ..nick = from.resource + ..affiliation = affiliation + ..role = role; + logger.finest('Self-presence handled'); return StanzaHandlerData( true, false, @@ -319,19 +336,49 @@ class MUCManager extends XmppManagerBase { ); } - if (presence.attributes['type'] == 'unavailable' && role == Role.none) { - // Cannot happen while joining, so we assume we are joined - assert( - room.joined, - 'Should not receive unavailable with role="none" while joining', - ); - room.members.remove(from.resource); - getAttributes().sendEvent( - MemberLeftEvent( - bareFrom, - from.resource, - ), - ); + if (presence.attributes['type'] == 'unavailable') { + if (role == Role.none) { + // Cannot happen while joining, so we assume we are joined + assert( + room.joined, + 'Should not receive unavailable with role="none" while joining', + ); + room.members.remove(from.resource); + getAttributes().sendEvent( + MemberLeftEvent( + bareFrom, + from.resource, + ), + ); + } else if (statuses.contains(nicknameChangedStatus)) { + assert( + room.joined, + 'Should not receive nick change while joining', + ); + final newNick = item.attributes['nick']! as String; + final member = RoomMember( + newNick, + Affiliation.fromString( + item.attributes['affiliation']! as String, + ), + role, + ); + + // Remove the old member. + room.members.remove(from.resource); + + // Add the "new" member". + room.members[newNick] = member; + + // Trigger an event. + getAttributes().sendEvent( + MemberChangedNickEvent( + bareFrom, + from.resource, + newNick, + ), + ); + } } else { final member = RoomMember( from.resource, @@ -344,14 +391,14 @@ class MUCManager extends XmppManagerBase { if (room.joined) { if (room.members.containsKey(from.resource)) { getAttributes().sendEvent( - MemberJoinedEvent( + MemberChangedEvent( bareFrom, member, ), ); } else { getAttributes().sendEvent( - MemberChangedEvent( + MemberJoinedEvent( bareFrom, member, ), @@ -360,8 +407,10 @@ class MUCManager extends XmppManagerBase { } room.members[from.resource] = member; + logger.finest('${from.resource} added to the member list'); } + logger.finest('Ran through'); return StanzaHandlerData( true, false, @@ -398,7 +447,8 @@ class MUCManager extends XmppManagerBase { ) async { final fromJid = JID.fromString(message.from!); final roomJid = fromJid.toBare(); - return _mucRoomCache.synchronized(() { + return _cacheLock.synchronized(() { + logger.finest('Lock aquired for message from ${message.from}'); final roomState = _mucRoomCache[roomJid]; if (roomState == null) { return state; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0054.dart b/packages/moxxmpp/lib/src/xeps/xep_0054.dart index ac25ede..eb821f9 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0054.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0054.dart @@ -56,14 +56,13 @@ class VCardManager extends XmppManagerBase { final x = presence.firstTag('x', xmlns: vCardTempUpdate)!; final hash = x.firstTag('photo')!.innerText(); - // TODO(Unknown): Use the presence manager interface. getAttributes().sendEvent( VCardAvatarUpdatedEvent( JID.fromString(presence.from!), hash, ), ); - return state..done = true; + return state; } VCardPhoto? _parseVCardPhoto(XMLNode? node) { diff --git a/packages/moxxmpp/lib/src/xeps/xep_0115.dart b/packages/moxxmpp/lib/src/xeps/xep_0115.dart index 19ebdf6..749011b 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0115.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0115.dart @@ -173,39 +173,20 @@ class EntityCapabilitiesManager extends XmppManagerBase { }); } - @visibleForTesting - Future onPresence( - Stanza stanza, - StanzaHandlerData state, + Future _performQuery( + Stanza presence, + String ver, + String hashFunctionName, + String capabilityNode, + JID from, ) async { - if (stanza.from == null) { - return state; - } - - final from = JID.fromString(stanza.from!); - final c = stanza.firstTag('c', xmlns: capsXmlns)!; - - final hashFunctionName = c.attributes['hash'] as String?; - final capabilityNode = c.attributes['node'] as String?; - final ver = c.attributes['ver'] as String?; - if (hashFunctionName == null || capabilityNode == null || ver == null) { - return state; - } - - // Check if we know of the hash - final isCached = - await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); - if (isCached) { - return state; - } - final dm = getAttributes().getManagerById(discoManager)!; final discoRequest = await dm.discoInfoQuery( from, node: capabilityNode, ); if (discoRequest.isType()) { - return state; + return; } final discoInfo = discoRequest.get(); @@ -220,7 +201,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { discoInfo, ), ); - return state; + return; } // Validate the disco#info result according to XEP-0115 § 5.4 @@ -234,7 +215,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal identity', ); - return state; + return; } } @@ -245,7 +226,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one equal feature', ); - return state; + return; } } @@ -273,7 +254,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: Extended Info FORM_TYPE contains more than one value(s) of different value.', ); - return state; + return; } } @@ -288,7 +269,7 @@ class EntityCapabilitiesManager extends XmppManagerBase { logger.warning( 'Malformed disco#info response: More than one Extended Disco Info forms with the same FORM_TYPE value', ); - return state; + return; } // Check if the field type is hidden @@ -325,7 +306,43 @@ class EntityCapabilitiesManager extends XmppManagerBase { 'Capability hash mismatch from $from: Received "$ver", expected "$computedCapabilityHash".', ); } + } + @visibleForTesting + Future onPresence( + Stanza stanza, + StanzaHandlerData state, + ) async { + if (stanza.from == null) { + return state; + } + + final from = JID.fromString(stanza.from!); + final c = stanza.firstTag('c', xmlns: capsXmlns)!; + + final hashFunctionName = c.attributes['hash'] as String?; + final capabilityNode = c.attributes['node'] as String?; + final ver = c.attributes['ver'] as String?; + if (hashFunctionName == null || capabilityNode == null || ver == null) { + return state; + } + + // Check if we know of the hash + final isCached = + await _cacheLock.synchronized(() => _capHashCache.containsKey(ver)); + if (isCached) { + return state; + } + + unawaited( + _performQuery( + stanza, + ver, + hashFunctionName, + capabilityNode, + from, + ), + ); return state; } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart index 49082d9..df34553 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart @@ -75,6 +75,17 @@ class StreamManagementManager extends XmppManagerBase { return acks; } + @override + Future onData() async { + // The ack timer does not matter if we are currently in the middle of receiving + // data. + await _ackLock.synchronized(() { + if (_pendingAcks > 0) { + _resetAckTimer(); + } + }); + } + /// Called when a stanza has been acked to decide whether we should trigger a /// StanzaAckedEvent. /// @@ -225,6 +236,12 @@ class StreamManagementManager extends XmppManagerBase { _ackTimer = null; } + /// Resets the ack timer. + void _resetAckTimer() { + _stopAckTimer(); + _startAckTimer(); + } + @visibleForTesting Future handleAckTimeout() async { _stopAckTimer(); @@ -315,8 +332,7 @@ class StreamManagementManager extends XmppManagerBase { // Reset the timer if (_pendingAcks > 0) { - _stopAckTimer(); - _startAckTimer(); + _resetAckTimer(); } } diff --git a/packages/moxxmpp/test/awaiter_test.dart b/packages/moxxmpp/test/awaiter_test.dart index acde248..f9b1813 100644 --- a/packages/moxxmpp/test/awaiter_test.dart +++ b/packages/moxxmpp/test/awaiter_test.dart @@ -2,11 +2,12 @@ import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/src/awaiter.dart'; import 'package:test/test.dart'; -void main() { - const bareJid = JID('moxxmpp', 'server3.example', ''); +const bareJid = 'user4@example.org'; +String getBareJidCallback() => bareJid; +void main() { test('Test awaiting an awaited stanza with a from attribute', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza final future = await awaiter.addPending( @@ -20,14 +21,12 @@ void main() { XMLNode.fromString( '', ), - bareJid, ); expect(result1, false); final result2 = await awaiter.onData( XMLNode.fromString( '', ), - bareJid, ); expect(result2, false); @@ -37,22 +36,20 @@ void main() { ); final result3 = await awaiter.onData( stanza, - bareJid, ); expect(result3, true); expect(await future, stanza); }); test('Test awaiting an awaited stanza without a from attribute', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); @@ -60,23 +57,21 @@ void main() { final stanza = XMLNode.fromString(''); final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); }); test('Test awaiting a stanza that was already awaited', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the correct answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( stanza, - bareJid, ); expect(result1, true); expect(await future, stanza); @@ -84,31 +79,55 @@ void main() { // Receive it again final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, false); }); test('Test ignoring a stanza that has the wrong tag', () async { - final awaiter = StanzaAwaiter(); + final awaiter = StanzaAwaiter(getBareJidCallback); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); // Receive the correct answer final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); }); + + test('Sending a stanza to our bare JID', () async { + final awaiter = StanzaAwaiter(getBareJidCallback); + + // "Send" a stanza + final future = await awaiter.addPending(bareJid, 'abc123', 'iq'); + + // Receive the response. + final stanza = XMLNode.fromString(''); + await awaiter.onData(stanza); + expect(await future, stanza); + }); + + test( + 'Sending a stanza to our bare JID and receiving stanza with a from attribute', + () async { + final awaiter = StanzaAwaiter(getBareJidCallback); + + // "Send" a stanza + final future = await awaiter.addPending(bareJid, 'abc123', 'iq'); + + // Receive the response. + final stanza = + XMLNode.fromString(''); + await awaiter.onData(stanza); + expect(await future, stanza); + }); } diff --git a/packages/moxxmpp/test/xeps/xep_0045_test.dart b/packages/moxxmpp/test/xeps/xep_0045_test.dart index 0f92aae..d857552 100644 --- a/packages/moxxmpp/test/xeps/xep_0045_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0045_test.dart @@ -332,4 +332,543 @@ void main() { ); }, ); + + test( + 'Testing a user joining a room', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a new user joins the room. + MemberJoinedEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberJoinedEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.member.nick, 'papatutuwawa'); + expect(event!.member.affiliation, Affiliation.admin); + expect(event!.member.role, Role.participant); + + final roomAfterJoin = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterJoin.members.length, 3); + }, + ); + + test( + 'Testing a user leaving a room', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a user leaves the room. + MemberLeftEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberLeftEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.nick, 'secondwitch'); + + final roomAfterLeave = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterLeave.members.length, 1); + }, + ); + + test( + 'Test a user changing their nick name', + () async { + final fakeSocket = StubTCPSocket([ + StringExpectation( + "", + ''' + + + + PLAIN + + + + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ]); + final conn = XmppConnection( + TestingSleepReconnectionPolicy(1), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + ) + ..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ) + ..setResource('test-resource', triggerEvent: false); + await conn.registerManagers([ + DiscoManager([]), + MUCManager(), + ]); + + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + shouldReconnect: false, + ); + + // Join a groupchat + final roomJid = JID.fromString('channel@muc.example.org'); + final joinResult = conn.getManagerById(mucManager)!.joinRoom( + roomJid, + 'test', + maxHistoryStanzas: 0, + ); + await Future.delayed(const Duration(seconds: 1)); + + fakeSocket + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + + + + ''', + ) + ..injectRawXml( + ''' + + + + ''', + ); + + await joinResult; + final room = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(room.joined, true); + expect( + room.members.length, + 2, + ); + + // Now a new user changes their nick. + MemberChangedNickEvent? event; + conn.asBroadcastStream().listen((e) { + if (e is MemberChangedNickEvent) { + event = e; + } + }); + + fakeSocket.injectRawXml( + ''' + + + + + + + ''', + ); + + await Future.delayed(const Duration(seconds: 2)); + expect(event != null, true); + expect(event!.oldNick, 'firstwitch'); + expect(event!.newNick, 'papatutuwawa'); + + final roomAfterChange = (await conn + .getManagerById(mucManager)! + .getRoomState(roomJid))!; + expect(roomAfterChange.members.length, 2); + expect(roomAfterChange.members['firstwitch'], null); + expect(roomAfterChange.members['papatutuwawa'] != null, true); + }, + ); } diff --git a/packages/moxxmpp/test/xeps/xep_0115_test.dart b/packages/moxxmpp/test/xeps/xep_0115_test.dart index 47837c8..ff90977 100644 --- a/packages/moxxmpp/test/xeps/xep_0115_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0115_test.dart @@ -343,6 +343,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); expect( await manager.getCachedDiscoInfoFromJid(aliceJid) != null, @@ -513,6 +514,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect( @@ -549,6 +551,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect( diff --git a/packages/moxxmpp/test/xmpp_parser_test.dart b/packages/moxxmpp/test/xmpp_parser_test.dart index 265f393..7098b79 100644 --- a/packages/moxxmpp/test/xmpp_parser_test.dart +++ b/packages/moxxmpp/test/xmpp_parser_test.dart @@ -11,14 +11,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -36,14 +38,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -64,14 +68,16 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((event) { - if (event is! XMPPStreamElement) return; - final node = event.node; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; + final node = event.node; - if (node.tag == 'childa') { - childa = true; - } else if (node.tag == 'childb') { - childb = true; + if (node.tag == 'childa') { + childa = true; + } else if (node.tag == 'childb') { + childb = true; + } } }), ); @@ -93,13 +99,15 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(parser).forEach((node) { - if (node is XMPPStreamElement) { - if (node.node.tag == 'childa') { - childa = true; + controller.stream.transform(parser).forEach((events) { + for (final event in events) { + if (event is XMPPStreamElement) { + if (event.node.tag == 'childa') { + childa = true; + } + } else if (event is XMPPStreamHeader) { + attrs = event.attributes; } - } else if (node is XMPPStreamHeader) { - attrs = node.attributes; } }), ); @@ -118,11 +126,13 @@ void main() { var gotFeatures = false; unawaited( controller.stream.transform(parser).forEach( - (event) { - if (event is! XMPPStreamElement) return; + (events) { + for (final event in events) { + if (event is! XMPPStreamElement) continue; - if (event.node.tag == 'stream:features') { - gotFeatures = true; + if (event.node.tag == 'stream:features') { + gotFeatures = true; + } } }, ), @@ -157,4 +167,27 @@ void main() { await Future.delayed(const Duration(seconds: 1)); expect(gotFeatures, true); }); + + test('Test the order of concatenated stanzas', () async { + // NOTE: This seems weird, but it turns out that not keeping this order leads to + // MUC joins (on Moxxy) not catching every bit of presence before marking the + // MUC as joined. + final parser = XMPPStreamParser(); + final controller = StreamController(); + var called = false; + + unawaited( + controller.stream.transform(parser).forEach((events) { + expect(events.isNotEmpty, true); + expect((events[0] as XMPPStreamElement).node.tag, 'childa'); + expect((events[1] as XMPPStreamElement).node.tag, 'childb'); + expect((events[2] as XMPPStreamElement).node.tag, 'childc'); + called = true; + }), + ); + controller.add(''); + + await Future.delayed(const Duration(seconds: 2)); + expect(called, true); + }); } diff --git a/packages/moxxmpp_socket_tcp/CHANGELOG.md b/packages/moxxmpp_socket_tcp/CHANGELOG.md index d0a7586..5902472 100644 --- a/packages/moxxmpp_socket_tcp/CHANGELOG.md +++ b/packages/moxxmpp_socket_tcp/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.4.0 + +- Keep version in sync with moxxmpp +- *BREAKING*: `TCPSocketWrapper` now takes a boolean parameter that enables logging of all incoming and outgoing data. + ## 0.3.1 - Keep version in sync with moxxmpp diff --git a/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart b/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart index 25e7aa9..b4c5a7d 100644 --- a/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart +++ b/packages/moxxmpp_socket_tcp/integration_test/badxmpp_certificate_test.dart @@ -5,7 +5,7 @@ import 'package:test/test.dart'; Future _runTest(String domain) async { var gotTLSException = false; - final socket = TCPSocketWrapper(); + final socket = TCPSocketWrapper(true); final log = Logger('TestLogger'); socket.getEventStream().listen((event) { if (event is XmppSocketTLSFailedEvent) { diff --git a/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart b/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart index d9eb4d0..4d012c0 100644 --- a/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart +++ b/packages/moxxmpp_socket_tcp/integration_test/failure_reconnection_test.dart @@ -19,7 +19,7 @@ void main() { TestingSleepReconnectionPolicy(10), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - TCPSocketWrapper(), + TCPSocketWrapper(true), )..connectionSettings = ConnectionSettings( jid: JID.fromString('testuser@no-sasl.badxmpp.eu'), password: 'abc123', @@ -59,7 +59,7 @@ void main() { TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), ClientToServerNegotiator(), - TCPSocketWrapper(), + TCPSocketWrapper(true), )..connectionSettings = ConnectionSettings( jid: JID.fromString('testuser@no-sasl.badxmpp.eu'), password: 'abc123', diff --git a/packages/moxxmpp_socket_tcp/lib/src/socket.dart b/packages/moxxmpp_socket_tcp/lib/src/socket.dart index 7922bdc..ce2e6c6 100644 --- a/packages/moxxmpp_socket_tcp/lib/src/socket.dart +++ b/packages/moxxmpp_socket_tcp/lib/src/socket.dart @@ -10,6 +10,11 @@ import 'package:moxxmpp_socket_tcp/src/rfc_2782.dart'; /// TCP socket implementation for XmppConnection class TCPSocketWrapper extends BaseSocketWrapper { + TCPSocketWrapper(this._logIncomingOutgoing); + + /// Flag controlling whether incoming/outgoing data is logged or not. + final bool _logIncomingOutgoing; + /// The underlying Socket/SecureSocket instance. Socket? _socket; @@ -212,7 +217,9 @@ class TCPSocketWrapper extends BaseSocketWrapper { _socketSubscription = _socket!.listen( (List event) { final data = utf8.decode(event); - _log.finest('<== $data'); + if (_logIncomingOutgoing) { + _log.finest('<== $data'); + } _dataStream.add(data); }, onError: (Object error) { @@ -297,7 +304,9 @@ class TCPSocketWrapper extends BaseSocketWrapper { return; } - _log.finest('==> $data'); + if (_logIncomingOutgoing) { + _log.finest('==> $data'); + } try { _socket!.write(data); diff --git a/packages/moxxmpp_socket_tcp/pubspec.yaml b/packages/moxxmpp_socket_tcp/pubspec.yaml index 380f343..3f887f6 100644 --- a/packages/moxxmpp_socket_tcp/pubspec.yaml +++ b/packages/moxxmpp_socket_tcp/pubspec.yaml @@ -1,6 +1,6 @@ name: moxxmpp_socket_tcp description: A socket for moxxmpp using TCP that implements the RFC6120 connection algorithm and XEP-0368 -version: 0.3.1 +version: 0.4.0 homepage: https://codeberg.org/moxxy/moxxmpp publish_to: https://git.polynom.me/api/packages/Moxxy/pub