feat: Track stanza responses as a tuple of (to, id)

Also fixes an invalid test case in the XEP-0198 tests, where
the IQ reply sets the "to" instead of the "from".
This commit is contained in:
PapaTutuWawa 2023-01-10 12:50:07 +01:00
parent 9223a7d403
commit 6517065a1a
2 changed files with 61 additions and 37 deletions

View File

@ -1,6 +1,7 @@
import 'dart:async'; import 'dart:async';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxlib/moxlib.dart';
import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/buffer.dart';
import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/errors.dart';
import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/events.dart';
@ -72,6 +73,23 @@ class XmppConnectionResult {
final XmppError? error; final XmppError? error;
} }
@immutable
class _StanzaAwaitableData {
const _StanzaAwaitableData(this.sentTo, this.id);
final String sentTo;
final String id;
@override
int get hashCode => sentTo.hashCode ^ id.hashCode;
@override
bool operator==(Object other) {
return other is _StanzaAwaitableData &&
other.sentTo == sentTo &&
other.id == id;
}
}
class XmppConnection { class XmppConnection {
/// [_socket] is for debugging purposes. /// [_socket] is for debugging purposes.
/// [connectionPingDuration] is the duration after which a ping will be sent to keep /// [connectionPingDuration] is the duration after which a ping will be sent to keep
@ -128,7 +146,7 @@ class XmppConnection {
/// A policy on how to reconnect /// A policy on how to reconnect
final ReconnectionPolicy _reconnectionPolicy; final ReconnectionPolicy _reconnectionPolicy;
/// A list of stanzas we are tracking with its corresponding critical section /// A list of stanzas we are tracking with its corresponding critical section
final Map<String, Completer<XMLNode>> _awaitingResponse; final Map<_StanzaAwaitableData, Completer<XMLNode>> _awaitingResponse;
final Lock _awaitingResponseLock; final Lock _awaitingResponseLock;
/// Helpers /// Helpers
@ -428,9 +446,10 @@ class XmppConnection {
/// none. /// none.
// TODO(Unknown): if addId = false, the function crashes. // TODO(Unknown): if addId = false, the function crashes.
Future<XMLNode> sendStanza(Stanza stanza, { StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool awaitable = true, bool encrypted = false }) async { Future<XMLNode> sendStanza(Stanza stanza, { StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool awaitable = true, bool encrypted = false }) async {
var stanza_ = stanza; assert(implies(addId == false && stanza.id == null, !awaitable), 'Cannot await a stanza with no id');
// Add extra data in case it was not set // Add extra data in case it was not set
var stanza_ = stanza;
if (addId && (stanza_.id == null || stanza_.id == '')) { if (addId && (stanza_.id == null || stanza_.id == '')) {
stanza_ = stanza.copyWith(id: generateId()); stanza_ = stanza.copyWith(id: generateId());
} }
@ -448,8 +467,6 @@ class XmppConnection {
} }
} }
final id = stanza_.id!;
_log.fine('Running pre stanza handlers..'); _log.fine('Running pre stanza handlers..');
final data = await _runOutgoingPreStanzaHandlers( final data = await _runOutgoingPreStanzaHandlers(
stanza_, stanza_,
@ -487,13 +504,16 @@ class XmppConnection {
final stanzaString = data.stanza.toXml(); final stanzaString = data.stanza.toXml();
// ignore: cascade_invocations // ignore: cascade_invocations
_log.fine('Attempting to acquire lock for $id...'); _log.fine('Attempting to acquire lock for ${data.stanza.id}...');
// TODO(PapaTutuWawa): Handle this much more graceful // TODO(PapaTutuWawa): Handle this much more graceful
var future = Future.value(XMLNode(tag: 'not-used')); var future = Future.value(XMLNode(tag: 'not-used'));
await _awaitingResponseLock.synchronized(() async { await _awaitingResponseLock.synchronized(() async {
_log.fine('Lock acquired for $id'); _log.fine('Lock acquired for ${data.stanza.id}');
_StanzaAwaitableData? key;
if (awaitable) { if (awaitable) {
_awaitingResponse[id] = Completer(); key = _StanzaAwaitableData(data.stanza.to!, data.stanza.id!);
_awaitingResponse[key] = Completer();
} }
// This uses the StreamManager to behave like a send queue // This uses the StreamManager to behave like a send queue
@ -519,10 +539,10 @@ class XmppConnection {
_log.fine('Done'); _log.fine('Done');
if (awaitable) { if (awaitable) {
future = _awaitingResponse[id]!.future; future = _awaitingResponse[key]!.future;
} }
_log.fine('Releasing lock for $id'); _log.fine('Releasing lock for ${data.stanza.id}');
}); });
return future; return future;
@ -691,11 +711,15 @@ class XmppConnection {
final id = incomingPreHandlers.stanza.attributes['id'] as String?; final id = incomingPreHandlers.stanza.attributes['id'] as String?;
var awaited = false; var awaited = false;
await _awaitingResponseLock.synchronized(() async { await _awaitingResponseLock.synchronized(() async {
if (id != null && _awaitingResponse.containsKey(id)) { if (id != null && incomingPreHandlers.stanza.from != null) {
_awaitingResponse[id]!.complete(incomingPreHandlers.stanza); final key = _StanzaAwaitableData(incomingPreHandlers.stanza.from!, id);
_awaitingResponse.remove(id); final comp = _awaitingResponse[key];
if (comp != null) {
comp.complete(incomingPreHandlers.stanza);
_awaitingResponse.remove(key);
awaited = true; awaited = true;
} }
}
}); });
if (awaited) { if (awaited) {

View File

@ -348,7 +348,7 @@ void main() {
), ),
StanzaExpectation( StanzaExpectation(
"<iq to='user@example.com' type='get' id='a' xmlns='jabber:client' />", "<iq to='user@example.com' type='get' id='a' xmlns='jabber:client' />",
"<iq to='user@example.com' type='result' id='a' />", "<iq from='user@example.com' type='result' id='a' />",
ignoreId: true, ignoreId: true,
adjustId: true, adjustId: true,
), ),