feat: Improve the stanza await system
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
This fixes an issue where publishing an avatar would fail, if returned an error where the "from" attribute is missing from the stanza.
This commit is contained in:
parent
b2c54ae8c0
commit
1aa50699ad
94
packages/moxxmpp/lib/src/awaiter.dart
Normal file
94
packages/moxxmpp/lib/src/awaiter.dart
Normal file
@ -0,0 +1,94 @@
|
||||
import 'dart:async';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:moxxmpp/src/jid.dart';
|
||||
import 'package:moxxmpp/src/stringxml.dart';
|
||||
import 'package:synchronized/synchronized.dart';
|
||||
|
||||
/// A surrogate key for awaiting stanzas.
|
||||
@immutable
|
||||
class _StanzaSurrogateKey {
|
||||
const _StanzaSurrogateKey(this.sentTo, this.id, this.tag);
|
||||
|
||||
/// The JID the original stanza was sent to. We expect the result to come from the
|
||||
/// same JID.
|
||||
final String sentTo;
|
||||
|
||||
/// The ID of the original stanza. We expect the result to have the same ID.
|
||||
final String id;
|
||||
|
||||
/// The tag name of the stanza.
|
||||
final String tag;
|
||||
|
||||
@override
|
||||
int get hashCode => sentTo.hashCode ^ id.hashCode ^ tag.hashCode;
|
||||
|
||||
@override
|
||||
bool operator==(Object other) {
|
||||
return other is _StanzaSurrogateKey &&
|
||||
other.sentTo == sentTo &&
|
||||
other.id == id &&
|
||||
other.tag == tag;
|
||||
}
|
||||
}
|
||||
|
||||
/// This class handles the await semantics for stanzas. Stanzas are given a "unique"
|
||||
/// key equal to the tuple (to, id, tag) with which their response is identified.
|
||||
///
|
||||
/// That means that when sending ```<iq to="example@some.server.example" id="abc123" />```,
|
||||
/// the response stanza must be from "example@some.server.example", have id "abc123" and
|
||||
/// be an iq stanza.
|
||||
///
|
||||
/// This class also handles some "edge cases" of RFC 6120, like an empty "from" attribute.
|
||||
class StanzaAwaiter {
|
||||
/// The pending stanzas, identified by their surrogate key.
|
||||
final Map<_StanzaSurrogateKey, Completer<XMLNode>> _pending = {};
|
||||
|
||||
/// The critical section for accessing [StanzaAwaiter._pending].
|
||||
final Lock _lock = Lock();
|
||||
|
||||
/// Register a stanza as pending.
|
||||
/// [to] is the value of the stanza's "to" attribute.
|
||||
/// [id] is the value of the stanza's "id" attribute.
|
||||
/// [tag] is the stanza's tag name.
|
||||
///
|
||||
/// Returns a future that might resolve to the response to the stanza.
|
||||
Future<Future<XMLNode>> addPending(String to, String id, String tag) async {
|
||||
final completer = await _lock.synchronized(() {
|
||||
final completer = Completer<XMLNode>();
|
||||
_pending[_StanzaSurrogateKey(to, id, tag)] = completer;
|
||||
return completer;
|
||||
});
|
||||
|
||||
return completer.future;
|
||||
}
|
||||
|
||||
/// Checks if the stanza [stanza] is being awaited. [bareJid] is the bare JID of
|
||||
/// the connection.
|
||||
/// If [stanza] is awaited, resolves the future and returns true. If not, returns
|
||||
/// false.
|
||||
Future<bool> onData(XMLNode stanza, JID bareJid) async {
|
||||
assert(bareJid.isBare(), 'bareJid must be bare');
|
||||
|
||||
final id = stanza.attributes['id'] as String?;
|
||||
if (id == null) return false;
|
||||
|
||||
final key = _StanzaSurrogateKey(
|
||||
// Section 8.1.2.1 § 3 of RFC 6120 says that an empty "from" indicates that the
|
||||
// attribute is implicitly from our own bare JID.
|
||||
stanza.attributes['from'] as String? ?? bareJid.toString(),
|
||||
id,
|
||||
stanza.tag,
|
||||
);
|
||||
|
||||
return _lock.synchronized(() {
|
||||
final completer = _pending[key];
|
||||
if (completer != null) {
|
||||
_pending.remove(key);
|
||||
completer.complete(stanza);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ import 'dart:async';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:moxlib/moxlib.dart';
|
||||
import 'package:moxxmpp/src/awaiter.dart';
|
||||
import 'package:moxxmpp/src/buffer.dart';
|
||||
import 'package:moxxmpp/src/errors.dart';
|
||||
import 'package:moxxmpp/src/events.dart';
|
||||
@ -89,29 +90,6 @@ class XmppConnectionResult {
|
||||
final XmppError? error;
|
||||
}
|
||||
|
||||
/// A surrogate key for awaiting stanzas.
|
||||
@immutable
|
||||
class _StanzaAwaitableData {
|
||||
const _StanzaAwaitableData(this.sentTo, this.id);
|
||||
|
||||
/// The JID the original stanza was sent to. We expect the result to come from the
|
||||
/// same JID.
|
||||
final String sentTo;
|
||||
|
||||
/// The ID of the original stanza. We expect the result to have the same ID.
|
||||
final String id;
|
||||
|
||||
@override
|
||||
int get hashCode => sentTo.hashCode ^ id.hashCode;
|
||||
|
||||
@override
|
||||
bool operator==(Object other) {
|
||||
return other is _StanzaAwaitableData &&
|
||||
other.sentTo == sentTo &&
|
||||
other.id == id;
|
||||
}
|
||||
}
|
||||
|
||||
/// This class is a connection to the server.
|
||||
class XmppConnection {
|
||||
XmppConnection(
|
||||
@ -151,9 +129,8 @@ class XmppConnection {
|
||||
/// A policy on how to reconnect
|
||||
final ReconnectionPolicy _reconnectionPolicy;
|
||||
|
||||
/// A list of stanzas we are tracking with its corresponding critical section lock
|
||||
final Map<_StanzaAwaitableData, Completer<XMLNode>> _awaitingResponse = {};
|
||||
final Lock _awaitingResponseLock = Lock();
|
||||
/// A helper for handling await semantics with stanzas
|
||||
final StanzaAwaiter _stanzaAwaiter = StanzaAwaiter();
|
||||
|
||||
/// Sorted list of handlers that we call or incoming and outgoing stanzas
|
||||
final List<StanzaHandler> _incomingStanzaHandlers = List.empty(growable: true);
|
||||
@ -544,43 +521,35 @@ class XmppConnection {
|
||||
_log.fine('Attempting to acquire lock for ${data.stanza.id}...');
|
||||
// TODO(PapaTutuWawa): Handle this much more graceful
|
||||
var future = Future.value(XMLNode(tag: 'not-used'));
|
||||
await _awaitingResponseLock.synchronized(() async {
|
||||
_log.fine('Lock acquired for ${data.stanza.id}');
|
||||
|
||||
_StanzaAwaitableData? key;
|
||||
if (awaitable) {
|
||||
key = _StanzaAwaitableData(data.stanza.to!, data.stanza.id!);
|
||||
_awaitingResponse[key] = Completer();
|
||||
}
|
||||
|
||||
// This uses the StreamManager to behave like a send queue
|
||||
if (await _canSendData()) {
|
||||
_socket.write(stanzaString);
|
||||
|
||||
// Try to ack every stanza
|
||||
// NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent
|
||||
} else {
|
||||
_log.fine('_canSendData() returned false.');
|
||||
}
|
||||
|
||||
_log.fine('Running post stanza handlers..');
|
||||
await _runOutgoingPostStanzaHandlers(
|
||||
stanza_,
|
||||
initial: StanzaHandlerData(
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
stanza_,
|
||||
),
|
||||
if (awaitable) {
|
||||
future = await _stanzaAwaiter.addPending(
|
||||
data.stanza.to!,
|
||||
data.stanza.id!,
|
||||
data.stanza.tag,
|
||||
);
|
||||
_log.fine('Done');
|
||||
}
|
||||
|
||||
if (awaitable) {
|
||||
future = _awaitingResponse[key]!.future;
|
||||
}
|
||||
// This uses the StreamManager to behave like a send queue
|
||||
if (await _canSendData()) {
|
||||
_socket.write(stanzaString);
|
||||
|
||||
_log.fine('Releasing lock for ${data.stanza.id}');
|
||||
});
|
||||
// Try to ack every stanza
|
||||
// NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent
|
||||
} else {
|
||||
_log.fine('_canSendData() returned false.');
|
||||
}
|
||||
|
||||
_log.fine('Running post stanza handlers..');
|
||||
await _runOutgoingPostStanzaHandlers(
|
||||
stanza_,
|
||||
initial: StanzaHandlerData(
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
stanza_,
|
||||
),
|
||||
);
|
||||
_log.fine('Done');
|
||||
|
||||
return future;
|
||||
}
|
||||
@ -744,21 +713,10 @@ class XmppConnection {
|
||||
'';
|
||||
_log.finest('<== $prefix${incomingPreHandlers.stanza.toXml()}');
|
||||
|
||||
// See if we are waiting for this stanza
|
||||
final id = incomingPreHandlers.stanza.attributes['id'] as String?;
|
||||
var awaited = false;
|
||||
await _awaitingResponseLock.synchronized(() async {
|
||||
if (id != null && incomingPreHandlers.stanza.from != null) {
|
||||
final key = _StanzaAwaitableData(incomingPreHandlers.stanza.from!, id);
|
||||
final comp = _awaitingResponse[key];
|
||||
if (comp != null) {
|
||||
comp.complete(incomingPreHandlers.stanza);
|
||||
_awaitingResponse.remove(key);
|
||||
awaited = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
final awaited = await _stanzaAwaiter.onData(
|
||||
incomingPreHandlers.stanza,
|
||||
_connectionSettings.jid.toBare(),
|
||||
);
|
||||
if (awaited) {
|
||||
return;
|
||||
}
|
||||
|
@ -51,7 +51,11 @@ class JID {
|
||||
bool isFull() => resource.isNotEmpty;
|
||||
|
||||
/// Converts the JID into a bare JID.
|
||||
JID toBare() => JID(local, domain, '');
|
||||
JID toBare() {
|
||||
if (isBare()) return this;
|
||||
|
||||
return JID(local, domain, '');
|
||||
}
|
||||
|
||||
/// Converts the JID into one with a resource part of [resource].
|
||||
JID withResource(String resource) => JID(local, domain, resource);
|
||||
|
@ -1,7 +1,6 @@
|
||||
import 'package:xml/xml.dart';
|
||||
|
||||
class XMLNode {
|
||||
|
||||
XMLNode({
|
||||
required this.tag,
|
||||
this.attributes = const <String, dynamic>{},
|
||||
|
104
packages/moxxmpp/test/awaiter_test.dart
Normal file
104
packages/moxxmpp/test/awaiter_test.dart
Normal file
@ -0,0 +1,104 @@
|
||||
import 'package:moxxmpp/moxxmpp.dart';
|
||||
import 'package:moxxmpp/src/awaiter.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
void main() {
|
||||
final bareJid = JID('moxxmpp', 'server3.example', '');
|
||||
|
||||
test('Test awaiting an awaited stanza with a from attribute', () async {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending('user1@server.example', 'abc123', 'iq');
|
||||
|
||||
// Receive the wrong answer
|
||||
final result1 = await awaiter.onData(
|
||||
XMLNode.fromString('<iq from="user3@server.example" id="abc123" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
final result2 = await awaiter.onData(
|
||||
XMLNode.fromString('<iq from="user1@server.example" id="lol" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, false);
|
||||
|
||||
// Receive the correct answer
|
||||
final stanza = XMLNode.fromString('<iq from="user1@server.example" id="abc123" type="result" />');
|
||||
final result3 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result3, true);
|
||||
expect(await future, stanza);
|
||||
});
|
||||
|
||||
test('Test awaiting an awaited stanza without a from attribute', () async {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
|
||||
// Receive the wrong answer
|
||||
final result1 = await awaiter.onData(
|
||||
XMLNode.fromString('<iq id="lol" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
|
||||
// Receive the correct answer
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, true);
|
||||
expect(await future, stanza);
|
||||
});
|
||||
|
||||
test('Test awaiting a stanza that was already awaited', () async {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
|
||||
// Receive the correct answer
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result1 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, true);
|
||||
expect(await future, stanza);
|
||||
|
||||
// Receive it again
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, false);
|
||||
});
|
||||
|
||||
test('Test ignoring a stanza that has the wrong tag', () async {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
|
||||
// Receive the wrong answer
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result1 = await awaiter.onData(
|
||||
XMLNode.fromString('<message id="abc123" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
|
||||
// Receive the correct answer
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, true);
|
||||
expect(await future, stanza);
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue
Block a user