fix(core): Fix getting stuck on invalid stream ends

Whenever the stream ends in a weird state, for example with
`<some-tag`, initiating a new stream will confuse the XML parser
and lead errors causing the connection to get stuck.

To fix this, we had to work around xml's extension methods and implement
the extensions in the new "stream parser" where we can reset the parser
(and by reset I mean replace) and thus continue with a fresh parsing state
whenever we reconnect (or connect).
This commit is contained in:
PapaTutuWawa 2023-05-06 20:58:34 +02:00
parent a8693da262
commit f73daf4d1c
9 changed files with 327 additions and 114 deletions

View File

@ -1,79 +0,0 @@
import 'dart:async';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:xml/xml.dart';
import 'package:xml/xml_events.dart';
/// A result object for XmlStreamBuffer.
abstract class XmlStreamBufferObject {}
/// A complete XML element returned by the stream buffer.
class XmlStreamBufferElement extends XmlStreamBufferObject {
XmlStreamBufferElement(this.node);
/// The actual [XMLNode].
final XMLNode node;
}
/// Just the stream header of a new XML stream.
class XmlStreamBufferHeader extends XmlStreamBufferObject {
XmlStreamBufferHeader(this.attributes);
/// The headers of the stream header.
final Map<String, String> attributes;
}
/// A buffer to put between a socket's input and a full XML stream.
class XmlStreamBuffer
extends StreamTransformerBase<String, XmlStreamBufferObject> {
final StreamController<XmlStreamBufferObject> _streamController =
StreamController<XmlStreamBufferObject>();
@override
Stream<XmlStreamBufferObject> bind(Stream<String> stream) {
final events = stream.toXmlEvents().asBroadcastStream();
events.transform(
StreamTransformer<List<XmlEvent>, XmlStartElementEvent>.fromHandlers(
handleData: (events, sink) {
for (final event in events) {
if (event is! XmlStartElementEvent) {
continue;
}
if (event.name != 'stream:stream') {
continue;
}
sink.add(event);
}
},
),
).listen((event) {
_streamController.add(
XmlStreamBufferHeader(
Map<String, String>.fromEntries(
event.attributes.map((attr) {
return MapEntry(attr.name, attr.value);
}),
),
),
);
});
events
.selectSubtreeEvents((event) {
return event.qualifiedName != 'stream:stream';
})
.transform(const XmlNodeDecoder())
.listen((nodes) {
for (final node in nodes) {
if (node.nodeType == XmlNodeType.ELEMENT) {
_streamController.add(
XmlStreamBufferElement(
XMLNode.fromXmlElement(node as XmlElement),
),
);
}
}
});
return _streamController.stream;
}
}

View File

