feat(core): Make the ping manager optional

This commit is contained in:
PapaTutuWawa 2023-03-29 15:25:17 +02:00
parent 4321573dfb
commit 2557a2fe5b
9 changed files with 98 additions and 94 deletions

View File

@ -77,7 +77,9 @@ class _MyHomePageState extends State<MyHomePage> {
StreamManagementManager(), StreamManagementManager(),
DiscoManager([]), DiscoManager([]),
RosterManager(TestingRosterStateManager("", [])), RosterManager(TestingRosterStateManager("", [])),
PingManager(), PingManager(
const Duration(minutes: 3),
),
MessageManager(), MessageManager(),
PresenceManager(), PresenceManager(),
]) ])

View File

@ -83,7 +83,6 @@ class XmppConnection {
ReconnectionPolicy reconnectionPolicy, ReconnectionPolicy reconnectionPolicy,
ConnectivityManager connectivityManager, ConnectivityManager connectivityManager,
this._socket, { this._socket, {
this.connectionPingDuration = const Duration(minutes: 3),
this.connectingTimeout = const Duration(minutes: 2), this.connectingTimeout = const Duration(minutes: 2),
}) : _reconnectionPolicy = reconnectionPolicy, }) : _reconnectionPolicy = reconnectionPolicy,
_connectivityManager = connectivityManager { _connectivityManager = connectivityManager {
@ -143,10 +142,6 @@ class XmppConnection {
/// UUID object to generate stanza and origin IDs /// UUID object to generate stanza and origin IDs
final Uuid _uuid = const Uuid(); final Uuid _uuid = const Uuid();
/// The time between sending a ping to keep the connection open
// TODO(Unknown): Only start the timer if we did not send a stanza after n seconds
final Duration connectionPingDuration;
/// The time that we may spent in the "connecting" state /// The time that we may spent in the "connecting" state
final Duration connectingTimeout; final Duration connectingTimeout;
@ -159,9 +154,6 @@ class XmppConnection {
/// True if we are authenticated. False if not. /// True if we are authenticated. False if not.
bool _isAuthenticated = false; bool _isAuthenticated = false;
/// Timer for the keep-alive ping.
Timer? _connectionPingTimer;
/// Timer for the connecting timeout /// Timer for the connecting timeout
Timer? _connectingTimeoutTimer; Timer? _connectingTimeoutTimer;
@ -627,22 +619,7 @@ class XmppConnection {
final oldState = _connectionState; final oldState = _connectionState;
_connectionState = state; _connectionState = state;
final sm = getNegotiatorById<StreamManagementNegotiator>(
streamManagementNegotiator,
);
await _sendEvent(
ConnectionStateChangedEvent(
state,
oldState,
sm?.isResumed ?? false,
),
);
if (state == XmppConnectionState.connected) { if (state == XmppConnectionState.connected) {
_log.finest('Starting _pingConnectionTimer');
_connectionPingTimer =
Timer.periodic(connectionPingDuration, _pingConnectionOpen);
// We are connected, so the timer can stop. // We are connected, so the timer can stop.
_destroyConnectingTimer(); _destroyConnectingTimer();
} else if (state == XmppConnectionState.connecting) { } else if (state == XmppConnectionState.connecting) {
@ -655,14 +632,18 @@ class XmppConnection {
} else { } else {
// Just make sure the connecting timeout timer is not running // Just make sure the connecting timeout timer is not running
_destroyConnectingTimer(); _destroyConnectingTimer();
// The ping timer makes no sense if we are not connected
if (_connectionPingTimer != null) {
_log.finest('Destroying _pingConnectionTimer');
_connectionPingTimer!.cancel();
_connectionPingTimer = null;
}
} }
final sm = getNegotiatorById<StreamManagementNegotiator>(
streamManagementNegotiator,
);
await _sendEvent(
ConnectionStateChangedEvent(
state,
oldState,
sm?.isResumed ?? false,
),
);
} }
/// Sets the routing state and logs the change /// Sets the routing state and logs the change
@ -682,22 +663,6 @@ class XmppConnection {
return _eventStreamController.stream.asBroadcastStream(); return _eventStreamController.stream.asBroadcastStream();
} }
/// Timer callback to prevent the connection from timing out.
Future<void> _pingConnectionOpen(Timer timer) async {
// Follow the recommendation of XEP-0198 and just request an ack. If SM is not enabled,
// send a whitespace ping
_log.finest('_pingConnectionTimer: Callback called.');
if (_connectionState == XmppConnectionState.connected) {
_log.finest('_pingConnectionTimer: Connected. Triggering a ping event.');
unawaited(_sendEvent(SendPingEvent()));
} else {
_log.finest(
'_pingConnectionTimer: Not connected. Not triggering an event.',
);
}
}
/// Iterate over [handlers] and check if the handler matches [stanza]. If it does, /// Iterate over [handlers] and check if the handler matches [stanza]. If it does,
/// call its callback and end the processing if the callback returned true; continue /// call its callback and end the processing if the callback returned true; continue
/// if it returned false. /// if it returned false.
@ -1130,10 +1095,6 @@ class XmppConnection {
_xmppManagers.containsKey(discoManager), _xmppManagers.containsKey(discoManager),
'A DiscoManager is mandatory', 'A DiscoManager is mandatory',
); );
assert(
_xmppManagers.containsKey(pingManager),
'A PingManager is mandatory',
);
} }
/// The private implementation for [XmppConnection.connect]. The parameters have /// The private implementation for [XmppConnection.connect]. The parameters have

