diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 6ffa533..4f68ec3 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; +import 'package:moxlib/moxlib.dart'; import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/events.dart'; @@ -72,6 +73,23 @@ class XmppConnectionResult { final XmppError? error; } +@immutable +class _StanzaAwaitableData { + const _StanzaAwaitableData(this.sentTo, this.id); + final String sentTo; + final String id; + + @override + int get hashCode => sentTo.hashCode ^ id.hashCode; + + @override + bool operator==(Object other) { + return other is _StanzaAwaitableData && + other.sentTo == sentTo && + other.id == id; + } +} + class XmppConnection { /// [_socket] is for debugging purposes. /// [connectionPingDuration] is the duration after which a ping will be sent to keep @@ -128,7 +146,7 @@ class XmppConnection { /// A policy on how to reconnect final ReconnectionPolicy _reconnectionPolicy; /// A list of stanzas we are tracking with its corresponding critical section - final Map> _awaitingResponse; + final Map<_StanzaAwaitableData, Completer> _awaitingResponse; final Lock _awaitingResponseLock; /// Helpers @@ -428,9 +446,10 @@ class XmppConnection { /// none. // TODO(Unknown): if addId = false, the function crashes. Future sendStanza(Stanza stanza, { StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool awaitable = true, bool encrypted = false }) async { - var stanza_ = stanza; - + assert(implies(addId == false && stanza.id == null, !awaitable), 'Cannot await a stanza with no id'); + // Add extra data in case it was not set + var stanza_ = stanza; if (addId && (stanza_.id == null || stanza_.id == '')) { stanza_ = stanza.copyWith(id: generateId()); } @@ -448,8 +467,6 @@ class XmppConnection { } } - final id = stanza_.id!; - _log.fine('Running pre stanza handlers..'); final data = await _runOutgoingPreStanzaHandlers( stanza_, @@ -487,42 +504,45 @@ class XmppConnection { final stanzaString = data.stanza.toXml(); // ignore: cascade_invocations - _log.fine('Attempting to acquire lock for $id...'); + _log.fine('Attempting to acquire lock for ${data.stanza.id}...'); // TODO(PapaTutuWawa): Handle this much more graceful var future = Future.value(XMLNode(tag: 'not-used')); await _awaitingResponseLock.synchronized(() async { - _log.fine('Lock acquired for $id'); - if (awaitable) { - _awaitingResponse[id] = Completer(); - } + _log.fine('Lock acquired for ${data.stanza.id}'); - // This uses the StreamManager to behave like a send queue - if (await _canSendData()) { - _socket.write(stanzaString); + _StanzaAwaitableData? key; + if (awaitable) { + key = _StanzaAwaitableData(data.stanza.to!, data.stanza.id!); + _awaitingResponse[key] = Completer(); + } - // Try to ack every stanza - // NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent - } else { - _log.fine('_canSendData() returned false.'); - } + // This uses the StreamManager to behave like a send queue + if (await _canSendData()) { + _socket.write(stanzaString); - _log.fine('Running post stanza handlers..'); - await _runOutgoingPostStanzaHandlers( + // Try to ack every stanza + // NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent + } else { + _log.fine('_canSendData() returned false.'); + } + + _log.fine('Running post stanza handlers..'); + await _runOutgoingPostStanzaHandlers( + stanza_, + initial: StanzaHandlerData( + false, + false, + null, stanza_, - initial: StanzaHandlerData( - false, - false, - null, - stanza_, - ), - ); - _log.fine('Done'); + ), + ); + _log.fine('Done'); - if (awaitable) { - future = _awaitingResponse[id]!.future; - } + if (awaitable) { + future = _awaitingResponse[key]!.future; + } - _log.fine('Releasing lock for $id'); + _log.fine('Releasing lock for ${data.stanza.id}'); }); return future; @@ -691,10 +711,14 @@ class XmppConnection { final id = incomingPreHandlers.stanza.attributes['id'] as String?; var awaited = false; await _awaitingResponseLock.synchronized(() async { - if (id != null && _awaitingResponse.containsKey(id)) { - _awaitingResponse[id]!.complete(incomingPreHandlers.stanza); - _awaitingResponse.remove(id); - awaited = true; + if (id != null && incomingPreHandlers.stanza.from != null) { + final key = _StanzaAwaitableData(incomingPreHandlers.stanza.from!, id); + final comp = _awaitingResponse[key]; + if (comp != null) { + comp.complete(incomingPreHandlers.stanza); + _awaitingResponse.remove(key); + awaited = true; + } } }); diff --git a/packages/moxxmpp/test/xeps/xep_0198_test.dart b/packages/moxxmpp/test/xeps/xep_0198_test.dart index dc13c61..22ad652 100644 --- a/packages/moxxmpp/test/xeps/xep_0198_test.dart +++ b/packages/moxxmpp/test/xeps/xep_0198_test.dart @@ -348,7 +348,7 @@ void main() { ), StanzaExpectation( "", - "", + "", ignoreId: true, adjustId: true, ),