Compare commits

..

3 Commits

8 changed files with 52 additions and 13 deletions

View File

@ -84,6 +84,10 @@ class XmppConnection {
() => _isAuthenticated,
sendRawXML,
() => connectionSettings,
() {
_log.finest('Resetting stream parser');
_streamParser.reset();
},
);
_socketStream = _socket.getDataStream();
@ -746,7 +750,9 @@ class XmppConnection {
),
);
if (!incomingHandlers.done) {
_log.warning('Returning error for unhandled stanza');
_log.warning(
'Returning error for unhandled stanza ${incomingPreHandlers.stanza.tag}',
);
await handleUnhandledStanza(this, incomingPreHandlers);
}
}
@ -809,8 +815,6 @@ class XmppConnection {
/// Sends an event to the connection's event stream.
Future<void> _sendEvent(XmppEvent event) async {
_log.finest('Event: ${event.toString()}');
for (final manager in _xmppManagers.values) {
await manager.onXmppEvent(event);
}

View File

@ -22,6 +22,9 @@ typedef SendNonzaFunction = void Function(XMLNode);
/// Returns the connection settings.
typedef GetConnectionSettingsFunction = ConnectionSettings Function();
/// Resets the stream parser's state.
typedef ResetStreamParserFunction = void Function();
/// This class implements the stream feature negotiation for XmppConnection.
abstract class NegotiationsHandler {
@protected
@ -51,6 +54,9 @@ abstract class NegotiationsHandler {
@protected
late final GetConnectionSettingsFunction getConnectionSettings;
@protected
late final ResetStreamParserFunction resetStreamParser;
/// The id included in the last stream header.
@protected
String? streamId;
@ -72,12 +78,14 @@ abstract class NegotiationsHandler {
IsAuthenticatedFunction isAuthenticated,
SendNonzaFunction sendNonza,
GetConnectionSettingsFunction getConnectionSettings,
ResetStreamParserFunction resetStreamParser,
) {
this.onNegotiationsDone = onNegotiationsDone;
this.handleError = handleError;
this.isAuthenticated = isAuthenticated;
this.sendNonza = sendNonza;
this.getConnectionSettings = getConnectionSettings;
this.resetStreamParser = resetStreamParser;
log = Logger(toString());
}

View File

@ -60,6 +60,7 @@ class ClientToServerNegotiator extends NegotiationsHandler {
@override
void sendStreamHeader() {
resetStreamParser();
sendNonza(
XMLNode(
tag: 'xml',

View File

@ -49,6 +49,7 @@ class ComponentToServerNegotiator extends NegotiationsHandler {
@override
void sendStreamHeader() {
resetStreamParser();
sendNonza(
XMLNode(
tag: 'xml',

View File

@ -56,7 +56,7 @@ class StanzaHandler extends Handler {
this.tagName,
this.priority = 0,
this.stanzaTag,
this.xmlns = stanzaXmlns,
this.xmlns,
});
/// If specified, then the stanza must contain a direct child with a tag equal to

View File

@ -69,6 +69,16 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode> _childBuffer =
_ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(const XmlNodeDecoder());
/// The selectors.
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent> _childSelector =
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(
XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream'),
);
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent> _streamHeaderSelector =
_ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(
XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream'),
);
void reset() {
try {
_eventBuffer.close();
@ -81,6 +91,16 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
} catch (_) {
// Do nothing.
}
try {
_childSelector.close();
} catch (_) {
// Do nothing.
}
try {
_streamHeaderSelector.close();
} catch (_) {
// Do nothing.
}
// Recreate the buffers.
_eventBuffer =
@ -88,6 +108,12 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
_childBuffer = _ChunkedConversionBuffer<List<XmlEvent>, XmlNode>(
const XmlNodeDecoder(),
);
_childSelector = _ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(
XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream'),
);
_streamHeaderSelector = _ChunkedConversionBuffer<List<XmlEvent>, XmlEvent>(
XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream'),
);
}
@override
@ -95,14 +121,9 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
// We do not want to use xml's toXmlEvents and toSubtreeEvents methods as they
// create streams we cannot close. We need to be able to destroy and recreate an
// XML parser whenever we start a new connection.
final childSelector =
XmlSubtreeSelector((event) => event.qualifiedName != 'stream:stream');
final streamHeaderSelector =
XmlSubtreeSelector((event) => event.qualifiedName == 'stream:stream');
stream.listen((input) {
final events = _eventBuffer.convert(input);
final streamHeaderEvents = streamHeaderSelector.convert(events);
final streamHeaderEvents = _streamHeaderSelector.convert(events);
// Process the stream header separately.
for (final event in streamHeaderEvents) {
@ -126,7 +147,7 @@ class XMPPStreamParser extends StreamTransformerBase<String, XMPPStreamObject> {
}
// Process the children of the <stream:stream> element.
final childEvents = childSelector.convert(events);
final childEvents = _childSelector.convert(events);
final children = _childBuffer.convert(childEvents);
for (final node in children) {
if (node.nodeType == XmlNodeType.ELEMENT) {

View File

@ -255,7 +255,7 @@ class StreamManagementManager extends XmppManagerBase {
_pendingAcks++;
_startAckTimer();
logger.fine('_pendingAcks is now at $_pendingAcks');
logger.fine('_pendingAcks is now at $_pendingAcks (caused by <r/>)');
getAttributes().sendNonza(StreamManagementRequestNonza());
@ -294,6 +294,7 @@ class StreamManagementManager extends XmppManagerBase {
/// Called when we receive an <a /> nonza from the server.
/// This is a response to the question "How many of my stanzas have you handled".
Future<bool> _handleAckResponse(XMLNode nonza) async {
logger.finest('Received ack');
final h = int.parse(nonza.attributes['h']! as String);
_lastAckTimestamp = DateTime.now().millisecondsSinceEpoch;
@ -306,6 +307,7 @@ class StreamManagementManager extends XmppManagerBase {
// Reset the timer
if (_pendingAcks > 0) {
_stopAckTimer();
_startAckTimer();
}
}
@ -314,7 +316,7 @@ class StreamManagementManager extends XmppManagerBase {
_stopAckTimer();
}
logger.fine('_pendingAcks is now at $_pendingAcks');
logger.fine('_pendingAcks is now at $_pendingAcks (caused by <a/>)');
});
});

View File

@ -59,6 +59,7 @@ void main() {
jid: JID.fromString('test'),
password: 'abc123',
),
() {},
)
..registerNegotiator(StubNegotiator1())
..registerNegotiator(StubNegotiator2());
@ -77,6 +78,7 @@ void main() {
jid: JID.fromString('test'),
password: 'abc123',
),
() {},
)
..registerNegotiator(StubNegotiator1())
..registerNegotiator(StubNegotiator2());