feat: Try to lock reconnections behind a flag

This commit is contained in:
PapaTutuWawa 2023-01-13 15:16:51 +01:00
parent 890fcfb506
commit ad1242c47d
2 changed files with 56 additions and 17 deletions

View File

@ -49,10 +49,12 @@ enum XmppConnectionState {
enum StanzaFromType { enum StanzaFromType {
/// Add the full JID to the stanza as the from attribute /// Add the full JID to the stanza as the from attribute
full, full,
/// Add the bare JID to the stanza as the from attribute /// Add the bare JID to the stanza as the from attribute
bare, bare,
/// Add no JID as the from attribute /// Add no JID as the from attribute
none none,
} }
/// Nonza describing the XMPP stream header. /// Nonza describing the XMPP stream header.
@ -207,9 +209,30 @@ class XmppConnection {
/// is still running. /// is still running.
final Lock _negotiationLock = Lock(); final Lock _negotiationLock = Lock();
/// Misc /// The logger for the class
final Logger _log = Logger('XmppConnection'); final Logger _log = Logger('XmppConnection');
/// A value indicating whether a connection attempt is currently running or not
bool _isConnectionRunning = false;
final Lock _connectionRunningLock = Lock();
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and does the following:
/// - if _isConnectionRunning is false, set it to true and return false.
/// - if _isConnectionRunning is true, return true.
Future<bool> _testAndSetIsConnectionRunning() async => _connectionRunningLock.synchronized(() {
if (!_isConnectionRunning) {
_isConnectionRunning = true;
return false;
}
return true;
});
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and sets it to false.
Future<void> _resetIsConnectionRunning() async => _connectionRunningLock.synchronized(() => _isConnectionRunning = false);
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy; ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
List<String> get serverFeatures => _serverFeatures; List<String> get serverFeatures => _serverFeatures;
@ -365,6 +388,11 @@ class XmppConnection {
/// Attempts to reconnect to the server by following an exponential backoff. /// Attempts to reconnect to the server by following an exponential backoff.
Future<void> _attemptReconnection() async { Future<void> _attemptReconnection() async {
if (await _testAndSetIsConnectionRunning()) {
_log.warning('_attemptReconnection is called but connection attempt is already running. Ignoring...');
return;
}
_log.finest('_attemptReconnection: Setting state to notConnected'); _log.finest('_attemptReconnection: Setting state to notConnected');
await _setConnectionState(XmppConnectionState.notConnected); await _setConnectionState(XmppConnectionState.notConnected);
_log.finest('_attemptReconnection: Done'); _log.finest('_attemptReconnection: Done');
@ -400,6 +428,7 @@ class XmppConnection {
} }
await _setConnectionState(XmppConnectionState.error); await _setConnectionState(XmppConnectionState.error);
await _resetIsConnectionRunning();
await _reconnectionPolicy.onFailure(); await _reconnectionPolicy.onFailure();
} }
@ -797,6 +826,7 @@ class XmppConnection {
/// a disco sweep among other things. /// a disco sweep among other things.
Future<void> _onNegotiationsDone() async { Future<void> _onNegotiationsDone() async {
// Set the connection state // Set the connection state
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.connected); await _setConnectionState(XmppConnectionState.connected);
// Resolve the connection completion future // Resolve the connection completion future
@ -980,9 +1010,10 @@ class XmppConnection {
} }
/// To be called when we lost the network connection. /// To be called when we lost the network connection.
void _onNetworkConnectionLost() { Future<void> _onNetworkConnectionLost() async {
_socket.close(); _socket.close();
_setConnectionState(XmppConnectionState.notConnected); await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.notConnected);
} }
/// Attempt to gracefully close the session /// Attempt to gracefully close the session
@ -1023,11 +1054,12 @@ class XmppConnection {
/// Like [connect] but the Future resolves when the resource binding is either done or /// Like [connect] but the Future resolves when the resource binding is either done or
/// SASL has failed. /// SASL has failed.
Future<XmppConnectionResult> connectAwaitable({ String? lastResource }) { Future<XmppConnectionResult> connectAwaitable({ String? lastResource }) async {
_runPreConnectionAssertions(); _runPreConnectionAssertions();
await _resetIsConnectionRunning();
_connectionCompleter = Completer(); _connectionCompleter = Completer();
_log.finest('Calling connect() from connectAwaitable'); _log.finest('Calling connect() from connectAwaitable');
connect(lastResource: lastResource); await connect(lastResource: lastResource);
return _connectionCompleter!.future; return _connectionCompleter!.future;
} }
@ -1039,6 +1071,7 @@ class XmppConnection {
} }
_runPreConnectionAssertions(); _runPreConnectionAssertions();
await _resetIsConnectionRunning();
_reconnectionPolicy.setShouldReconnect(true); _reconnectionPolicy.setShouldReconnect(true);
if (lastResource != null) { if (lastResource != null) {

View File

@ -4,27 +4,33 @@ import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
abstract class ReconnectionPolicy { /// A callback function to be called when the connection to the server has been lost.
typedef ConnectionLostCallback = Future<void> Function();
ReconnectionPolicy() /// A function that, when called, causes the XmppConnection to connect to the server, if
: _shouldAttemptReconnection = false, /// another reconnection is not already running.
_isReconnecting = false, typedef PerformReconnectFunction = Future<void> Function();
_isReconnectingLock = Lock();
abstract class ReconnectionPolicy {
/// Function provided by XmppConnection that allows the policy /// Function provided by XmppConnection that allows the policy
/// to perform a reconnection. /// to perform a reconnection.
Future<void> Function()? performReconnect; PerformReconnectFunction? performReconnect;
/// Function provided by XmppConnection that allows the policy /// Function provided by XmppConnection that allows the policy
/// to say that we lost the connection. /// to say that we lost the connection.
void Function()? triggerConnectionLost; ConnectionLostCallback? triggerConnectionLost;
/// Indicate if should try to reconnect. /// Indicate if should try to reconnect.
bool _shouldAttemptReconnection; bool _shouldAttemptReconnection = false;
/// Indicate if a reconnection attempt is currently running. /// Indicate if a reconnection attempt is currently running.
bool _isReconnecting; bool _isReconnecting = false;
/// And the corresponding lock /// And the corresponding lock
final Lock _isReconnectingLock; final Lock _isReconnectingLock = Lock();
/// Called by XmppConnection to register the policy. /// Called by XmppConnection to register the policy.
void register(Future<void> Function() performReconnect, void Function() triggerConnectionLost) { void register(PerformReconnectFunction performReconnect, ConnectionLostCallback triggerConnectionLost) {
this.performReconnect = performReconnect; this.performReconnect = performReconnect;
this.triggerConnectionLost = triggerConnectionLost; this.triggerConnectionLost = triggerConnectionLost;