diff --git a/packages/moxxmpp/lib/src/connection.dart b/packages/moxxmpp/lib/src/connection.dart index bc8466d..d65eaed 100644 --- a/packages/moxxmpp/lib/src/connection.dart +++ b/packages/moxxmpp/lib/src/connection.dart @@ -71,7 +71,11 @@ class XmppConnection { this._socket, { this.connectingTimeout = const Duration(minutes: 2), }) : _reconnectionPolicy = reconnectionPolicy, - _connectivityManager = connectivityManager { + _connectivityManager = connectivityManager, + assert( + _socket.getDataStream().isBroadcast, + "The socket's data stream must be a broadcast stream", + ) { // Allow the reconnection policy to perform reconnections by itself _reconnectionPolicy.register( _attemptReconnection, @@ -95,10 +99,10 @@ class XmppConnection { ); _incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter); _socketStream = _socket.getDataStream(); - // TODO(Unknown): Handle on done _socketStream .transform(_streamParser) .forEach(_incomingStanzaQueue.addStanza); + _socketStream.listen(_handleOnDataCallbacks); _socket.getEventStream().listen(handleSocketEvent); _stanzaQueue = AsyncStanzaQueue( @@ -314,6 +318,13 @@ class XmppConnection { return getManagerById(csiManager); } + /// Called whenever we receive data on the socket. + Future _handleOnDataCallbacks(String _) async { + for (final manager in _xmppManagers.values) { + unawaited(manager.onData()); + } + } + /// Attempts to reconnect to the server by following an exponential backoff. Future _attemptReconnection() async { _log.finest('_attemptReconnection: Setting state to notConnected'); diff --git a/packages/moxxmpp/lib/src/managers/base.dart b/packages/moxxmpp/lib/src/managers/base.dart index ab61ed3..ae14458 100644 --- a/packages/moxxmpp/lib/src/managers/base.dart +++ b/packages/moxxmpp/lib/src/managers/base.dart @@ -80,6 +80,9 @@ abstract class XmppManagerBase { /// handler's priority, the earlier it is run. List getNonzaHandlers() => []; + /// Whenever the socket receives data, this method is called, if it is non-null. + Future onData() async {} + /// Return a list of features that should be included in a disco response. List getDiscoFeatures() => []; diff --git a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart index 49082d9..f89a996 100644 --- a/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart +++ b/packages/moxxmpp/lib/src/xeps/xep_0198/xep_0198.dart @@ -75,6 +75,11 @@ class StreamManagementManager extends XmppManagerBase { return acks; } + @override + Future onData() async { + logger.finest('Got data!'); + } + /// Called when a stanza has been acked to decide whether we should trigger a /// StanzaAckedEvent. ///