feat(core): Allow tracking the stream id

This commit is contained in:
PapaTutuWawa 2023-04-03 23:18:10 +02:00
parent 3e43ac22d7
commit 63c84d9479
4 changed files with 127 additions and 23 deletions

View File

@ -1,29 +1,76 @@
import 'dart:async';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:xml/xml.dart';
import 'package:xml/xml_events.dart';
class XmlStreamBuffer extends StreamTransformerBase<String, XMLNode> {
XmlStreamBuffer()
: _streamController = StreamController(),
_decoder = const XmlNodeDecoder();
final StreamController<XMLNode> _streamController;
final XmlNodeDecoder _decoder;
/// A result object for XmlStreamBuffer.
abstract class XmlStreamBufferObject {}
/// A complete XML element returned by the stream buffer.
class XmlStreamBufferElement extends XmlStreamBufferObject {
XmlStreamBufferElement(this.node);
/// The actual [XMLNode].
final XMLNode node;
}
/// Just the stream header of a new XML stream.
class XmlStreamBufferHeader extends XmlStreamBufferObject {
XmlStreamBufferHeader(this.attributes);
/// The headers of the stream header.
final Map<String, String> attributes;
}
/// A buffer to put between a socket's input and a full XML stream.
class XmlStreamBuffer
extends StreamTransformerBase<String, XmlStreamBufferObject> {
final StreamController<XmlStreamBufferObject> _streamController =
StreamController<XmlStreamBufferObject>();
@override
Stream<XMLNode> bind(Stream<String> stream) {
stream
.toXmlEvents()
Stream<XmlStreamBufferObject> bind(Stream<String> stream) {
final events = stream.toXmlEvents().asBroadcastStream();
events.transform(
StreamTransformer<List<XmlEvent>, XmlStartElementEvent>.fromHandlers(
handleData: (events, sink) {
for (final event in events) {
if (event is! XmlStartElementEvent) {
continue;
}
if (event.name != 'stream:stream') {
continue;
}
sink.add(event);
}
},
),
).listen((event) {
_streamController.add(
XmlStreamBufferHeader(
Map<String, String>.fromEntries(
event.attributes.map((attr) {
return MapEntry(attr.name, attr.value);
}),
),
),
);
});
events
.selectSubtreeEvents((event) {
return event.qualifiedName != 'stream:stream';
})
.transform(_decoder)
.transform(const XmlNodeDecoder())
.listen((nodes) {
for (final node in nodes) {
if (node.nodeType == XmlNodeType.ELEMENT) {
_streamController.add(XMLNode.fromXmlElement(node as XmlElement));
_streamController.add(
XmlStreamBufferElement(
XMLNode.fromXmlElement(node as XmlElement),
),
);
}
}
});

View File

@ -766,7 +766,18 @@ class XmppConnection {
}
/// Called whenever we receive data that has been parsed as XML.
Future<void> handleXmlStream(XMLNode node) async {
Future<void> handleXmlStream(XmlStreamBufferObject event) async {
if (event is XmlStreamBufferHeader) {
_negotiationsHandler.setStreamHeaderId(event.attributes['id']);
return;
}
assert(
event is XmlStreamBufferElement,
'The event must be a XmlStreamBufferElement',
);
final node = (event as XmlStreamBufferElement).node;
// Check if we received a stream error
if (node.tag == 'stream:error') {
_log
@ -788,7 +799,7 @@ class XmppConnection {
// prevent this issue.
await _negotiationLock.synchronized(() async {
if (_routingState != RoutingState.negotiating) {
unawaited(handleXmlStream(node));
unawaited(handleXmlStream(XmlStreamBufferElement(node)));
return;
}
@ -940,7 +951,7 @@ class XmppConnection {
} else {
await _reconnectionPolicy.onSuccess();
_log.fine('Preparing the internal state for a connection attempt');
_negotiationsHandler.resetNegotiators();
_negotiationsHandler.reset();
await _setConnectionState(XmppConnectionState.connecting);
_updateRoutingState(RoutingState.negotiating);
_isAuthenticated = false;

View File

@ -43,6 +43,15 @@ abstract class NegotiationsHandler {
@protected
late final IsAuthenticatedFunction isAuthenticated;
/// The id included in the last stream header.
@protected
String? streamId;
/// Set the id of the last stream header.
void setStreamHeaderId(String? id) {
streamId = id;
}
/// Returns, if registered, a negotiator with id [id].
T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) =>
negotiators[id] as T?;
@ -81,9 +90,10 @@ abstract class NegotiationsHandler {
/// Remove [feature] from the stream features we are currently negotiating.
void removeNegotiatingFeature(String feature) {}
/// Resets all registered negotiators.
/// Resets all registered negotiators and the negotiation handler.
@mustCallSuper
void resetNegotiators() {
void reset() {
streamId = null;
for (final negotiator in negotiators.values) {
negotiator.reset();
}
@ -110,8 +120,8 @@ class ClientToServerNegotiator extends NegotiationsHandler {
}
@override
void resetNegotiators() {
super.resetNegotiators();
void reset() {
super.reset();
// Prevent leaking the last active negotiator
_currentNegotiator = null;

View File

@ -11,7 +11,10 @@ void main() {
final controller = StreamController<String>();
unawaited(
controller.stream.transform(buffer).forEach((node) {
controller.stream.transform(buffer).forEach((event) {
if (event is! XmlStreamBufferElement) return;
final node = event.node;
if (node.tag == 'childa') {
childa = true;
} else if (node.tag == 'childb') {
@ -33,7 +36,10 @@ void main() {
final controller = StreamController<String>();
unawaited(
controller.stream.transform(buffer).forEach((node) {
controller.stream.transform(buffer).forEach((event) {
if (event is! XmlStreamBufferElement) return;
final node = event.node;
if (node.tag == 'childa') {
childa = true;
} else if (node.tag == 'childb') {
@ -58,7 +64,10 @@ void main() {
final controller = StreamController<String>();
unawaited(
controller.stream.transform(buffer).forEach((node) {
controller.stream.transform(buffer).forEach((event) {
if (event is! XmlStreamBufferElement) return;
final node = event.node;
if (node.tag == 'childa') {
childa = true;
} else if (node.tag == 'childb') {
@ -75,4 +84,31 @@ void main() {
expect(childa, true);
expect(childb, true);
});
test('Test opening the stream', () async {
var childa = false;
Map<String, String>? attrs;
final buffer = XmlStreamBuffer();
final controller = StreamController<String>();
unawaited(
controller.stream.transform(buffer).forEach((node) {
if (node is XmlStreamBufferElement) {
if (node.node.tag == 'childa') {
childa = true;
}
} else if (node is XmlStreamBufferHeader) {
attrs = node.attributes;
}
}),
);
controller
..add('<stream:stream id="abc123"><childa')
..add(' />');
await Future<void>.delayed(const Duration(seconds: 2));
expect(childa, true);
expect(attrs!['id'], 'abc123');
});
}