feat(core): Attempt to improve the ReconnectionPolicy

This commit is contained in:
PapaTutuWawa 2023-04-03 13:40:13 +02:00
parent 0d9afd546c
commit 275d6e0346
3 changed files with 147 additions and 107 deletions

View File

@ -91,7 +91,6 @@ class XmppConnection {
// Allow the reconnection policy to perform reconnections by itself
_reconnectionPolicy.register(
_attemptReconnection,
_onNetworkConnectionLost,
);
_socketStream = _socket.getDataStream();
@ -398,7 +397,7 @@ class XmppConnection {
// The error is recoverable
await _setConnectionState(XmppConnectionState.notConnected);
if (await _reconnectionPolicy.getShouldReconnect()) {
if (await _reconnectionPolicy.canTriggerFailure()) {
await _reconnectionPolicy.onFailure();
} else {
_log.info(
@ -1009,12 +1008,6 @@ class XmppConnection {
);
}
/// To be called when we lost the network connection.
Future<void> _onNetworkConnectionLost() async {
_socket.close();
await _setConnectionState(XmppConnectionState.notConnected);
}
/// Attempt to gracefully close the session
Future<void> disconnect() async {
await _disconnect(state: XmppConnectionState.notConnected);
@ -1069,6 +1062,15 @@ class XmppConnection {
// Kill a possibly existing connection
_socket.close();
await _reconnectionPolicy.reset();
_enableReconnectOnSuccess = enableReconnectOnSuccess;
if (shouldReconnect) {
await _reconnectionPolicy.setShouldReconnect(true);
} else {
await _reconnectionPolicy.setShouldReconnect(false);
}
await _sendEvent(ConnectingEvent());
if (waitUntilLogin) {
_log.finest('Setting up completer for awaiting completed login');
_connectionCompleter = Completer();
@ -1080,16 +1082,6 @@ class XmppConnection {
setResource('', triggerEvent: false);
}
_enableReconnectOnSuccess = enableReconnectOnSuccess;
if (shouldReconnect) {
await _reconnectionPolicy.setShouldReconnect(true);
} else {
await _reconnectionPolicy.setShouldReconnect(false);
}
await _reconnectionPolicy.reset();
await _sendEvent(ConnectingEvent());
// If requested, wait until we have a network connection
if (waitForConnection) {
_log.info('Waiting for okay from connectivityManager');

View File

@ -2,7 +2,6 @@ import 'dart:async';
import 'dart:math';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:moxxmpp/src/util/queue.dart';
import 'package:synchronized/synchronized.dart';
/// A callback function to be called when the connection to the server has been lost.
@ -17,40 +16,52 @@ abstract class ReconnectionPolicy {
/// to perform a reconnection.
PerformReconnectFunction? performReconnect;
/// Function provided by XmppConnection that allows the policy
/// to say that we lost the connection.
ConnectionLostCallback? triggerConnectionLost;
final Lock _lock = Lock();
/// Indicate if a reconnection attempt is currently running.
bool _isReconnecting = false;
/// Indicate if should try to reconnect.
bool _shouldAttemptReconnection = false;
/// Indicate if a reconnection attempt is currently running.
@protected
bool isReconnecting = false;
Future<bool> canTryReconnecting() async => _lock.synchronized(() => !_isReconnecting);
/// And the corresponding lock
@protected
final Lock lock = Lock();
Future<bool> getIsReconnecting() async => _lock.synchronized(() => _isReconnecting);
/// The lock for accessing [_shouldAttemptReconnection]
@protected
final Lock shouldReconnectLock = Lock();
Future<void> _resetIsReconnecting() async {
await _lock.synchronized(() {
_isReconnecting = false;
});
}
/// Called by XmppConnection to register the policy.
void register(
PerformReconnectFunction performReconnect,
ConnectionLostCallback triggerConnectionLost,
) {
this.performReconnect = performReconnect;
this.triggerConnectionLost = triggerConnectionLost;
unawaited(reset());
}
/// In case the policy depends on some internal state, this state must be reset
/// to an initial state when reset is called. In case timers run, they must be
/// terminated.
Future<void> reset();
@mustCallSuper
Future<void> reset() async {
await _resetIsReconnecting();
}
@mustCallSuper
Future<bool> canTriggerFailure() async {
return _lock.synchronized(() {
if (_shouldAttemptReconnection && !_isReconnecting) {
_isReconnecting = true;
return true;
}
return false;
});
}
/// Called by the XmppConnection when the reconnection failed.
Future<void> onFailure() async {}
@ -59,28 +70,14 @@ abstract class ReconnectionPolicy {
Future<void> onSuccess();
Future<bool> getShouldReconnect() async {
return shouldReconnectLock.synchronized(() => _shouldAttemptReconnection);
return _lock.synchronized(() => _shouldAttemptReconnection);
}
/// Set whether a reconnection attempt should be made.
Future<void> setShouldReconnect(bool value) async {
return shouldReconnectLock
return _lock
.synchronized(() => _shouldAttemptReconnection = value);
}
/// Returns true if the manager is currently triggering a reconnection. If not, returns
/// false.
Future<bool> isReconnectionRunning() async {
return lock.synchronized(() => isReconnecting);
}
/// Set the isReconnecting state to [value].
@protected
Future<void> setIsReconnecting(bool value) async {
await lock.synchronized(() async {
isReconnecting = value;
});
}
}
/// A simple reconnection strategy: Make the reconnection delays exponentially longer
@ -105,90 +102,59 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
/// Backoff timer.
Timer? _timer;
final Lock _timerLock = Lock();
/// Logger.
final Logger _log = Logger('RandomBackoffReconnectionPolicy');
/// Event queue
final AsyncQueue _eventQueue = AsyncQueue();
final Lock _timerLock = Lock();
/// Called when the backoff expired
Future<void> _onTimerElapsed() async {
_log.fine('Timer elapsed. Waiting for lock');
await lock.synchronized(() async {
_log.fine('Lock aquired');
if (!(await getShouldReconnect())) {
_log.fine(
'Backoff timer expired but getShouldReconnect() returned false',
);
@visibleForTesting
Future<void> onTimerElapsed() async {
_log.fine('Timer elapsed. Waiting for lock...');
await _timerLock.synchronized(() async {
if (!(await getIsReconnecting())) {
return;
}
if (isReconnecting) {
if (!(await getShouldReconnect())) {
_log.fine(
'Backoff timer expired but a reconnection is running, so doing nothing.',
'Should not reconnect. Stopping here.',
);
return;
}
_log.fine('Triggering reconnect');
isReconnecting = true;
_timer?.cancel();
_timer = null;
await performReconnect!();
});
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = null;
});
}
Future<void> _reset() async {
_log.finest('Resetting internal state');
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = null;
});
await setIsReconnecting(false);
}
@override
Future<void> reset() async {
// ignore: unnecessary_lambdas
await _eventQueue.addJob(() => _reset());
}
Future<void> _onFailure() async {
final shouldContinue = await _timerLock.synchronized(() {
return _timer == null;
});
if (!shouldContinue) {
_log.finest(
'_onFailure: Not backing off since _timer is already running',
);
return;
_log.finest('Resetting internal state');
_timer?.cancel();
_timer = null;
await super.reset();
}
@override
Future<void> onFailure() async {
final seconds =
Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime;
_log.finest('Failure occured. Starting random backoff with ${seconds}s');
_timer?.cancel();
_timer = Timer(Duration(seconds: seconds), _onTimerElapsed);
}
@override
Future<void> onFailure() async {
// ignore: unnecessary_lambdas
await _eventQueue.addJob(() => _onFailure());
_timer = Timer(Duration(seconds: seconds), onTimerElapsed);
}
@override
Future<void> onSuccess() async {
await reset();
}
@visibleForTesting
bool isTimerRunning() => _timer != null;
}
/// A stub reconnection policy for tests.
@ -203,7 +169,9 @@ class TestingReconnectionPolicy extends ReconnectionPolicy {
Future<void> onFailure() async {}
@override
Future<void> reset() async {}
Future<void> reset() async {
await super.reset();
}
}
/// A reconnection policy for tests that waits a constant number of seconds before
@ -223,5 +191,7 @@ class TestingSleepReconnectionPolicy extends ReconnectionPolicy {
}
@override
Future<void> reset() async {}
Future<void> reset() async {
await super.reset();
}
}

View File

@ -0,0 +1,78 @@
import 'package:moxxmpp/moxxmpp.dart';
import 'package:test/test.dart';
import 'helpers/logging.dart';
void main() {
initLogger();
test('Test triggering a reconnect multiple times', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
);
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
// Try to trigger another one
expect(
await policy.canTriggerFailure(),
false,
);
});
test('Test resetting while reconnecting', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
)..register(() async => expect(true, false));
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
expect(policy.isTimerRunning(), true);
// We reset
await policy.reset();
expect(policy.isTimerRunning(), false);
// We have another failure
expect(
await policy.canTriggerFailure(),
true,
);
});
test('Test triggering the timer callback twice', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
);
var counter = 0;
policy.register(() async {
await policy.reset();
counter++;
});
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
await policy.onTimerElapsed();
await policy.onTimerElapsed();
expect(counter, 1);
});
}