fix(xep,core): Fix some reconnection issues

- Use handleError instead of directly invoking onFailure
- Stop the ack timer only when we receive ack responses
- NoConnectionPossibleErrors are not irrecoverable
This commit is contained in:
PapaTutuWawa 2023-04-09 13:14:53 +02:00
parent c9173c49bd
commit 0fb66f6aca
5 changed files with 190 additions and 25 deletions

View File

@ -308,7 +308,6 @@ class XmppConnection {
unawaited(
_connectImpl(
waitForConnection: true,
shouldReconnect: false,
),
);
}
@ -902,7 +901,7 @@ class XmppConnection {
port: port,
);
if (!result) {
await handleError(NoConnectionError());
await handleError(NoConnectionPossibleError());
return Result(NoConnectionPossibleError());
} else {

View File

@ -7,13 +7,6 @@ abstract class XmppError {
bool isRecoverable();
}
/// Returned if we could not establish a TCP connection
/// to the server.
class NoConnectionError extends XmppError {
@override
bool isRecoverable() => true;
}
/// Returned if a socket error occured
class SocketError extends XmppError {
SocketError(this.event);

View File

@ -0,0 +1,7 @@
import 'package:moxxmpp/src/errors.dart';
/// Triggered by the StreamManagementManager when an ack request times out.
class StreamManagementAckTimeoutError extends XmppError {
@override
bool isRecoverable() => true;
}

View File

@ -11,6 +11,7 @@ import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/xeps/xep_0198/errors.dart';
import 'package:moxxmpp/src/xeps/xep_0198/negotiator.dart';
import 'package:moxxmpp/src/xeps/xep_0198/nonzas.dart';
import 'package:moxxmpp/src/xeps/xep_0198/state.dart';
@ -45,10 +46,10 @@ class StreamManagementManager extends XmppManagerBase {
@internal
final Duration ackTimeout;
/// The time at which the last ack has been sent
/// The time at which the last ack has been received
int _lastAckTimestamp = -1;
/// The timer to see if we timed the connection out
/// The timer to see if the connection timed out
Timer? _ackTimer;
/// Counts how many acks we're waiting for
@ -194,6 +195,7 @@ class StreamManagementManager extends XmppManagerBase {
_stopAckTimer();
break;
case XmppConnectionState.connecting:
_stopAckTimer();
// NOOP
break;
}
@ -214,25 +216,34 @@ class StreamManagementManager extends XmppManagerBase {
/// Stops the timer, if it is running.
void _stopAckTimer() {
if (_ackTimer == null) return;
logger.fine('Stopping ack timer');
_ackTimer!.cancel();
_ackTimer?.cancel();
_ackTimer = null;
}
@visibleForTesting
Future<void> handleAckTimeout() async {
_stopAckTimer();
await getAttributes()
.getConnection()
.handleError(StreamManagementAckTimeoutError());
}
/// Timer callback that checks if all acks have been answered. If not and the last
/// response has been more that [ackTimeout] in the past, declare the session dead.
void _ackTimerCallback(Timer timer) {
_ackLock.synchronized(() async {
Future<void> _ackTimerCallback(Timer timer) async {
logger.finest('Ack timer callback called');
final shouldTimeout = await _ackLock.synchronized(() {
final now = DateTime.now().millisecondsSinceEpoch;
if (now - _lastAckTimestamp >= ackTimeout.inMilliseconds &&
_pendingAcks > 0) {
_stopAckTimer();
await getAttributes().getConnection().reconnectionPolicy.onFailure();
}
return now - _lastAckTimestamp >= ackTimeout.inMilliseconds &&
_pendingAcks > 0;
});
logger.finest('Should timeout: $shouldTimeout');
if (shouldTimeout) {
await handleAckTimeout();
}
}
/// Wrapper around sending an <r /> nonza that starts the ack timeout timer.
@ -240,9 +251,7 @@ class StreamManagementManager extends XmppManagerBase {
logger.fine('_sendAckRequest: Waiting to acquire lock...');
await _ackLock.synchronized(() async {
logger.fine('_sendAckRequest: Done...');
final now = DateTime.now().millisecondsSinceEpoch;
_lastAckTimestamp = now;
_pendingAcks++;
_startAckTimer();
@ -287,16 +296,24 @@ class StreamManagementManager extends XmppManagerBase {
Future<bool> _handleAckResponse(XMLNode nonza) async {
final h = int.parse(nonza.attributes['h']! as String);
_lastAckTimestamp = DateTime.now().millisecondsSinceEpoch;
await _ackLock.synchronized(() async {
await _stateLock.synchronized(() async {
if (_pendingAcks > 0) {
// Prevent diff from becoming negative
final diff = max(_state.c2s - h, 0);
_pendingAcks = diff;
} else {
_stopAckTimer();
// Reset the timer
if (_pendingAcks > 0) {
_startAckTimer();
}
}
if (_pendingAcks == 0) {
_stopAckTimer();
}
logger.fine('_pendingAcks is now at $_pendingAcks');
});
});

View File

@ -634,6 +634,155 @@ void main() {
});
});
test('Test timing out an ack request', () async {
final fakeSocket = StubTCPSocket([
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StringExpectation(
"<resume xmlns='urn:xmpp:sm:3' previd='id-1' h='10' />",
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
),
StanzaExpectation(
"<iq to='localhost' type='get' from='polynomdivision@test.server/abc123' xmlns='jabber:client' />",
'',
ignoreId: true,
),
StanzaExpectation(
"<r xmlns='urn:xmpp:sm:3' />",
'',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StringExpectation(
"<resume xmlns='urn:xmpp:sm:3' previd='id-1' h='10' />",
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
),
]);
final conn = XmppConnection(
TestingSleepReconnectionPolicy(1),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..connectionSettings = ConnectionSettings(
jid: JID.fromString('polynomdivision@test.server'),
password: 'aaaa',
);
await conn.registerManagers([
StreamManagementManager(ackTimeout: const Duration(minutes: 9999)),
]);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
StreamManagementNegotiator()..resource = 'abc123',
]);
await conn.getManagerById<StreamManagementManager>(smManager)!.setState(
StreamManagementState(
10,
10,
streamResumptionId: 'id-1',
),
);
await conn.connect(
waitUntilLogin: true,
);
expect(fakeSocket.getState(), 4);
expect(await conn.getConnectionState(), XmppConnectionState.connected);
// Send a bogus stanza
unawaited(
conn.sendStanza(Stanza.iq(to: 'localhost', type: 'get')),
);
await Future<void>.delayed(const Duration(seconds: 5));
expect(fakeSocket.getState(), 6);
// Trigger a fake timeout
await conn
.getManagerById<StreamManagementManager>(smManager)!
.handleAckTimeout();
// Wait for reconnect
await Future<void>.delayed(const Duration(seconds: 5));
expect(fakeSocket.getState(), 14);
});
test('Test SASL2 inline stream resumption', () async {
final fakeSocket = StubTCPSocket([
StringExpectation(