diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index d880a54..3009050 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -26,6 +26,7 @@ import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/types/result.dart'; +import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart'; @@ -92,6 +93,11 @@ class XmppConnection { // TODO(Unknown): Handle on done _socketStream.transform(_streamParser).forEach(handleXmlStream); _socket.getEventStream().listen(_handleSocketEvent); + + _stanzaQueue = AsyncStanzaQueue( + _sendStanzaImpl, + _canSendData, + ); } /// The state that the connection is currently in @@ -175,6 +181,8 @@ class XmppConnection { bool get isAuthenticated => _isAuthenticated; + late final AsyncStanzaQueue _stanzaQueue; + /// Returns the JID we authenticate with and add the resource that we have bound. JID _getJidWithResource() { assert(_resource.isNotEmpty, 'The resource must not be empty'); @@ -412,6 +420,136 @@ class XmppConnection { .contains(await getConnectionState()); } + Future?> sendStanza2(StanzaDetails details) async { + assert( + implies( + details.awaitable, + details.stanza.id != null && details.stanza.id!.isNotEmpty || + details.addId), + 'An awaitable stanza must have an id', + ); + + final completer = details.awaitable ? Completer() : null; + await _stanzaQueue.enqueueStanza( + StanzaQueueEntry( + details, + completer, + ), + ); + + return completer?.future; + } + + Future _sendStanzaImpl(StanzaQueueEntry entry) async { + final details = entry.details; + var newStanza = details.stanza; + + // Generate an id, if requested + if (details.addId && (newStanza.id == null || newStanza.id == '')) { + newStanza = newStanza.copyWith(id: generateId()); + } + + // Add a from type, if requested + if (details.addFrom != StanzaFromType.none && + (newStanza.from == null || newStanza.from == '')) { + switch (details.addFrom) { + case StanzaFromType.full: + newStanza = newStanza.copyWith( + from: _getJidWithResource().toString(), + ); + break; + case StanzaFromType.bare: + newStanza = newStanza.copyWith( + from: connectionSettings.jid.toBare().toString(), + ); + break; + case StanzaFromType.none: + // NOOP + break; + } + } + + // Add the correct stanza namespace + newStanza = newStanza.copyWith( + xmlns: _negotiationsHandler.getStanzaNamespace(), + ); + + // Run pre-send handlers + _log.fine('Running pre stanza handlers..'); + final data = await _runOutgoingPreStanzaHandlers( + newStanza, + initial: StanzaHandlerData( + false, + false, + null, + newStanza, + encrypted: details.encrypted, + forceEncryption: details.forceEncryption, + ), + ); + _log.fine('Done'); + + // Cancel sending, if the pre-send handlers indicated it. + if (data.cancel) { + _log.fine('A stanza handler indicated that it wants to cancel sending.'); + await _sendEvent(StanzaSendingCancelledEvent(data)); + + // Resolve the future, if one was given. + if (details.awaitable) { + entry.completer!.complete( + Stanza( + tag: data.stanza.tag, + to: data.stanza.from, + from: data.stanza.to, + attributes: { + 'type': 'error', + if (data.stanza.id != null) 'id': data.stanza.id!, + }, + ), + ); + } + return; + } + + // Log the (raw) stanza + final prefix = data.encrypted ? '(Encrypted) ' : ''; + _log.finest('==> $prefix${newStanza.toXml()}'); + + if (details.awaitable) { + await _stanzaAwaiter + .addPending( + // A stanza with no to attribute is for direct processing by the server. As such, + // we can correlate it by just *assuming* we have that attribute + // (RFC 6120 Section 8.1.1.1) + data.stanza.to ?? connectionSettings.jid.toBare().toString(), + data.stanza.id!, + data.stanza.tag, + ) + .then((result) { + entry.completer!.complete(result); + }); + } + + if (await _canSendData()) { + _socket.write(data.stanza.toXml()); + } else { + _log.fine('Not sending dat as _canSendData() returned false.'); + } + + // Run post-send handlers + _log.fine('Running post stanza handlers..'); + await _runOutgoingPostStanzaHandlers( + newStanza, + initial: StanzaHandlerData( + false, + false, + null, + newStanza, + ), + ); + _log.fine('Done'); + } + /// Sends a [stanza] to the server. If stream management is enabled, then keeping track /// of the stanza is taken care of. Returns a Future that resolves when we receive a /// response to the stanza. @@ -487,11 +625,7 @@ class XmppConnection { from: data.stanza.to, attributes: { 'type': 'error', - ...data.stanza.id != null - ? { - 'id': data.stanza.id!, - } - : {}, + if (data.stanza.id != null) 'id': data.stanza.id!, }, ); } diff --git a/packages/moxxmpp/lib/src/util/queue.dart b/packages/moxxmpp/lib/src/util/queue.dart index db0456d..b483ae9 100644 --- a/packages/moxxmpp/lib/src/util/queue.dart +++ b/packages/moxxmpp/lib/src/util/queue.dart @@ -1,37 +1,96 @@ import 'dart:async'; import 'dart:collection'; import 'package:meta/meta.dart'; +import 'package:moxxmpp/src/connection.dart'; +import 'package:moxxmpp/src/stanza.dart'; +import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; -/// A job to be submitted to an [AsyncQueue]. -typedef AsyncQueueJob = Future Function(); +class StanzaDetails { + const StanzaDetails( + this.stanza, { + this.addFrom = StanzaFromType.full, + this.addId = true, + this.awaitable = true, + this.encrypted = false, + this.forceEncryption = false, + }); + + /// The stanza to send. + final Stanza stanza; + + /// How to set the "from" attribute of the stanza. + final StanzaFromType addFrom; + + /// Flag indicating whether a stanza id should be added before sending. + final bool addId; + + /// Track the stanza to allow awaiting its response. + final bool awaitable; + + final bool encrypted; + + final bool forceEncryption; +} + +class StanzaQueueEntry { + const StanzaQueueEntry( + this.details, + this.completer, + ); + + /// The actual data to send. + final StanzaDetails details; + + /// The [Completer] to resolve when the response is received. + final Completer? completer; +} + +/// A function that is executed when a job is popped from the queue. +typedef SendStanzaFunction = Future Function(StanzaQueueEntry); + +/// A function that is called before popping a queue item. Should return true when +/// the [SendStanzaFunction] can be executed. +typedef CanSendCallback = Future Function(); /// A (hopefully) async-safe queue that attempts to force /// in-order execution of its jobs. -class AsyncQueue { - /// The lock for accessing [AsyncQueue._lock] and [AsyncQueue._running]. +class AsyncStanzaQueue { + AsyncStanzaQueue( + this._sendStanzaFunction, + this._canSendCallback, + ); + + /// The lock for accessing [AsyncStanzaQueue._lock] and [AsyncStanzaQueue._running]. final Lock _lock = Lock(); /// The actual job queue. - final Queue _queue = Queue(); + final Queue _queue = Queue(); + + /// Sends the stanza when we can pop from the queue. + final SendStanzaFunction _sendStanzaFunction; + + final CanSendCallback _canSendCallback; /// Indicates whether we are currently executing a job. bool _running = false; @visibleForTesting - Queue get queue => _queue; + Queue get queue => _queue; @visibleForTesting bool get isRunning => _running; - /// Adds a job [job] to the queue. - Future addJob(AsyncQueueJob job) async { - await _lock.synchronized(() { - _queue.add(job); + /// Adds a job [entry] to the queue. + Future enqueueStanza(StanzaQueueEntry entry) async { + await _lock.synchronized(() async { + _queue.add(entry); - if (!_running && _queue.isNotEmpty) { + if (!_running && _queue.isNotEmpty && await _canSendCallback()) { _running = true; - unawaited(_popJob()); + unawaited( + _runJob(_queue.removeFirst()), + ); } }); } @@ -40,17 +99,30 @@ class AsyncQueue { await _lock.synchronized(_queue.clear); } - Future _popJob() async { - final job = _queue.removeFirst(); - final future = job(); - await future; + Future _runJob(StanzaQueueEntry details) async { + await _sendStanzaFunction(details); - await _lock.synchronized(() { - if (_queue.isNotEmpty) { - unawaited(_popJob()); + await _lock.synchronized(() async { + if (_queue.isNotEmpty && await _canSendCallback()) { + unawaited( + _runJob(_queue.removeFirst()), + ); } else { _running = false; } }); } + + Future restart() async { + if (!(await _canSendCallback())) return; + + await _lock.synchronized(() { + if (_queue.isNotEmpty) { + _running = true; + unawaited( + _runJob(_queue.removeFirst()), + ); + } + }); + } } diff --git a/packages/moxxmpp/test/async_queue_test.dart b/packages/moxxmpp/test/async_queue_test.dart index 9fc645a..301172b 100644 --- a/packages/moxxmpp/test/async_queue_test.dart +++ b/packages/moxxmpp/test/async_queue_test.dart @@ -1,58 +1,100 @@ +import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/src/util/queue.dart'; import 'package:test/test.dart'; void main() { - test('Test the async queue', () async { - final queue = AsyncQueue(); - var future1Finish = 0; - var future2Finish = 0; - var future3Finish = 0; + test('Test not sending', () async { + final queue = AsyncStanzaQueue( + (entry) async { + assert(false, 'No stanza should be sent'); + }, + () async => false, + ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future1Finish = DateTime.now().millisecondsSinceEpoch, + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, ), ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future2Finish = DateTime.now().millisecondsSinceEpoch, - ), - ); - await queue.addJob( - () => Future.delayed( - const Duration(seconds: 3), - () => future3Finish = DateTime.now().millisecondsSinceEpoch, + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, ), ); - await Future.delayed(const Duration(seconds: 12)); + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 2); + expect(queue.isRunning, false); + }); - // The three futures must be done - expect(future1Finish != 0, true); - expect(future2Finish != 0, true); - expect(future3Finish != 0, true); - - // The end times of the futures must be ordered (on a timeline) - // |-- future1Finish -- future2Finish -- future3Finish --| - expect( - future1Finish < future2Finish && future1Finish < future3Finish, - true, - ); - expect( - future2Finish < future3Finish && future2Finish > future1Finish, - true, - ); - expect( - future3Finish > future1Finish && future3Finish > future2Finish, - true, + test('Test sending', () async { + final queue = AsyncStanzaQueue( + (entry) async {}, + () async => true, ); - // The queue must be empty at the end - expect(queue.queue.isEmpty, true); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); - // The queue must not be executing anything at the end + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 0); + expect(queue.isRunning, false); + }); + + test('Test partial sending and resuming', () async { + var canRun = true; + final queue = AsyncStanzaQueue( + (entry) async { + canRun = false; + }, + () async => canRun, + ); + + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + await queue.enqueueStanza( + StanzaQueueEntry( + StanzaDetails( + Stanza.message(), + ), + null, + ), + ); + + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 1); + expect(queue.isRunning, false); + + canRun = true; + await queue.restart(); + await Future.delayed(const Duration(seconds: 1)); + expect(queue.queue.length, 0); expect(queue.isRunning, false); }); }