import 'package:moxxmpp/moxxmpp.dart'; import 'package:test/test.dart'; import '../helpers/logging.dart'; import '../helpers/xmpp.dart'; Future runIncomingStanzaHandlers( StreamManagementManager man, Stanza stanza, ) async { for (final handler in man.getIncomingPreStanzaHandlers()) { if (handler.matches(stanza)) { await handler.callback( stanza, StanzaHandlerData( false, false, null, stanza, ), ); } } } Future runOutgoingStanzaHandlers( StreamManagementManager man, Stanza stanza, ) async { for (final handler in man.getOutgoingPostStanzaHandlers()) { if (handler.matches(stanza)) { await handler.callback( stanza, StanzaHandlerData( false, false, null, stanza, ), ); } } } XmppManagerAttributes mkAttributes(void Function(Stanza) callback) { return XmppManagerAttributes( sendStanza: ( stanza, { StanzaFromType addFrom = StanzaFromType.full, bool addId = true, bool awaitable = true, bool encrypted = false, bool forceEncryption = false, }) async { callback(stanza); return Stanza.message(); }, sendNonza: (nonza) {}, sendEvent: (event) {}, getManagerById: getManagerNullStub, getConnectionSettings: () => ConnectionSettings( jid: JID.fromString('hallo@example.server'), password: 'password', useDirectTLS: true, ), getFullJID: () => JID.fromString('hallo@example.server/uwu'), getSocket: () => StubTCPSocket([]), getConnection: () => XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), StubTCPSocket([]), ), getNegotiatorById: getNegotiatorNullStub, ); } XMLNode mkAck(int h) => XMLNode.xmlns( tag: 'a', xmlns: 'urn:xmpp:sm:3', attributes: { 'h': h.toString(), }, ); void main() { initLogger(); final stanza = Stanza( to: 'some.user@server.example', tag: 'message', ); test('Test stream with SM enablement', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); // [...] // // await manager.onXmppEvent(StreamManagementEnabledEvent(resource: 'hallo')); expect(manager.state.c2s, 0); expect(manager.state.s2c, 0); expect(manager.isStreamManagementEnabled(), true); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } expect(manager.state.c2s, 5); // Receive 3 stanzas for (var i = 0; i < 3; i++) { await runIncomingStanzaHandlers(manager, stanza); } expect(manager.state.s2c, 3); }); group('Acking', () { test('Test completely clearing the queue', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager .onXmppEvent(StreamManagementEnabledEvent(resource: 'hallo')); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } // await manager.runNonzaHandlers(mkAck(5)); expect(manager.getUnackedStanzas().length, 0); }); test('Test partially clearing the queue', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } // await manager.runNonzaHandlers(mkAck(3)); expect(manager.getUnackedStanzas().length, 2); }); test('Send an ack with h > c2s', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } // await manager.runNonzaHandlers(mkAck(6)); expect(manager.getUnackedStanzas().length, 0); expect(manager.state.c2s, 6); }); test('Send an ack with h < c2s', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } // await manager.runNonzaHandlers(mkAck(3)); expect(manager.getUnackedStanzas().length, 2); expect(manager.state.c2s, 5); }); }); group('Counting acks', () { test('Sending all pending acks at once', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } expect(await manager.getPendingAcks(), 5); // Ack all of them at once await manager.runNonzaHandlers(mkAck(5)); expect(await manager.getPendingAcks(), 0); }); test('Sending partial pending acks at once', () async { final attributes = mkAttributes((_) {}); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send a stanza 5 times for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } expect(await manager.getPendingAcks(), 5); // Ack only 3 of them at once await manager.runNonzaHandlers(mkAck(3)); expect(await manager.getPendingAcks(), 2); }); test('Test counting incoming stanzas for which handlers end early', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN ''', ), StringExpectation( "AHBvbHlub21kaXZpc2lvbgBhYWFh", '', ), StringExpectation( "", ''' ''', ), StanzaExpectation( '', 'polynomdivision@test.server/MU29eEZn', ignoreId: true, ), StringExpectation( "", '', ), ]); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, )..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ); final sm = StreamManagementManager(); await conn.registerManagers([ PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), sm, CarbonsManager()..forceEnable(), EntityCapabilitiesManager('http://moxxmpp.example'), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator(), ]); await conn.connect( waitUntilLogin: true, ); expect(fakeSocket.getState(), 5); expect(await conn.getConnectionState(), XmppConnectionState.connected); expect( conn .getManagerById(smManager)! .isStreamManagementEnabled(), true, ); // Send an invalid carbon fakeSocket.injectRawXml(''' What man art thou that, thus bescreen'd in night, so stumblest on my counsel? 0e3141cd80894871a68e6fe6b1ec56fa '''); await Future.delayed(const Duration(seconds: 2)); expect(sm.state.s2c, 1); }); test('Test counting incoming stanzas that are awaited', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN ''', ), StringExpectation( "AHBvbHlub21kaXZpc2lvbgBhYWFh", '', ), StringExpectation( "", ''' ''', ), StanzaExpectation( '', 'polynomdivision@test.server/MU29eEZn', ignoreId: true, ), StringExpectation( "", '', ), StringExpectation( "chat", '', ), StanzaExpectation( "", "", ignoreId: true, adjustId: true, ), ]); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, )..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ); final sm = StreamManagementManager(); await conn.registerManagers([ PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), sm, CarbonsManager()..forceEnable(), //EntityCapabilitiesManager('http://moxxmpp.example'), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator(), ]); await conn.connect( waitUntilLogin: true, ); expect(fakeSocket.getState(), 6); expect(await conn.getConnectionState(), XmppConnectionState.connected); expect( conn .getManagerById(smManager)! .isStreamManagementEnabled(), true, ); // Await an iq await conn.sendStanza( Stanza.iq( to: 'user@example.com', type: 'get', ), addFrom: StanzaFromType.none, ); expect(sm.state.s2c, 2); }); }); group('Stream resumption', () { test('Stanza retransmission', () async { var stanzaCount = 0; final attributes = mkAttributes((_) { stanzaCount++; }); final manager = StreamManagementManager()..register(attributes); await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); // Send 5 stanzas for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } // Only ack 3 // await manager.runNonzaHandlers(mkAck(3)); expect(manager.getUnackedStanzas().length, 2); // Lose connection // [ Reconnect ] await manager.onXmppEvent(StreamResumedEvent(h: 3)); expect(stanzaCount, 2); }); test('Resumption with prior state', () async { var stanzaCount = 0; final attributes = mkAttributes((_) { stanzaCount++; }); final manager = StreamManagementManager()..register(attributes); // [ ... ] await manager.onXmppEvent( StreamManagementEnabledEvent(resource: 'hallo'), ); await manager.setState(manager.state.copyWith(c2s: 150, s2c: 70)); // Send some stanzas but don't ack them for (var i = 0; i < 5; i++) { await runOutgoingStanzaHandlers(manager, stanza); } expect(manager.getUnackedStanzas().length, 5); // Lose connection // [ Reconnect ] await manager.onXmppEvent(StreamResumedEvent(h: 150)); expect(manager.getUnackedStanzas().length, 0); expect(stanzaCount, 5); }); }); group('Test the negotiator', () { test('Test successful stream enablement', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN ''', ), StringExpectation( "AHBvbHlub21kaXZpc2lvbgBhYWFh", '', ), StringExpectation( "", ''' ''', ), StanzaExpectation( '', 'polynomdivision@test.server/MU29eEZn', ignoreId: true, ), StringExpectation( "", '', ) ]); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, )..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ); await conn.registerManagers([ PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), StreamManagementManager(), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator(), ]); await conn.connect( waitUntilLogin: true, ); expect(fakeSocket.getState(), 6); expect(await conn.getConnectionState(), XmppConnectionState.connected); expect( conn .getManagerById(smManager)! .isStreamManagementEnabled(), true, ); }); test('Test a failed stream resumption', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN ''', ), StringExpectation( "AHBvbHlub21kaXZpc2lvbgBhYWFh", '', ), StringExpectation( "", ''' ''', ), StringExpectation( "", "", ), StanzaExpectation( '', 'polynomdivision@test.server/MU29eEZn', ignoreId: true, ), StringExpectation( "", '', ) ]); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, )..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ); await conn.registerManagers([ PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), StreamManagementManager(), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator(), ]); await conn.getManagerById(smManager)!.setState( StreamManagementState( 10, 10, streamResumptionId: 'id-1', ), ); await conn.connect( waitUntilLogin: true, ); expect(fakeSocket.getState(), 7); expect(await conn.getConnectionState(), XmppConnectionState.connected); expect( conn .getManagerById(smManager)! .isStreamManagementEnabled(), true, ); }); test('Test a successful stream resumption', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN ''', ), StringExpectation( "AHBvbHlub21kaXZpc2lvbgBhYWFh", '', ), StringExpectation( "", ''' ''', ), StringExpectation( "", "", ), ]); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, )..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ); await conn.registerManagers([ PresenceManager(), RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), StreamManagementManager(), ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator(), ]); await conn.getManagerById(smManager)!.setState( StreamManagementState( 10, 10, streamResumptionId: 'id-1', ), ); await conn.connect( lastResource: 'abc123', waitUntilLogin: true, ); expect(fakeSocket.getState(), 4); expect(await conn.getConnectionState(), XmppConnectionState.connected); final sm = conn.getManagerById(smManager)!; expect(sm.isStreamManagementEnabled(), true); expect(sm.streamResumed, true); }); }); test('Test SASL2 inline stream resumption', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN PLAIN ''', ), StanzaExpectation( "moxxmppPapaTutuWawa's awesome deviceAHBvbHlub21kaXZpc2lvbgBhYWFh", ''' polynomdivision@test.server ''', ), ]); final sm = StreamManagementManager(); await sm.setState( sm.state.copyWith( c2s: 25, s2c: 2, streamResumptionId: 'test-prev-id', ), ); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, ) ..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ) ..setResource('test-resource', triggerEvent: false); await conn.registerManagers([ RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), sm, ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator()..setResource('test-resource'), Sasl2Negotiator( userAgent: const UserAgent( id: 'd4565fa7-4d72-4749-b3d3-740edbf87770', software: 'moxxmpp', device: "PapaTutuWawa's awesome device", ), ), ]); final result = await conn.connect( waitUntilLogin: true, shouldReconnect: false, enableReconnectOnSuccess: false, ); expect(result.isType(), false); expect( sm.state.c2s, 25, ); expect( sm.state.s2c, 2, ); expect(conn.resource, 'test-resource'); }); test('Test SASL2 inline stream resumption with Bind2', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN PLAIN ''', ), StanzaExpectation( "moxxmppPapaTutuWawa's awesome deviceAHBvbHlub21kaXZpc2lvbgBhYWFh", ''' polynomdivision@test.server ''', ), ]); final sm = StreamManagementManager(); await sm.setState( sm.state.copyWith( c2s: 25, s2c: 2, streamResumptionId: 'test-prev-id', ), ); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, ) ..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ) ..setResource('test-resource', triggerEvent: false); await conn.registerManagers([ RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), sm, ]); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), StreamManagementNegotiator()..setResource('test-resource'), Bind2Negotiator(), Sasl2Negotiator( userAgent: const UserAgent( id: 'd4565fa7-4d72-4749-b3d3-740edbf87770', software: 'moxxmpp', device: "PapaTutuWawa's awesome device", ), ), ]); final result = await conn.connect( waitUntilLogin: true, shouldReconnect: false, enableReconnectOnSuccess: false, ); expect(result.isType(), false); expect( sm.state.c2s, 25, ); expect( sm.state.s2c, 2, ); expect(conn.resource, 'test-resource'); }); test('Test failed SASL2 inline stream resumption with Bind2', () async { final fakeSocket = StubTCPSocket([ StringExpectation( "", ''' PLAIN PLAIN ''', ), StanzaExpectation( "moxxmppPapaTutuWawa's awesome deviceAHBvbHlub21kaXZpc2lvbgBhYWFh", ''' polynomdivision@test.server/test-resource ''', ), ]); final sm = StreamManagementManager(); await sm.setState( sm.state.copyWith( c2s: 25, s2c: 2, streamResumptionId: 'test-prev-id', ), ); final conn = XmppConnection( TestingReconnectionPolicy(), AlwaysConnectedConnectivityManager(), fakeSocket, ) ..setConnectionSettings( ConnectionSettings( jid: JID.fromString('polynomdivision@test.server'), password: 'aaaa', useDirectTLS: true, ), ) ..setResource('test-resource', triggerEvent: false); await conn.registerManagers([ RosterManager(TestingRosterStateManager('', [])), DiscoManager([]), sm, ]); final smn = StreamManagementNegotiator(); await conn.registerFeatureNegotiators([ SaslPlainNegotiator(), ResourceBindingNegotiator(), smn, Bind2Negotiator(), Sasl2Negotiator( userAgent: const UserAgent( id: 'd4565fa7-4d72-4749-b3d3-740edbf87770', software: 'moxxmpp', device: "PapaTutuWawa's awesome device", ), ), ]); final result = await conn.connect( waitUntilLogin: true, shouldReconnect: false, enableReconnectOnSuccess: false, ); expect(result.isType(), false); expect(smn.isResumed, false); expect(smn.resumeFailed, true); expect(smn.streamEnablementFailed, true); expect(conn.resource, 'test-resource'); }); }