View File

@ -26,6 +26,12 @@ class ConnectionStateChangedEvent extends XmppEvent {
final XmppConnectionState before; final XmppConnectionState before;
final XmppConnectionState state; final XmppConnectionState state;
final bool resumed; final bool resumed;
/// Indicates whether the connection state switched from a not connected state to a
/// connected state.
bool get connectionEstablished =>
before != XmppConnectionState.connected &&
state == XmppConnectionState.connected;
} }
/// Triggered when we encounter a stream error. /// Triggered when we encounter a stream error.

View File

@ -1,10 +1,23 @@
import 'dart:async';
import 'package:meta/meta.dart';
import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/managers/base.dart'; import 'package:moxxmpp/src/managers/base.dart';
import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart';
import 'package:synchronized/synchronized.dart';
/// This manager class is responsible to sending periodic pings, if required, using
/// either whitespaces or Stream Management. Keep in mind, that without
/// Stream Management, a stale connection cannot be detected.
class PingManager extends XmppManagerBase { class PingManager extends XmppManagerBase {
PingManager() : super(pingManager); PingManager(this._pingDuration) : super(pingManager);
/// The time between pings, when connected.
final Duration _pingDuration;
/// The actual timer.
Timer? _pingTimer;
final Lock _timerLock = Lock();
@override @override
Future<bool> isSupported() async => true; Future<bool> isSupported() async => true;
@ -15,39 +28,73 @@ class PingManager extends XmppManagerBase {
); );
} }
/// Cancel a potentially scheduled ping timer. Can be overriden to cancel a custom timing mechanism.
/// By default, cancels a [Timer.periodic] that was set up prior.
@visibleForOverriding
Future<void> cancelPing() async {
await _timerLock.synchronized(() {
logger.finest('Cancelling timer');
_pingTimer?.cancel();
_pingTimer = null;
});
}
/// Schedule a ping to be sent after a given amount of time. Can be overriden for custom timing mechanisms.
/// By default, uses a [Timer.periodic] timer to trigger a ping.
/// NOTE: This function is called whenever the connection is re-established. Custom
/// implementations should thus guard against multiple timers being started.
@visibleForOverriding
Future<void> schedulePing() async {
await _timerLock.synchronized(() {
logger.finest('Scheduling new timer? ${_pingTimer != null}');
_pingTimer ??= Timer.periodic(
_pingDuration,
_sendPing,
);
});
}
Future<void> _sendPing(Timer _) async {
logger.finest('Attempting to send ping');
final attrs = getAttributes();
final socket = attrs.getSocket();
if (socket.managesKeepalives()) {
logger.finest('Not sending ping as the socket manages it.');
return;
}
final stream = attrs.getManagerById(smManager) as StreamManagementManager?;
if (stream != null) {
if (stream
.isStreamManagementEnabled() /*&& stream.getUnackedStanzaCount() > 0*/) {
logger.finest('Sending an ack ping as Stream Management is enabled');
stream.sendAckRequestPing();
} else if (attrs.getSocket().whitespacePingAllowed()) {
logger.finest(
'Sending a whitespace ping as Stream Management is not enabled',
);
attrs.getConnection().sendWhitespacePing();
} else {
_logWarning();
}
} else {
if (attrs.getSocket().whitespacePingAllowed()) {
attrs.getConnection().sendWhitespacePing();
} else {
_logWarning();
}
}
}
@override @override
Future<void> onXmppEvent(XmppEvent event) async { Future<void> onXmppEvent(XmppEvent event) async {
if (event is SendPingEvent) { if (event is ConnectionStateChangedEvent) {
logger.finest('Received ping event.'); if (event.connectionEstablished) {
final attrs = getAttributes(); await schedulePing();
final socket = attrs.getSocket();
if (socket.managesKeepalives()) {
logger.finest('Not sending ping as the socket manages it.');
return;
}
final stream =
attrs.getManagerById(smManager) as StreamManagementManager?;
if (stream != null) {
if (stream
.isStreamManagementEnabled() /*&& stream.getUnackedStanzaCount() > 0*/) {
logger.finest('Sending an ack ping as Stream Management is enabled');
stream.sendAckRequestPing();
} else if (attrs.getSocket().whitespacePingAllowed()) {
logger.finest(
'Sending a whitespace ping as Stream Management is not enabled',
);
attrs.getConnection().sendWhitespacePing();
} else {
_logWarning();
}
} else { } else {
if (attrs.getSocket().whitespacePingAllowed()) { await cancelPing();
attrs.getConnection().sendWhitespacePing();
} else {
_logWarning();
}
} }
} }
} }

