xmpp: Add support for setting reconnection policies

This commit is contained in:
PapaTutuWawa 2022-05-30 16:26:34 +02:00
parent dfacbca446
commit aae126a3de
7 changed files with 130 additions and 39 deletions

View File

@ -9,6 +9,7 @@ import "package:moxxyv2/xmpp/connection.dart";
import "package:moxxyv2/xmpp/presence.dart";
import "package:moxxyv2/xmpp/message.dart";
import "package:moxxyv2/xmpp/ping.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/xeps/xep_0054.dart";
import "package:moxxyv2/xmpp/xeps/xep_0060.dart";
import "package:moxxyv2/xmpp/xeps/xep_0066.dart";
@ -166,7 +167,7 @@ Future<void> entrypoint() async {
// Init the UDPLogger
await initUDPLogger();
final connection = XmppConnection();
final connection = XmppConnection(ExponentialBackoffReconnectionPolicy());
connection.registerManagers([
MoxxyStreamManagementManager(),
MoxxyDiscoManager(),

View File

@ -1,5 +1,4 @@
import "dart:async";
import "dart:math";
import "package:moxxyv2/xmpp/socket.dart";
import "package:moxxyv2/xmpp/buffer.dart";
@ -12,6 +11,7 @@ import "package:moxxyv2/xmpp/events.dart";
import "package:moxxyv2/xmpp/iq.dart";
import "package:moxxyv2/xmpp/presence.dart";
import "package:moxxyv2/xmpp/roster.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/sasl/authenticator.dart";
import "package:moxxyv2/xmpp/sasl/authenticators.dart";
import "package:moxxyv2/xmpp/managers/base.dart";
@ -30,6 +30,7 @@ import "package:uuid/uuid.dart";
import "package:synchronized/synchronized.dart";
import "package:logging/logging.dart";
import "package:meta/meta.dart";
import "package:moxlib/moxlib.dart";
enum XmppConnectionState {
notConnected,
@ -94,6 +95,7 @@ class XmppConnection {
XmppConnectionState _connectionState;
late final Stream<String> _socketStream;
late ConnectionSettings _connectionSettings;
final ReconnectionPolicy _reconnectionPolicy;
/// Stream properties
///
@ -112,8 +114,6 @@ class XmppConnection {
RoutingState _routingState;
/// The currently bound resource or "" if none has been bound yet.
String _resource;
/// Counter for how manyy we have tried to reconnect.
int _currentBackoffAttempt;
/// For indicating in a [ConnectionStateChangedEvent] that the event occured because we
/// did a reconnection.
bool _resuming;
@ -122,9 +122,8 @@ class XmppConnection {
bool _disconnecting;
/// For indicating whether we expect a socket closure due to StartTLS.
bool _performingStartTLS;
/// Timers for the keep-alive ping and the backoff connection process.
/// Timers for the keep-alive ping.
Timer? _connectionPingTimer;
Timer? _backoffTimer;
/// Completers for certain actions
Completer<XmppConnectionResult>? _connectionCompleter;
@ -137,16 +136,18 @@ class XmppConnection {
/// [socket] is for debugging purposes.
/// [connectionPingDuration] is the duration after which a ping will be sent to keep
/// the connection open. Defaults to 15 minutes.
XmppConnection({
XmppConnection(
ReconnectionPolicy reconnectionPolicy,
{
BaseSocketWrapper? socket,
this.connectionPingDuration = const Duration(minutes: 5)
}) :
}
) :
_connectionState = XmppConnectionState.notConnected,
_routingState = RoutingState.unauthenticated,
_eventStreamController = StreamController.broadcast(),
_resource = "",
_streamBuffer = XmlStreamBuffer(),
_currentBackoffAttempt = 0,
_resuming = true,
_performingStartTLS = false,
_disconnecting = false,
@ -159,11 +160,15 @@ class XmppConnection {
_incomingStanzaHandlers = List.empty(growable: true),
_outgoingPreStanzaHandlers = List.empty(growable: true),
_outgoingPostStanzaHandlers = List.empty(growable: true),
_reconnectionPolicy = reconnectionPolicy,
_log = Logger("XmppConnection") {
_socketStream = _socket.getDataStream();
// TODO: Handle on done
_socketStream.transform(_streamBuffer).forEach(handleXmlStream);
_socket.getEventStream().listen(_handleSocketEvent);
// Allow the reconnection policy to perform reconnections by itself
_reconnectionPolicy.register(_attemptReconnection);
_socketStream = _socket.getDataStream();
// TODO: Handle on done
_socketStream.transform(_streamBuffer).forEach(handleXmlStream);
_socket.getEventStream().listen(_handleSocketEvent);
}
List<String> get streamFeatures => _streamFeatures;
@ -295,15 +300,7 @@ class XmppConnection {
void _attemptReconnection() {
_setConnectionState(XmppConnectionState.notConnected);
_socket.close();
if (_currentBackoffAttempt == 0) {
// TODO: This may to too long
final minutes = pow(2, _currentBackoffAttempt).toInt();
_currentBackoffAttempt++;
_backoffTimer = Timer(Duration(minutes: minutes), () {
connect();
});
}
connect();
}
/// Called when a stream ending error has occurred
@ -316,7 +313,7 @@ class XmppConnection {
}
// TODO: This may be too harsh for every error
_attemptReconnection();
_reconnectionPolicy.onFailure();
}
/// Called whenever the socket creates an event
@ -892,6 +889,11 @@ class XmppConnection {
Future<void> _sendEvent(XmppEvent event) async {
_log.finest("Event: ${event.toString()}");
// Specific event handling
if (event is AckRequestResponseTimeoutEvent) {
_reconnectionPolicy.onFailure();
}
for (var manager in _xmppManagers.values) {
await manager.onXmppEvent(event);
}
@ -966,11 +968,8 @@ class XmppConnection {
if (lastResource != null) {
_resource = lastResource;
}
if (_backoffTimer != null) {
_backoffTimer!.cancel();
_backoffTimer = null;
}
_reconnectionPolicy.reset();
_resuming = true;
await _sendEvent(ConnectingEvent());
@ -993,7 +992,7 @@ class XmppConnection {
if (!result) {
handleError(null);
} else {
_currentBackoffAttempt = 0;
_reconnectionPolicy.onSuccess();
_log.fine("Preparing the internal state for a connection attempt");
_performingStartTLS = false;
_setConnectionState(XmppConnectionState.connecting);

87
lib/xmpp/reconnect.dart Normal file
View File

@ -0,0 +1,87 @@
import "dart:async";
import "dart:math";
import "package:logging/logging.dart";
import "package:meta/meta.dart";
abstract class ReconnectionPolicy {
/// Function provided by [XmppConnection] that allows the policy
/// to perform a reconnection.
void Function()? performReconnect;
/// Called by [XmppConnection] to register the policy.
void register(void Function() performReconnect) {
this.performReconnect = performReconnect;
reset();
}
/// In case the policy depends on some internal state, this state must be reset
/// to an initial state when [reset] is called. In case timers run, they must be
/// terminated.
void reset();
/// Called by the [XmppConnection] when the reconnection failed.
void onFailure();
/// Caled by the [XmppConnection] when the reconnection was successful.
void onSuccess();
}
/// A simple reconnection strategy: Make the reconnection delays exponentially longer
/// for every failed attempt.
class ExponentialBackoffReconnectionPolicy extends ReconnectionPolicy {
int _counter;
Timer? _timer;
Logger _log;
ExponentialBackoffReconnectionPolicy()
: _counter = 0,
_log = Logger("ExponentialBackoffReconnectionPolicy");
/// Called when the backoff expired
void _onTimerElapsed() {
performReconnect!();
}
@override
void reset() {
_log.finest("Resetting internal state");
_counter = 0;
if (_timer != null) {
_timer!.cancel();
_timer = null;
}
}
@override
void onFailure() {
_log.finest("Failure occured. Starting exponential backoff");
_counter++;
if (_timer != null) {
_timer!.cancel();
}
_timer = Timer(Duration(seconds: pow(2, _counter).toInt()), _onTimerElapsed);
}
@override
void onSuccess() {
reset();
}
}
/// A stub reconnection policy for tests
@visibleForTesting
class TestingReconnectionPolicy extends ReconnectionPolicy {
@override
void onSuccess() {}
@override
void onFailure() {}
@override
void reset() {}
}

View File

@ -4,6 +4,7 @@ import "package:moxxyv2/xmpp/stanza.dart";
import "package:moxxyv2/xmpp/settings.dart";
import "package:moxxyv2/xmpp/jid.dart";
import "package:moxxyv2/xmpp/connection.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/managers/attributes.dart";
import "package:moxxyv2/xmpp/managers/data.dart";
import "package:moxxyv2/xmpp/xeps/xep_0198/xep_0198.dart";
@ -45,7 +46,7 @@ XmppManagerAttributes mkAttributes(void Function(Stanza) callback) {
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("hallo@example.server/uwu"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
);
}

View File

@ -2,6 +2,7 @@ import "package:moxxyv2/xmpp/connection.dart";
import "package:moxxyv2/xmpp/jid.dart";
import "package:moxxyv2/xmpp/settings.dart";
import "package:moxxyv2/xmpp/stringxml.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/managers/attributes.dart";
import "package:moxxyv2/xmpp/xeps/xep_0280.dart";
@ -31,7 +32,7 @@ void main() {
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("bob@xmpp.example/uwu"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
);
final manager = CarbonsManager();
manager.register(attributes);

View File

@ -3,6 +3,7 @@ import "package:moxxyv2/xmpp/settings.dart";
import "package:moxxyv2/xmpp/namespaces.dart";
import "package:moxxyv2/xmpp/stringxml.dart";
import "package:moxxyv2/xmpp/jid.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/managers/attributes.dart";
import "package:moxxyv2/xmpp/xeps/xep_0352.dart";
@ -33,7 +34,7 @@ void main() {
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("some.user@example.server/aaaaa"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
)
);
@ -62,7 +63,7 @@ void main() {
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("some.user@example.server/aaaaa"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
));
csi.setActive();

View File

@ -8,6 +8,7 @@ import "package:moxxyv2/xmpp/stanza.dart";
import "package:moxxyv2/xmpp/presence.dart";
import "package:moxxyv2/xmpp/roster.dart";
import "package:moxxyv2/xmpp/events.dart";
import "package:moxxyv2/xmpp/reconnect.dart";
import "package:moxxyv2/xmpp/managers/attributes.dart";
import "package:moxxyv2/xmpp/managers/data.dart";
import "package:moxxyv2/xmpp/xeps/xep_0030/xep_0030.dart";
@ -39,7 +40,7 @@ Future<bool> testRosterManager(String bareJid, String resource, String stanzaStr
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("$bareJid/$resource"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
));
final stanza = Stanza.fromXMLNode(XMLNode.fromString(stanzaString));
@ -215,7 +216,7 @@ void main() {
]
);
// TODO: This test is broken since we query the server and enable carbons
final XmppConnection conn = XmppConnection(socket: fakeSocket);
final XmppConnection conn = XmppConnection(TestingReconnectionPolicy(), socket: fakeSocket);
conn.setConnectionSettings(ConnectionSettings(
jid: JID.fromString("polynomdivision@test.server"),
password: "aaaa",
@ -294,7 +295,7 @@ void main() {
]
);
bool receivedEvent = false;
final XmppConnection conn = XmppConnection(socket: fakeSocket);
final XmppConnection conn = XmppConnection(TestingReconnectionPolicy(), socket: fakeSocket);
conn.setConnectionSettings(ConnectionSettings(
jid: JID.fromString("polynomdivision@test.server"),
password: "aaaa",
@ -378,7 +379,7 @@ void main() {
]
);
bool receivedEvent = false;
final XmppConnection conn = XmppConnection(socket: fakeSocket);
final XmppConnection conn = XmppConnection(TestingReconnectionPolicy(), socket: fakeSocket);
conn.setConnectionSettings(ConnectionSettings(
jid: JID.fromString("polynomdivision@test.server"),
password: "aaaa",
@ -466,7 +467,7 @@ void main() {
)
]
);
final XmppConnection conn = XmppConnection(socket: fakeSocket);
final XmppConnection conn = XmppConnection(TestingReconnectionPolicy(), socket: fakeSocket);
conn.setConnectionSettings(ConnectionSettings(
jid: JID.fromString("polynomdivision@test.server"),
password: "aaaa",
@ -505,7 +506,7 @@ void main() {
isFeatureSupported: (_) => false,
getFullJID: () => JID.fromString("some.user@example.server/aaaaa"),
getSocket: () => StubTCPSocket(play: []),
getConnection: () => XmppConnection()
getConnection: () => XmppConnection(TestingReconnectionPolicy())
));
// NOTE: Based on https://gultsch.de/gajim_roster_push_and_message_interception.html