fix: Fix reconnections when the connection is awaited

This commit is contained in:
PapaTutuWawa 2022-11-12 21:49:13 +01:00
parent 988db718a2
commit 9cc735d854
7 changed files with 131 additions and 23 deletions

3
.gitignore vendored
View File

@ -10,3 +10,6 @@ build/
# Omit committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock
# Omit pubspec override files generated by melos
**/pubspec_overrides.yaml

View File

@ -0,0 +1 @@
pubspec_overrides.yaml

View File

@ -0,0 +1,55 @@
import 'package:logging/logging.dart';
import 'package:moxxmpp/moxxmpp.dart';
import 'package:moxxmpp_socket_tcp/moxxmpp_socket_tcp.dart';
import 'package:test/test.dart';
void main() {
Logger.root.level = Level.ALL;
Logger.root.onRecord.listen((record) {
print('${record.level.name}: ${record.time}: ${record.message}');
});
final log = Logger('FailureReconnectionTest');
test('Failing an awaited connection', () async {
var errors = 0;
final connection = XmppConnection(
TestingSleepReconnectionPolicy(10),
TCPSocketWrapper(false),
);
connection.registerFeatureNegotiators([
StartTlsNegotiator(),
]);
connection.registerManagers([
DiscoManager(),
RosterManager(),
PingManager(),
MessageManager(),
PresenceManager('http://moxxmpp.example'),
]);
connection.asBroadcastStream().listen((event) {
if (event is ConnectionStateChangedEvent) {
if (event.state == XmppConnectionState.error) {
errors++;
}
}
});
connection.setConnectionSettings(
ConnectionSettings(
jid: JID.fromString('testuser@no-sasl.badxmpp.eu'),
password: 'abc123',
useDirectTLS: true,
allowPlainAuth: true,
),
);
final result = await connection.connectAwaitable();
log.info('Connection failed as expected');
expect(result.success, false);
expect(errors, 1);
log.info('Waiting 20 seconds for unexpected reconnections');
await Future.delayed(const Duration(seconds: 20));
expect(errors, 1);
}, timeout: Timeout.factor(2));
}

View File

@ -163,6 +163,8 @@ class XmppConnection {
/// Completers for certain actions
// ignore: use_late_for_private_fields_and_variables
Completer<XmppConnectionResult>? _connectionCompleter;
/// Controls whether an XmppSocketClosureEvent triggers a reconnection.
bool _socketClosureTriggersReconnect = true;
/// Negotiators
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators;
@ -350,8 +352,18 @@ class XmppConnection {
_log.severe('handleError: Called with null');
}
// TODO(Unknown): This may be too harsh for every error
await _setConnectionState(XmppConnectionState.notConnected);
// Whenever we encounter an error that would trigger a reconnection attempt while
// the connection result is being awaited, don't attempt a reconnection but instead
// try to gracefully disconnect.
if (_connectionCompleter != null) {
_log.info('Not triggering reconnection since connection result is being awaited');
await _disconnect(triggeredByUser: false, state: XmppConnectionState.error);
_connectionCompleter?.complete(const XmppConnectionResult(false));
_connectionCompleter = null;
return;
}
await _setConnectionState(XmppConnectionState.error);
await _reconnectionPolicy.onFailure();
}
@ -360,8 +372,12 @@ class XmppConnection {
if (event is XmppSocketErrorEvent) {
await handleError(event.error);
} else if (event is XmppSocketClosureEvent) {
_log.fine('Received XmppSocketClosureEvent. Reconnecting...');
await _reconnectionPolicy.onFailure();
if (_socketClosureTriggersReconnect) {
_log.fine('Received XmppSocketClosureEvent. Reconnecting...');
await _reconnectionPolicy.onFailure();
} else {
_log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...');
}
}
}
@ -798,16 +814,10 @@ class XmppConnection {
await _onNegotiationsDone();
} else if (_currentNegotiator!.state == NegotiatorState.error) {
_log.severe('Negotiator returned an error');
_updateRoutingState(RoutingState.error);
await _setConnectionState(XmppConnectionState.error);
_connectionCompleter?.complete(const XmppConnectionResult(false));
_connectionCompleter = null;
_closeSocket();
await handleError(null);
}
}
void _closeSocket() {
_socket.close();
}
@ -965,17 +975,32 @@ class XmppConnection {
/// Attempt to gracefully close the session
Future<void> disconnect() async {
_reconnectionPolicy.setShouldReconnect(false);
getPresenceManager().sendUnavailablePresence();
_socket.prepareDisconnect();
sendRawString('</stream:stream>');
await _setConnectionState(XmppConnectionState.notConnected);
_socket.close();
// Clear Stream Management state, if available
await getStreamManagementManager()?.resetState();
await _disconnect(state: XmppConnectionState.notConnected);
}
Future<void> _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async {
_reconnectionPolicy.setShouldReconnect(false);
_socketClosureTriggersReconnect = false;
if (triggeredByUser) {
getPresenceManager().sendUnavailablePresence();
}
_socket.prepareDisconnect();
if (triggeredByUser) {
sendRawString('</stream:stream>');
}
await _setConnectionState(state);
_socket.close();
if (triggeredByUser) {
// Clear Stream Management state, if available
await getStreamManagementManager()?.resetState();
}
}
/// Make sure that all required managers are registered
void _runPreConnectionAssertions() {
assert(_xmppManagers.containsKey(presenceManager), 'A PresenceManager is mandatory');
@ -1009,7 +1034,7 @@ class XmppConnection {
}
await _reconnectionPolicy.reset();
_socketClosureTriggersReconnect = true;
await _sendEvent(ConnectingEvent());
final smManager = getStreamManagementManager();

View File

@ -93,6 +93,7 @@ class ExponentialBackoffReconnectionPolicy extends ReconnectionPolicy {
final isReconnecting = await isReconnectionRunning();
if (shouldReconnect) {
if (!isReconnecting) {
await setIsReconnecting(true);
await performReconnect!();
} else {
// Should never happen.
@ -117,7 +118,6 @@ class ExponentialBackoffReconnectionPolicy extends ReconnectionPolicy {
Future<void> onFailure() async {
_log.finest('Failure occured. Starting exponential backoff');
_counter++;
await setIsReconnecting(true);
if (_timer != null) {
_timer!.cancel();
@ -148,3 +148,23 @@ class TestingReconnectionPolicy extends ReconnectionPolicy {
@override
Future<void> reset() async {}
}
/// A reconnection policy for tests that waits a constant number of seconds before
/// attempting a reconnection.
@visibleForTesting
class TestingSleepReconnectionPolicy extends ReconnectionPolicy {
TestingSleepReconnectionPolicy(this._sleepAmount) : super();
final int _sleepAmount;
@override
Future<void> onSuccess() async {}
@override
Future<void> onFailure() async {
await Future<void>.delayed(Duration(seconds: _sleepAmount));
await performReconnect!();
}
@override
Future<void> reset() async {}
}

View File

@ -29,5 +29,8 @@ dependencies:
dev_dependencies:
build_runner: ^2.1.11
moxxmpp_socket_tcp:
hosted: https://git.polynom.me/api/packages/Moxxy/pub
version: ^0.1.2+1
test: ^1.16.0
very_good_analysis: ^3.0.1

View File

@ -0,0 +1 @@
pubspec_overrides.yaml