diff --git a/packages/moxxmpp/lib/src/buffer.dart b/packages/moxxmpp/lib/src/buffer.dart index ab0e497..c9904bb 100644 --- a/packages/moxxmpp/lib/src/buffer.dart +++ b/packages/moxxmpp/lib/src/buffer.dart @@ -1,29 +1,76 @@ import 'dart:async'; - import 'package:moxxmpp/src/stringxml.dart'; - import 'package:xml/xml.dart'; import 'package:xml/xml_events.dart'; -class XmlStreamBuffer extends StreamTransformerBase { - XmlStreamBuffer() - : _streamController = StreamController(), - _decoder = const XmlNodeDecoder(); - final StreamController _streamController; - final XmlNodeDecoder _decoder; +/// A result object for XmlStreamBuffer. +abstract class XmlStreamBufferObject {} + +/// A complete XML element returned by the stream buffer. +class XmlStreamBufferElement extends XmlStreamBufferObject { + XmlStreamBufferElement(this.node); + + /// The actual [XMLNode]. + final XMLNode node; +} + +/// Just the stream header of a new XML stream. +class XmlStreamBufferHeader extends XmlStreamBufferObject { + XmlStreamBufferHeader(this.attributes); + + /// The headers of the stream header. + final Map attributes; +} + +/// A buffer to put between a socket's input and a full XML stream. +class XmlStreamBuffer + extends StreamTransformerBase { + final StreamController _streamController = + StreamController(); @override - Stream bind(Stream stream) { - stream - .toXmlEvents() + Stream bind(Stream stream) { + final events = stream.toXmlEvents().asBroadcastStream(); + events.transform( + StreamTransformer, XmlStartElementEvent>.fromHandlers( + handleData: (events, sink) { + for (final event in events) { + if (event is! XmlStartElementEvent) { + continue; + } + if (event.name != 'stream:stream') { + continue; + } + + sink.add(event); + } + }, + ), + ).listen((event) { + _streamController.add( + XmlStreamBufferHeader( + Map.fromEntries( + event.attributes.map((attr) { + return MapEntry(attr.name, attr.value); + }), + ), + ), + ); + }); + + events .selectSubtreeEvents((event) { return event.qualifiedName != 'stream:stream'; }) - .transform(_decoder) + .transform(const XmlNodeDecoder()) .listen((nodes) { for (final node in nodes) { if (node.nodeType == XmlNodeType.ELEMENT) { - _streamController.add(XMLNode.fromXmlElement(node as XmlElement)); + _streamController.add( + XmlStreamBufferElement( + XMLNode.fromXmlElement(node as XmlElement), + ), + ); } } }); diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index bd5ed09..78107f5 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -766,7 +766,18 @@ class XmppConnection { } /// Called whenever we receive data that has been parsed as XML. - Future handleXmlStream(XMLNode node) async { + Future handleXmlStream(XmlStreamBufferObject event) async { + if (event is XmlStreamBufferHeader) { + _negotiationsHandler.setStreamHeaderId(event.attributes['id']); + return; + } + + assert( + event is XmlStreamBufferElement, + 'The event must be a XmlStreamBufferElement', + ); + final node = (event as XmlStreamBufferElement).node; + // Check if we received a stream error if (node.tag == 'stream:error') { _log @@ -788,7 +799,7 @@ class XmppConnection { // prevent this issue. await _negotiationLock.synchronized(() async { if (_routingState != RoutingState.negotiating) { - unawaited(handleXmlStream(node)); + unawaited(handleXmlStream(XmlStreamBufferElement(node))); return; } @@ -940,7 +951,7 @@ class XmppConnection { } else { await _reconnectionPolicy.onSuccess(); _log.fine('Preparing the internal state for a connection attempt'); - _negotiationsHandler.resetNegotiators(); + _negotiationsHandler.reset(); await _setConnectionState(XmppConnectionState.connecting); _updateRoutingState(RoutingState.negotiating); _isAuthenticated = false; diff --git a/packages/moxxmpp/lib/src/negotiators/handler.dart b/packages/moxxmpp/lib/src/negotiators/handler.dart index 258d67c..67608dc 100644 --- a/packages/moxxmpp/lib/src/negotiators/handler.dart +++ b/packages/moxxmpp/lib/src/negotiators/handler.dart @@ -43,6 +43,15 @@ abstract class NegotiationsHandler { @protected late final IsAuthenticatedFunction isAuthenticated; + /// The id included in the last stream header. + @protected + String? streamId; + + /// Set the id of the last stream header. + void setStreamHeaderId(String? id) { + streamId = id; + } + /// Returns, if registered, a negotiator with id [id]. T? getNegotiatorById(String id) => negotiators[id] as T?; @@ -81,9 +90,10 @@ abstract class NegotiationsHandler { /// Remove [feature] from the stream features we are currently negotiating. void removeNegotiatingFeature(String feature) {} - /// Resets all registered negotiators. + /// Resets all registered negotiators and the negotiation handler. @mustCallSuper - void resetNegotiators() { + void reset() { + streamId = null; for (final negotiator in negotiators.values) { negotiator.reset(); } @@ -110,8 +120,8 @@ class ClientToServerNegotiator extends NegotiationsHandler { } @override - void resetNegotiators() { - super.resetNegotiators(); + void reset() { + super.reset(); // Prevent leaking the last active negotiator _currentNegotiator = null; diff --git a/packages/moxxmpp/test/xmlstreambuffer_test.dart b/packages/moxxmpp/test/xmlstreambuffer_test.dart index 8c8e26f..eef4245 100644 --- a/packages/moxxmpp/test/xmlstreambuffer_test.dart +++ b/packages/moxxmpp/test/xmlstreambuffer_test.dart @@ -11,7 +11,10 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((node) { + controller.stream.transform(buffer).forEach((event) { + if (event is! XmlStreamBufferElement) return; + final node = event.node; + if (node.tag == 'childa') { childa = true; } else if (node.tag == 'childb') { @@ -33,7 +36,10 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((node) { + controller.stream.transform(buffer).forEach((event) { + if (event is! XmlStreamBufferElement) return; + final node = event.node; + if (node.tag == 'childa') { childa = true; } else if (node.tag == 'childb') { @@ -58,7 +64,10 @@ void main() { final controller = StreamController(); unawaited( - controller.stream.transform(buffer).forEach((node) { + controller.stream.transform(buffer).forEach((event) { + if (event is! XmlStreamBufferElement) return; + final node = event.node; + if (node.tag == 'childa') { childa = true; } else if (node.tag == 'childb') { @@ -75,4 +84,31 @@ void main() { expect(childa, true); expect(childb, true); }); + + test('Test opening the stream', () async { + var childa = false; + Map? attrs; + + final buffer = XmlStreamBuffer(); + final controller = StreamController(); + + unawaited( + controller.stream.transform(buffer).forEach((node) { + if (node is XmlStreamBufferElement) { + if (node.node.tag == 'childa') { + childa = true; + } + } else if (node is XmlStreamBufferHeader) { + attrs = node.attributes; + } + }), + ); + controller + ..add(''); + + await Future.delayed(const Duration(seconds: 2)); + expect(childa, true); + expect(attrs!['id'], 'abc123'); + }); }