fix(core,xep): Fix multiple issues

- Fix a deadlock in the RandomBackoffReconnectionPolicy on failure
- Fix enablement of stream management happening too early
This commit is contained in:
PapaTutuWawa 2023-04-07 15:25:50 +02:00
parent c88ab940c4
commit c1ad646905
6 changed files with 88 additions and 52 deletions

View File

@ -10,6 +10,7 @@ import 'package:moxxmpp/src/errors.dart';
import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/handlers/base.dart';
import 'package:moxxmpp/src/iq.dart';
import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/managers/attributes.dart';
import 'package:moxxmpp/src/managers/base.dart';
import 'package:moxxmpp/src/managers/data.dart';
@ -140,6 +141,8 @@ class XmppConnection {
RoutingState _routingState = RoutingState.preConnection;
/// The currently bound resource or '' if none has been bound yet.
/// NOTE: A Using the empty string is okay since RFC7622 says that
/// the resource MUST NOT be zero octets.
String _resource = '';
String get resource => _resource;
@ -170,6 +173,13 @@ class XmppConnection {
bool get isAuthenticated => _isAuthenticated;
/// Returns the JID we authenticate with and add the resource that we have bound.
JID _getJidWithResource() {
assert(_resource.isNotEmpty, 'The resource must not be empty');
return connectionSettings.jid.withResource(_resource);
}
/// Registers a list of [XmppManagerBase] sub-classes as managers on this connection.
Future<void> registerManagers(List<XmppManagerBase> managers) async {
for (final manager in managers) {
@ -181,7 +191,7 @@ class XmppConnection {
sendEvent: _sendEvent,
getConnectionSettings: () => connectionSettings,
getManagerById: getManagerById,
getFullJID: () => connectionSettings.jid.withResource(_resource),
getFullJID: _getJidWithResource,
getSocket: () => _socket,
getConnection: () => this,
getNegotiatorById: _negotiationsHandler.getNegotiatorById,
@ -232,7 +242,7 @@ class XmppConnection {
_sendEvent,
_negotiationsHandler.getNegotiatorById,
getManagerById,
() => connectionSettings.jid.withResource(_resource),
_getJidWithResource,
() => _socket,
() => _isAuthenticated,
_setAuthenticated,
@ -295,8 +305,11 @@ class XmppConnection {
// Connect again
// ignore: cascade_invocations
_log.finest('Calling _connectImpl() from _attemptReconnection');
await _connectImpl(
waitForConnection: true,
unawaited(
_connectImpl(
waitForConnection: true,
shouldReconnect: false,
),
);
}
@ -431,7 +444,7 @@ class XmppConnection {
case StanzaFromType.full:
{
stanza_ = stanza_.copyWith(
from: connectionSettings.jid.withResource(_resource).toString(),
from: _getJidWithResource().toString(),
);
}
break;
@ -840,7 +853,6 @@ class XmppConnection {
/// The private implementation for [XmppConnection.connect]. The parameters have
/// the same meaning as with [XmppConnection.connect].
Future<Result<bool, XmppError>> _connectImpl({
String? lastResource,
bool waitForConnection = false,
bool shouldReconnect = true,
bool waitUntilLogin = false,
@ -863,11 +875,9 @@ class XmppConnection {
_connectionCompleter = Completer();
}
if (lastResource != null) {
setResource(lastResource, triggerEvent: false);
} else {
setResource('', triggerEvent: false);
}
// Reset the resource. If we use stream resumption from XEP-0198, then the
// manager will set it on successful resumption.
setResource('', triggerEvent: false);
// If requested, wait until we have a network connection
if (waitForConnection) {
@ -914,9 +924,6 @@ class XmppConnection {
/// Start the connection process using the provided connection settings.
///
/// If [lastResource] is set, then its value is used as the connection's resource.
/// Useful for stream resumption.
///
/// [shouldReconnect] indicates whether the reconnection attempts should be
/// automatically performed after a fatal failure of any kind occurs.
///
@ -931,14 +938,12 @@ class XmppConnection {
/// [enableReconnectOnSuccess] indicates that automatic reconnection is to be
/// enabled once the connection has been successfully established.
Future<Result<bool, XmppError>> connect({
String? lastResource,
bool? shouldReconnect,
bool waitForConnection = false,
bool waitUntilLogin = false,
bool enableReconnectOnSuccess = true,
}) async {
final result = _connectImpl(
lastResource: lastResource,
shouldReconnect: shouldReconnect ?? !waitUntilLogin,
waitForConnection: waitForConnection,
waitUntilLogin: waitUntilLogin,

View File

@ -111,31 +111,47 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
/// Called when the backoff expired
@visibleForTesting
Future<void> onTimerElapsed() async {
_log.fine('Timer elapsed. Waiting for lock...');
await _timerLock.synchronized(() async {
_log.finest('Timer elapsed. Waiting for lock...');
final shouldContinue = await _timerLock.synchronized(() async {
_log.finest('Timer lock aquired');
if (_timer == null) {
_log.finest(
'The timer is already set to null. Doing nothing.',
);
return false;
}
if (!(await getIsReconnecting())) {
return;
return false;
}
if (!(await getShouldReconnect())) {
_log.fine(
_log.finest(
'Should not reconnect. Stopping here.',
);
return;
return false;
}
_log.fine('Triggering reconnect');
_timer?.cancel();
_timer = null;
await performReconnect!();
return true;
});
if (!shouldContinue) {
return;
}
_log.fine('Triggering reconnect');
await performReconnect!();
}
@override
Future<void> reset() async {
_log.finest('Resetting internal state');
_timer?.cancel();
_timer = null;
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = null;
});
await super.reset();
}
@ -144,9 +160,11 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
final seconds =
Random().nextInt(_maxBackoffTime - _minBackoffTime) + _minBackoffTime;
_log.finest('Failure occured. Starting random backoff with ${seconds}s');
_timer?.cancel();
_timer = Timer(Duration(seconds: seconds), onTimerElapsed);
await _timerLock.synchronized(() {
_timer?.cancel();
_timer = Timer(Duration(seconds: seconds), onTimerElapsed);
});
}
@override

View File

@ -1,6 +1,5 @@
import 'package:collection/collection.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/namespaces.dart';
@ -60,11 +59,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
bool _inlineStreamEnablementRequested = false;
/// Cached resource for stream resumption
String _resource = '';
@visibleForTesting
void setResource(String resource) {
_resource = resource;
}
String resource = '';
@override
bool canInlineFeature(List<XMLNode> features) {
@ -90,7 +85,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
@override
Future<void> onXmppEvent(XmppEvent event) async {
if (event is ResourceBoundEvent) {
_resource = event.resource;
resource = event.resource;
}
}
@ -100,7 +95,9 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
if (sm.state.streamResumptionId != null && !_resumeFailed) {
// We could do Stream resumption
return super.matchesFeature(features) && attributes.isAuthenticated();
return super.matchesFeature(features) &&
attributes.isAuthenticated() &&
resource.isNotEmpty;
} else {
// We cannot do a stream resumption
return super.matchesFeature(features) &&
@ -122,23 +119,23 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
_resumeFailed = true;
_isResumed = false;
_state = _StreamManagementNegotiatorState.ready;
resource = '';
attributes.setResource('', triggerEvent: false);
}
Future<void> _onStreamResumptionSuccessful(XMLNode resumed) async {
assert(resumed.tag == 'resumed', 'The correct element must be passed');
assert(
resource.isNotEmpty,
'The Stream Management Negotiator must know of the previous resource',
);
final h = int.parse(resumed.attributes['h']! as String);
await attributes.sendEvent(StreamResumedEvent(h: h));
_resumeFailed = false;
_isResumed = true;
if (attributes.getConnection().resource.isEmpty && _resource.isNotEmpty) {
attributes.setResource(_resource);
} else if (attributes.getConnection().resource.isNotEmpty &&
_resource.isEmpty) {
_resource = attributes.getConnection().resource;
}
attributes.setResource(resource);
}
Future<void> _onStreamEnablementSuccessful(XMLNode enabled) async {
@ -152,7 +149,7 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
await attributes.sendEvent(
StreamManagementEnabledEvent(
resource: attributes.getFullJID().resource,
resource: resource,
id: id,
location: enabled.attributes['location'] as String?,
),
@ -197,10 +194,12 @@ class StreamManagementNegotiator extends Sasl2FeatureNegotiator
_log.finest('Stream Management resumption successful');
assert(
attributes.getFullJID().resource != '',
resource.isNotEmpty,
'Resume only works when we already have a resource bound and know about it',
);
// TODO(Unknown): Don't do this here. We trigger an event that the CSIManager
// can consume.
final csi = attributes.getManagerById(csiManager) as CSIManager?;
if (csi != null) {
csi.restoreCSIState();

View File

@ -42,7 +42,6 @@ const _doNotEncryptList = [
DoNotEncrypt('stanza-id', stableIdXmlns),
];
@mustCallSuper
abstract class BaseOmemoManager extends XmppManagerBase {
BaseOmemoManager() : super(omemoManager);

View File

@ -1,3 +1,4 @@
import 'dart:async';
import 'package:moxxmpp/moxxmpp.dart';
import 'package:test/test.dart';
import 'helpers/logging.dart';
@ -63,6 +64,8 @@ void main() {
counter++;
});
await policy.setShouldReconnect(true);
// ignore: avoid_print
print('policy.setShouldReconnect(true) done');
// We have a failure
expect(
@ -70,9 +73,13 @@ void main() {
true,
);
await policy.onFailure();
// ignore: avoid_print
print('policy.onFailure() done');
await policy.onTimerElapsed();
await policy.onTimerElapsed();
unawaited(policy.onTimerElapsed());
unawaited(policy.onTimerElapsed());
await Future<void>.delayed(const Duration(seconds: 3));
expect(counter, 1);
});
}

View File

@ -514,7 +514,7 @@ void main() {
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
StreamManagementNegotiator(),
StreamManagementNegotiator()..resource = 'test-resource',
]);
await conn.getManagerById<StreamManagementManager>(smManager)!.setState(
StreamManagementState(
@ -536,6 +536,15 @@ void main() {
.isStreamManagementEnabled(),
true,
);
expect(conn.resource, 'MU29eEZn');
expect(
conn
.getNegotiatorById<StreamManagementNegotiator>(
streamManagementNegotiator,
)!
.resource,
'MU29eEZn',
);
});
test('Test a successful stream resumption', () async {
@ -604,7 +613,7 @@ void main() {
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
StreamManagementNegotiator(),
StreamManagementNegotiator()..resource = 'abc123',
]);
await conn.getManagerById<StreamManagementManager>(smManager)!.setState(
StreamManagementState(
@ -615,7 +624,6 @@ void main() {
);
await conn.connect(
lastResource: 'abc123',
waitUntilLogin: true,
);
expect(fakeSocket.getState(), 4);
@ -690,7 +698,7 @@ void main() {
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
StreamManagementNegotiator()..setResource('test-resource'),
StreamManagementNegotiator()..resource = 'test-resource',
Sasl2Negotiator(
userAgent: const UserAgent(
id: 'd4565fa7-4d72-4749-b3d3-740edbf87770',
@ -787,7 +795,7 @@ void main() {
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
ResourceBindingNegotiator(),
StreamManagementNegotiator()..setResource('test-resource'),
StreamManagementNegotiator()..resource = 'test-resource',
Bind2Negotiator(),
Sasl2Negotiator(
userAgent: const UserAgent(