fix(xep,core): Ensure in-order processing of incoming stanzas

This commit is contained in:
PapaTutuWawa 2023-09-29 19:58:43 +02:00
parent aba90f2e90
commit d9fbb9e102
8 changed files with 245 additions and 83 deletions

View File

@ -25,12 +25,12 @@ import 'package:moxxmpp/src/settings.dart';
import 'package:moxxmpp/src/socket.dart'; import 'package:moxxmpp/src/socket.dart';
import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/util/incoming_queue.dart';
import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/queue.dart';
import 'package:moxxmpp/src/util/typed_map.dart'; import 'package:moxxmpp/src/util/typed_map.dart';
import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart';
import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart';
import 'package:moxxmpp/src/xeps/xep_0352.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart';
import 'package:synchronized/synchronized.dart';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
/// The states the XmppConnection can be in /// The states the XmppConnection can be in
@ -90,9 +90,12 @@ class XmppConnection {
}, },
); );
_incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream);
_socketStream = _socket.getDataStream(); _socketStream = _socket.getDataStream();
// TODO(Unknown): Handle on done // TODO(Unknown): Handle on done
_socketStream.transform(_streamParser).forEach(handleXmlStream); _socketStream
.transform(_streamParser)
.forEach(_incomingStanzaQueue.addStanza);
_socket.getEventStream().listen(handleSocketEvent); _socket.getEventStream().listen(handleSocketEvent);
_stanzaQueue = AsyncStanzaQueue( _stanzaQueue = AsyncStanzaQueue(
@ -170,10 +173,6 @@ class XmppConnection {
T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) => T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) =>
_negotiationsHandler.getNegotiatorById<T>(id); _negotiationsHandler.getNegotiatorById<T>(id);
/// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator
/// is still running.
final Lock _negotiationLock = Lock();
/// The logger for the class /// The logger for the class
final Logger _log = Logger('XmppConnection'); final Logger _log = Logger('XmppConnection');
@ -182,6 +181,8 @@ class XmppConnection {
bool get isAuthenticated => _isAuthenticated; bool get isAuthenticated => _isAuthenticated;
late final IncomingStanzaQueue _incomingStanzaQueue;
late final AsyncStanzaQueue _stanzaQueue; late final AsyncStanzaQueue _stanzaQueue;
/// Returns the JID we authenticate with and add the resource that we have bound. /// Returns the JID we authenticate with and add the resource that we have bound.
@ -591,6 +592,8 @@ class XmppConnection {
await _reconnectionPolicy.setShouldReconnect(true); await _reconnectionPolicy.setShouldReconnect(true);
} }
_incomingStanzaQueue.negotiationsDone = true;
// Tell consumers of the event stream that we're done with stream feature // Tell consumers of the event stream that we're done with stream feature
// negotiations // negotiations
await _sendEvent( await _sendEvent(
@ -828,17 +831,17 @@ class XmppConnection {
// causing (a) the negotiator to become confused and (b) the stanzas/nonzas to be // causing (a) the negotiator to become confused and (b) the stanzas/nonzas to be
// missed. This causes the data to wait while the negotiator is running and thus // missed. This causes the data to wait while the negotiator is running and thus
// prevent this issue. // prevent this issue.
await _negotiationLock.synchronized(() async { if (_routingState != RoutingState.negotiating) {
if (_routingState != RoutingState.negotiating) { unawaited(handleXmlStream(event));
unawaited(handleXmlStream(event)); return;
return; }
}
await _negotiationsHandler.negotiate(event); await _negotiationsHandler.negotiate(event);
});
break; break;
case RoutingState.handleStanzas: case RoutingState.handleStanzas:
_log.finest('Handling ${node.tag} (${node.attributes["id"]})');
await _handleStanza(node); await _handleStanza(node);
_log.finest('Handling ${node.tag} (${node.attributes["id"]}) done');
break; break;
case RoutingState.preConnection: case RoutingState.preConnection:
case RoutingState.error: case RoutingState.error:
@ -903,6 +906,7 @@ class XmppConnection {
// Kill a possibly existing connection // Kill a possibly existing connection
_socket.close(); _socket.close();
_incomingStanzaQueue.negotiationsDone = false;
await _reconnectionPolicy.reset(); await _reconnectionPolicy.reset();
_enableReconnectOnSuccess = enableReconnectOnSuccess; _enableReconnectOnSuccess = enableReconnectOnSuccess;
if (shouldReconnect) { if (shouldReconnect) {

View File

@ -57,9 +57,10 @@ class _ChunkedConversionBuffer<S, T> {
} }
/// A buffer to put between a socket's input and a full XML stream. /// A buffer to put between a socket's input and a full XML stream.
class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> { class XMPPStreamParser
final StreamController<XMPPStreamObject> _streamController = extends StreamTransformerBase<String, List<XMPPStreamObject>> {
StreamController<XMPPStreamObject>(); final StreamController<List<XMPPStreamObject>> _streamController =
StreamController<List<XMPPStreamObject>>();
/// Turns a String into a list of [XmlEvent]s in a chunked fashion. /// Turns a String into a list of [XmlEvent]s in a chunked fashion.
_ChunkedConversionBuffer<String, XmlEvent> _eventBuffer = _ChunkedConversionBuffer<String, XmlEvent> _eventBuffer =
@ -117,13 +118,14 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
} }
@override @override
Stream<XMPPStreamObject> bind(Stream<String> stream) { Stream<List<XMPPStreamObject>> bind(Stream<String> stream) {
// We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they
// create streams we cannot close. We need to be able to destroy and recreate an // create streams we cannot close. We need to be able to destroy and recreate an
// XML parser whenever we start a new connection. // XML parser whenever we start a new connection.
stream.listen((input) { stream.listen((input) {
final events = _eventBuffer.convert(input); final events = _eventBuffer.convert(input);
final streamHeaderEvents = _streamHeaderSelector.convert(events); final streamHeaderEvents = _streamHeaderSelector.convert(events);
final objects = List<XMPPStreamObject>.empty(growable: true);
// Process the stream header separately. // Process the stream header separately.
for (final event in streamHeaderEvents) { for (final event in streamHeaderEvents) {
@ -135,7 +137,7 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
continue; continue;
} }
_streamController.add( objects.add(
XMPPStreamHeader( XMPPStreamHeader(
Map<String, String>.fromEntries( Map<String, String>.fromEntries(
event.attributes.map((attr) { event.attributes.map((attr) {
@ -151,13 +153,15 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
final children = _childBuffer.convert(childEvents); final children = _childBuffer.convert(childEvents);
for (final node in children) { for (final node in children) {
if (node.nodeType == XmlNodeType.ELEMENT) { if (node.nodeType == XmlNodeType.ELEMENT) {
_streamController.add( objects.add(
XMPPStreamElement( XMPPStreamElement(
XMLNode.fromXmlElement(node as XmlElement), XMLNode.fromXmlElement(node as XmlElement),
), ),
); );
} }
} }
_streamController.add(objects);
}); });
return _streamController.stream; return _streamController.stream;

View File

@ -0,0 +1,99 @@
import 'dart:async';
import 'dart:collection';
import 'package:logging/logging.dart';
import 'package:moxxmpp/src/parser.dart';
import 'package:synchronized/synchronized.dart';
typedef LockResult = (Completer<void>?, XMPPStreamObject);
class IncomingStanzaQueue {
IncomingStanzaQueue(this._callback);
final Queue<Completer<void>> _queue = Queue();
final Future<void> Function(XMPPStreamObject) _callback;
bool _isRunning = false;
final Lock _lock = Lock();
final Logger _log = Logger('IncomingStanzaQueue');
bool negotiationsDone = false;
Future<void> _processStreamObject(
Future<void>? future,
XMPPStreamObject object,
) async {
if (future == null) {
if (object is XMPPStreamElement) {
_log.finest(
'Bypassing queue for ${object.node.tag} (${object.node.attributes["id"]})',
);
}
return _callback(object);
}
// Wait for our turn.
await future;
// Run the callback.
if (object is XMPPStreamElement) {
_log.finest('Running callback for ${object.node.toXml()}');
}
await _callback(object);
if (object is XMPPStreamElement) {
_log.finest(
'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done',
);
}
// Run the next entry.
_log.finest('Entering second lock...');
await _lock.synchronized(() {
_log.finest('Second lock entered...');
if (_queue.isNotEmpty) {
_log.finest('New queue size: ${_queue.length - 1}');
_queue.removeFirst().complete();
} else {
_isRunning = false;
_log.finest('New queue size: 0');
}
});
}
Future<void> addStanza(List<XMPPStreamObject> objects) async {
_log.finest('Entering initial lock...');
await _lock.synchronized(() {
_log.finest('Lock entered...');
for (final object in objects) {
if (canBypassQueue(object)) {
unawaited(
_processStreamObject(null, object),
);
continue;
}
final completer = Completer<void>();
if (_isRunning) {
_queue.add(completer);
} else {
_isRunning = true;
completer.complete();
}
unawaited(
_processStreamObject(completer.future, object),
);
}
});
}
bool canBypassQueue(XMPPStreamObject object) {
// TODO: Ask the StanzaAwaiter if the stanza is awaited
return object is XMPPStreamElement &&
negotiationsDone &&
object.node.tag == 'iq' &&
['result', 'error'].contains(object.node.attributes['type'] as String?);
}
}

View File

@ -251,19 +251,23 @@ class MUCManager extends XmppManagerBase {
StanzaHandlerData state, StanzaHandlerData state,
) async { ) async {
if (presence.from == null) { if (presence.from == null) {
logger.finest('Ignoring presence as it has no from attribute');
return state; return state;
} }
final from = JID.fromString(presence.from!); final from = JID.fromString(presence.from!);
final bareFrom = from.toBare(); final bareFrom = from.toBare();
return _cacheLock.synchronized(() { return _cacheLock.synchronized(() {
logger.finest('Lock aquired for presence from ${presence.from}');
final room = _mucRoomCache[bareFrom]; final room = _mucRoomCache[bareFrom];
if (room == null) { if (room == null) {
logger.finest('Ignoring presence as it does not belong to a room');
return state; return state;
} }
if (from.resource.isEmpty) { if (from.resource.isEmpty) {
// TODO(Unknown): Handle presence from the room itself. // TODO(Unknown): Handle presence from the room itself.
logger.finest('Ignoring presence as it has no resource');
return state; return state;
} }
@ -311,6 +315,7 @@ class MUCManager extends XmppManagerBase {
// Set the nick to make sure we're in sync with the MUC. // Set the nick to make sure we're in sync with the MUC.
room.nick = from.resource; room.nick = from.resource;
logger.finest('Self-presence handled');
return StanzaHandlerData( return StanzaHandlerData(
true, true,
false, false,
@ -360,8 +365,10 @@ class MUCManager extends XmppManagerBase {
} }
room.members[from.resource] = member; room.members[from.resource] = member;
logger.finest('${from.resource} added to the member list');
} }
logger.finest('Ran through');
return StanzaHandlerData( return StanzaHandlerData(
true, true,
false, false,
@ -398,7 +405,8 @@ class MUCManager extends XmppManagerBase {
) async { ) async {
final fromJid = JID.fromString(message.from!); final fromJid = JID.fromString(message.from!);
final roomJid = fromJid.toBare(); final roomJid = fromJid.toBare();
return _mucRoomCache.synchronized(() { return _cacheLock.synchronized(() {
logger.finest('Lock aquired for message from ${message.from}');
final roomState = _mucRoomCache[roomJid]; final roomState = _mucRoomCache[roomJid];
if (roomState == null) { if (roomState == null) {
return state; return state;

View File

@ -173,39 +173,15 @@ class EntityCapabilitiesManager extends XmppManagerBase {
}); });
} }
@visibleForTesting Future<void> _performQuery(Stanza presence, String ver,
Future<StanzaHandlerData> onPresence( String hashFunctionName, String capabilityNode, JID from) async {
Stanza stanza,
StanzaHandlerData state,
) async {
if (stanza.from == null) {
return state;
}
final from = JID.fromString(stanza.from!);
final c = stanza.firstTag('c', xmlns: capsXmlns)!;
final hashFunctionName = c.attributes['hash'] as String?;
final capabilityNode = c.attributes['node'] as String?;
final ver = c.attributes['ver'] as String?;
if (hashFunctionName == null || capabilityNode == null || ver == null) {
return state;
}
// Check if we know of the hash
final isCached =
await _cacheLock.synchronized(() => _capHashCache.containsKey(ver));
if (isCached) {
return state;
}
final dm = getAttributes().getManagerById<DiscoManager>(discoManager)!; final dm = getAttributes().getManagerById<DiscoManager>(discoManager)!;
final discoRequest = await dm.discoInfoQuery( final discoRequest = await dm.discoInfoQuery(
from, from,
node: capabilityNode, node: capabilityNode,
); );
if (discoRequest.isType<StanzaError>()) { if (discoRequest.isType<StanzaError>()) {
return state; return;
} }
final discoInfo = discoRequest.get<DiscoInfo>(); final discoInfo = discoRequest.get<DiscoInfo>();
@ -220,7 +196,7 @@ class EntityCapabilitiesManager extends XmppManagerBase {
discoInfo, discoInfo,
), ),
); );
return state; return;
} }
// Validate the disco#info result according to XEP-0115 § 5.4 // Validate the disco#info result according to XEP-0115 § 5.4
@ -234,7 +210,7 @@ class EntityCapabilitiesManager extends XmppManagerBase {
logger.warning( logger.warning(
'Malformed disco#info response: More than one equal identity', 'Malformed disco#info response: More than one equal identity',
); );
return state; return;
} }
} }
@ -245,7 +221,7 @@ class EntityCapabilitiesManager extends XmppManagerBase {
logger.warning( logger.warning(
'Malformed disco#info response: More than one equal feature', 'Malformed disco#info response: More than one equal feature',
); );
return state; return;
} }
} }
@ -273,7 +249,7 @@ class EntityCapabilitiesManager extends XmppManagerBase {
logger.warning( logger.warning(
'Malformed disco#info response: Extended Info FORM_TYPE contains more than one value(s) of different value.', 'Malformed disco#info response: Extended Info FORM_TYPE contains more than one value(s) of different value.',
); );
return state; return;
} }
} }
@ -288,7 +264,7 @@ class EntityCapabilitiesManager extends XmppManagerBase {
logger.warning( logger.warning(
'Malformed disco#info response: More than one Extended Disco Info forms with the same FORM_TYPE value', 'Malformed disco#info response: More than one Extended Disco Info forms with the same FORM_TYPE value',
); );
return state; return;
} }
// Check if the field type is hidden // Check if the field type is hidden
@ -325,7 +301,43 @@ class EntityCapabilitiesManager extends XmppManagerBase {
'Capability hash mismatch from $from: Received "$ver", expected "$computedCapabilityHash".', 'Capability hash mismatch from $from: Received "$ver", expected "$computedCapabilityHash".',
); );
} }
}
@visibleForTesting
Future<StanzaHandlerData> onPresence(
Stanza stanza,
StanzaHandlerData state,
) async {
if (stanza.from == null) {
return state;
}
final from = JID.fromString(stanza.from!);
final c = stanza.firstTag('c', xmlns: capsXmlns)!;
final hashFunctionName = c.attributes['hash'] as String?;
final capabilityNode = c.attributes['node'] as String?;
final ver = c.attributes['ver'] as String?;
if (hashFunctionName == null || capabilityNode == null || ver == null) {
return state;
}
// Check if we know of the hash
final isCached =
await _cacheLock.synchronized(() => _capHashCache.containsKey(ver));
if (isCached) {
return state;
}
unawaited(
_performQuery(
stanza,
ver,
hashFunctionName,
capabilityNode,
from,
),
);
return state; return state;
} }

View File

@ -8,6 +8,7 @@ environment:
sdk: '>=3.0.0 <4.0.0' sdk: '>=3.0.0 <4.0.0'
dependencies: dependencies:
async_queue: ^1.3.0
collection: ^1.16.0 collection: ^1.16.0
cryptography: ^2.0.5 cryptography: ^2.0.5
hex: ^0.2.0 hex: ^0.2.0

View File

@ -11,14 +11,16 @@ void main() {
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(parser).forEach((event) { controller.stream.transform(parser).forEach((events) {
if (event is! XMPPStreamElement) return; for (final event in events) {
final node = event.node; if (event is! XMPPStreamElement) continue;
final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
childa = true; childa = true;
} else if (node.tag == 'childb') { } else if (node.tag == 'childb') {
childb = true; childb = true;
}
} }
}), }),
); );
@ -36,14 +38,16 @@ void main() {
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(parser).forEach((event) { controller.stream.transform(parser).forEach((events) {
if (event is! XMPPStreamElement) return; for (final event in events) {
final node = event.node; if (event is! XMPPStreamElement) continue;
final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
childa = true; childa = true;
} else if (node.tag == 'childb') { } else if (node.tag == 'childb') {
childb = true; childb = true;
}
} }
}), }),
); );
@ -64,14 +68,16 @@ void main() {
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(parser).forEach((event) { controller.stream.transform(parser).forEach((events) {
if (event is! XMPPStreamElement) return; for (final event in events) {
final node = event.node; if (event is! XMPPStreamElement) continue;
final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
childa = true; childa = true;
} else if (node.tag == 'childb') { } else if (node.tag == 'childb') {
childb = true; childb = true;
}
} }
}), }),
); );
@ -93,13 +99,15 @@ void main() {
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(parser).forEach((node) { controller.stream.transform(parser).forEach((events) {
if (node is XMPPStreamElement) { for (final event in events) {
if (node.node.tag == 'childa') { if (event is XMPPStreamElement) {
childa = true; if (event.node.tag == 'childa') {
childa = true;
}
} else if (event is XMPPStreamHeader) {
attrs = event.attributes;
} }
} else if (node is XMPPStreamHeader) {
attrs = node.attributes;
} }
}), }),
); );
@ -118,11 +126,13 @@ void main() {
var gotFeatures = false; var gotFeatures = false;
unawaited( unawaited(
controller.stream.transform(parser).forEach( controller.stream.transform(parser).forEach(
(event) { (events) {
if (event is! XMPPStreamElement) return; for (final event in events) {
if (event is! XMPPStreamElement) continue;
if (event.node.tag == 'stream:features') { if (event.node.tag == 'stream:features') {
gotFeatures = true; gotFeatures = true;
}
} }
}, },
), ),
@ -157,4 +167,27 @@ void main() {
await Future<void>.delayed(const Duration(seconds: 1)); await Future<void>.delayed(const Duration(seconds: 1));
expect(gotFeatures, true); expect(gotFeatures, true);
}); });
test('Test the order of concatenated stanzas', () async {
// NOTE: This seems weird, but it turns out that not keeping this order leads to
// MUC joins (on Moxxy) not catching every bit of presence before marking the
// MUC as joined.
final parser = XMPPStreamParser();
final controller = StreamController<String>();
var called = false;
unawaited(
controller.stream.transform(parser).forEach((events) {
expect(events.isNotEmpty, true);
expect((events[0] as XMPPStreamElement).node.tag, 'childa');
expect((events[1] as XMPPStreamElement).node.tag, 'childb');
expect((events[2] as XMPPStreamElement).node.tag, 'childc');
called = true;
}),
);
controller.add('<childa /><childb /><childc />');
await Future<void>.delayed(const Duration(seconds: 2));
expect(called, true);
});
} }