diff --git a/.gitlint b/.gitlint index 99d3cfb..b4ea8e9 100644 --- a/.gitlint +++ b/.gitlint @@ -7,7 +7,7 @@ line-length=72 [title-trailing-punctuation] [title-hard-tab] [title-match-regex] -regex=^((feat|fix|chore|refactor|docs|release|test)\((meta|tests|style|docs|xep|core|example)+(,(meta|tests|style|docs|xep|core|example))*\)|release): [A-Z0-9].*$ +regex=^((feat|fix|chore|refactor|docs|release|test)\((meta|tests|style|docs|xep|core|example|all)+(,(meta|tests|style|docs|xep|core|example|all))*\)|release): [A-Z0-9].*$ [body-trailing-whitespace] diff --git a/packages/moxxmpp/CHANGELOG.md b/packages/moxxmpp/CHANGELOG.md index e34e692..40ab55c 100644 --- a/packages/moxxmpp/CHANGELOG.md +++ b/packages/moxxmpp/CHANGELOG.md @@ -9,6 +9,8 @@ - **BREAKING**: Remove `DiscoManager.discoInfoCapHashQuery`. - **BREAKING**: The entity argument of `DiscoManager.discoInfoQuery` and `DiscoManager.discoItemsQuery` are now `JID` instead of `String`. - **BREAKING**: `PubSubManager` and `UserAvatarManager` now use `JID` instead of `String`. +- **BREAKING**: `XmppConnection.sendStanza` not only takes a `StanzaDetails` argument. +- Sent stanzas are not kept in a queue until sent. ## 0.3.1 diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index d880a54..872a985 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -26,6 +26,7 @@ import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/types/result.dart'; +import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart'; @@ -48,18 +49,6 @@ enum XmppConnectionState { error } -/// Metadata for [XmppConnection.sendStanza]. -enum StanzaFromType { - /// Add the full JID to the stanza as the from attribute - full, - - /// Add the bare JID to the stanza as the from attribute - bare, - - /// Add no JID as the from attribute - none, -} - /// This class is a connection to the server. class XmppConnection { XmppConnection( @@ -91,7 +80,12 @@ class XmppConnection { _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done _socketStream.transform(_streamParser).forEach(handleXmlStream); - _socket.getEventStream().listen(_handleSocketEvent); + _socket.getEventStream().listen(handleSocketEvent); + + _stanzaQueue = AsyncStanzaQueue( + _sendStanzaImpl, + _canSendData, + ); } /// The state that the connection is currently in @@ -175,6 +169,8 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + late final AsyncStanzaQueue _stanzaQueue; + /// Returns the JID we authenticate with and add the resource that we have bound. JID _getJidWithResource() { assert(_resource.isNotEmpty, 'The resource must not be empty'); @@ -366,7 +362,8 @@ class XmppConnection { } /// Called whenever the socket creates an event - Future _handleSocketEvent(XmppSocketEvent event) async { + @visibleForTesting + Future handleSocketEvent(XmppSocketEvent event) async { if (event is XmppSocketErrorEvent) { await handleError(SocketError(event)); } else if (event is XmppSocketClosureEvent) { @@ -412,133 +409,135 @@ class XmppConnection { .contains(await getConnectionState()); } - /// Sends a [stanza] to the server. If stream management is enabled, then keeping track - /// of the stanza is taken care of. Returns a Future that resolves when we receive a - /// response to the stanza. - /// - /// If addFrom is true, then a 'from' attribute will be added to the stanza if - /// [stanza] has none. - /// If addId is true, then an 'id' attribute will be added to the stanza if [stanza] has - /// none. + /// Sends a stanza described by [details] to the server. Until sent, the stanza is + /// kept in a queue, that is flushed after going online again. If Stream Management + /// is active, stanza's acknowledgement is tracked. // TODO(Unknown): if addId = false, the function crashes. - Future sendStanza( - Stanza stanza, { - StanzaFromType addFrom = StanzaFromType.full, - bool addId = true, - bool awaitable = true, - bool encrypted = false, - bool forceEncryption = false, - }) async { + Future sendStanza(StanzaDetails details) async { assert( - implies(addId == false && stanza.id == null, !awaitable), - 'Cannot await a stanza with no id', + implies( + details.awaitable, + details.stanza.id != null && details.stanza.id!.isNotEmpty || + details.addId, + ), + 'An awaitable stanza must have an id', ); - // Add extra data in case it was not set - var stanza_ = stanza; - if (addId && (stanza_.id == null || stanza_.id == '')) { - stanza_ = stanza.copyWith(id: generateId()); + final completer = details.awaitable ? Completer() : null; + await _stanzaQueue.enqueueStanza( + StanzaQueueEntry( + details, + completer, + ), + ); + + return completer?.future; + } + + Future _sendStanzaImpl(StanzaQueueEntry entry) async { + final details = entry.details; + var newStanza = details.stanza; + + // Generate an id, if requested + if (details.addId && (newStanza.id == null || newStanza.id == '')) { + newStanza = newStanza.copyWith(id: generateId()); } - if (addFrom != StanzaFromType.none && - (stanza_.from == null || stanza_.from == '')) { - switch (addFrom) { - case StanzaFromType.full: - { - stanza_ = stanza_.copyWith( - from: _getJidWithResource().toString(), - ); - } - break; - case StanzaFromType.bare: - { - stanza_ = stanza_.copyWith( - from: connectionSettings.jid.toBare().toString(), - ); - } - break; - case StanzaFromType.none: - break; - } - } - stanza_ = stanza_.copyWith( + + // NOTE: Originally, we handled adding a "from" attribute to the stanza here. + // However, this is not neccessary as RFC 6120 states: + // + // > When a server receives an XML stanza from a connected client, the + // > server MUST add a 'from' attribute to the stanza or override the + // > 'from' attribute specified by the client, where the value of the + // > 'from' attribute MUST be the full JID + // > () determined by the server for + // > the connected resource that generated the stanza (see + // > Section 4.3.6), or the bare JID () in the + // > case of subscription-related presence stanzas (see [XMPP-IM]). + // + // This means that even if we add a "from" attribute, the server will discard + // it. If we don't specify it, then the server will add the correct value + // itself. + + // Add the correct stanza namespace + newStanza = newStanza.copyWith( xmlns: _negotiationsHandler.getStanzaNamespace(), ); + // Run pre-send handlers _log.fine('Running pre stanza handlers..'); final data = await _runOutgoingPreStanzaHandlers( - stanza_, + newStanza, initial: StanzaHandlerData( false, false, null, - stanza_, - encrypted: encrypted, - forceEncryption: forceEncryption, + newStanza, + encrypted: details.encrypted, + forceEncryption: details.forceEncryption, ), ); _log.fine('Done'); + // Cancel sending, if the pre-send handlers indicated it. if (data.cancel) { _log.fine('A stanza handler indicated that it wants to cancel sending.'); await _sendEvent(StanzaSendingCancelledEvent(data)); - return Stanza( - tag: data.stanza.tag, - to: data.stanza.from, - from: data.stanza.to, - attributes: { - 'type': 'error', - ...data.stanza.id != null - ? { - 'id': data.stanza.id!, - } - : {}, - }, - ); + + // Resolve the future, if one was given. + if (details.awaitable) { + entry.completer!.complete( + Stanza( + tag: data.stanza.tag, + to: data.stanza.from, + from: data.stanza.to, + attributes: { + 'type': 'error', + if (data.stanza.id != null) 'id': data.stanza.id!, + }, + ), + ); + } + return; } + // Log the (raw) stanza final prefix = data.encrypted ? '(Encrypted) ' : ''; - _log.finest('==> $prefix${stanza_.toXml()}'); + _log.finest('==> $prefix${newStanza.toXml()}'); - final stanzaString = data.stanza.toXml(); - - // ignore: cascade_invocations - _log.fine('Attempting to acquire lock for ${data.stanza.id}...'); - // TODO(PapaTutuWawa): Handle this much more graceful - var future = Future.value(XMLNode(tag: 'not-used')); - if (awaitable) { - future = await _stanzaAwaiter.addPending( + if (details.awaitable) { + await _stanzaAwaiter + .addPending( // A stanza with no to attribute is for direct processing by the server. As such, // we can correlate it by just *assuming* we have that attribute // (RFC 6120 Section 8.1.1.1) data.stanza.to ?? connectionSettings.jid.toBare().toString(), data.stanza.id!, data.stanza.tag, - ); + ) + .then((result) { + entry.completer!.complete(result); + }); } - // This uses the StreamManager to behave like a send queue if (await _canSendData()) { - _socket.write(stanzaString); - - // Try to ack every stanza - // NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent + _socket.write(data.stanza.toXml()); } else { - _log.fine('_canSendData() returned false.'); + _log.fine('Not sending dat as _canSendData() returned false.'); } + // Run post-send handlers _log.fine('Running post stanza handlers..'); await _runOutgoingPostStanzaHandlers( - stanza_, + newStanza, initial: StanzaHandlerData( false, false, null, - stanza_, + newStanza, ), ); _log.fine('Done'); - - return future; } /// Called when we timeout during connecting @@ -562,18 +561,11 @@ class XmppConnection { // Set the new routing state _updateRoutingState(RoutingState.handleStanzas); - // Set the connection state - await _setConnectionState(XmppConnectionState.connected); - // Enable reconnections if (_enableReconnectOnSuccess) { await _reconnectionPolicy.setShouldReconnect(true); } - // Resolve the connection completion future - _connectionCompleter?.complete(const Result(true)); - _connectionCompleter = null; - // Tell consumers of the event stream that we're done with stream feature // negotiations await _sendEvent( @@ -582,6 +574,16 @@ class XmppConnection { false, ), ); + + // Set the connection state + await _setConnectionState(XmppConnectionState.connected); + + // Resolve the connection completion future + _connectionCompleter?.complete(const Result(true)); + _connectionCompleter = null; + + // Flush the stanza send queue + await _stanzaQueue.restart(); } /// Sets the connection state to [state] and triggers an event of type diff --git a/packages/moxxmpp/lib/src/iq.dart b/packages/moxxmpp/lib/src/iq.dart index c20b6fe..9d3ab18 100644 --- a/packages/moxxmpp/lib/src/iq.dart +++ b/packages/moxxmpp/lib/src/iq.dart @@ -23,9 +23,11 @@ Future handleUnhandledStanza( ); await conn.sendStanza( - stanza, - awaitable: false, - forceEncryption: data.encrypted, + StanzaDetails( + stanza, + awaitable: false, + forceEncryption: data.encrypted, + ), ); } } diff --git a/packages/moxxmpp/lib/src/managers/attributes.dart b/packages/moxxmpp/lib/src/managers/attributes.dart index 508c7a8..4b55467 100644 --- a/packages/moxxmpp/lib/src/managers/attributes.dart +++ b/packages/moxxmpp/lib/src/managers/attributes.dart @@ -23,14 +23,7 @@ class XmppManagerAttributes { }); /// Send a stanza whose response can be awaited. - final Future Function( - Stanza stanza, { - StanzaFromType addFrom, - bool addId, - bool awaitable, - bool encrypted, - bool forceEncryption, - }) sendStanza; + final Future Function(StanzaDetails) sendStanza; /// Send a nonza. final void Function(XMLNode) sendNonza; diff --git a/packages/moxxmpp/lib/src/managers/base.dart b/packages/moxxmpp/lib/src/managers/base.dart index 351919d..dca035b 100644 --- a/packages/moxxmpp/lib/src/managers/base.dart +++ b/packages/moxxmpp/lib/src/managers/base.dart @@ -5,6 +5,7 @@ import 'package:moxxmpp/src/managers/attributes.dart'; import 'package:moxxmpp/src/managers/data.dart'; import 'package:moxxmpp/src/managers/handlers.dart'; import 'package:moxxmpp/src/managers/namespaces.dart'; +import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/xeps/xep_0030/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; @@ -165,9 +166,11 @@ abstract class XmppManagerBase { ); await getAttributes().sendStanza( - stanza, - awaitable: false, - forceEncryption: data.encrypted, + StanzaDetails( + stanza, + awaitable: false, + forceEncryption: data.encrypted, + ), ); } } diff --git a/packages/moxxmpp/lib/src/message.dart b/packages/moxxmpp/lib/src/message.dart index 075ece0..7b78b28 100644 --- a/packages/moxxmpp/lib/src/message.dart +++ b/packages/moxxmpp/lib/src/message.dart @@ -320,6 +320,11 @@ class MessageManager extends XmppManagerBase { ); } - getAttributes().sendStanza(stanza, awaitable: false); + getAttributes().sendStanza( + StanzaDetails( + stanza, + awaitable: false, + ), + ); } } diff --git a/packages/moxxmpp/lib/src/presence.dart b/packages/moxxmpp/lib/src/presence.dart index 0a73171..2abf680 100644 --- a/packages/moxxmpp/lib/src/presence.dart +++ b/packages/moxxmpp/lib/src/presence.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'package:moxxmpp/src/connection.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/managers/base.dart'; @@ -7,10 +6,8 @@ import 'package:moxxmpp/src/managers/data.dart'; import 'package:moxxmpp/src/managers/handlers.dart'; import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart'; -import 'package:moxxmpp/src/negotiators/namespaces.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; -import 'package:moxxmpp/src/xeps/xep_0198/negotiator.dart'; /// A function that will be called when presence, outside of subscription request /// management, will be sent. Useful for managers that want to add [XMLNode]s to said @@ -49,12 +46,8 @@ class PresenceManager extends XmppManagerBase { Future onXmppEvent(XmppEvent event) async { if (event is StreamNegotiationsDoneEvent) { // Send initial presence only when we have not resumed the stream - final sm = getAttributes().getNegotiatorById( - streamManagementNegotiator, - ); - final isResumed = sm?.isResumed ?? false; - if (!isResumed) { - unawaited(sendInitialPresence()); + if (!event.resumed) { + await sendInitialPresence(); } } } @@ -108,66 +101,77 @@ class PresenceManager extends XmppManagerBase { final attrs = getAttributes(); await attrs.sendStanza( - Stanza.presence( - from: attrs.getFullJID().toString(), - children: children, + StanzaDetails( + Stanza.presence( + children: children, + ), + awaitable: false, + addId: false, ), - awaitable: false, - addId: false, ); } /// Send an unavailable presence with no 'to' attribute. void sendUnavailablePresence() { getAttributes().sendStanza( - Stanza.presence( - type: 'unavailable', + StanzaDetails( + Stanza.presence( + type: 'unavailable', + ), + awaitable: false, ), - addFrom: StanzaFromType.full, ); } /// Sends a subscription request to [to]. void sendSubscriptionRequest(String to) { getAttributes().sendStanza( - Stanza.presence( - type: 'subscribe', - to: to, + StanzaDetails( + Stanza.presence( + type: 'subscribe', + to: to, + ), + awaitable: false, ), - addFrom: StanzaFromType.none, ); } /// Sends an unsubscription request to [to]. void sendUnsubscriptionRequest(String to) { getAttributes().sendStanza( - Stanza.presence( - type: 'unsubscribe', - to: to, + StanzaDetails( + Stanza.presence( + type: 'unsubscribe', + to: to, + ), + awaitable: false, ), - addFrom: StanzaFromType.none, ); } /// Accept a presence subscription request for [to]. void sendSubscriptionRequestApproval(String to) { getAttributes().sendStanza( - Stanza.presence( - type: 'subscribed', - to: to, + StanzaDetails( + Stanza.presence( + type: 'subscribed', + to: to, + ), + awaitable: false, ), - addFrom: StanzaFromType.none, ); } /// Reject a presence subscription request for [to]. void sendSubscriptionRequestRejection(String to) { getAttributes().sendStanza( - Stanza.presence( - type: 'unsubscribed', - to: to, + StanzaDetails( + Stanza.presence( + type: 'unsubscribed', + to: to, + ), + awaitable: false, ), - addFrom: StanzaFromType.none, ); } } diff --git a/packages/moxxmpp/lib/src/roster/roster.dart b/packages/moxxmpp/lib/src/roster/roster.dart index d8c3f9d..93ae392 100644 --- a/packages/moxxmpp/lib/src/roster/roster.dart +++ b/packages/moxxmpp/lib/src/roster/roster.dart @@ -235,14 +235,16 @@ class RosterManager extends XmppManagerBase { query.attributes['ver'] = rosterVersion; } - final response = await attrs.sendStanza( - Stanza.iq( - type: 'get', - children: [ - query, - ], + final response = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + children: [ + query, + ], + ), ), - ); + ))!; if (response.attributes['type'] != 'result') { logger.warning('Error requesting roster: ${response.toXml()}'); @@ -258,20 +260,22 @@ class RosterManager extends XmppManagerBase { Future> requestRosterPushes() async { final attrs = getAttributes(); - final result = await attrs.sendStanza( - Stanza.iq( - type: 'get', - children: [ - XMLNode.xmlns( - tag: 'query', - xmlns: rosterXmlns, - attributes: { - 'ver': await _stateManager.getRosterVersion() ?? '', - }, - ) - ], + final result = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + children: [ + XMLNode.xmlns( + tag: 'query', + xmlns: rosterXmlns, + attributes: { + 'ver': await _stateManager.getRosterVersion() ?? '', + }, + ) + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { logger.warning('Requesting roster pushes failed: ${result.toXml()}'); @@ -296,31 +300,33 @@ class RosterManager extends XmppManagerBase { List? groups, }) async { final attrs = getAttributes(); - final response = await attrs.sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'query', - xmlns: rosterXmlns, - children: [ - XMLNode( - tag: 'item', - attributes: { - 'jid': jid, - ...title == jid.split('@')[0] - ? {} - : {'name': title} - }, - children: (groups ?? []) - .map((group) => XMLNode(tag: 'group', text: group)) - .toList(), - ) - ], - ) - ], + final response = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'query', + xmlns: rosterXmlns, + children: [ + XMLNode( + tag: 'item', + attributes: { + 'jid': jid, + ...title == jid.split('@')[0] + ? {} + : {'name': title} + }, + children: (groups ?? []) + .map((group) => XMLNode(tag: 'group', text: group)) + .toList(), + ) + ], + ), + ], + ), ), - ); + ))!; if (response.attributes['type'] != 'result') { logger.severe('Error adding $jid to roster: $response'); @@ -334,26 +340,28 @@ class RosterManager extends XmppManagerBase { /// false otherwise. Future removeFromRoster(String jid) async { final attrs = getAttributes(); - final response = await attrs.sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'query', - xmlns: rosterXmlns, - children: [ - XMLNode( - tag: 'item', - attributes: { - 'jid': jid, - 'subscription': 'remove' - }, - ) - ], - ) - ], + final response = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'query', + xmlns: rosterXmlns, + children: [ + XMLNode( + tag: 'item', + attributes: { + 'jid': jid, + 'subscription': 'remove' + }, + ) + ], + ) + ], + ), ), - ); + ))!; if (response.attributes['type'] != 'result') { logger.severe('Failed to remove roster item: ${response.toXml()}'); diff --git a/packages/moxxmpp/lib/src/stanza.dart b/packages/moxxmpp/lib/src/stanza.dart index 47a06fe..b06267a 100644 --- a/packages/moxxmpp/lib/src/stanza.dart +++ b/packages/moxxmpp/lib/src/stanza.dart @@ -1,6 +1,30 @@ import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/stringxml.dart'; +/// A description of a stanza to send. +class StanzaDetails { + const StanzaDetails( + this.stanza, { + this.addId = true, + this.awaitable = true, + this.encrypted = false, + this.forceEncryption = false, + }); + + /// The stanza to send. + final Stanza stanza; + + /// Flag indicating whether a stanza id should be added before sending. + final bool addId; + + /// Track the stanza to allow awaiting its response. + final bool awaitable; + + final bool encrypted; + + final bool forceEncryption; +} + /// A simple description of the element that may be inside a stanza class StanzaError { StanzaError(this.type, this.error); diff --git a/packages/moxxmpp/lib/src/util/queue.dart b/packages/moxxmpp/lib/src/util/queue.dart index db0456d..b209bc4 100644 --- a/packages/moxxmpp/lib/src/util/queue.dart +++ b/packages/moxxmpp/lib/src/util/queue.dart @@ -1,37 +1,68 @@ import 'dart:async'; import 'dart:collection'; import 'package:meta/meta.dart'; +import 'package:moxxmpp/src/stanza.dart'; +import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; -/// A job to be submitted to an [AsyncQueue]. -typedef AsyncQueueJob = Future Function(); +class StanzaQueueEntry { + const StanzaQueueEntry( + this.details, + this.completer, + ); + + /// The actual data to send. + final StanzaDetails details; + + /// The [Completer] to resolve when the response is received. + final Completer? completer; +} + +/// A function that is executed when a job is popped from the queue. +typedef SendStanzaFunction = Future Function(StanzaQueueEntry); + +/// A function that is called before popping a queue item. Should return true when +/// the [SendStanzaFunction] can be executed. +typedef CanSendCallback = Future Function(); /// A (hopefully) async-safe queue that attempts to force /// in-order execution of its jobs. -class AsyncQueue { - /// The lock for accessing [AsyncQueue._lock] and [AsyncQueue._running]. +class AsyncStanzaQueue { + AsyncStanzaQueue( + this._sendStanzaFunction, + this._canSendCallback, + ); + + /// The lock for accessing [AsyncStanzaQueue._lock] and [AsyncStanzaQueue._running]. final Lock _lock = Lock(); /// The actual job queue. - final Queue _queue = Queue(); + final Queue _queue = Queue(); + + /// Sends the stanza when we can pop from the queue. + final SendStanzaFunction _sendStanzaFunction; + + final CanSendCallback _canSendCallback; /// Indicates whether we are currently executing a job. bool _running = false; @visibleForTesting - Queue get queue => _queue; + Queue get queue => _queue; @visibleForTesting bool get isRunning => _running; - /// Adds a job [job] to the queue. - Future addJob(AsyncQueueJob job) async { - await _lock.synchronized(() { - _queue.add(job); + /// Adds a job [entry] to the queue. + Future enqueueStanza(StanzaQueueEntry entry) async { + await _lock.synchronized(() async { + _queue.add(entry); - if (!_running && _queue.isNotEmpty) { + if (!_running && _queue.isNotEmpty && await _canSendCallback()) { _running = true; - unawaited(_popJob()); + unawaited( + _runJob(_queue.removeFirst()), + ); } }); } @@ -40,17 +71,30 @@ class AsyncQueue { await _lock.synchronized(_queue.clear); } - Future _popJob() async { - final job = _queue.removeFirst(); - final future = job(); - await future; + Future _runJob(StanzaQueueEntry details) async { + await _sendStanzaFunction(details); - await _lock.synchronized(() { - if (_queue.isNotEmpty) { - unawaited(_popJob()); + await _lock.synchronized(() async { + if (_queue.isNotEmpty && await _canSendCallback()) { + unawaited( + _runJob(_queue.removeFirst()), + ); } else { _running = false; } }); } + + Future restart() async { + if (!(await _canSendCallback())) return; + + await _lock.synchronized(() { + if (_queue.isNotEmpty) { + _running = true; + unawaited( + _runJob(_queue.removeFirst()), + ); + } + }); + } } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0030/xep_0030.dart b/packages/moxxmpp/lib/src/xeps/xep_0030/xep_0030.dart index 8558da3..263cdce 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0030/xep_0030.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0030/xep_0030.dart @@ -291,10 +291,12 @@ class DiscoManager extends XmppManagerBase { } } - final stanza = await getAttributes().sendStanza( - buildDiscoInfoQueryStanza(entity, node), - encrypted: !shouldEncrypt, - ); + final stanza = (await getAttributes().sendStanza( + StanzaDetails( + buildDiscoInfoQueryStanza(entity, node), + encrypted: !shouldEncrypt, + ), + ))!; final query = stanza.firstTag('query'); if (query == null) { final result = Result(InvalidResponseDiscoError()); @@ -331,10 +333,12 @@ class DiscoManager extends XmppManagerBase { return future; } - final stanza = await getAttributes().sendStanza( - buildDiscoItemsQueryStanza(entity, node: node), - encrypted: !shouldEncrypt, - ) as Stanza; + final stanza = (await getAttributes().sendStanza( + StanzaDetails( + buildDiscoItemsQueryStanza(entity, node: node), + encrypted: !shouldEncrypt, + ), + ))!; final query = stanza.firstTag('query'); if (query == null) { @@ -344,7 +348,7 @@ class DiscoManager extends XmppManagerBase { return result; } - if (stanza.type == 'error') { + if (stanza.attributes['type'] == 'error') { //final error = stanza.firstTag('error'); //print("Disco Items error: " + error.toXml()); final result = diff --git a/packages/moxxmpp/lib/src/xeps/xep_0054.dart b/packages/moxxmpp/lib/src/xeps/xep_0054.dart index c57c9bf..0245044 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0054.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0054.dart @@ -103,19 +103,21 @@ class VCardManager extends XmppManagerBase { } Future> requestVCard(String jid) async { - final result = await getAttributes().sendStanza( - Stanza.iq( - to: jid, - type: 'get', - children: [ - XMLNode.xmlns( - tag: 'vCard', - xmlns: vCardTempXmlns, - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + to: jid, + type: 'get', + children: [ + XMLNode.xmlns( + tag: 'vCard', + xmlns: vCardTempXmlns, + ) + ], + ), + encrypted: true, ), - encrypted: true, - ); + ))!; if (result.attributes['type'] != 'result') { return Result(UnknownVCardError()); diff --git a/packages/moxxmpp/lib/src/xeps/xep_0060/xep_0060.dart b/packages/moxxmpp/lib/src/xeps/xep_0060/xep_0060.dart index bc66404..bc3a32c 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0060/xep_0060.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0060/xep_0060.dart @@ -181,27 +181,29 @@ class PubSubManager extends XmppManagerBase { Future> subscribe(String jid, String node) async { final attrs = getAttributes(); - final result = await attrs.sendStanza( - Stanza.iq( - type: 'set', - to: jid, - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode( - tag: 'subscribe', - attributes: { - 'node': node, - 'jid': attrs.getFullJID().toBare().toString(), - }, - ), - ], - ), - ], + final result = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: jid, + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ + XMLNode( + tag: 'subscribe', + attributes: { + 'node': node, + 'jid': attrs.getFullJID().toBare().toString(), + }, + ), + ], + ), + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { return Result(UnknownPubSubError()); @@ -222,27 +224,29 @@ class PubSubManager extends XmppManagerBase { Future> unsubscribe(String jid, String node) async { final attrs = getAttributes(); - final result = await attrs.sendStanza( - Stanza.iq( - type: 'set', - to: jid, - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode( - tag: 'unsubscribe', - attributes: { - 'node': node, - 'jid': attrs.getFullJID().toBare().toString(), - }, - ), - ], - ), - ], + final result = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: jid, + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ + XMLNode( + tag: 'unsubscribe', + attributes: { + 'node': node, + 'jid': attrs.getFullJID().toBare().toString(), + }, + ), + ], + ), + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { return Result(UnknownPubSubError()); @@ -293,38 +297,40 @@ class PubSubManager extends XmppManagerBase { pubOptions = await preprocessPublishOptions(jid, node, options); } - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - to: jid.toString(), - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode( - tag: 'publish', - attributes: {'node': node}, - children: [ - XMLNode( - tag: 'item', - attributes: id != null - ? {'id': id} - : {}, - children: [payload], - ) - ], - ), - if (pubOptions != null) + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: jid.toString(), + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ XMLNode( - tag: 'publish-options', - children: [pubOptions.toXml()], + tag: 'publish', + attributes: {'node': node}, + children: [ + XMLNode( + tag: 'item', + attributes: id != null + ? {'id': id} + : {}, + children: [payload], + ) + ], ), - ], - ) - ], + if (pubOptions != null) + XMLNode( + tag: 'publish-options', + children: [pubOptions.toXml()], + ), + ], + ) + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { final error = getPubSubError(result); @@ -395,21 +401,26 @@ class PubSubManager extends XmppManagerBase { String jid, String node, ) async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'get', - to: jid, - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode(tag: 'items', attributes: {'node': node}), - ], - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + to: jid, + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ + XMLNode( + tag: 'items', + attributes: {'node': node}, + ), + ], + ) + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { return Result(getPubSubError(result)); @@ -436,30 +447,32 @@ class PubSubManager extends XmppManagerBase { String node, String id, ) async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'get', - to: jid, - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode( - tag: 'items', - attributes: {'node': node}, - children: [ - XMLNode( - tag: 'item', - attributes: {'id': id}, - ), - ], - ), - ], - ), - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + to: jid, + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ + XMLNode( + tag: 'items', + attributes: {'node': node}, + children: [ + XMLNode( + tag: 'item', + attributes: {'id': id}, + ), + ], + ), + ], + ), + ], + ), ), - ); + ))!; if (result.attributes['type'] != 'result') { return Result(getPubSubError(result)); @@ -488,53 +501,57 @@ class PubSubManager extends XmppManagerBase { final attrs = getAttributes(); // Request the form - final form = await attrs.sendStanza( - Stanza.iq( - type: 'get', - to: jid.toString(), - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubOwnerXmlns, - children: [ - XMLNode( - tag: 'configure', - attributes: { - 'node': node, - }, - ), - ], - ), - ], + final form = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + to: jid.toString(), + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubOwnerXmlns, + children: [ + XMLNode( + tag: 'configure', + attributes: { + 'node': node, + }, + ), + ], + ), + ], + ), ), - ); + ))!; if (form.attributes['type'] != 'result') { return Result(getPubSubError(form)); } - final submit = await attrs.sendStanza( - Stanza.iq( - type: 'set', - to: jid.toString(), - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubOwnerXmlns, - children: [ - XMLNode( - tag: 'configure', - attributes: { - 'node': node, - }, - children: [ - options.toXml(), - ], - ), - ], - ), - ], + final submit = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: jid.toString(), + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubOwnerXmlns, + children: [ + XMLNode( + tag: 'configure', + attributes: { + 'node': node, + }, + children: [ + options.toXml(), + ], + ), + ], + ), + ], + ), ), - ); + ))!; if (submit.attributes['type'] != 'result') { return Result(getPubSubError(form)); } @@ -543,28 +560,30 @@ class PubSubManager extends XmppManagerBase { } Future> delete(JID host, String node) async { - final request = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - to: host.toString(), - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubOwnerXmlns, - children: [ - XMLNode( - tag: 'delete', - attributes: { - 'node': node, - }, - ), - ], - ), - ], + final request = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: host.toString(), + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubOwnerXmlns, + children: [ + XMLNode( + tag: 'delete', + attributes: { + 'node': node, + }, + ), + ], + ), + ], + ), ), - ) as Stanza; + ))!; - if (request.type != 'result') { + if (request.attributes['type'] != 'result') { // TODO(Unknown): Be more specific return Result(UnknownPubSubError()); } @@ -577,36 +596,38 @@ class PubSubManager extends XmppManagerBase { String node, String itemId, ) async { - final request = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - to: host.toString(), - children: [ - XMLNode.xmlns( - tag: 'pubsub', - xmlns: pubsubXmlns, - children: [ - XMLNode( - tag: 'retract', - attributes: { - 'node': node, - }, - children: [ - XMLNode( - tag: 'item', - attributes: { - 'id': itemId, - }, - ), - ], - ), - ], - ), - ], + final request = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + to: host.toString(), + children: [ + XMLNode.xmlns( + tag: 'pubsub', + xmlns: pubsubXmlns, + children: [ + XMLNode( + tag: 'retract', + attributes: { + 'node': node, + }, + children: [ + XMLNode( + tag: 'item', + attributes: { + 'id': itemId, + }, + ), + ], + ), + ], + ), + ], + ), ), - ) as Stanza; + ))!; - if (request.type != 'result') { + if (request.attributes['type'] != 'result') { // TODO(Unknown): Be more specific return Result(UnknownPubSubError()); } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0085.dart b/packages/moxxmpp/lib/src/xeps/xep_0085.dart index b3d8ea3..ab28ab1 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0085.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0085.dart @@ -111,10 +111,14 @@ class ChatStateManager extends XmppManagerBase { final tagName = state.toString().split('.').last; getAttributes().sendStanza( - Stanza.message( - to: to, - type: messageType, - children: [XMLNode.xmlns(tag: tagName, xmlns: chatStateXmlns)], + StanzaDetails( + Stanza.message( + to: to, + type: messageType, + children: [ + XMLNode.xmlns(tag: tagName, xmlns: chatStateXmlns), + ], + ), ), ); } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0191.dart b/packages/moxxmpp/lib/src/xeps/xep_0191.dart index 3c310b7..c443d0d 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0191.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0191.dart @@ -96,39 +96,43 @@ class BlockingManager extends XmppManagerBase { } Future block(List items) async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'block', - xmlns: blockingXmlns, - children: items.map((item) { - return XMLNode( - tag: 'item', - attributes: {'jid': item}, - ); - }).toList(), - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'block', + xmlns: blockingXmlns, + children: items.map((item) { + return XMLNode( + tag: 'item', + attributes: {'jid': item}, + ); + }).toList(), + ) + ], + ), ), - ); + ))!; return result.attributes['type'] == 'result'; } Future unblockAll() async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'unblock', - xmlns: blockingXmlns, - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'unblock', + xmlns: blockingXmlns, + ) + ], + ), ), - ); + ))!; return result.attributes['type'] == 'result'; } @@ -136,41 +140,45 @@ class BlockingManager extends XmppManagerBase { Future unblock(List items) async { assert(items.isNotEmpty, 'The list of items to unblock must be non-empty'); - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'unblock', - xmlns: blockingXmlns, - children: items - .map( - (item) => XMLNode( - tag: 'item', - attributes: {'jid': item}, - ), - ) - .toList(), - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'unblock', + xmlns: blockingXmlns, + children: items + .map( + (item) => XMLNode( + tag: 'item', + attributes: {'jid': item}, + ), + ) + .toList(), + ) + ], + ), ), - ); + ))!; return result.attributes['type'] == 'result'; } Future> getBlocklist() async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'get', - children: [ - XMLNode.xmlns( - tag: 'blocklist', - xmlns: blockingXmlns, - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'get', + children: [ + XMLNode.xmlns( + tag: 'blocklist', + xmlns: blockingXmlns, + ) + ], + ), ), - ); + ))!; final blocklist = result.firstTag('blocklist', xmlns: blockingXmlns)!; return blocklist diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart index 38a566b..fd3a741 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart @@ -414,7 +414,12 @@ class StreamManagementManager extends XmppManagerBase { _unackedStanzas.clear(); for (final stanza in stanzas) { - await getAttributes().sendStanza(stanza, awaitable: false); + await getAttributes().sendStanza( + StanzaDetails( + stanza, + awaitable: false, + ), + ); } } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0280.dart b/packages/moxxmpp/lib/src/xeps/xep_0280.dart index f767f8b..d24e8ca 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0280.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0280.dart @@ -1,6 +1,5 @@ import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/connection.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/managers/base.dart'; @@ -111,20 +110,20 @@ class CarbonsManager extends XmppManagerBase { /// Returns true if carbons were enabled. False, if not. Future enableCarbons() async { final attrs = getAttributes(); - final result = await attrs.sendStanza( - Stanza.iq( - to: attrs.getFullJID().toBare().toString(), - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'enable', - xmlns: carbonsXmlns, - ) - ], + final result = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + to: attrs.getFullJID().toBare().toString(), + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'enable', + xmlns: carbonsXmlns, + ) + ], + ), ), - addFrom: StanzaFromType.full, - addId: true, - ); + ))!; if (result.attributes['type'] != 'result') { logger.warning('Failed to enable message carbons'); @@ -142,19 +141,19 @@ class CarbonsManager extends XmppManagerBase { /// /// Returns true if carbons were disabled. False, if not. Future disableCarbons() async { - final result = await getAttributes().sendStanza( - Stanza.iq( - type: 'set', - children: [ - XMLNode.xmlns( - tag: 'disable', - xmlns: carbonsXmlns, - ) - ], + final result = (await getAttributes().sendStanza( + StanzaDetails( + Stanza.iq( + type: 'set', + children: [ + XMLNode.xmlns( + tag: 'disable', + xmlns: carbonsXmlns, + ) + ], + ), ), - addFrom: StanzaFromType.full, - addId: true, - ); + ))!; if (result.attributes['type'] != 'result') { logger.warning('Failed to disable message carbons'); diff --git a/packages/moxxmpp/lib/src/xeps/xep_0363/xep_0363.dart b/packages/moxxmpp/lib/src/xeps/xep_0363/xep_0363.dart index 541b924..b63ffab 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0363/xep_0363.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0363/xep_0363.dart @@ -149,23 +149,25 @@ class HttpFileUploadManager extends XmppManagerBase { } final attrs = getAttributes(); - final response = await attrs.sendStanza( - Stanza.iq( - to: _entityJid.toString(), - type: 'get', - children: [ - XMLNode.xmlns( - tag: 'request', - xmlns: httpFileUploadXmlns, - attributes: { - 'filename': filename, - 'size': filesize.toString(), - ...contentType != null ? {'content-type': contentType} : {} - }, - ) - ], + final response = (await attrs.sendStanza( + StanzaDetails( + Stanza.iq( + to: _entityJid.toString(), + type: 'get', + children: [ + XMLNode.xmlns( + tag: 'request', + xmlns: httpFileUploadXmlns, + attributes: { + 'filename': filename, + 'size': filesize.toString(), + ...contentType != null ? {'content-type': contentType} : {} + }, + ) + ], + ), ), - ); + ))!; if (response.attributes['type']! != 'result') { logger.severe('Failed to request HTTP File Upload slot.'); diff --git a/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart b/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart index 658b69d..a7fdf54 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0384/xep_0384.dart @@ -262,24 +262,26 @@ abstract class BaseOmemoManager extends XmppManagerBase { String toJid, ) async { await getAttributes().sendStanza( - Stanza.message( - to: toJid, - type: 'chat', - children: [ - _buildEncryptedElement( - result, - toJid, - await _getDeviceId(), - ), + StanzaDetails( + Stanza.message( + to: toJid, + type: 'chat', + children: [ + _buildEncryptedElement( + result, + toJid, + await _getDeviceId(), + ), - // Add a storage hint in case this is a message - // Taken from the example at - // https://xmpp.org/extensions/xep-0384.html#message-structure-description. - MessageProcessingHint.store.toXml(), - ], + // Add a storage hint in case this is a message + // Taken from the example at + // https://xmpp.org/extensions/xep-0384.html#message-structure-description. + MessageProcessingHint.store.toXml(), + ], + ), + awaitable: false, + encrypted: true, ), - awaitable: false, - encrypted: true, ); } diff --git a/packages/moxxmpp/test/async_queue_test.dart b/packages/moxxmpp/test/async_queue_test.dart index 9fc645a..301172b 100644 --- a/packages/moxxmpp/test/async_queue_test.dart +++ b/packages/moxxmpp/test/async_queue_test.dart @@ -1,58 +1,100 @@ +import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/src/util/queue.dart'; import 'package:test/test.dart'; void main() { - test('Test the async queue', () async { - final queue = AsyncQueue(); - var future1Finish = 0; - var future2Finish = 0; - var future3Finish = 0; + test('Test not sending', () async { + final queue = AsyncStanzaQueue( + (entry) async { + assert(false, 'No stanza should be sent'); + }, + () async => false, + ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future1Finish = DateTime.now().millisecondsSinceEpoch, + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, ), ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future2Finish = DateTime.now().millisecondsSinceEpoch, - ), - ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future3Finish = DateTime.now().millisecondsSinceEpoch, + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, ), ); - await Future.delayed(const Duration(seconds: 12)); + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 2); + expect(queue.isRunning, false); + }); - // The three futures must be done - expect(future1Finish != 0, true); - expect(future2Finish != 0, true); - expect(future3Finish != 0, true); - - // The end times of the futures must be ordered (on a timeline) - // |-- future1Finish -- future2Finish -- future3Finish --| - expect( - future1Finish < future2Finish && future1Finish < future3Finish, - true, - ); - expect( - future2Finish < future3Finish && future2Finish > future1Finish, - true, - ); - expect( - future3Finish > future1Finish && future3Finish > future2Finish, - true, + test('Test sending', () async { + final queue = AsyncStanzaQueue( + (entry) async {}, + () async => true, ); - // The queue must be empty at the end - expect(queue.queue.isEmpty, true); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); - // The queue must not be executing anything at the end + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 0); + expect(queue.isRunning, false); + }); + + test('Test partial sending and resuming', () async { + var canRun = true; + final queue = AsyncStanzaQueue( + (entry) async { + canRun = false; + }, + () async => canRun, + ); + + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 1); + expect(queue.isRunning, false); + + canRun = true; + await queue.restart(); + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 0); expect(queue.isRunning, false); }); } diff --git a/packages/moxxmpp/test/helpers/manager.dart b/packages/moxxmpp/test/helpers/manager.dart index 6813271..af64e56 100644 --- a/packages/moxxmpp/test/helpers/manager.dart +++ b/packages/moxxmpp/test/helpers/manager.dart @@ -33,7 +33,6 @@ class TestingManagerHolder { Future _sendStanza( stanza, { - StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool awaitable = true, bool encrypted = false, diff --git a/packages/moxxmpp/test/helpers/xmpp.dart b/packages/moxxmpp/test/helpers/xmpp.dart index b6686d1..4ad38fd 100644 --- a/packages/moxxmpp/test/helpers/xmpp.dart +++ b/packages/moxxmpp/test/helpers/xmpp.dart @@ -98,7 +98,7 @@ List buildAuthenticatedPlay(ConnectionSettings settings) { ignoreId: true, ), StanzaExpectation( - "chat", + "chat", '', ), ]; diff --git a/packages/moxxmpp/test/xeps/xep_0030_test.dart b/packages/moxxmpp/test/xeps/xep_0030_test.dart index f38255f..134d9e6 100644 --- a/packages/moxxmpp/test/xeps/xep_0030_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0030_test.dart @@ -58,11 +58,11 @@ void main() { ignoreId: true, ), StanzaExpectation( - "chat", + "chat", '', ), StanzaExpectation( - "", + "", '', ignoreId: true, ), diff --git a/packages/moxxmpp/test/xeps/xep_0060_test.dart b/packages/moxxmpp/test/xeps/xep_0060_test.dart index e559dbc..0f1795b 100644 --- a/packages/moxxmpp/test/xeps/xep_0060_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0060_test.dart @@ -92,7 +92,7 @@ void main() { [ StanzaExpectation( ''' - + ''', @@ -110,7 +110,7 @@ void main() { ), StanzaExpectation( ''' - + ''', @@ -124,7 +124,7 @@ void main() { ), StanzaExpectation( ''' - + diff --git a/packages/moxxmpp/test/xeps/xep_0198_test.dart b/packages/moxxmpp/test/xeps/xep_0198_test.dart index d1f1058..1ba8b78 100644 --- a/packages/moxxmpp/test/xeps/xep_0198_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0198_test.dart @@ -44,15 +44,8 @@ Future runOutgoingStanzaHandlers( XmppManagerAttributes mkAttributes(void Function(Stanza) callback) { return XmppManagerAttributes( - sendStanza: ( - stanza, { - StanzaFromType addFrom = StanzaFromType.full, - bool addId = true, - bool awaitable = true, - bool encrypted = false, - bool forceEncryption = false, - }) async { - callback(stanza); + sendStanza: (StanzaDetails details) async { + callback(details.stanza); return Stanza.message(); }, @@ -290,12 +283,8 @@ void main() { ); final sm = StreamManagementManager(); await conn.registerManagers([ - PresenceManager(), - RosterManager(TestingRosterStateManager('', [])), - DiscoManager([]), sm, CarbonsManager()..forceEnable(), - EntityCapabilitiesManager('http://moxxmpp.example'), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), @@ -391,7 +380,7 @@ void main() { '', ), StanzaExpectation( - "chat", + "chat", '', ), StringExpectation( @@ -681,7 +670,7 @@ void main() { "", ), StanzaExpectation( - "", + "", '', ignoreId: true, ), @@ -734,7 +723,7 @@ void main() { "", ), StanzaExpectation( - "", + "", '', ignoreId: true, ), @@ -776,7 +765,11 @@ void main() { // Send a bogus stanza unawaited( - conn.sendStanza(Stanza.iq(to: 'localhost', type: 'get')), + conn.sendStanza( + StanzaDetails( + Stanza.iq(to: 'localhost', type: 'get'), + ), + ), ); await Future.delayed(const Duration(seconds: 5)); diff --git a/packages/moxxmpp/test/xeps/xep_0280_test.dart b/packages/moxxmpp/test/xeps/xep_0280_test.dart index 464c1f9..c663d27 100644 --- a/packages/moxxmpp/test/xeps/xep_0280_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0280_test.dart @@ -9,17 +9,9 @@ void main() { test("Test if we're vulnerable against CVE-2020-26547 style vulnerabilities", () async { final attributes = XmppManagerAttributes( - sendStanza: ( - stanza, { - StanzaFromType addFrom = StanzaFromType.full, - bool addId = true, - bool retransmitted = false, - bool awaitable = true, - bool encrypted = false, - bool forceEncryption = false, - }) async { + sendStanza: (StanzaDetails details) async { // ignore: avoid_print - print('==> ${stanza.toXml()}'); + print('==> ${details.stanza.toXml()}'); return XMLNode(tag: 'iq', attributes: {'type': 'result'}); }, sendNonza: (nonza) {}, diff --git a/packages/moxxmpp/test/xeps/xep_0352_test.dart b/packages/moxxmpp/test/xeps/xep_0352_test.dart index fb252b4..a823faa 100644 --- a/packages/moxxmpp/test/xeps/xep_0352_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0352_test.dart @@ -39,7 +39,6 @@ void main() { XmppManagerAttributes( sendStanza: ( _, { - StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool retransmitted = false, bool awaitable = true, @@ -78,7 +77,6 @@ void main() { XmppManagerAttributes( sendStanza: ( _, { - StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool retransmitted = false, bool awaitable = true, diff --git a/packages/moxxmpp/test/xmpp_test.dart b/packages/moxxmpp/test/xmpp_test.dart index 6a438e2..50675f4 100644 --- a/packages/moxxmpp/test/xmpp_test.dart +++ b/packages/moxxmpp/test/xmpp_test.dart @@ -4,6 +4,32 @@ import 'package:test/test.dart'; import 'helpers/logging.dart'; import 'helpers/xmpp.dart'; +class StubConnectivityManager extends ConnectivityManager { + bool _hasConnection = true; + + Completer _goingOnlineCompleter = Completer(); + + @override + Future hasConnection() async => _hasConnection; + + @override + Future waitForConnection() async { + if (!_hasConnection) { + await _goingOnlineCompleter.future; + } + } + + void goOffline() { + _hasConnection = false; + } + + void goOnline() { + _hasConnection = true; + _goingOnlineCompleter.complete(); + _goingOnlineCompleter = Completer(); + } +} + /// Returns true if the roster manager triggeres an event for a given stanza Future testRosterManager( String bareJid, @@ -16,7 +42,6 @@ Future testRosterManager( XmppManagerAttributes( sendStanza: ( _, { - StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool retransmitted = false, bool awaitable = true, @@ -131,11 +156,7 @@ void main() { password: 'aaaa', ); await conn.registerManagers([ - PresenceManager(), - RosterManager(TestingRosterStateManager('', [])), - DiscoManager([]), StreamManagementManager(), - EntityCapabilitiesManager('http://moxxmpp.example'), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), @@ -271,7 +292,6 @@ void main() { XmppManagerAttributes( sendStanza: ( _, { - StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool retransmitted = false, bool awaitable = true, @@ -625,4 +645,152 @@ void main() { true, ); }); + + test('Test sending stanzas while offline', () async { + final fakeSocket = StubTCPSocket( + [ + StringExpectation( + "", + ''' + + + + PLAIN + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StringExpectation( + "", + ''' + + + + PLAIN + + ''', + ), + StringExpectation( + "AHBvbHlub21kaXZpc2lvbgBhYWFh", + '', + ), + StringExpectation( + "", + ''' + + + + + + + + + + + +''', + ), + StanzaExpectation( + '', + 'polynomdivision@test.server/MU29eEZn', + ignoreId: true, + ), + StanzaExpectation( + '', + '', + ignoreId: true, + ), + ], + ); + final connectivity = StubConnectivityManager(); + final conn = XmppConnection( + TestingReconnectionPolicy(), + connectivity, + ClientToServerNegotiator(), + fakeSocket, + )..connectionSettings = ConnectionSettings( + jid: JID.fromString('polynomdivision@test.server'), + password: 'aaaa', + ); + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + SaslScramNegotiator(10, '', '', ScramHashType.sha512), + ResourceBindingNegotiator(), + ]); + + await conn.connect( + waitUntilLogin: true, + ); + expect(fakeSocket.getState(), 4); + + // Fake going offline + connectivity.goOffline(); + await conn.handleSocketEvent( + XmppSocketClosureEvent(false), + ); + + // Send a stanza while offline + final stanzaFuture = conn.sendStanza( + StanzaDetails( + Stanza.iq( + id: 'abc123', + type: 'get', + ), + ), + ); + + // Come online again + connectivity.goOnline(); + await conn.connect( + waitUntilLogin: true, + ); + await Future.delayed(const Duration(seconds: 6)); + + expect(fakeSocket.getState(), 9); + expect(await stanzaFuture != null, true); + }); }