diff --git a/packages/moxxmpp/lib/src/awaiter.dart b/packages/moxxmpp/lib/src/awaiter.dart index c730038..13722b1 100644 --- a/packages/moxxmpp/lib/src/awaiter.dart +++ b/packages/moxxmpp/lib/src/awaiter.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/stringxml.dart'; import 'package:synchronized/synchronized.dart'; @@ -11,7 +10,7 @@ class _StanzaSurrogateKey { /// The JID the original stanza was sent to. We expect the result to come from the /// same JID. - final String sentTo; + final String? sentTo; /// The ID of the original stanza. We expect the result to have the same ID. final String id; @@ -52,7 +51,7 @@ class StanzaAwaiter { /// [tag] is the stanza's tag name. /// /// Returns a future that might resolve to the response to the stanza. - Future> addPending(String to, String id, String tag) async { + Future> addPending(String? to, String id, String tag) async { final completer = await _lock.synchronized(() { final completer = Completer(); _pending[_StanzaSurrogateKey(to, id, tag)] = completer; @@ -62,20 +61,15 @@ class StanzaAwaiter { return completer.future; } - /// Checks if the stanza [stanza] is being awaited. [bareJid] is the bare JID of - /// the connection. + /// Checks if the stanza [stanza] is being awaited. /// If [stanza] is awaited, resolves the future and returns true. If not, returns /// false. - Future onData(XMLNode stanza, JID bareJid) async { - assert(bareJid.isBare(), 'bareJid must be bare'); - + Future onData(XMLNode stanza) async { final id = stanza.attributes['id'] as String?; if (id == null) return false; final key = _StanzaSurrogateKey( - // Section 8.1.2.1 ยง 3 of RFC 6120 says that an empty "from" indicates that the - // attribute is implicitly from our own bare JID. - stanza.attributes['from'] as String? ?? bareJid.toString(), + stanza.attributes['from'] as String?, id, stanza.tag, ); @@ -91,4 +85,19 @@ class StanzaAwaiter { return false; }); } + + /// Checks if [stanza] represents a stanza that is awaited. Returns true, if [stanza] + /// is awaited. False, if not. + Future isAwaited(XMLNode stanza) async { + final id = stanza.attributes['id'] as String?; + if (id == null) return false; + + final key = _StanzaSurrogateKey( + stanza.attributes['from'] as String?, + id, + stanza.tag, + ); + + return _lock.synchronized(() => _pending.containsKey(key)); + } } diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 1d44579..34d537b 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -90,7 +90,7 @@ class XmppConnection { }, ); - _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream); + _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done _socketStream @@ -531,12 +531,15 @@ class XmppConnection { _log.finest('==> $prefix${newStanza.toXml()}'); if (details.awaitable) { + final isOwnJid = + data.stanza.to == connectionSettings.jid.toBare().toString(); + await _stanzaAwaiter .addPending( // A stanza with no to attribute is for direct processing by the server. As such, // we can correlate it by just *assuming* we have that attribute // (RFC 6120 Section 8.1.1.1) - data.stanza.to ?? connectionSettings.jid.toBare().toString(), + isOwnJid ? null : data.stanza.to, data.stanza.id!, data.stanza.tag, ) @@ -773,9 +776,15 @@ class XmppConnection { return; } + // In case the stanza came from our own bare Jid, remove it so that the stanza + // awaiter works correctly. + final isOwnJid = incomingPreHandlers.stanza.from == + connectionSettings.jid.toBare().toString(); + final ownJidStanza = isOwnJid + ? incomingPreHandlers.stanza.copyWith(from: null) + : incomingPreHandlers.stanza; final awaited = await _stanzaAwaiter.onData( - incomingPreHandlers.stanza, - connectionSettings.jid.toBare(), + ownJidStanza, ); if (awaited) { return; diff --git a/packages/moxxmpp/lib/src/stanza.dart b/packages/moxxmpp/lib/src/stanza.dart index 561eef4..b90c1e6 100644 --- a/packages/moxxmpp/lib/src/stanza.dart +++ b/packages/moxxmpp/lib/src/stanza.dart @@ -102,6 +102,8 @@ class RemoteServerTimeoutError extends StanzaError { /// An unknown error. class UnknownStanzaError extends StanzaError {} +const _stanzaNotDefined = Object(); + class Stanza extends XMLNode { // ignore: use_super_parameters Stanza({ @@ -216,7 +218,7 @@ class Stanza extends XMLNode { Stanza copyWith({ String? id, - String? from, + Object? from = _stanzaNotDefined, String? to, String? type, List? children, @@ -225,7 +227,7 @@ class Stanza extends XMLNode { return Stanza( tag: tag, to: to ?? this.to, - from: from ?? this.from, + from: from != _stanzaNotDefined ? from as String? : this.from, id: id ?? this.id, type: type ?? this.type, children: children ?? this.children, diff --git a/packages/moxxmpp/lib/src/util/incoming_queue.dart b/packages/moxxmpp/lib/src/util/incoming_queue.dart index 36a77cd..202b1f4 100644 --- a/packages/moxxmpp/lib/src/util/incoming_queue.dart +++ b/packages/moxxmpp/lib/src/util/incoming_queue.dart @@ -1,24 +1,37 @@ import 'dart:async'; import 'dart:collection'; import 'package:logging/logging.dart'; +import 'package:moxxmpp/src/awaiter.dart'; import 'package:moxxmpp/src/parser.dart'; import 'package:synchronized/synchronized.dart'; -typedef LockResult = (Completer?, XMPPStreamObject); - class IncomingStanzaQueue { - IncomingStanzaQueue(this._callback); + IncomingStanzaQueue(this._callback, this._stanzaAwaiter); + /// The queue for storing the completer of each + /// incoming stanza (or stream object to be precise). + /// Only access while holding the lock [_lock]. final Queue> _queue = Queue(); - final Future Function(XMPPStreamObject) _callback; + /// Flag indicating whether a callback is already running (true) + /// or not. "a callback" and not "the callback" because awaited stanzas + /// are allowed to bypass the queue. + /// Only access while holding the lock [_lock]. bool _isRunning = false; + /// The function to call to process an incoming stream object. + final Future Function(XMPPStreamObject) _callback; + + /// Lock guarding both [_queue] and [_isRunning]. final Lock _lock = Lock(); + // TODO: Remove once we can await stanzas (or can we). + bool negotiationsDone = false; + + /// Logger. final Logger _log = Logger('IncomingStanzaQueue'); - bool negotiationsDone = false; + final StanzaAwaiter _stanzaAwaiter; Future _processStreamObject( Future? future, @@ -37,37 +50,22 @@ class IncomingStanzaQueue { await future; // Run the callback. - if (object is XMPPStreamElement) { - _log.finest('Running callback for ${object.node.toXml()}'); - } await _callback(object); - if (object is XMPPStreamElement) { - _log.finest( - 'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done', - ); - } // Run the next entry. - _log.finest('Entering second lock...'); await _lock.synchronized(() { - _log.finest('Second lock entered...'); if (_queue.isNotEmpty) { - _log.finest('New queue size: ${_queue.length - 1}'); _queue.removeFirst().complete(); } else { _isRunning = false; - _log.finest('New queue size: 0'); } }); } Future addStanza(List objects) async { - _log.finest('Entering initial lock...'); - await _lock.synchronized(() { - _log.finest('Lock entered...'); - + await _lock.synchronized(() async { for (final object in objects) { - if (canBypassQueue(object)) { + if (await canBypassQueue(object)) { unawaited( _processStreamObject(null, object), ); @@ -89,11 +87,12 @@ class IncomingStanzaQueue { }); } - bool canBypassQueue(XMPPStreamObject object) { - // TODO: Ask the StanzaAwaiter if the stanza is awaited - return object is XMPPStreamElement && - negotiationsDone && - object.node.tag == 'iq' && - ['result', 'error'].contains(object.node.attributes['type'] as String?); + Future canBypassQueue(XMPPStreamObject object) async { + if (object is XMPPStreamHeader) { + return false; + } + + object as XMPPStreamElement; + return _stanzaAwaiter.isAwaited(object.node); } } diff --git a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart index e929ca5..08b08cb 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0045/xep_0045.dart @@ -16,7 +16,6 @@ import 'package:moxxmpp/src/xeps/xep_0045/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0045/events.dart'; import 'package:moxxmpp/src/xeps/xep_0045/types.dart'; import 'package:moxxmpp/src/xeps/xep_0359.dart'; -import 'package:synchronized/extension.dart'; import 'package:synchronized/synchronized.dart'; /// (Room JID, nickname) diff --git a/packages/moxxmpp/lib/src/xeps/xep_0115.dart b/packages/moxxmpp/lib/src/xeps/xep_0115.dart index ff8890d..749011b 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0115.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0115.dart @@ -173,8 +173,13 @@ class EntityCapabilitiesManager extends XmppManagerBase { }); } - Future _performQuery(Stanza presence, String ver, - String hashFunctionName, String capabilityNode, JID from) async { + Future _performQuery( + Stanza presence, + String ver, + String hashFunctionName, + String capabilityNode, + JID from, + ) async { final dm = getAttributes().getManagerById(discoManager)!; final discoRequest = await dm.discoInfoQuery( from, diff --git a/packages/moxxmpp/test/awaiter_test.dart b/packages/moxxmpp/test/awaiter_test.dart index acde248..8367305 100644 --- a/packages/moxxmpp/test/awaiter_test.dart +++ b/packages/moxxmpp/test/awaiter_test.dart @@ -3,8 +3,6 @@ import 'package:moxxmpp/src/awaiter.dart'; import 'package:test/test.dart'; void main() { - const bareJid = JID('moxxmpp', 'server3.example', ''); - test('Test awaiting an awaited stanza with a from attribute', () async { final awaiter = StanzaAwaiter(); @@ -20,14 +18,12 @@ void main() { XMLNode.fromString( '', ), - bareJid, ); expect(result1, false); final result2 = await awaiter.onData( XMLNode.fromString( '', ), - bareJid, ); expect(result2, false); @@ -37,7 +33,6 @@ void main() { ); final result3 = await awaiter.onData( stanza, - bareJid, ); expect(result3, true); expect(await future, stanza); @@ -47,12 +42,11 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); @@ -60,7 +54,6 @@ void main() { final stanza = XMLNode.fromString(''); final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); @@ -70,13 +63,12 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the correct answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( stanza, - bareJid, ); expect(result1, true); expect(await future, stanza); @@ -84,7 +76,6 @@ void main() { // Receive it again final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, false); }); @@ -93,20 +84,18 @@ void main() { final awaiter = StanzaAwaiter(); // "Send" a stanza - final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq'); + final future = await awaiter.addPending(null, 'abc123', 'iq'); // Receive the wrong answer final stanza = XMLNode.fromString(''); final result1 = await awaiter.onData( XMLNode.fromString(''), - bareJid, ); expect(result1, false); // Receive the correct answer final result2 = await awaiter.onData( stanza, - bareJid, ); expect(result2, true); expect(await future, stanza); diff --git a/packages/moxxmpp/test/xeps/xep_0115_test.dart b/packages/moxxmpp/test/xeps/xep_0115_test.dart index 47837c8..ff90977 100644 --- a/packages/moxxmpp/test/xeps/xep_0115_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0115_test.dart @@ -343,6 +343,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); expect( await manager.getCachedDiscoInfoFromJid(aliceJid) != null, @@ -513,6 +514,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect( @@ -549,6 +551,7 @@ void main() { stanza, StanzaHandlerData(false, false, stanza, TypedMap()), ); + await Future.delayed(const Duration(seconds: 2)); final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid); expect(