Compare commits
No commits in common. "275d6e0346bfba879bb8cd329c39ca869d1a7f39" and "2947e2c5393b263bd41557518230dd6087c16671" have entirely different histories.
275d6e0346
...
2947e2c539
@ -91,6 +91,7 @@ class XmppConnection {
|
|||||||
// Allow the reconnection policy to perform reconnections by itself
|
// Allow the reconnection policy to perform reconnections by itself
|
||||||
_reconnectionPolicy.register(
|
_reconnectionPolicy.register(
|
||||||
_attemptReconnection,
|
_attemptReconnection,
|
||||||
|
_onNetworkConnectionLost,
|
||||||
);
|
);
|
||||||
|
|
||||||
_socketStream = _socket.getDataStream();
|
_socketStream = _socket.getDataStream();
|
||||||
@ -113,7 +114,6 @@ class XmppConnection {
|
|||||||
|
|
||||||
/// A policy on how to reconnect
|
/// A policy on how to reconnect
|
||||||
final ReconnectionPolicy _reconnectionPolicy;
|
final ReconnectionPolicy _reconnectionPolicy;
|
||||||
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
|
|
||||||
|
|
||||||
/// The class responsible for preventing errors on initial connection due
|
/// The class responsible for preventing errors on initial connection due
|
||||||
/// to no network.
|
/// to no network.
|
||||||
@ -173,9 +173,34 @@ class XmppConnection {
|
|||||||
/// The logger for the class
|
/// The logger for the class
|
||||||
final Logger _log = Logger('XmppConnection');
|
final Logger _log = Logger('XmppConnection');
|
||||||
|
|
||||||
|
/// A value indicating whether a connection attempt is currently running or not
|
||||||
|
bool _isConnectionRunning = false;
|
||||||
|
final Lock _connectionRunningLock = Lock();
|
||||||
|
|
||||||
/// Flag indicating whether reconnection should be enabled after a successful connection.
|
/// Flag indicating whether reconnection should be enabled after a successful connection.
|
||||||
bool _enableReconnectOnSuccess = false;
|
bool _enableReconnectOnSuccess = false;
|
||||||
|
|
||||||
|
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
|
||||||
|
/// and does the following:
|
||||||
|
/// - if _isConnectionRunning is false, set it to true and return false.
|
||||||
|
/// - if _isConnectionRunning is true, return true.
|
||||||
|
Future<bool> _testAndSetIsConnectionRunning() async =>
|
||||||
|
_connectionRunningLock.synchronized(() {
|
||||||
|
if (!_isConnectionRunning) {
|
||||||
|
_isConnectionRunning = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
|
||||||
|
/// and sets it to false.
|
||||||
|
Future<void> _resetIsConnectionRunning() async =>
|
||||||
|
_connectionRunningLock.synchronized(() => _isConnectionRunning = false);
|
||||||
|
|
||||||
|
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
|
||||||
|
|
||||||
bool get isAuthenticated => _isAuthenticated;
|
bool get isAuthenticated => _isAuthenticated;
|
||||||
|
|
||||||
/// Return the registered feature negotiator that has id [id]. Returns null if
|
/// Return the registered feature negotiator that has id [id]. Returns null if
|
||||||
@ -339,6 +364,13 @@ class XmppConnection {
|
|||||||
|
|
||||||
/// Attempts to reconnect to the server by following an exponential backoff.
|
/// Attempts to reconnect to the server by following an exponential backoff.
|
||||||
Future<void> _attemptReconnection() async {
|
Future<void> _attemptReconnection() async {
|
||||||
|
if (await _testAndSetIsConnectionRunning()) {
|
||||||
|
_log.warning(
|
||||||
|
'_attemptReconnection is called but connection attempt is already running. Ignoring...',
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
_log.finest('_attemptReconnection: Setting state to notConnected');
|
_log.finest('_attemptReconnection: Setting state to notConnected');
|
||||||
await _setConnectionState(XmppConnectionState.notConnected);
|
await _setConnectionState(XmppConnectionState.notConnected);
|
||||||
_log.finest('_attemptReconnection: Done');
|
_log.finest('_attemptReconnection: Done');
|
||||||
@ -352,13 +384,14 @@ class XmppConnection {
|
|||||||
_log.finest('Calling _connectImpl() from _attemptReconnection');
|
_log.finest('Calling _connectImpl() from _attemptReconnection');
|
||||||
await _connectImpl(
|
await _connectImpl(
|
||||||
waitForConnection: true,
|
waitForConnection: true,
|
||||||
|
ignoreLock: true,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called when a stream ending error has occurred
|
/// Called when a stream ending error has occurred
|
||||||
Future<void> handleError(XmppError error) async {
|
Future<void> handleError(XmppError error) async {
|
||||||
_log.severe('handleError called with ${error.toString()}');
|
_log.severe('handleError called with ${error.toString()}');
|
||||||
|
|
||||||
// Whenever we encounter an error that would trigger a reconnection attempt while
|
// Whenever we encounter an error that would trigger a reconnection attempt while
|
||||||
// the connection result is being awaited, don't attempt a reconnection but instead
|
// the connection result is being awaited, don't attempt a reconnection but instead
|
||||||
// try to gracefully disconnect.
|
// try to gracefully disconnect.
|
||||||
@ -397,7 +430,7 @@ class XmppConnection {
|
|||||||
// The error is recoverable
|
// The error is recoverable
|
||||||
await _setConnectionState(XmppConnectionState.notConnected);
|
await _setConnectionState(XmppConnectionState.notConnected);
|
||||||
|
|
||||||
if (await _reconnectionPolicy.canTriggerFailure()) {
|
if (await _reconnectionPolicy.getShouldReconnect()) {
|
||||||
await _reconnectionPolicy.onFailure();
|
await _reconnectionPolicy.onFailure();
|
||||||
} else {
|
} else {
|
||||||
_log.info(
|
_log.info(
|
||||||
@ -582,6 +615,7 @@ class XmppConnection {
|
|||||||
/// Called when we timeout during connecting
|
/// Called when we timeout during connecting
|
||||||
Future<void> _onConnectingTimeout() async {
|
Future<void> _onConnectingTimeout() async {
|
||||||
_log.severe('Connection stuck in "connecting". Causing a reconnection...');
|
_log.severe('Connection stuck in "connecting". Causing a reconnection...');
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await handleError(TimeoutError());
|
await handleError(TimeoutError());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -810,6 +844,7 @@ class XmppConnection {
|
|||||||
/// a disco sweep among other things.
|
/// a disco sweep among other things.
|
||||||
Future<void> _onNegotiationsDone() async {
|
Future<void> _onNegotiationsDone() async {
|
||||||
// Set the connection state
|
// Set the connection state
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await _setConnectionState(XmppConnectionState.connected);
|
await _setConnectionState(XmppConnectionState.connected);
|
||||||
|
|
||||||
// Enable reconnections
|
// Enable reconnections
|
||||||
@ -845,12 +880,14 @@ class XmppConnection {
|
|||||||
// We failed before authenticating
|
// We failed before authenticating
|
||||||
if (!_isAuthenticated) {
|
if (!_isAuthenticated) {
|
||||||
_log.severe('No negotiator could be picked while unauthenticated');
|
_log.severe('No negotiator could be picked while unauthenticated');
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await handleError(NoMatchingAuthenticationMechanismAvailableError());
|
await handleError(NoMatchingAuthenticationMechanismAvailableError());
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
_log.severe(
|
_log.severe(
|
||||||
'No negotiator could be picked while negotiations are not done',
|
'No negotiator could be picked while negotiations are not done',
|
||||||
);
|
);
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await handleError(NoAuthenticatorAvailableError());
|
await handleError(NoAuthenticatorAvailableError());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -859,6 +896,7 @@ class XmppConnection {
|
|||||||
final result = await _currentNegotiator!.negotiate(nonza);
|
final result = await _currentNegotiator!.negotiate(nonza);
|
||||||
if (result.isType<NegotiatorError>()) {
|
if (result.isType<NegotiatorError>()) {
|
||||||
_log.severe('Negotiator returned an error');
|
_log.severe('Negotiator returned an error');
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await handleError(result.get<NegotiatorError>());
|
await handleError(result.get<NegotiatorError>());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -881,6 +919,7 @@ class XmppConnection {
|
|||||||
!_isNegotiationPossible(_streamFeatures)) {
|
!_isNegotiationPossible(_streamFeatures)) {
|
||||||
_log.finest('Negotiations done!');
|
_log.finest('Negotiations done!');
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
} else {
|
} else {
|
||||||
_currentNegotiator = getNextNegotiator(_streamFeatures);
|
_currentNegotiator = getNextNegotiator(_streamFeatures);
|
||||||
@ -904,6 +943,7 @@ class XmppConnection {
|
|||||||
_log.finest('Negotiations done!');
|
_log.finest('Negotiations done!');
|
||||||
|
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
} else {
|
} else {
|
||||||
_log.finest('Picking new negotiator...');
|
_log.finest('Picking new negotiator...');
|
||||||
@ -922,6 +962,7 @@ class XmppConnection {
|
|||||||
);
|
);
|
||||||
|
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1008,6 +1049,13 @@ class XmppConnection {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// To be called when we lost the network connection.
|
||||||
|
Future<void> _onNetworkConnectionLost() async {
|
||||||
|
_socket.close();
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
|
await _setConnectionState(XmppConnectionState.notConnected);
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to gracefully close the session
|
/// Attempt to gracefully close the session
|
||||||
Future<void> disconnect() async {
|
Future<void> disconnect() async {
|
||||||
await _disconnect(state: XmppConnectionState.notConnected);
|
await _disconnect(state: XmppConnectionState.notConnected);
|
||||||
@ -1052,24 +1100,34 @@ class XmppConnection {
|
|||||||
|
|
||||||
/// The private implementation for [XmppConnection.connect]. The parameters have
|
/// The private implementation for [XmppConnection.connect]. The parameters have
|
||||||
/// the same meaning as with [XmppConnection.connect].
|
/// the same meaning as with [XmppConnection.connect].
|
||||||
|
///
|
||||||
|
/// [ignoreLock] is a flag that, if true, causes the method to not try to aquire
|
||||||
|
/// a connection lock. This is useful when we already aquired the connection lock,
|
||||||
|
/// for example, in _attemptReconnection.
|
||||||
Future<Result<bool, XmppError>> _connectImpl({
|
Future<Result<bool, XmppError>> _connectImpl({
|
||||||
String? lastResource,
|
String? lastResource,
|
||||||
bool waitForConnection = false,
|
bool waitForConnection = false,
|
||||||
bool shouldReconnect = true,
|
bool shouldReconnect = true,
|
||||||
bool waitUntilLogin = false,
|
bool waitUntilLogin = false,
|
||||||
bool enableReconnectOnSuccess = true,
|
bool enableReconnectOnSuccess = true,
|
||||||
|
bool ignoreLock = false,
|
||||||
}) async {
|
}) async {
|
||||||
// Kill a possibly existing connection
|
if (!ignoreLock) {
|
||||||
_socket.close();
|
if (await _testAndSetIsConnectionRunning()) {
|
||||||
|
_log.fine(
|
||||||
|
'Cancelling this connection attempt as one appears to be already running.',
|
||||||
|
);
|
||||||
|
return Future.value(
|
||||||
|
Result(
|
||||||
|
ConnectionAlreadyRunningError(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
await _reconnectionPolicy.reset();
|
await _resetIsConnectionRunning();
|
||||||
_enableReconnectOnSuccess = enableReconnectOnSuccess;
|
|
||||||
if (shouldReconnect) {
|
|
||||||
await _reconnectionPolicy.setShouldReconnect(true);
|
|
||||||
} else {
|
} else {
|
||||||
await _reconnectionPolicy.setShouldReconnect(false);
|
_log.fine('Ignoring connection lock as ignoreLock = true');
|
||||||
}
|
}
|
||||||
await _sendEvent(ConnectingEvent());
|
|
||||||
|
|
||||||
if (waitUntilLogin) {
|
if (waitUntilLogin) {
|
||||||
_log.finest('Setting up completer for awaiting completed login');
|
_log.finest('Setting up completer for awaiting completed login');
|
||||||
@ -1082,6 +1140,16 @@ class XmppConnection {
|
|||||||
setResource('', triggerEvent: false);
|
setResource('', triggerEvent: false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_enableReconnectOnSuccess = enableReconnectOnSuccess;
|
||||||
|
if (shouldReconnect) {
|
||||||
|
await _reconnectionPolicy.setShouldReconnect(true);
|
||||||
|
} else {
|
||||||
|
await _reconnectionPolicy.setShouldReconnect(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
await _reconnectionPolicy.reset();
|
||||||
|
await _sendEvent(ConnectingEvent());
|
||||||
|
|
||||||
// If requested, wait until we have a network connection
|
// If requested, wait until we have a network connection
|
||||||
if (waitForConnection) {
|
if (waitForConnection) {
|
||||||
_log.info('Waiting for okay from connectivityManager');
|
_log.info('Waiting for okay from connectivityManager');
|
||||||
|
@ -4,6 +4,12 @@ import 'package:moxxmpp/src/negotiators/negotiator.dart';
|
|||||||
/// The reason a call to `XmppConnection.connect` failed.
|
/// The reason a call to `XmppConnection.connect` failed.
|
||||||
abstract class XmppConnectionError extends XmppError {}
|
abstract class XmppConnectionError extends XmppError {}
|
||||||
|
|
||||||
|
/// Returned by `XmppConnection.connect` when a connection is already active.
|
||||||
|
class ConnectionAlreadyRunningError extends XmppConnectionError {
|
||||||
|
@override
|
||||||
|
bool isRecoverable() => true;
|
||||||
|
}
|
||||||
|
|
||||||
/// Returned by `XmppConnection.connect` when a negotiator returned an unrecoverable
|
/// Returned by `XmppConnection.connect` when a negotiator returned an unrecoverable
|
||||||
/// error. Only returned when waitUntilLogin is true.
|
/// error. Only returned when waitUntilLogin is true.
|
||||||
class NegotiatorReturnedError extends XmppConnectionError {
|
class NegotiatorReturnedError extends XmppConnectionError {
|
||||||
|
@ -2,6 +2,7 @@ import 'dart:async';
|
|||||||
import 'dart:math';
|
import 'dart:math';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
|
import 'package:moxxmpp/src/util/queue.dart';
|
||||||
import 'package:synchronized/synchronized.dart';
|
import 'package:synchronized/synchronized.dart';
|
||||||
|
|
||||||
/// A callback function to be called when the connection to the server has been lost.
|
/// A callback function to be called when the connection to the server has been lost.
|
||||||
@ -16,68 +17,70 @@ abstract class ReconnectionPolicy {
|
|||||||
/// to perform a reconnection.
|
/// to perform a reconnection.
|
||||||
PerformReconnectFunction? performReconnect;
|
PerformReconnectFunction? performReconnect;
|
||||||
|
|
||||||
final Lock _lock = Lock();
|
/// Function provided by XmppConnection that allows the policy
|
||||||
|
/// to say that we lost the connection.
|
||||||
/// Indicate if a reconnection attempt is currently running.
|
ConnectionLostCallback? triggerConnectionLost;
|
||||||
bool _isReconnecting = false;
|
|
||||||
|
|
||||||
/// Indicate if should try to reconnect.
|
/// Indicate if should try to reconnect.
|
||||||
bool _shouldAttemptReconnection = false;
|
bool _shouldAttemptReconnection = false;
|
||||||
|
|
||||||
|
/// Indicate if a reconnection attempt is currently running.
|
||||||
@protected
|
@protected
|
||||||
Future<bool> canTryReconnecting() async => _lock.synchronized(() => !_isReconnecting);
|
bool isReconnecting = false;
|
||||||
|
|
||||||
|
/// And the corresponding lock
|
||||||
@protected
|
@protected
|
||||||
Future<bool> getIsReconnecting() async => _lock.synchronized(() => _isReconnecting);
|
final Lock lock = Lock();
|
||||||
|
|
||||||
|
/// The lock for accessing [_shouldAttemptReconnection]
|
||||||
|
@protected
|
||||||
|
final Lock shouldReconnectLock = Lock();
|
||||||
|
|
||||||
Future<void> _resetIsReconnecting() async {
|
|
||||||
await _lock.synchronized(() {
|
|
||||||
_isReconnecting = false;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Called by XmppConnection to register the policy.
|
/// Called by XmppConnection to register the policy.
|
||||||
void register(
|
void register(
|
||||||
PerformReconnectFunction performReconnect,
|
PerformReconnectFunction performReconnect,
|
||||||
|
ConnectionLostCallback triggerConnectionLost,
|
||||||
) {
|
) {
|
||||||
this.performReconnect = performReconnect;
|
this.performReconnect = performReconnect;
|
||||||
|
this.triggerConnectionLost = triggerConnectionLost;
|
||||||
|
|
||||||
|
unawaited(reset());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// In case the policy depends on some internal state, this state must be reset
|
/// In case the policy depends on some internal state, this state must be reset
|
||||||
/// to an initial state when reset is called. In case timers run, they must be
|
/// to an initial state when reset is called. In case timers run, they must be
|
||||||
/// terminated.
|
/// terminated.
|
||||||
@mustCallSuper
|
Future<void> reset();
|
||||||
Future<void> reset() async {
|
|
||||||
await _resetIsReconnecting();
|
|
||||||
}
|
|
||||||
|
|
||||||
@mustCallSuper
|
|
||||||
Future<bool> canTriggerFailure() async {
|
|
||||||
return _lock.synchronized(() {
|
|
||||||
if (_shouldAttemptReconnection && !_isReconnecting) {
|
|
||||||
_isReconnecting = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Called by the XmppConnection when the reconnection failed.
|
/// Called by the XmppConnection when the reconnection failed.
|
||||||
Future<void> onFailure() async {}
|
Future<void> onFailure() async {}
|
||||||
|
|
||||||
/// Caled by the XmppConnection when the reconnection was successful.
|
/// Caled by the XmppConnection when the reconnection was successful.
|
||||||
Future<void> onSuccess();
|
Future<void> onSuccess();
|
||||||
|
|
||||||
Future<bool> getShouldReconnect() async {
|
Future<bool> getShouldReconnect() async {
|
||||||
return _lock.synchronized(() => _shouldAttemptReconnection);
|
return shouldReconnectLock.synchronized(() => _shouldAttemptReconnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set whether a reconnection attempt should be made.
|
/// Set whether a reconnection attempt should be made.
|
||||||
Future<void> setShouldReconnect(bool value) async {
|
Future<void> setShouldReconnect(bool value) async {
|
||||||
return _lock
|
return shouldReconnectLock
|
||||||
.synchronized(() => _shouldAttemptReconnection = value);
|
.synchronized(() => _shouldAttemptReconnection = value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the manager is currently triggering a reconnection. If not, returns
|
||||||
|
/// false.
|
||||||
|
Future<bool> isReconnectionRunning() async {
|
||||||
|
return lock.synchronized(() => isReconnecting);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the isReconnecting state to [value].
|
||||||
|
@protected
|
||||||
|
Future<void> setIsReconnecting(bool value) async {
|
||||||
|
await lock.synchronized(() async {
|
||||||
|
isReconnecting = value;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A simple reconnection strategy: Make the reconnection delays exponentially longer
|
/// A simple reconnection strategy: Make the reconnection delays exponentially longer
|
||||||
@ -102,59 +105,90 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
|
|||||||
/// Backoff timer.
|
/// Backoff timer.
|
||||||
Timer? _timer;
|
Timer? _timer;
|
||||||
|
|
||||||
|
final Lock _timerLock = Lock();
|
||||||
|
|
||||||
/// Logger.
|
/// Logger.
|
||||||
final Logger _log = Logger('RandomBackoffReconnectionPolicy');
|
final Logger _log = Logger('RandomBackoffReconnectionPolicy');
|
||||||
|
|
||||||
final Lock _timerLock = Lock();
|
/// Event queue
|
||||||
|
final AsyncQueue _eventQueue = AsyncQueue();
|
||||||
|
|
||||||
/// Called when the backoff expired
|
/// Called when the backoff expired
|
||||||
@visibleForTesting
|
Future<void> _onTimerElapsed() async {
|
||||||
Future<void> onTimerElapsed() async {
|
_log.fine('Timer elapsed. Waiting for lock');
|
||||||
_log.fine('Timer elapsed. Waiting for lock...');
|
await lock.synchronized(() async {
|
||||||
await _timerLock.synchronized(() async {
|
_log.fine('Lock aquired');
|
||||||
if (!(await getIsReconnecting())) {
|
if (!(await getShouldReconnect())) {
|
||||||
|
_log.fine(
|
||||||
|
'Backoff timer expired but getShouldReconnect() returned false',
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(await getShouldReconnect())) {
|
if (isReconnecting) {
|
||||||
_log.fine(
|
_log.fine(
|
||||||
'Should not reconnect. Stopping here.',
|
'Backoff timer expired but a reconnection is running, so doing nothing.',
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_log.fine('Triggering reconnect');
|
_log.fine('Triggering reconnect');
|
||||||
_timer?.cancel();
|
isReconnecting = true;
|
||||||
_timer = null;
|
|
||||||
await performReconnect!();
|
await performReconnect!();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await _timerLock.synchronized(() {
|
||||||
|
_timer?.cancel();
|
||||||
|
_timer = null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _reset() async {
|
||||||
|
_log.finest('Resetting internal state');
|
||||||
|
|
||||||
|
await _timerLock.synchronized(() {
|
||||||
|
_timer?.cancel();
|
||||||
|
_timer = null;
|
||||||
|
});
|
||||||
|
|
||||||
|
await setIsReconnecting(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> reset() async {
|
Future<void> reset() async {
|
||||||
_log.finest('Resetting internal state');
|
// ignore: unnecessary_lambdas
|
||||||
_timer?.cancel();
|
await _eventQueue.addJob(() => _reset());
|
||||||
_timer = null;
|
|
||||||
await super.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
Future<void> _onFailure() async {
|
||||||
Future<void> onFailure() async {
|
final shouldContinue = await _timerLock.synchronized(() {
|
||||||
|
return _timer == null;
|
||||||
|
});
|
||||||
|
if (!shouldContinue) {
|
||||||
|
_log.finest(
|
||||||
|
'_onFailure: Not backing off since _timer is already running',
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final seconds =
|
final seconds =
|
||||||
Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime;
|
Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime;
|
||||||
_log.finest('Failure occured. Starting random backoff with ${seconds}s');
|
_log.finest('Failure occured. Starting random backoff with ${seconds}s');
|
||||||
_timer?.cancel();
|
_timer?.cancel();
|
||||||
|
|
||||||
_timer = Timer(Duration(seconds: seconds), onTimerElapsed);
|
_timer = Timer(Duration(seconds: seconds), _onTimerElapsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> onFailure() async {
|
||||||
|
// ignore: unnecessary_lambdas
|
||||||
|
await _eventQueue.addJob(() => _onFailure());
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> onSuccess() async {
|
Future<void> onSuccess() async {
|
||||||
await reset();
|
await reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
@visibleForTesting
|
|
||||||
bool isTimerRunning() => _timer != null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A stub reconnection policy for tests.
|
/// A stub reconnection policy for tests.
|
||||||
@ -169,9 +203,7 @@ class TestingReconnectionPolicy extends ReconnectionPolicy {
|
|||||||
Future<void> onFailure() async {}
|
Future<void> onFailure() async {}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> reset() async {
|
Future<void> reset() async {}
|
||||||
await super.reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A reconnection policy for tests that waits a constant number of seconds before
|
/// A reconnection policy for tests that waits a constant number of seconds before
|
||||||
@ -191,7 +223,5 @@ class TestingSleepReconnectionPolicy extends ReconnectionPolicy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> reset() async {
|
Future<void> reset() async {}
|
||||||
await super.reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -139,11 +139,6 @@ class StubTCPSocket extends BaseSocketWrapper {
|
|||||||
Stream<XmppSocketEvent> getEventStream() =>
|
Stream<XmppSocketEvent> getEventStream() =>
|
||||||
_eventStream.stream.asBroadcastStream();
|
_eventStream.stream.asBroadcastStream();
|
||||||
|
|
||||||
/// "Closes" the socket unexpectedly
|
|
||||||
void injectSocketFault() {
|
|
||||||
_eventStream.add(XmppSocketClosureEvent(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Let the "connection" receive [data].
|
/// Let the "connection" receive [data].
|
||||||
void injectRawXml(String data) {
|
void injectRawXml(String data) {
|
||||||
// ignore: avoid_print
|
// ignore: avoid_print
|
||||||
|
@ -1,78 +0,0 @@
|
|||||||
import 'package:moxxmpp/moxxmpp.dart';
|
|
||||||
import 'package:test/test.dart';
|
|
||||||
import 'helpers/logging.dart';
|
|
||||||
|
|
||||||
void main() {
|
|
||||||
initLogger();
|
|
||||||
|
|
||||||
test('Test triggering a reconnect multiple times', () async {
|
|
||||||
final policy = RandomBackoffReconnectionPolicy(
|
|
||||||
9998,
|
|
||||||
9999,
|
|
||||||
);
|
|
||||||
await policy.setShouldReconnect(true);
|
|
||||||
|
|
||||||
// We have a failure
|
|
||||||
expect(
|
|
||||||
await policy.canTriggerFailure(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
await policy.onFailure();
|
|
||||||
|
|
||||||
// Try to trigger another one
|
|
||||||
expect(
|
|
||||||
await policy.canTriggerFailure(),
|
|
||||||
false,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Test resetting while reconnecting', () async {
|
|
||||||
final policy = RandomBackoffReconnectionPolicy(
|
|
||||||
9998,
|
|
||||||
9999,
|
|
||||||
)..register(() async => expect(true, false));
|
|
||||||
await policy.setShouldReconnect(true);
|
|
||||||
|
|
||||||
// We have a failure
|
|
||||||
expect(
|
|
||||||
await policy.canTriggerFailure(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
await policy.onFailure();
|
|
||||||
expect(policy.isTimerRunning(), true);
|
|
||||||
|
|
||||||
// We reset
|
|
||||||
await policy.reset();
|
|
||||||
expect(policy.isTimerRunning(), false);
|
|
||||||
|
|
||||||
// We have another failure
|
|
||||||
expect(
|
|
||||||
await policy.canTriggerFailure(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Test triggering the timer callback twice', () async {
|
|
||||||
final policy = RandomBackoffReconnectionPolicy(
|
|
||||||
9998,
|
|
||||||
9999,
|
|
||||||
);
|
|
||||||
var counter = 0;
|
|
||||||
policy.register(() async {
|
|
||||||
await policy.reset();
|
|
||||||
counter++;
|
|
||||||
});
|
|
||||||
await policy.setShouldReconnect(true);
|
|
||||||
|
|
||||||
// We have a failure
|
|
||||||
expect(
|
|
||||||
await policy.canTriggerFailure(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
await policy.onFailure();
|
|
||||||
|
|
||||||
await policy.onTimerElapsed();
|
|
||||||
await policy.onTimerElapsed();
|
|
||||||
expect(counter, 1);
|
|
||||||
});
|
|
||||||
}
|
|
@ -427,115 +427,4 @@ void main() {
|
|||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
test('Test losing the connection while negotiation', () async {
|
|
||||||
final fakeSocket = StubTCPSocket(
|
|
||||||
[
|
|
||||||
StringExpectation(
|
|
||||||
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
|
|
||||||
'''
|
|
||||||
<stream:stream
|
|
||||||
xmlns="jabber:client"
|
|
||||||
version="1.0"
|
|
||||||
xmlns:stream="http://etherx.jabber.org/streams"
|
|
||||||
from="test.server"
|
|
||||||
xml:lang="en">
|
|
||||||
<stream:features xmlns="http://etherx.jabber.org/streams">
|
|
||||||
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
|
|
||||||
<mechanism>PLAIN</mechanism>
|
|
||||||
</mechanisms>
|
|
||||||
</stream:features>''',
|
|
||||||
),
|
|
||||||
StringExpectation(
|
|
||||||
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
|
|
||||||
'',
|
|
||||||
),
|
|
||||||
StringExpectation(
|
|
||||||
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
|
|
||||||
'''
|
|
||||||
<stream:stream
|
|
||||||
xmlns="jabber:client"
|
|
||||||
version="1.0"
|
|
||||||
xmlns:stream="http://etherx.jabber.org/streams"
|
|
||||||
from="test.server"
|
|
||||||
xml:lang="en">
|
|
||||||
<stream:features xmlns="http://etherx.jabber.org/streams">
|
|
||||||
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
|
|
||||||
<mechanism>PLAIN</mechanism>
|
|
||||||
</mechanisms>
|
|
||||||
</stream:features>''',
|
|
||||||
),
|
|
||||||
StringExpectation(
|
|
||||||
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
|
|
||||||
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
|
|
||||||
),
|
|
||||||
StringExpectation(
|
|
||||||
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
|
|
||||||
'''
|
|
||||||
<stream:stream
|
|
||||||
xmlns="jabber:client"
|
|
||||||
version="1.0"
|
|
||||||
xmlns:stream="http://etherx.jabber.org/streams"
|
|
||||||
from="test.server"
|
|
||||||
xml:lang="en">
|
|
||||||
<stream:features xmlns="http://etherx.jabber.org/streams">
|
|
||||||
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
|
|
||||||
<required/>
|
|
||||||
</bind>
|
|
||||||
</stream:features>''',
|
|
||||||
),
|
|
||||||
StanzaExpectation(
|
|
||||||
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
|
|
||||||
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>testuser@example.org/MU29eEZn</jid></bind></iq>',
|
|
||||||
ignoreId: true,
|
|
||||||
),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
|
|
||||||
final conn = XmppConnection(
|
|
||||||
TestingReconnectionPolicy(),
|
|
||||||
AlwaysConnectedConnectivityManager(),
|
|
||||||
fakeSocket,
|
|
||||||
);
|
|
||||||
await conn.registerManagers([
|
|
||||||
RosterManager(TestingRosterStateManager('', [])),
|
|
||||||
DiscoManager([]),
|
|
||||||
]);
|
|
||||||
await conn.registerFeatureNegotiators([
|
|
||||||
SaslPlainNegotiator(),
|
|
||||||
ResourceBindingNegotiator(),
|
|
||||||
]);
|
|
||||||
conn.setConnectionSettings(
|
|
||||||
ConnectionSettings(
|
|
||||||
jid: JID.fromString('testuser@example.org'),
|
|
||||||
password: 'abc123',
|
|
||||||
useDirectTLS: false,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
final result1 = conn.connect(
|
|
||||||
waitUntilLogin: true,
|
|
||||||
);
|
|
||||||
await Future<void>.delayed(const Duration(seconds: 2));
|
|
||||||
|
|
||||||
// Inject a fault
|
|
||||||
fakeSocket.injectSocketFault();
|
|
||||||
expect(
|
|
||||||
(await result1).isType<bool>(),
|
|
||||||
false,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Try to connect again
|
|
||||||
final result2 = await conn.connect(
|
|
||||||
waitUntilLogin: true,
|
|
||||||
);
|
|
||||||
expect(
|
|
||||||
fakeSocket.getState(),
|
|
||||||
6,
|
|
||||||
);
|
|
||||||
expect(
|
|
||||||
result2.isType<bool>(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user