fix(core): Fix stream parsing breaking after some time

This commit is contained in:
PapaTutuWawa 2023-05-07 00:09:35 +02:00
parent f73daf4d1c
commit 483cb0d7f1
9 changed files with 65 additions and 41 deletions

View File

@ -84,6 +84,10 @@ class XmppConnection {
() => _isAuthenticated, () => _isAuthenticated,
sendRawXML, sendRawXML,
() => connectionSettings, () => connectionSettings,
() {
_log.finest('Resetting stream parser');
_streamParser.reset();
},
); );
_socketStream = _socket.getDataStream(); _socketStream = _socket.getDataStream();
@ -699,7 +703,7 @@ class XmppConnection {
// Process nonzas separately // Process nonzas separately
if (!['message', 'iq', 'presence'].contains(nonza.tag)) { if (!['message', 'iq', 'presence'].contains(nonza.tag)) {
_log.finest('<== ${nonza.toXml()}'); _log.finest('<== ${nonza.toXml()}');
var nonzaHandled = false; var nonzaHandled = false;
await Future.forEach(_xmppManagers.values, await Future.forEach(_xmppManagers.values,
(XmppManagerBase manager) async { (XmppManagerBase manager) async {
@ -746,7 +750,7 @@ class XmppConnection {
), ),
); );
if (!incomingHandlers.done) { if (!incomingHandlers.done) {
_log.warning('Returning error for unhandled stanza'); _log.warning('Returning error for unhandled stanza ${incomingPreHandlers.stanza.tag}');
await handleUnhandledStanza(this, incomingPreHandlers); await handleUnhandledStanza(this, incomingPreHandlers);
} }
} }
@ -809,8 +813,6 @@ class XmppConnection {
/// Sends an event to the connection's event stream. /// Sends an event to the connection's event stream.
Future<void> _sendEvent(XmppEvent event) async { Future<void> _sendEvent(XmppEvent event) async {
_log.finest('Event: ${event.toString()}');
for (final manager in _xmppManagers.values) { for (final manager in _xmppManagers.values) {
await manager.onXmppEvent(event); await manager.onXmppEvent(event);
} }

View File

@ -22,6 +22,9 @@ typedef SendNonzaFunction = void Function(XMLNode);
/// Returns the connection settings. /// Returns the connection settings.
typedef GetConnectionSettingsFunction = ConnectionSettings Function(); typedef GetConnectionSettingsFunction = ConnectionSettings Function();
/// Resets the stream parser's state.
typedef ResetStreamParserFunction = void Function();
/// This class implements the stream feature negotiation for XmppConnection. /// This class implements the stream feature negotiation for XmppConnection.
abstract class NegotiationsHandler { abstract class NegotiationsHandler {
@protected @protected
@ -51,6 +54,9 @@ abstract class NegotiationsHandler {
@protected @protected
late final GetConnectionSettingsFunction getConnectionSettings; late final GetConnectionSettingsFunction getConnectionSettings;
@protected
late final ResetStreamParserFunction resetStreamParser;
/// The id included in the last stream header. /// The id included in the last stream header.
@protected @protected
String? streamId; String? streamId;
@ -72,12 +78,14 @@ abstract class NegotiationsHandler {
IsAuthenticatedFunction isAuthenticated, IsAuthenticatedFunction isAuthenticated,
SendNonzaFunction sendNonza, SendNonzaFunction sendNonza,
GetConnectionSettingsFunction getConnectionSettings, GetConnectionSettingsFunction getConnectionSettings,
ResetStreamParserFunction resetStreamParser,
) { ) {
this.onNegotiationsDone = onNegotiationsDone; this.onNegotiationsDone = onNegotiationsDone;
this.handleError = handleError; this.handleError = handleError;
this.isAuthenticated = isAuthenticated; this.isAuthenticated = isAuthenticated;
this.sendNonza = sendNonza; this.sendNonza = sendNonza;
this.getConnectionSettings = getConnectionSettings; this.getConnectionSettings = getConnectionSettings;
this.resetStreamParser = resetStreamParser;
log = Logger(toString()); log = Logger(toString());
} }

View File

@ -60,6 +60,7 @@ class ClientToServerNegotiator extends NegotiationsHandler {
@override @override
void sendStreamHeader() { void sendStreamHeader() {
resetStreamParser();
sendNonza( sendNonza(
XMLNode( XMLNode(
tag: 'xml', tag: 'xml',

View File

@ -49,6 +49,7 @@ class ComponentToServerNegotiator extends NegotiationsHandler {
@override @override
void sendStreamHeader() { void sendStreamHeader() {
resetStreamParser();
sendNonza( sendNonza(
XMLNode( XMLNode(
tag: 'xml', tag: 'xml',

View File

@ -1,6 +1,5 @@
import 'package:moxlib/moxlib.dart'; import 'package:moxlib/moxlib.dart';
import 'package:moxxmpp/src/managers/data.dart'; import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
@ -56,9 +55,8 @@ class StanzaHandler extends Handler {
this.tagName, this.tagName,
this.priority = 0, this.priority = 0,
this.stanzaTag, this.stanzaTag,
this.xmlns = stanzaXmlns,
}); });
/// If specified, then the stanza must contain a direct child with a tag equal to /// If specified, then the stanza must contain a direct child with a tag equal to
/// [tagName]. /// [tagName].
final String? tagName; final String? tagName;
@ -71,11 +69,6 @@ class StanzaHandler extends Handler {
/// If specified, the matching stanza must have a tag equal to [stanzaTag]. /// If specified, the matching stanza must have a tag equal to [stanzaTag].
final String? stanzaTag; final String? stanzaTag;
/// If specified, then the stanza must have a xmlns attribute equal to [xmlns].
/// This defaults to [stanzaXmlns], but can be set to any other value or null. This
/// is useful, for example, for components.
final String? xmlns;
/// The priority after which [StanzaHandler]s are sorted. /// The priority after which [StanzaHandler]s are sorted.
final int priority; final int priority;
@ -88,9 +81,11 @@ class StanzaHandler extends Handler {
if (stanzaTag != null) { if (stanzaTag != null) {
matches &= node.tag == stanzaTag; matches &= node.tag == stanzaTag;
} }
if (xmlns != null) { // if (xmlns != null) {
matches &= node.xmlns == xmlns; // matches &= node.xmlns == xmlns;
} // if (flag != null)
// print('${node.xmlns} == $xmlns');
// }
if (tagName != null) { if (tagName != null) {
final firstTag = node.firstTag(tagName!, xmlns: tagXmlns); final firstTag = node.firstTag(tagName!, xmlns: tagXmlns);

View File

@ -69,6 +69,12 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode> _childBuffer = _ChunkedConversionBuffer<List<XmlEvent>, XmlNode> _childBuffer =
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(const XmlNodeDecoder()); _ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(const XmlNodeDecoder());
/// The selectors.
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent> _childSelector =
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream'));
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent> _streamHeaderSelector =
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream'));
void reset() { void reset() {
try { try {
_eventBuffer.close(); _eventBuffer.close();
@ -81,6 +87,16 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
} catch (_) { } catch (_) {
// Do nothing. // Do nothing.
} }
try {
_childSelector.close();
} catch (_) {
// Do nothing.
}
try {
_streamHeaderSelector.close();
} catch (_) {
// Do nothing.
}
// Recreate the buffers. // Recreate the buffers.
_eventBuffer = _eventBuffer =
@ -88,6 +104,9 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
_childBuffer = _ChunkedConversionBuffer<List<XmlEvent>, XmlNode>( _childBuffer = _ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(
const XmlNodeDecoder(), const XmlNodeDecoder(),
); );
_childSelector = _ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream'));
_streamHeaderSelector =
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream'));
} }
@override @override
@ -95,14 +114,9 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
// We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they
// create streams we cannot close. We need to be able to destroy and recreate an // create streams we cannot close. We need to be able to destroy and recreate an
// XML parser whenever we start a new connection. // XML parser whenever we start a new connection.
final childSelector =
XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream');
final streamHeaderSelector =
XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream');
stream.listen((input) { stream.listen((input) {
final events = _eventBuffer.convert(input); final events = _eventBuffer.convert(input);
final streamHeaderEvents = streamHeaderSelector.convert(events); final streamHeaderEvents = _streamHeaderSelector.convert(events);
// Process the stream header separately. // Process the stream header separately.
for (final event in streamHeaderEvents) { for (final event in streamHeaderEvents) {
@ -126,7 +140,7 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
} }
// Process the children of the <stream:stream> element. // Process the children of the <stream:stream> element.
final childEvents = childSelector.convert(events); final childEvents = _childSelector.convert(events);
final children = _childBuffer.convert(childEvents); final children = _childBuffer.convert(childEvents);
for (final node in children) { for (final node in children) {
if (node.nodeType == XmlNodeType.ELEMENT) { if (node.nodeType == XmlNodeType.ELEMENT) {

View File

@ -255,7 +255,7 @@ class StreamManagementManager extends XmppManagerBase {
_pendingAcks++; _pendingAcks++;
_startAckTimer(); _startAckTimer();
logger.fine('_pendingAcks is now at $_pendingAcks'); logger.fine('_pendingAcks is now at $_pendingAcks (caused by <r/>)');
getAttributes().sendNonza(StreamManagementRequestNonza()); getAttributes().sendNonza(StreamManagementRequestNonza());
@ -294,6 +294,7 @@ class StreamManagementManager extends XmppManagerBase {
/// Called when we receive an <a /> nonza from the server. /// Called when we receive an <a /> nonza from the server.
/// This is a response to the question "How many of my stanzas have you handled". /// This is a response to the question "How many of my stanzas have you handled".
Future<bool> _handleAckResponse(XMLNode nonza) async { Future<bool> _handleAckResponse(XMLNode nonza) async {
logger.finest('Received ack');
final h = int.parse(nonza.attributes['h']! as String); final h = int.parse(nonza.attributes['h']! as String);
_lastAckTimestamp = DateTime.now().millisecondsSinceEpoch; _lastAckTimestamp = DateTime.now().millisecondsSinceEpoch;
@ -306,6 +307,7 @@ class StreamManagementManager extends XmppManagerBase {
// Reset the timer // Reset the timer
if (_pendingAcks > 0) { if (_pendingAcks > 0) {
_stopAckTimer();
_startAckTimer(); _startAckTimer();
} }
} }
@ -314,7 +316,7 @@ class StreamManagementManager extends XmppManagerBase {
_stopAckTimer(); _stopAckTimer();
} }
logger.fine('_pendingAcks is now at $_pendingAcks'); logger.fine('_pendingAcks is now at $_pendingAcks (caused by <a/>)');
}); });
}); });

View File

@ -59,6 +59,7 @@ void main() {
jid: JID.fromString('test'), jid: JID.fromString('test'),
password: 'abc123', password: 'abc123',
), ),
() {},
) )
..registerNegotiator(StubNegotiator1()) ..registerNegotiator(StubNegotiator1())
..registerNegotiator(StubNegotiator2()); ..registerNegotiator(StubNegotiator2());
@ -77,6 +78,7 @@ void main() {
jid: JID.fromString('test'), jid: JID.fromString('test'),
password: 'abc123', password: 'abc123',
), ),
() {},
) )
..registerNegotiator(StubNegotiator1()) ..registerNegotiator(StubNegotiator1())
..registerNegotiator(StubNegotiator2()); ..registerNegotiator(StubNegotiator2());

View File

@ -122,24 +122,23 @@ void main() {
expect(handler.matches(stanza2), false); expect(handler.matches(stanza2), false);
}); });
test('Test matching stanzas with a different xmlns', () { // test('Test matching stanzas with a different xmlns', () {
final handler = StanzaHandler( // final handler = StanzaHandler(
callback: (stanza, _) async => StanzaHandlerData( // callback: (stanza, _) async => StanzaHandlerData(
true, // true,
false, // false,
null, // null,
stanza, // stanza,
), // ),
xmlns: componentAcceptXmlns, // );
);
expect(handler.matches(Stanza.iq(xmlns: stanzaXmlns)), false); // expect(handler.matches(Stanza.iq(xmlns: stanzaXmlns)), false);
expect(handler.matches(Stanza.message(xmlns: stanzaXmlns)), false); // expect(handler.matches(Stanza.message(xmlns: stanzaXmlns)), false);
expect(handler.matches(Stanza.presence(xmlns: stanzaXmlns)), false); // expect(handler.matches(Stanza.presence(xmlns: stanzaXmlns)), false);
expect(handler.matches(Stanza.iq(xmlns: componentAcceptXmlns)), true); // expect(handler.matches(Stanza.iq(xmlns: componentAcceptXmlns)), true);
expect(handler.matches(stanza1), false); // expect(handler.matches(stanza1), false);
expect(handler.matches(stanza2), false); // expect(handler.matches(stanza2), false);
}); // });
test('sorting', () { test('sorting', () {
final handlerList = [ final handlerList = [