@ -3,7 +3,6 @@ import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxlib/moxlib.dart'; import 'package:moxlib/moxlib.dart';
import 'package:moxxmpp/src/awaiter.dart'; import 'package:moxxmpp/src/awaiter.dart';
import 'package:moxxmpp/src/buffer.dart';
import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/connection_errors.dart';
import 'package:moxxmpp/src/connectivity.dart'; import 'package:moxxmpp/src/connectivity.dart';
import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/errors.dart';
@ -18,6 +17,7 @@ import 'package:moxxmpp/src/managers/handlers.dart';
import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart'; import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/parser.dart';
import 'package:moxxmpp/src/presence.dart'; import 'package:moxxmpp/src/presence.dart';
import 'package:moxxmpp/src/reconnect.dart'; import 'package:moxxmpp/src/reconnect.dart';
import 'package:moxxmpp/src/roster/roster.dart'; import 'package:moxxmpp/src/roster/roster.dart';
@ -88,7 +88,7 @@ class XmppConnection {
_socketStream = _socket.getDataStream(); _socketStream = _socket.getDataStream();
// TODO(Unknown): Handle on done // TODO(Unknown): Handle on done
_socketStream.transform(_streamBuffer).forEach(handleXmlStream); _socketStream.transform(_streamParser).forEach(handleXmlStream);
_socket.getEventStream().listen(_handleSocketEvent); _socket.getEventStream().listen(_handleSocketEvent);
} }
@ -128,8 +128,8 @@ class XmppConnection {
StreamController.broadcast(); StreamController.broadcast();
final Map<String, XmppManagerBase> _xmppManagers = {}; final Map<String, XmppManagerBase> _xmppManagers = {};
/// The buffer object to keep split up stanzas together /// The parser for the entire XMPP XML stream.
final XmlStreamBuffer _streamBuffer = XmlStreamBuffer(); final XMPPStreamParser _streamParser = XMPPStreamParser();
/// UUID object to generate stanza and origin IDs /// UUID object to generate stanza and origin IDs
final Uuid _uuid = const Uuid(); final Uuid _uuid = const Uuid();
@ -752,17 +752,17 @@ class XmppConnection {
} }
/// Called whenever we receive data that has been parsed as XML. /// Called whenever we receive data that has been parsed as XML.
Future<void> handleXmlStream(XmlStreamBufferObject event) async { Future<void> handleXmlStream(XMPPStreamObject event) async {
if (event is XmlStreamBufferHeader) { if (event is XMPPStreamHeader) {
await _negotiationsHandler.negotiate(event); await _negotiationsHandler.negotiate(event);
return; return;
} }
assert( assert(
event is XmlStreamBufferElement, event is XMPPStreamElement,
'The event must be a XmlStreamBufferElement', 'The event must be a XMPPStreamElement',
); );
final node = (event as XmlStreamBufferElement).node; final node = (event as XMPPStreamElement).node;
// Check if we received a stream error // Check if we received a stream error
if (node.tag == 'stream:error') { if (node.tag == 'stream:error') {
@ -885,6 +885,9 @@ class XmppConnection {
_log.info('Got okay from connectivityManager'); _log.info('Got okay from connectivityManager');
} }
// Reset the stream parser
_streamParser.reset();
final smManager = getStreamManagementManager(); final smManager = getStreamManagementManager();
var host = connectionSettings.host; var host = connectionSettings.host;
var port = connectionSettings.port; var port = connectionSettings.port;

View File

@ -1,9 +1,9 @@
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/buffer.dart';
import 'package:moxxmpp/src/errors.dart'; import 'package:moxxmpp/src/errors.dart';
import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/parser.dart';
import 'package:moxxmpp/src/settings.dart'; import 'package:moxxmpp/src/settings.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
@ -116,8 +116,8 @@ abstract class NegotiationsHandler {
} }
/// Called whenever the stream buffer outputs a new event [event]. /// Called whenever the stream buffer outputs a new event [event].
Future<void> negotiate(XmlStreamBufferObject event) async { Future<void> negotiate(XMPPStreamObject event) async {
if (event is XmlStreamBufferHeader) { if (event is XMPPStreamHeader) {
streamId = event.attributes['id']; streamId = event.attributes['id'];
} }
} }

View File

@ -1,10 +1,10 @@
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/buffer.dart';
import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/connection_errors.dart';
import 'package:moxxmpp/src/handlers/base.dart'; import 'package:moxxmpp/src/handlers/base.dart';
import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/parser.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
/// "Nonza" describing the XMPP stream header of a client-to-server connection. /// "Nonza" describing the XMPP stream header of a client-to-server connection.
@ -211,8 +211,8 @@ class ClientToServerNegotiator extends NegotiationsHandler {
} }
@override @override
Future<void> negotiate(XmlStreamBufferObject event) async { Future<void> negotiate(XMPPStreamObject event) async {
if (event is XmlStreamBufferElement) { if (event is XMPPStreamElement) {
if (event.node.tag == 'stream:features') { if (event.node.tag == 'stream:features') {
// Store the received stream features // Store the received stream features
_streamFeatures _streamFeatures

View File

@ -1,12 +1,12 @@
import 'dart:convert'; import 'dart:convert';
import 'package:cryptography/cryptography.dart'; import 'package:cryptography/cryptography.dart';
import 'package:hex/hex.dart'; import 'package:hex/hex.dart';
import 'package:moxxmpp/src/buffer.dart';
import 'package:moxxmpp/src/connection_errors.dart'; import 'package:moxxmpp/src/connection_errors.dart';
import 'package:moxxmpp/src/handlers/base.dart'; import 'package:moxxmpp/src/handlers/base.dart';
import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart'; import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/parser.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
/// Nonza describing the XMPP stream header. /// Nonza describing the XMPP stream header.
@ -70,10 +70,10 @@ class ComponentToServerNegotiator extends NegotiationsHandler {
} }
@override @override
Future<void> negotiate(XmlStreamBufferObject event) async { Future<void> negotiate(XMPPStreamObject event) async {
switch (_state) { switch (_state) {
case ComponentToServerState.idle: case ComponentToServerState.idle:
if (event is XmlStreamBufferHeader) { if (event is XMPPStreamHeader) {
streamId = event.attributes['id']; streamId = event.attributes['id'];
assert( assert(
streamId != null, streamId != null,
@ -93,7 +93,7 @@ class ComponentToServerNegotiator extends NegotiationsHandler {
} }
break; break;
case ComponentToServerState.handshakeSent: case ComponentToServerState.handshakeSent:
if (event is XmlStreamBufferElement) { if (event is XMPPStreamElement) {
if (event.node.tag == 'handshake' && if (event.node.tag == 'handshake' &&
event.node.children.isEmpty && event.node.children.isEmpty &&
event.node.attributes.isEmpty) { event.node.attributes.isEmpty) {

View File

@ -0,0 +1,144 @@
import 'dart:async';
import 'dart:convert';
import 'package:moxxmpp/src/stringxml.dart';
// ignore: implementation_imports
import 'package:xml/src/xml_events/utils/conversion_sink.dart';
import 'package:xml/xml.dart';
import 'package:xml/xml_events.dart';
/// A result object for XmlStreamBuffer.
abstract class XMPPStreamObject {}
/// A complete XML element returned by the stream buffer.
class XMPPStreamElement extends XMPPStreamObject {
XMPPStreamElement(this.node);
/// The actual [XMLNode].
final XMLNode node;
}
/// Just the stream header of a new XML stream.
class XMPPStreamHeader extends XMPPStreamObject {
XMPPStreamHeader(this.attributes);
/// The headers of the stream header.
final Map<String, String> attributes;
}
/// A wrapper around a [Converter]'s [Converter.startChunkedConversion] method.
class _ChunkedConversionBuffer<S, T> {
/// Use the converter [converter].
_ChunkedConversionBuffer(Converter<S, List<T>> converter) {
_outputSink = ConversionSink<List<T>>(_results.addAll);
_inputSink = converter.startChunkedConversion(_outputSink);
}
/// The results of the converter.
final List<T> _results = List<T>.empty(growable: true);
/// The sink that outputs to [_results].
late ConversionSink<List<T>> _outputSink;
/// The sink that we use for input.
late Sink<S> _inputSink;
/// Close all opened sinks.
void close() {
_inputSink.close();
_outputSink.close();
}
/// Turn the input [input] into a list of [T] according to the initial converter.
List<T> convert(S input) {
_results.clear();
_inputSink.add(input);
return _results;
}
}
/// A buffer to put between a socket's input and a full XML stream.
class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
final StreamController<XMPPStreamObject> _streamController =
StreamController<XMPPStreamObject>();
/// Turns a String into a list of [XmlEvent]s in a chunked fashion.
_ChunkedConversionBuffer<String, XmlEvent> _eventBuffer =
_ChunkedConversionBuffer<String, XmlEvent>(XmlEventDecoder());
/// Turns a list of [XmlEvent]s into a list of [XmlNode]s in a chunked fashion.
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode> _childBuffer =
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(const XmlNodeDecoder());
void reset() {
try {
_eventBuffer.close();
} catch (_) {
// Do nothing. A crash here may indicate that we end on invalid XML, which is fine
// since we're not going to use the buffer's output anymore.
}
try {
_childBuffer.close();
} catch (_) {
// Do nothing.
}
// Recreate the buffers.
_eventBuffer =
_ChunkedConversionBuffer<String, XmlEvent>(XmlEventDecoder());
_childBuffer = _ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(
const XmlNodeDecoder(),
);
}
@override
Stream<XMPPStreamObject> bind(Stream<String> stream) {
// We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they
// create streams we cannot close. We need to be able to destroy and recreate an
// XML parser whenever we start a new connection.
final childSelector =
XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream');
final streamHeaderSelector =
XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream');
stream.listen((input) {
final events = _eventBuffer.convert(input);
final streamHeaderEvents = streamHeaderSelector.convert(events);
// Process the stream header separately.
for (final event in streamHeaderEvents) {
if (event is! XmlStartElementEvent) {
continue;
}
if (event.name != 'stream:stream') {
continue;
}
_streamController.add(
XMPPStreamHeader(
Map<String, String>.fromEntries(
event.attributes.map((attr) {
return MapEntry(attr.name, attr.value);
}),
),
),
);
}
// Process the children of the <stream:stream> element.
final childEvents = childSelector.convert(events);
final children = _childBuffer.convert(childEvents);
for (final node in children) {
if (node.nodeType == XmlNodeType.ELEMENT) {
_streamController.add(
XMPPStreamElement(
XMLNode.fromXmlElement(node as XmlElement),
),
);
}
}
});
return _streamController.stream;
}
}

View File

@ -1,5 +1,5 @@
import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/moxxmpp.dart';
import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/parser.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
import 'helpers/logging.dart'; import 'helpers/logging.dart';
@ -83,7 +83,7 @@ void main() {
await negotiator.runPostRegisterCallback(); await negotiator.runPostRegisterCallback();
await negotiator.negotiate( await negotiator.negotiate(
XmlStreamBufferElement( XMPPStreamElement(
XMLNode.fromString( XMLNode.fromString(
''' '''
<stream:features xmlns="http://etherx.jabber.org/streams"> <stream:features xmlns="http://etherx.jabber.org/streams">

View File

@ -1,5 +1,5 @@
import 'dart:async'; import 'dart:async';
import 'package:moxxmpp/src/buffer.dart'; import 'package:moxxmpp/src/parser.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
void main() { void main() {
@ -7,12 +7,12 @@ void main() {
var childa = false; var childa = false;
var childb = false; var childb = false;
final buffer = XmlStreamBuffer(); final parser = XMPPStreamParser();
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(buffer).forEach((event) { controller.stream.transform(parser).forEach((event) {
if (event is! XmlStreamBufferElement) return; if (event is! XMPPStreamElement) return;
final node = event.node; final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
@ -32,12 +32,12 @@ void main() {
var childa = false; var childa = false;
var childb = false; var childb = false;
final buffer = XmlStreamBuffer(); final parser = XMPPStreamParser();
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(buffer).forEach((event) { controller.stream.transform(parser).forEach((event) {
if (event is! XmlStreamBufferElement) return; if (event is! XMPPStreamElement) return;
final node = event.node; final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
@ -60,12 +60,12 @@ void main() {
var childa = false; var childa = false;
var childb = false; var childb = false;
final buffer = XmlStreamBuffer(); final parser = XMPPStreamParser();
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(buffer).forEach((event) { controller.stream.transform(parser).forEach((event) {
if (event is! XmlStreamBufferElement) return; if (event is! XMPPStreamElement) return;
final node = event.node; final node = event.node;
if (node.tag == 'childa') { if (node.tag == 'childa') {
@ -89,16 +89,16 @@ void main() {
var childa = false; var childa = false;
Map<String, String>? attrs; Map<String, String>? attrs;
final buffer = XmlStreamBuffer(); final parser = XMPPStreamParser();
final controller = StreamController<String>(); final controller = StreamController<String>();
unawaited( unawaited(
controller.stream.transform(buffer).forEach((node) { controller.stream.transform(parser).forEach((node) {
if (node is XmlStreamBufferElement) { if (node is XMPPStreamElement) {
if (node.node.tag == 'childa') { if (node.node.tag == 'childa') {
childa = true; childa = true;
} }
} else if (node is XmlStreamBufferHeader) { } else if (node is XMPPStreamHeader) {
attrs = node.attributes; attrs = node.attributes;
} }
}), }),
@ -111,4 +111,50 @@ void main() {
expect(childa, true); expect(childa, true);
expect(attrs!['id'], 'abc123'); expect(attrs!['id'], 'abc123');
}); });
test('Test restarting a broken XML stream', () async {
final parser = XMPPStreamParser();
final controller = StreamController<String>();
var gotFeatures = false;
unawaited(
controller.stream.transform(parser).forEach(
(event) {
if (event is! XMPPStreamElement) return;
if (event.node.tag == 'stream:features') {
gotFeatures = true;
}
},
),
);
// Begin the stream with invalid XML
controller.add('<stream:stream xmlns="jabber:client');
// Let it marinate
await Future<void>.delayed(const Duration(seconds: 1));
expect(gotFeatures, false);
// Start a new stream
parser.reset();
controller.add(
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>
''',
);
// Let it marinate
await Future<void>.delayed(const Duration(seconds: 1));
expect(gotFeatures, true);
});
} }

View File

@ -526,4 +526,103 @@ void main() {
true, true,
); );
}); });
test('Test an invalid XML continuation', () async {
final fakeSocket = StubTCPSocket(
[
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms''',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
</stream:features>''',
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>testuser@example.org/MU29eEZn</jid></bind></iq>',
ignoreId: true,
),
],
);
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..connectionSettings = ConnectionSettings(
jid: JID.fromString('testuser@example.org'),
password: 'abc123',
);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
]);
final result1 = conn.connect(
waitUntilLogin: true,
);
await Future<void>.delayed(const Duration(seconds: 2));
// Inject a fault
fakeSocket.injectSocketFault();
expect(
(await result1).isType<bool>(),
false,
);
// Try to connect again
final result2 = await conn.connect(
waitUntilLogin: true,
);
expect(
fakeSocket.getState(),
5,
);
expect(
result2.isType<bool>(),
true,
);
});
} }