feat(core): Remove the connection lock

This commit is contained in:
PapaTutuWawa 2023-04-03 12:46:15 +02:00
parent 2947e2c539
commit 3da334b5cf
4 changed files with 120 additions and 64 deletions

View File

@ -114,6 +114,7 @@ class XmppConnection {
/// A policy on how to reconnect
final ReconnectionPolicy _reconnectionPolicy;
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
/// The class responsible for preventing errors on initial connection due
/// to no network.
@ -173,34 +174,9 @@ class XmppConnection {
/// The logger for the class
final Logger _log = Logger('XmppConnection');
/// A value indicating whether a connection attempt is currently running or not
bool _isConnectionRunning = false;
final Lock _connectionRunningLock = Lock();
/// Flag indicating whether reconnection should be enabled after a successful connection.
bool _enableReconnectOnSuccess = false;
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and does the following:
/// - if _isConnectionRunning is false, set it to true and return false.
/// - if _isConnectionRunning is true, return true.
Future<bool> _testAndSetIsConnectionRunning() async =>
_connectionRunningLock.synchronized(() {
if (!_isConnectionRunning) {
_isConnectionRunning = true;
return false;
}
return true;
});
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and sets it to false.
Future<void> _resetIsConnectionRunning() async =>
_connectionRunningLock.synchronized(() => _isConnectionRunning = false);
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
bool get isAuthenticated => _isAuthenticated;
/// Return the registered feature negotiator that has id [id]. Returns null if
@ -364,13 +340,6 @@ class XmppConnection {
/// Attempts to reconnect to the server by following an exponential backoff.
Future<void> _attemptReconnection() async {
if (await _testAndSetIsConnectionRunning()) {
_log.warning(
'_attemptReconnection is called but connection attempt is already running. Ignoring...',
);
return;
}
_log.finest('_attemptReconnection: Setting state to notConnected');
await _setConnectionState(XmppConnectionState.notConnected);
_log.finest('_attemptReconnection: Done');
@ -615,7 +584,6 @@ class XmppConnection {
/// Called when we timeout during connecting
Future<void> _onConnectingTimeout() async {
_log.severe('Connection stuck in "connecting". Causing a reconnection...');
await _resetIsConnectionRunning();
await handleError(TimeoutError());
}
@ -844,7 +812,6 @@ class XmppConnection {
/// a disco sweep among other things.
Future<void> _onNegotiationsDone() async {
// Set the connection state
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.connected);
// Enable reconnections
@ -880,14 +847,12 @@ class XmppConnection {
// We failed before authenticating
if (!_isAuthenticated) {
_log.severe('No negotiator could be picked while unauthenticated');
await _resetIsConnectionRunning();
await handleError(NoMatchingAuthenticationMechanismAvailableError());
return;
} else {
_log.severe(
'No negotiator could be picked while negotiations are not done',
);
await _resetIsConnectionRunning();
await handleError(NoAuthenticatorAvailableError());
return;
}
@ -896,7 +861,6 @@ class XmppConnection {
final result = await _currentNegotiator!.negotiate(nonza);
if (result.isType<NegotiatorError>()) {
_log.severe('Negotiator returned an error');
await _resetIsConnectionRunning();
await handleError(result.get<NegotiatorError>());
return;
}
@ -919,7 +883,6 @@ class XmppConnection {
!_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone();
} else {
_currentNegotiator = getNextNegotiator(_streamFeatures);
@ -943,7 +906,6 @@ class XmppConnection {
_log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone();
} else {
_log.finest('Picking new negotiator...');
@ -962,7 +924,6 @@ class XmppConnection {
);
_updateRoutingState(RoutingState.handleStanzas);
await _resetIsConnectionRunning();
await _onNegotiationsDone();
break;
}
@ -1052,7 +1013,6 @@ class XmppConnection {
/// To be called when we lost the network connection.
Future<void> _onNetworkConnectionLost() async {
_socket.close();
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.notConnected);
}
@ -1112,22 +1072,8 @@ class XmppConnection {
bool enableReconnectOnSuccess = true,
bool ignoreLock = false,
}) async {
if (!ignoreLock) {
if (await _testAndSetIsConnectionRunning()) {
_log.fine(
'Cancelling this connection attempt as one appears to be already running.',
);
return Future.value(
Result(
ConnectionAlreadyRunningError(),
),
);
}
await _resetIsConnectionRunning();
} else {
_log.fine('Ignoring connection lock as ignoreLock = true');
}
// Kill a possibly existing connection
_socket.close();
if (waitUntilLogin) {
_log.finest('Setting up completer for awaiting completed login');

View File

@ -4,12 +4,6 @@ import 'package:moxxmpp/src/negotiators/negotiator.dart';
/// The reason a call to `XmppConnection.connect` failed.
abstract class XmppConnectionError extends XmppError {}
/// Returned by `XmppConnection.connect` when a connection is already active.
class ConnectionAlreadyRunningError extends XmppConnectionError {
@override
bool isRecoverable() => true;
}
/// Returned by `XmppConnection.connect` when a negotiator returned an unrecoverable
/// error. Only returned when waitUntilLogin is true.
class NegotiatorReturnedError extends XmppConnectionError {

View File

@ -139,6 +139,11 @@ class StubTCPSocket extends BaseSocketWrapper {
Stream<XmppSocketEvent> getEventStream() =>
_eventStream.stream.asBroadcastStream();
/// "Closes" the socket unexpectedly
void injectSocketFault() {
_eventStream.add(XmppSocketClosureEvent(false));
}
/// Let the "connection" receive [data].
void injectRawXml(String data) {
// ignore: avoid_print

View File

@ -427,4 +427,115 @@ void main() {
true,
);
});
test('Test losing the connection while negotiation', () async {
final fakeSocket = StubTCPSocket(
[
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
'',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHRlc3R1c2VyAGFiYzEyMw==</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='example.org' from='testuser@example.org' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
</stream:features>''',
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>testuser@example.org/MU29eEZn</jid></bind></iq>',
ignoreId: true,
),
],
);
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
fakeSocket,
);
await conn.registerManagers([
RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]),
]);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
]);
conn.setConnectionSettings(
ConnectionSettings(
jid: JID.fromString('testuser@example.org'),
password: 'abc123',
useDirectTLS: false,
),
);
final result1 = conn.connect(
waitUntilLogin: true,
);
await Future<void>.delayed(const Duration(seconds: 2));
// Inject a fault
fakeSocket.injectSocketFault();
expect(
(await result1).isType<bool>(),
false,
);
// Try to connect again
final result2 = await conn.connect(
waitUntilLogin: true,
);
expect(
fakeSocket.getState(),
6,
);
expect(
result2.isType<bool>(),
true,
);
});
}