View File

@ -75,7 +75,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
]) ])
..setConnectionSettings( ..setConnectionSettings(

View File

@ -82,7 +82,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager(null, [])), RosterManager(TestingRosterStateManager(null, [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
conn.registerFeatureNegotiators([ conn.registerFeatureNegotiators([

View File

@ -168,7 +168,6 @@ void main() {
PresenceManager(), PresenceManager(),
MessageManager(), MessageManager(),
RosterManager(TestingRosterStateManager(null, [])), RosterManager(TestingRosterStateManager(null, [])),
PingManager(),
]); ]);
connection connection
..registerFeatureNegotiators([ ..registerFeatureNegotiators([

View File

@ -294,7 +294,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
sm, sm,
CarbonsManager()..forceEnable(), CarbonsManager()..forceEnable(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
@ -420,7 +419,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
sm, sm,
CarbonsManager()..forceEnable(), CarbonsManager()..forceEnable(),
//EntityCapabilitiesManager('http://moxxmpp.example'), //EntityCapabilitiesManager('http://moxxmpp.example'),
@ -580,7 +578,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
StreamManagementManager(), StreamManagementManager(),
]); ]);
conn.registerFeatureNegotiators([ conn.registerFeatureNegotiators([
@ -675,7 +672,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
StreamManagementManager(), StreamManagementManager(),
]); ]);
conn.registerFeatureNegotiators([ conn.registerFeatureNegotiators([
@ -767,7 +763,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
StreamManagementManager(), StreamManagementManager(),
]); ]);
conn.registerFeatureNegotiators([ conn.registerFeatureNegotiators([

View File

@ -137,7 +137,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
StreamManagementManager(), StreamManagementManager(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
@ -194,7 +193,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
conn.registerFeatureNegotiators([ conn.registerFeatureNegotiators([
@ -254,7 +252,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
EntityCapabilitiesManager('http://moxxmpp.example'), EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
conn.registerFeatureNegotiators([SaslPlainNegotiator()]); conn.registerFeatureNegotiators([SaslPlainNegotiator()]);
@ -409,7 +406,6 @@ void main() {
PresenceManager(), PresenceManager(),
RosterManager(TestingRosterStateManager('', [])), RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]), DiscoManager([]),
PingManager(),
]); ]);
conn conn
..registerFeatureNegotiators([ ..registerFeatureNegotiators([