Compare commits

...

3 Commits

6 changed files with 264 additions and 174 deletions

View File

@ -91,7 +91,6 @@ class XmppConnection {
// Allow the reconnection policy to perform reconnections by itself // Allow the reconnection policy to perform reconnections by itself
_reconnectionPolicy.register( _reconnectionPolicy.register(
_attemptReconnection, _attemptReconnection,
_onNetworkConnectionLost,
); );
_socketStream = _socket.getDataStream(); _socketStream = _socket.getDataStream();
@ -114,6 +113,7 @@ class XmppConnection {
/// A policy on how to reconnect /// A policy on how to reconnect
final ReconnectionPolicy _reconnectionPolicy; final ReconnectionPolicy _reconnectionPolicy;
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
/// The class responsible for preventing errors on initial connection due /// The class responsible for preventing errors on initial connection due
/// to no network. /// to no network.
@ -173,34 +173,9 @@ class XmppConnection {
/// The logger for the class /// The logger for the class
final Logger _log = Logger('XmppConnection'); final Logger _log = Logger('XmppConnection');
/// A value indicating whether a connection attempt is currently running or not
bool _isConnectionRunning = false;
final Lock _connectionRunningLock = Lock();
/// Flag indicating whether reconnection should be enabled after a successful connection. /// Flag indicating whether reconnection should be enabled after a successful connection.
bool _enableReconnectOnSuccess = false; bool _enableReconnectOnSuccess = false;
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and does the following:
/// - if _isConnectionRunning is false, set it to true and return false.
/// - if _isConnectionRunning is true, return true.
Future<bool> _testAndSetIsConnectionRunning() async =>
_connectionRunningLock.synchronized(() {
if (!_isConnectionRunning) {
_isConnectionRunning = true;
return false;
}
return true;
});
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and sets it to false.
Future<void> _resetIsConnectionRunning() async =>
_connectionRunningLock.synchronized(() => _isConnectionRunning = false);
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
bool get isAuthenticated => _isAuthenticated; bool get isAuthenticated => _isAuthenticated;
/// Return the registered feature negotiator that has id [id]. Returns null if /// Return the registered feature negotiator that has id [id]. Returns null if
@ -364,13 +339,6 @@ class XmppConnection {
/// Attempts to reconnect to the server by following an exponential backoff. /// Attempts to reconnect to the server by following an exponential backoff.
Future<void> _attemptReconnection() async { Future<void> _attemptReconnection() async {
if (await _testAndSetIsConnectionRunning()) {
_log.warning(
'_attemptReconnection is called but connection attempt is already running. Ignoring...',
);
return;
}
_log.finest('_attemptReconnection: Setting state to notConnected'); _log.finest('_attemptReconnection: Setting state to notConnected');
await _setConnectionState(XmppConnectionState.notConnected); await _setConnectionState(XmppConnectionState.notConnected);
_log.finest('_attemptReconnection: Done'); _log.finest('_attemptReconnection: Done');
@ -384,14 +352,13 @@ class XmppConnection {
_log.finest('Calling _connectImpl() from _attemptReconnection'); _log.finest('Calling _connectImpl() from _attemptReconnection');
await _connectImpl( await _connectImpl(
waitForConnection: true, waitForConnection: true,
ignoreLock: true,
); );
} }
/// Called when a stream ending error has occurred /// Called when a stream ending error has occurred
Future<void> handleError(XmppError error) async { Future<void> handleError(XmppError error) async {
_log.severe('handleError called with ${error.toString()}'); _log.severe('handleError called with ${error.toString()}');
// Whenever we encounter an error that would trigger a reconnection attempt while // Whenever we encounter an error that would trigger a reconnection attempt while
// the connection result is being awaited, don't attempt a reconnection but instead // the connection result is being awaited, don't attempt a reconnection but instead
// try to gracefully disconnect. // try to gracefully disconnect.
@ -430,7 +397,7 @@ class XmppConnection {
// The error is recoverable // The error is recoverable
await _setConnectionState(XmppConnectionState.notConnected); await _setConnectionState(XmppConnectionState.notConnected);
if (await _reconnectionPolicy.getShouldReconnect()) { if (await _reconnectionPolicy.canTriggerFailure()) {
await _reconnectionPolicy.onFailure(); await _reconnectionPolicy.onFailure();
} else { } else {
_log.info( _log.info(
@ -615,7 +582,6 @@ class XmppConnection {
/// Called when we timeout during connecting /// Called when we timeout during connecting
Future<void> _onConnectingTimeout() async { Future<void> _onConnectingTimeout() async {
_log.severe('Connection stuck in "connecting". Causing a reconnection...'); _log.severe('Connection stuck in "connecting". Causing a reconnection...');
await _resetIsConnectionRunning();
await handleError(TimeoutError()); await handleError(TimeoutError());
} }
@ -844,7 +810,6 @@ class XmppConnection {
/// a disco sweep among other things. /// a disco sweep among other things.
Future<void> _onNegotiationsDone() async { Future<void> _onNegotiationsDone() async {
// Set the connection state // Set the connection state
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.connected); await _setConnectionState(XmppConnectionState.connected);
// Enable reconnections // Enable reconnections
@ -880,14 +845,12 @@ class XmppConnection {
// We failed before authenticating // We failed before authenticating
if (!_isAuthenticated) { if (!_isAuthenticated) {
_log.severe('No negotiator could be picked while unauthenticated'); _log.severe('No negotiator could be picked while unauthenticated');
await _resetIsConnectionRunning();
await handleError(NoMatchingAuthenticationMechanismAvailableError()); await handleError(NoMatchingAuthenticationMechanismAvailableError());
return; return;
} else { } else {
_log.severe( _log.severe(
'No negotiator could be picked while negotiations are not done', 'No negotiator could be picked while negotiations are not done',
); );
await _resetIsConnectionRunning();
await handleError(NoAuthenticatorAvailableError()); await handleError(NoAuthenticatorAvailableError());
return; return;
} }
@ -896,7 +859,6 @@ class XmppConnection {
final result = await _currentNegotiator!.negotiate(nonza); final result = await _currentNegotiator!.negotiate(nonza);
if (result.isType<NegotiatorError>()) { if (result.isType<NegotiatorError>()) {
_log.severe('Negotiator returned an error'); _log.severe('Negotiator returned an error');
await _resetIsConnectionRunning();
await handleError(result.get<NegotiatorError>()); await handleError(result.get<NegotiatorError>());
return; return;
} }
@ -919,7 +881,6 @@ class XmppConnection {
!_isNegotiationPossible(_streamFeatures)) { !_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!'); _log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
} else { } else {
_currentNegotiator = getNextNegotiator(_streamFeatures); _currentNegotiator = getNextNegotiator(_streamFeatures);
@ -943,7 +904,6 @@ class XmppConnection {
_log.finest('Negotiations done!'); _log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
} else { } else {
_log.finest('Picking new negotiator...'); _log.finest('Picking new negotiator...');
@ -962,7 +922,6 @@ class XmppConnection {
); );
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
break; break;
} }
@ -1049,13 +1008,6 @@ class XmppConnection {
); );
} }
/// To be called when we lost the network connection.
Future<void> _onNetworkConnectionLost() async {
_socket.close();
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.notConnected);
}
/// Attempt to gracefully close the session /// Attempt to gracefully close the session
Future<void> disconnect() async { Future<void> disconnect() async {
await _disconnect(state: XmppConnectionState.notConnected); await _disconnect(state: XmppConnectionState.notConnected);
@ -1100,34 +1052,24 @@ class XmppConnection {
/// The private implementation for [XmppConnection.connect]. The parameters have /// The private implementation for [XmppConnection.connect]. The parameters have
/// the same meaning as with [XmppConnection.connect]. /// the same meaning as with [XmppConnection.connect].
///
/// [ignoreLock] is a flag that, if true, causes the method to not try to aquire
/// a connection lock. This is useful when we already aquired the connection lock,
/// for example, in _attemptReconnection.
Future<Result<bool, XmppError>> _connectImpl({ Future<Result<bool, XmppError>> _connectImpl({
String? lastResource, String? lastResource,
bool waitForConnection = false, bool waitForConnection = false,
bool shouldReconnect = true, bool shouldReconnect = true,
bool waitUntilLogin = false, bool waitUntilLogin = false,
bool enableReconnectOnSuccess = true, bool enableReconnectOnSuccess = true,
bool ignoreLock = false,
}) async { }) async {
if (!ignoreLock) { // Kill a possibly existing connection
if (await _testAndSetIsConnectionRunning()) { _socket.close();
_log.fine(
'Cancelling this connection attempt as one appears to be already running.',
);
return Future.value(
Result(
ConnectionAlreadyRunningError(),
),
);
}
await _resetIsConnectionRunning(); await _reconnectionPolicy.reset();
_enableReconnectOnSuccess = enableReconnectOnSuccess;
if (shouldReconnect) {
await _reconnectionPolicy.setShouldReconnect(true);
} else { } else {
_log.fine('Ignoring connection lock as ignoreLock = true'); await _reconnectionPolicy.setShouldReconnect(false);
} }
await _sendEvent(ConnectingEvent());
if (waitUntilLogin) { if (waitUntilLogin) {
_log.finest('Setting up completer for awaiting completed login'); _log.finest('Setting up completer for awaiting completed login');
@ -1140,16 +1082,6 @@ class XmppConnection {
setResource('', triggerEvent: false); setResource('', triggerEvent: false);
} }
_enableReconnectOnSuccess = enableReconnectOnSuccess;
if (shouldReconnect) {
await _reconnectionPolicy.setShouldReconnect(true);
} else {
await _reconnectionPolicy.setShouldReconnect(false);
}
await _reconnectionPolicy.reset();
await _sendEvent(ConnectingEvent());
// If requested, wait until we have a network connection // If requested, wait until we have a network connection
if (waitForConnection) { if (waitForConnection) {
_log.info('Waiting for okay from connectivityManager'); _log.info('Waiting for okay from connectivityManager');

View File

@ -4,12 +4,6 @@ import 'package:moxxmpp/src/negotiators/negotiator.dart';
/// The reason a call to `XmppConnection.connect` failed. /// The reason a call to `XmppConnection.connect` failed.
abstract class XmppConnectionError extends XmppError {} abstract class XmppConnectionError extends XmppError {}
/// Returned by `XmppConnection.connect` when a connection is already active.
class ConnectionAlreadyRunningError extends XmppConnectionError {
@override
bool isRecoverable() => true;
}
/// Returned by `XmppConnection.connect` when a negotiator returned an unrecoverable /// Returned by `XmppConnection.connect` when a negotiator returned an unrecoverable
/// error. Only returned when waitUntilLogin is true. /// error. Only returned when waitUntilLogin is true.
class NegotiatorReturnedError extends XmppConnectionError { class NegotiatorReturnedError extends XmppConnectionError {

View File

@ -2,7 +2,6 @@ import 'dart:async';
import 'dart:math'; import 'dart:math';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/util/queue.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
/// A callback function to be called when the connection to the server has been lost. /// A callback function to be called when the connection to the server has been lost.
@ -17,70 +16,68 @@ abstract class ReconnectionPolicy {
/// to perform a reconnection. /// to perform a reconnection.
PerformReconnectFunction? performReconnect; PerformReconnectFunction? performReconnect;
/// Function provided by XmppConnection that allows the policy final Lock _lock = Lock();
/// to say that we lost the connection.
ConnectionLostCallback? triggerConnectionLost; /// Indicate if a reconnection attempt is currently running.
bool _isReconnecting = false;
/// Indicate if should try to reconnect. /// Indicate if should try to reconnect.
bool _shouldAttemptReconnection = false; bool _shouldAttemptReconnection = false;
/// Indicate if a reconnection attempt is currently running.
@protected @protected
bool isReconnecting = false; Future<bool> canTryReconnecting() async => _lock.synchronized(() => !_isReconnecting);
/// And the corresponding lock
@protected @protected
final Lock lock = Lock(); Future<bool> getIsReconnecting() async => _lock.synchronized(() => _isReconnecting);
/// The lock for accessing [_shouldAttemptReconnection]
@protected
final Lock shouldReconnectLock = Lock();
Future<void> _resetIsReconnecting() async {
await _lock.synchronized(() {
_isReconnecting = false;
});
}
/// Called by XmppConnection to register the policy. /// Called by XmppConnection to register the policy.
void register( void register(
PerformReconnectFunction performReconnect, PerformReconnectFunction performReconnect,
ConnectionLostCallback triggerConnectionLost,
) { ) {
this.performReconnect = performReconnect; this.performReconnect = performReconnect;
this.triggerConnectionLost = triggerConnectionLost;
unawaited(reset());
} }
/// In case the policy depends on some internal state, this state must be reset /// In case the policy depends on some internal state, this state must be reset
/// to an initial state when reset is called. In case timers run, they must be /// to an initial state when reset is called. In case timers run, they must be
/// terminated. /// terminated.
Future<void> reset(); @mustCallSuper
Future<void> reset() async {
await _resetIsReconnecting();
}
@mustCallSuper
Future<bool> canTriggerFailure() async {
return _lock.synchronized(() {
if (_shouldAttemptReconnection && !_isReconnecting) {
_isReconnecting = true;
return true;
}
return false;
});
}
/// Called by the XmppConnection when the reconnection failed. /// Called by the XmppConnection when the reconnection failed.
Future<void> onFailure() async {} Future<void> onFailure() async {}
/// Caled by the XmppConnection when the reconnection was successful. /// Caled by the XmppConnection when the reconnection was successful.
Future<void> onSuccess(); Future<void> onSuccess();
Future<bool> getShouldReconnect() async { Future<bool> getShouldReconnect() async {
return shouldReconnectLock.synchronized(() => _shouldAttemptReconnection); return _lock.synchronized(() => _shouldAttemptReconnection);
} }
/// Set whether a reconnection attempt should be made. /// Set whether a reconnection attempt should be made.
Future<void> setShouldReconnect(bool value) async { Future<void> setShouldReconnect(bool value) async {
return shouldReconnectLock return _lock
.synchronized(() => _shouldAttemptReconnection = value); .synchronized(() => _shouldAttemptReconnection = value);
} }
/// Returns true if the manager is currently triggering a reconnection. If not, returns
/// false.
Future<bool> isReconnectionRunning() async {
return lock.synchronized(() => isReconnecting);
}
/// Set the isReconnecting state to [value].
@protected
Future<void> setIsReconnecting(bool value) async {
await lock.synchronized(() async {
isReconnecting = value;
});
}
} }
/// A simple reconnection strategy: Make the reconnection delays exponentially longer /// A simple reconnection strategy: Make the reconnection delays exponentially longer
@ -105,90 +102,59 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
/// Backoff timer. /// Backoff timer.
Timer? _timer; Timer? _timer;
final Lock _timerLock = Lock();
/// Logger. /// Logger.
final Logger _log = Logger('RandomBackoffReconnectionPolicy'); final Logger _log = Logger('RandomBackoffReconnectionPolicy');
/// Event queue final Lock _timerLock = Lock();
final AsyncQueue _eventQueue = AsyncQueue();
/// Called when the backoff expired /// Called when the backoff expired
Future<void> _onTimerElapsed() async { @visibleForTesting
_log.fine('Timer elapsed. Waiting for lock'); Future<void> onTimerElapsed() async {
await lock.synchronized(() async { _log.fine('Timer elapsed. Waiting for lock...');
_log.fine('Lock aquired'); await _timerLock.synchronized(() async {
if (!(await getShouldReconnect())) { if (!(await getIsReconnecting())) {
_log.fine(
'Backoff timer expired but getShouldReconnect() returned false',
);
return; return;
} }
if (isReconnecting) { if (!(await getShouldReconnect())) {
_log.fine( _log.fine(
'Backoff timer expired but a reconnection is running, so doing nothing.', 'Should not reconnect. Stopping here.',
); );
return; return;
} }
_log.fine('Triggering reconnect'); _log.fine('Triggering reconnect');
isReconnecting = true; _timer?.cancel();
_timer = null;
await performReconnect!(); await performReconnect!();
}); });
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = null;
});
}
Future<void> _reset() async {
_log.finest('Resetting internal state');
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = null;
});
await setIsReconnecting(false);
} }
@override @override
Future<void> reset() async { Future<void> reset() async {
// ignore: unnecessary_lambdas _log.finest('Resetting internal state');
await _eventQueue.addJob(() => _reset()); _timer?.cancel();
_timer = null;
await super.reset();
} }
Future<void> _onFailure() async { @override
final shouldContinue = await _timerLock.synchronized(() { Future<void> onFailure() async {
return _timer == null;
});
if (!shouldContinue) {
_log.finest(
'_onFailure: Not backing off since _timer is already running',
);
return;
}
final seconds = final seconds =
Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime; Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime;
_log.finest('Failure occured. Starting random backoff with ${seconds}s'); _log.finest('Failure occured. Starting random backoff with ${seconds}s');
_timer?.cancel(); _timer?.cancel();
_timer = Timer(Duration(seconds: seconds), _onTimerElapsed); _timer = Timer(Duration(seconds: seconds), onTimerElapsed);
} }
@override
Future<void> onFailure() async {
// ignore: unnecessary_lambdas
await _eventQueue.addJob(() => _onFailure());
}
@override @override
Future<void> onSuccess() async { Future<void> onSuccess() async {
await reset(); await reset();
} }
@visibleForTesting
bool isTimerRunning() => _timer != null;
} }
/// A stub reconnection policy for tests. /// A stub reconnection policy for tests.
@ -203,7 +169,9 @@ class TestingReconnectionPolicy extends ReconnectionPolicy {
Future<void> onFailure() async {} Future<void> onFailure() async {}
@override @override
Future<void> reset() async {} Future<void> reset() async {
await super.reset();
}
} }
/// A reconnection policy for tests that waits a constant number of seconds before /// A reconnection policy for tests that waits a constant number of seconds before
@ -223,5 +191,7 @@ class TestingSleepReconnectionPolicy extends ReconnectionPolicy {
} }
@override @override
Future<void> reset() async {} Future<void> reset() async {
await super.reset();
}
} }

View File

@ -139,6 +139,11 @@ class StubTCPSocket extends BaseSocketWrapper {
Stream<XmppSocketEvent> getEventStream() => Stream<XmppSocketEvent> getEventStream() =>
_eventStream.stream.asBroadcastStream(); _eventStream.stream.asBroadcastStream();
/// "Closes" the socket unexpectedly
void injectSocketFault() {
_eventStream.add(XmppSocketClosureEvent(false));
}
/// Let the "connection" receive [data]. /// Let the "connection" receive [data].
void injectRawXml(String data) { void injectRawXml(String data) {
// ignore: avoid_print // ignore: avoid_print

View File

@ -0,0 +1,78 @@
import 'package:moxxmpp/moxxmpp.dart';
import 'package:test/test.dart';
import 'helpers/logging.dart';
void main() {
initLogger();
test('Test triggering a reconnect multiple times', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
);
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
// Try to trigger another one
expect(
await policy.canTriggerFailure(),
false,
);
});
test('Test resetting while reconnecting', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
)..register(() async => expect(true, false));
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
expect(policy.isTimerRunning(), true);
// We reset
await policy.reset();
expect(policy.isTimerRunning(), false);
// We have another failure
expect(
await policy.canTriggerFailure(),
true,
);
});
test('Test triggering the timer callback twice', () async {
final policy = RandomBackoffReconnectionPolicy(
9998,
9999,
);
var counter = 0;
policy.register(() async {
await policy.reset();
counter++;
});
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
true,
);
await policy.onFailure();
await policy.onTimerElapsed();
await policy.onTimerElapsed();
expect(counter, 1);
});
}

View File

@ -427,4 +427,115 @@ void main() {
true, true,
); );
}); });
test('Test losing the connection while negotiation', () async {
final fakeSocket = StubTCPSocket(
[
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
'',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
</stream:features>''',
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>testuser@example.org/MU29eEZn</jid></bind></iq>',
ignoreId: true,
),
],
);
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
fakeSocket,
);
await conn.registerManagers([
RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]),
]);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
]);
conn.setConnectionSettings(
ConnectionSettings(
jid: JID.fromString('testuser@example.org'),
password: 'abc123',
useDirectTLS: false,
),
);
final result1 = conn.connect(
waitUntilLogin: true,
);
await Future<void>.delayed(const Duration(seconds: 2));
// Inject a fault
fakeSocket.injectSocketFault();
expect(
(await result1).isType<bool>(),
false,
);
// Try to connect again
final result2 = await conn.connect(
waitUntilLogin: true,
);
expect(
fakeSocket.getState(),
6,
);
expect(
result2.isType<bool>(),
true,
);
});
} }