diff --git a/packages/moxxmpp/lib/src/buffer.dart b/packages/moxxmpp/lib/src/buffer.dart deleted file mode 100644 index c9904bb..0000000 --- a/packages/moxxmpp/lib/src/buffer.dart +++ /dev/null @@ -1,79 +0,0 @@ -import 'dart:async'; -import 'package:moxxmpp/src/stringxml.dart'; -import 'package:xml/xml.dart'; -import 'package:xml/xml_events.dart'; - -/// A result object for XmlStreamBuffer. -abstract class XmlStreamBufferObject {} - -/// A complete XML element returned by the stream buffer. -class XmlStreamBufferElement extends XmlStreamBufferObject { - XmlStreamBufferElement(this.node); - - /// The actual [XMLNode]. - final XMLNode node; -} - -/// Just the stream header of a new XML stream. -class XmlStreamBufferHeader extends XmlStreamBufferObject { - XmlStreamBufferHeader(this.attributes); - - /// The headers of the stream header. - final Map attributes; -} - -/// A buffer to put between a socket's input and a full XML stream. -class XmlStreamBuffer - extends StreamTransformerBase { - final StreamController _streamController = - StreamController(); - - @override - Stream bind(Stream stream) { - final events = stream.toXmlEvents().asBroadcastStream(); - events.transform( - StreamTransformer, XmlStartElementEvent>.fromHandlers( - handleData: (events, sink) { - for (final event in events) { - if (event is! XmlStartElementEvent) { - continue; - } - if (event.name != 'stream:stream') { - continue; - } - - sink.add(event); - } - }, - ), - ).listen((event) { - _streamController.add( - XmlStreamBufferHeader( - Map.fromEntries( - event.attributes.map((attr) { - return MapEntry(attr.name, attr.value); - }), - ), - ), - ); - }); - - events - .selectSubtreeEvents((event) { - return event.qualifiedName != 'stream:stream'; - }) - .transform(const XmlNodeDecoder()) - .listen((nodes) { - for (final node in nodes) { - if (node.nodeType == XmlNodeType.ELEMENT) { - _streamController.add( - XmlStreamBufferElement( - XMLNode.fromXmlElement(node as XmlElement), - ), - ); - } - } - }); - return _streamController.stream; - } -} diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index 4819e82..6c92e9f 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -3,7 +3,6 @@ import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:moxlib/moxlib.dart'; import 'package:moxxmpp/src/awaiter.dart'; -import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/connectivity.dart'; import 'package:moxxmpp/src/errors.dart'; @@ -18,6 +17,7 @@ import 'package:moxxmpp/src/managers/handlers.dart'; import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/negotiators/namespaces.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:moxxmpp/src/presence.dart'; import 'package:moxxmpp/src/reconnect.dart'; import 'package:moxxmpp/src/roster/roster.dart'; @@ -88,7 +88,7 @@ class XmppConnection { _socketStream = _socket.getDataStream(); // TODO(Unknown): Handle on done - _socketStream.transform(_streamBuffer).forEach(handleXmlStream); + _socketStream.transform(_streamParser).forEach(handleXmlStream); _socket.getEventStream().listen(_handleSocketEvent); } @@ -128,8 +128,8 @@ class XmppConnection { StreamController.broadcast(); final Map _xmppManagers = {}; - /// The buffer object to keep split up stanzas together - final XmlStreamBuffer _streamBuffer = XmlStreamBuffer(); + /// The parser for the entire XMPP XML stream. + final XMPPStreamParser _streamParser = XMPPStreamParser(); /// UUID object to generate stanza and origin IDs final Uuid _uuid = const Uuid(); @@ -752,17 +752,17 @@ class XmppConnection { } /// Called whenever we receive data that has been parsed as XML. - Future handleXmlStream(XmlStreamBufferObject event) async { - if (event is XmlStreamBufferHeader) { + Future handleXmlStream(XMPPStreamObject event) async { + if (event is XMPPStreamHeader) { await _negotiationsHandler.negotiate(event); return; } assert( - event is XmlStreamBufferElement, - 'The event must be a XmlStreamBufferElement', + event is XMPPStreamElement, + 'The event must be a XMPPStreamElement', ); - final node = (event as XmlStreamBufferElement).node; + final node = (event as XMPPStreamElement).node; // Check if we received a stream error if (node.tag == 'stream:error') { @@ -885,6 +885,9 @@ class XmppConnection { _log.info('Got okay from connectivityManager'); } + // Reset the stream parser + _streamParser.reset(); + final smManager = getStreamManagementManager(); var host = connectionSettings.host; var port = connectionSettings.port; diff --git a/packages/moxxmpp/lib/src/handlers/base.dart b/packages/moxxmpp/lib/src/handlers/base.dart index 7b84886..eaf06db 100644 --- a/packages/moxxmpp/lib/src/handlers/base.dart +++ b/packages/moxxmpp/lib/src/handlers/base.dart @@ -1,9 +1,9 @@ import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:moxxmpp/src/settings.dart'; import 'package:moxxmpp/src/stringxml.dart'; @@ -116,8 +116,8 @@ abstract class NegotiationsHandler { } /// Called whenever the stream buffer outputs a new event [event]. - Future negotiate(XmlStreamBufferObject event) async { - if (event is XmlStreamBufferHeader) { + Future negotiate(XMPPStreamObject event) async { + if (event is XMPPStreamHeader) { streamId = event.attributes['id']; } } diff --git a/packages/moxxmpp/lib/src/handlers/client.dart b/packages/moxxmpp/lib/src/handlers/client.dart index a7d2cc0..91ab099 100644 --- a/packages/moxxmpp/lib/src/handlers/client.dart +++ b/packages/moxxmpp/lib/src/handlers/client.dart @@ -1,10 +1,10 @@ import 'package:meta/meta.dart'; -import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/handlers/base.dart'; import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:moxxmpp/src/stringxml.dart'; /// "Nonza" describing the XMPP stream header of a client-to-server connection. @@ -211,8 +211,8 @@ class ClientToServerNegotiator extends NegotiationsHandler { } @override - Future negotiate(XmlStreamBufferObject event) async { - if (event is XmlStreamBufferElement) { + Future negotiate(XMPPStreamObject event) async { + if (event is XMPPStreamElement) { if (event.node.tag == 'stream:features') { // Store the received stream features _streamFeatures diff --git a/packages/moxxmpp/lib/src/handlers/component.dart b/packages/moxxmpp/lib/src/handlers/component.dart index 3fe4cb1..33360dc 100644 --- a/packages/moxxmpp/lib/src/handlers/component.dart +++ b/packages/moxxmpp/lib/src/handlers/component.dart @@ -1,12 +1,12 @@ import 'dart:convert'; import 'package:cryptography/cryptography.dart'; import 'package:hex/hex.dart'; -import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/handlers/base.dart'; import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:moxxmpp/src/stringxml.dart'; /// Nonza describing the XMPP stream header. @@ -70,10 +70,10 @@ class ComponentToServerNegotiator extends NegotiationsHandler { } @override - Future negotiate(XmlStreamBufferObject event) async { + Future negotiate(XMPPStreamObject event) async { switch (_state) { case ComponentToServerState.idle: - if (event is XmlStreamBufferHeader) { + if (event is XMPPStreamHeader) { streamId = event.attributes['id']; assert( streamId != null, @@ -93,7 +93,7 @@ class ComponentToServerNegotiator extends NegotiationsHandler { } break; case ComponentToServerState.handshakeSent: - if (event is XmlStreamBufferElement) { + if (event is XMPPStreamElement) { if (event.node.tag == 'handshake' && event.node.children.isEmpty && event.node.attributes.isEmpty) { diff --git a/packages/moxxmpp/lib/src/parser.dart b/packages/moxxmpp/lib/src/parser.dart new file mode 100644 index 0000000..9292493 --- /dev/null +++ b/packages/moxxmpp/lib/src/parser.dart @@ -0,0 +1,144 @@ +import 'dart:async'; +import 'dart:convert'; +import 'package:moxxmpp/src/stringxml.dart'; +// ignore: implementation_imports +import 'package:xml/src/xml_events/utils/conversion_sink.dart'; +import 'package:xml/xml.dart'; +import 'package:xml/xml_events.dart'; + +/// A result object for XmlStreamBuffer. +abstract class XMPPStreamObject {} + +/// A complete XML element returned by the stream buffer. +class XMPPStreamElement extends XMPPStreamObject { + XMPPStreamElement(this.node); + + /// The actual [XMLNode]. + final XMLNode node; +} + +/// Just the stream header of a new XML stream. +class XMPPStreamHeader extends XMPPStreamObject { + XMPPStreamHeader(this.attributes); + + /// The headers of the stream header. + final Map attributes; +} + +/// A wrapper around a [Converter]'s [Converter.startChunkedConversion] method. +class _ChunkedConversionBuffer { + /// Use the converter [converter]. + _ChunkedConversionBuffer(Converter> converter) { + _outputSink = ConversionSink>(_results.addAll); + _inputSink = converter.startChunkedConversion(_outputSink); + } + + /// The results of the converter. + final List _results = List.empty(growable: true); + + /// The sink that outputs to [_results]. + late ConversionSink> _outputSink; + + /// The sink that we use for input. + late Sink _inputSink; + + /// Close all opened sinks. + void close() { + _inputSink.close(); + _outputSink.close(); + } + + /// Turn the input [input] into a list of [T] according to the initial converter. + List convert(S input) { + _results.clear(); + _inputSink.add(input); + return _results; + } +} + +/// A buffer to put between a socket's input and a full XML stream. +class XMPPStreamParser extends StreamTransformerBase { + final StreamController _streamController = + StreamController(); + + /// Turns a String into a list of [XmlEvent]s in a chunked fashion. + _ChunkedConversionBuffer _eventBuffer = + _ChunkedConversionBuffer(XmlEventDecoder()); + + /// Turns a list of [XmlEvent]s into a list of [XmlNode]s in a chunked fashion. + _ChunkedConversionBuffer, XmlNode> _childBuffer = + _ChunkedConversionBuffer, XmlNode>(const XmlNodeDecoder()); + + void reset() { + try { + _eventBuffer.close(); + } catch (_) { + // Do nothing. A crash here may indicate that we end on invalid XML, which is fine + // since we're not going to use the buffer's output anymore. + } + try { + _childBuffer.close(); + } catch (_) { + // Do nothing. + } + + // Recreate the buffers. + _eventBuffer = + _ChunkedConversionBuffer(XmlEventDecoder()); + _childBuffer = _ChunkedConversionBuffer, XmlNode>( + const XmlNodeDecoder(), + ); + } + + @override + Stream bind(Stream stream) { + // We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they + // create streams we cannot close. We need to be able to destroy and recreate an + // XML parser whenever we start a new connection. + final childSelector = + XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream'); + final streamHeaderSelector = + XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream'); + + stream.listen((input) { + final events = _eventBuffer.convert(input); + final streamHeaderEvents = streamHeaderSelector.convert(events); + + // Process the stream header separately. + for (final event in streamHeaderEvents) { + if (event is! XmlStartElementEvent) { + continue; + } + + if (event.name != 'stream:stream') { + continue; + } + + _streamController.add( + XMPPStreamHeader( + Map.fromEntries( + event.attributes.map((attr) { + return MapEntry(attr.name, attr.value); + }), + ), + ), + ); + } + + // Process the children of the element. + final childEvents = childSelector.convert(events); + final children = _childBuffer.convert(childEvents); + for (final node in children) { + if (node.nodeType == XmlNodeType.ELEMENT) { + _streamController.add( + XMPPStreamElement( + XMLNode.fromXmlElement(node as XmlElement), + ), + ); + } + } + }); + + return _streamController.stream; + } +} diff --git a/packages/moxxmpp/test/negotiator_test.dart b/packages/moxxmpp/test/negotiator_test.dart index bd112c4..4323a1b 100644 --- a/packages/moxxmpp/test/negotiator_test.dart +++ b/packages/moxxmpp/test/negotiator_test.dart @@ -1,5 +1,5 @@ import 'package:moxxmpp/moxxmpp.dart'; -import 'package:moxxmpp/src/buffer.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:test/test.dart'; import 'helpers/logging.dart'; @@ -83,7 +83,7 @@ void main() { await negotiator.runPostRegisterCallback(); await negotiator.negotiate( - XmlStreamBufferElement( + XMPPStreamElement( XMLNode.fromString( ''' diff --git a/packages/moxxmpp/test/xmlstreambuffer_test.dart b/packages/moxxmpp/test/xmpp_parser_test.dart similarity index 54% rename from packages/moxxmpp/test/xmlstreambuffer_test.dart rename to packages/moxxmpp/test/xmpp_parser_test.dart index eef4245..265f393 100644 --- a/packages/moxxmpp/test/xmlstreambuffer_test.dart +++ b/packages/moxxmpp/test/xmpp_parser_test.dart @@ -1,5 +1,5 @@ import 'dart:async'; -import 'package:moxxmpp/src/buffer.dart'; +import 'package:moxxmpp/src/parser.dart'; import 'package:test/test.dart'; void main() { @@ -7,12 +7,12 @@ void main() { var childa = false; var childb = false; - final buffer = XmlStreamBuffer(); + final parser = XMPPStreamParser(); final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((event) { - if (event is! XmlStreamBufferElement) return; + controller.stream.transform(parser).forEach((event) { + if (event is! XMPPStreamElement) return; final node = event.node; if (node.tag == 'childa') { @@ -32,12 +32,12 @@ void main() { var childa = false; var childb = false; - final buffer = XmlStreamBuffer(); + final parser = XMPPStreamParser(); final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((event) { - if (event is! XmlStreamBufferElement) return; + controller.stream.transform(parser).forEach((event) { + if (event is! XMPPStreamElement) return; final node = event.node; if (node.tag == 'childa') { @@ -60,12 +60,12 @@ void main() { var childa = false; var childb = false; - final buffer = XmlStreamBuffer(); + final parser = XMPPStreamParser(); final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((event) { - if (event is! XmlStreamBufferElement) return; + controller.stream.transform(parser).forEach((event) { + if (event is! XMPPStreamElement) return; final node = event.node; if (node.tag == 'childa') { @@ -89,16 +89,16 @@ void main() { var childa = false; Map? attrs; - final buffer = XmlStreamBuffer(); + final parser = XMPPStreamParser(); final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((node) { - if (node is XmlStreamBufferElement) { + controller.stream.transform(parser).forEach((node) { + if (node is XMPPStreamElement) { if (node.node.tag == 'childa') { childa = true; } - } else if (node is XmlStreamBufferHeader) { + } else if (node is XMPPStreamHeader) { attrs = node.attributes; } }), @@ -111,4 +111,50 @@ void main() { expect(childa, true); expect(attrs!['id'], 'abc123'); }); + + test('Test restarting a broken XML stream', () async { + final parser = XMPPStreamParser(); + final controller = StreamController(); + var gotFeatures = false; + unawaited( + controller.stream.transform(parser).forEach( + (event) { + if (event is! XMPPStreamElement) return; + + if (event.node.tag == 'stream:features') { + gotFeatures = true; + } + }, + ), + ); + + // Begin the stream with invalid XML + controller.add(' + + + PLAIN + + + ''', + ); + + // Let it marinate + await Future.delayed(const Duration(seconds: 1)); + expect(gotFeatures, true); + }); } diff --git a/packages/moxxmpp/test/xmpp_test.dart b/packages/moxxmpp/test/xmpp_test.dart index abb1ad4..6a438e2 100644 --- a/packages/moxxmpp/test/xmpp_test.dart +++ b/packages/moxxmpp/test/xmpp_test.dart @@ -526,4 +526,103 @@ void main() { true, ); }); + + test('Test an invalid XML continuation', () async { + final fakeSocket = StubTCPSocket( + [ + StringExpectation( + "", + ''' + + + + PLAIN + ", + ''' + + + + PLAIN + + ''', + ), + StringExpectation( + "AHRlc3R1c2VyAGFiYzEyMw==", + '', + ), + StringExpectation( + "", + ''' + + + + + + ''', + ), + StanzaExpectation( + '', + 'testuser@example.org/MU29eEZn', + ignoreId: true, + ), + ], + ); + + final conn = XmppConnection( + TestingReconnectionPolicy(), + AlwaysConnectedConnectivityManager(), + ClientToServerNegotiator(), + fakeSocket, + )..connectionSettings = ConnectionSettings( + jid: JID.fromString('testuser@example.org'), + password: 'abc123', + ); + await conn.registerFeatureNegotiators([ + SaslPlainNegotiator(), + ResourceBindingNegotiator(), + ]); + + final result1 = conn.connect( + waitUntilLogin: true, + ); + await Future.delayed(const Duration(seconds: 2)); + + // Inject a fault + fakeSocket.injectSocketFault(); + expect( + (await result1).isType(), + false, + ); + + // Try to connect again + final result2 = await conn.connect( + waitUntilLogin: true, + ); + expect( + fakeSocket.getState(), + 5, + ); + expect( + result2.isType(), + true, + ); + }); }