fix(xep): Fix resend behaviour leading to period disconnects

It seems that we were expecting acks "in the future" for old stanzas.
Also, this commit should prevent E2EE implementations from re-encrypting
resent stanzas.

Fixes #38.
This commit is contained in:
PapaTutuWawa 2023-06-19 22:54:53 +02:00
parent 4f9a0605c7
commit 2db44e2f51
6 changed files with 138 additions and 26 deletions

View File

@ -28,7 +28,6 @@ import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/util/queue.dart';
import 'package:moxxmpp/src/util/typed_map.dart';
import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart';
import 'package:moxxmpp/src/xeps/xep_0198/types.dart';
import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart';
import 'package:moxxmpp/src/xeps/xep_0352.dart';
import 'package:synchronized/synchronized.dart';
@ -533,15 +532,14 @@ class XmppConnection {
// Run post-send handlers
_log.fine('Running post stanza handlers..');
final extensions = TypedMap<StanzaHandlerExtension>()
..set(StreamManagementData(details.excludeFromStreamManagement));
await _runOutgoingPostStanzaHandlers(
newStanza,
initial: StanzaHandlerData(
false,
false,
newStanza,
extensions,
details.postSendExtensions ?? TypedMap<StanzaHandlerExtension>(),
encrypted: data.encrypted,
),
);
_log.fine('Done');

View File

@ -11,6 +11,8 @@ import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/util/typed_map.dart';
import 'package:moxxmpp/src/xeps/xep_0198/types.dart';
/// A function that will be called when presence, outside of subscription request
/// management, will be sent. Useful for managers that want to add [XMLNode]s to said
@ -156,7 +158,9 @@ class PresenceManager extends XmppManagerBase {
),
awaitable: false,
bypassQueue: true,
excludeFromStreamManagement: true,
postSendExtensions: TypedMap<StanzaHandlerExtension>.fromList([
const StreamManagementData(true, null),
]),
),
);
}

View File

@ -1,5 +1,7 @@
import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/util/typed_map.dart';
/// A description of a stanza to send.
class StanzaDetails {
@ -11,7 +13,7 @@ class StanzaDetails {
this.encrypted = false,
this.forceEncryption = false,
this.bypassQueue = false,
this.excludeFromStreamManagement = false,
this.postSendExtensions,
});
/// The stanza to send.
@ -42,7 +44,7 @@ class StanzaDetails {
/// This makes the Stream Management implementation, when available, ignore the stanza,
/// meaning that it gets counted but excluded from resending.
/// This should never have to be set to true.
final bool excludeFromStreamManagement;
final TypedMap<StanzaHandlerExtension>? postSendExtensions;
}
/// A simple description of the <error /> element that may be inside a stanza

View File

@ -1,8 +1,39 @@
import 'package:meta/meta.dart';
import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/stanza.dart';
class StreamManagementData implements StanzaHandlerExtension {
const StreamManagementData(this.exclude);
const StreamManagementData(this.exclude, this.queueId);
/// Whether the stanza should be exluded from the StreamManagement's resend queue.
final bool exclude;
/// The ID to use when queuing the stanza.
final int? queueId;
/// If we resend a stanza, then we will have [queueId] set, so we should skip
/// incrementing the C2S counter.
bool get shouldCountStanza => queueId == null;
}
/// A queue element for keeping track of stanzas to (potentially) resend.
@immutable
class SMQueueEntry {
const SMQueueEntry(this.stanza, this.encrypted);
/// The actual stanza.
final Stanza stanza;
/// Flag indicating whether the stanza was encrypted before sending.
final bool encrypted;
@override
bool operator ==(Object other) {
return other is SMQueueEntry &&
other.stanza == stanza &&
other.encrypted == encrypted;
}
@override
int get hashCode => stanza.hashCode ^ encrypted.hashCode;
}

View File

@ -11,6 +11,7 @@ import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/util/typed_map.dart';
import 'package:moxxmpp/src/xeps/xep_0198/errors.dart';
import 'package:moxxmpp/src/xeps/xep_0198/negotiator.dart';
import 'package:moxxmpp/src/xeps/xep_0198/nonzas.dart';
@ -28,7 +29,7 @@ class StreamManagementManager extends XmppManagerBase {
}) : super(smManager);
/// The queue of stanzas that are not (yet) acked
final Map<int, Stanza> _unackedStanzas = {};
final Map<int, SMQueueEntry> _unackedStanzas = {};
/// Commitable state of the StreamManagementManager
StreamManagementState _state = StreamManagementState(0, 0);
@ -60,8 +61,8 @@ class StreamManagementManager extends XmppManagerBase {
final Lock _ackLock = Lock();
/// Functions for testing
@visibleForTesting
Map<int, Stanza> getUnackedStanzas() => _unackedStanzas;
/// @visibleForTesting
Map<int, SMQueueEntry> getUnackedStanzas() => _unackedStanzas;
@visibleForTesting
Future<int> getPendingAcks() async {
@ -306,6 +307,10 @@ class StreamManagementManager extends XmppManagerBase {
if (_pendingAcks > 0) {
// Prevent diff from becoming negative
final diff = max(_state.c2s - h, 0);
logger.finest(
'Setting _pendingAcks to $diff (was $_pendingAcks before): max(${_state.c2s} - $h, 0)',
);
_pendingAcks = diff;
// Reset the timer
@ -336,15 +341,18 @@ class StreamManagementManager extends XmppManagerBase {
final attrs = getAttributes();
final sequences = _unackedStanzas.keys.toList()..sort();
for (final height in sequences) {
logger.finest('Unacked stanza: height $height, h $h');
// Do nothing if the ack does not concern this stanza
if (height > h) continue;
final stanza = _unackedStanzas[height]!;
logger.finest('Removing stanza with height $height');
final entry = _unackedStanzas[height]!;
_unackedStanzas.remove(height);
// Create a StanzaAckedEvent if the stanza is correct
if (shouldTriggerAckedEvent(stanza)) {
attrs.sendEvent(StanzaAckedEvent(stanza));
if (shouldTriggerAckedEvent(entry.stanza)) {
attrs.sendEvent(StanzaAckedEvent(entry.stanza));
}
}
@ -401,13 +409,29 @@ class StreamManagementManager extends XmppManagerBase {
StanzaHandlerData state,
) async {
if (isStreamManagementEnabled()) {
await _incrementC2S();
final smData = state.extensions.get<StreamManagementData>();
logger.finest('Should count stanza: ${smData?.shouldCountStanza}');
if (smData?.shouldCountStanza ?? true) {
await _incrementC2S();
}
if (state.extensions.get<StreamManagementData>()?.exclude ?? false) {
if (smData?.exclude ?? false) {
return state;
}
_unackedStanzas[_state.c2s] = stanza;
int queueId;
if (smData?.queueId != null) {
logger.finest('Reusing queue id ${smData!.queueId}');
queueId = smData.queueId!;
} else {
queueId = await _stateLock.synchronized(() => _state.c2s);
}
_unackedStanzas[queueId] = SMQueueEntry(
stanza,
// Prevent an E2EE message being encrypted again
state.encrypted,
);
await _sendAckRequest();
}
@ -415,16 +439,23 @@ class StreamManagementManager extends XmppManagerBase {
}
Future<void> _resendStanzas() async {
final stanzas = _unackedStanzas.values.toList();
_unackedStanzas.clear();
for (final stanza in stanzas) {
logger
.finest('Resending ${stanza.tag} with id ${stanza.attributes["id"]}');
final queueCopy = _unackedStanzas.entries.toList();
for (final entry in queueCopy) {
logger.finest(
'Resending ${entry.value.stanza.tag} with id ${entry.value.stanza.attributes["id"]}',
);
await getAttributes().sendStanza(
StanzaDetails(
stanza,
entry.value.stanza,
postSendExtensions: TypedMap<StanzaHandlerExtension>.fromList([
StreamManagementData(
false,
entry.key,
),
]),
awaitable: false,
// Prevent an E2EE message being encrypted again
encrypted: entry.value.encrypted,
),
);
}

View File

@ -1,5 +1,6 @@
import 'dart:async';
import 'package:moxxmpp/moxxmpp.dart';
import 'package:moxxmpp/src/xeps/xep_0198/types.dart';
import 'package:test/test.dart';
import '../helpers/logging.dart';
import '../helpers/xmpp.dart';
@ -42,10 +43,10 @@ Future<void> runOutgoingStanzaHandlers(
}
}
XmppManagerAttributes mkAttributes(void Function(Stanza) callback) {
XmppManagerAttributes mkAttributes(void Function(StanzaDetails) callback) {
return XmppManagerAttributes(
sendStanza: (StanzaDetails details) async {
callback(details.stanza);
callback(details);
return Stanza.message();
},
@ -1069,4 +1070,49 @@ void main() {
expect(smn.streamEnablementFailed, true);
expect(conn.resource, 'test-resource');
});
test('Test resending state changes', () async {
final manager = StreamManagementManager();
final attributes = mkAttributes((details) async {
for (final handler in manager.getOutgoingPostStanzaHandlers()) {
await handler.callback(
details.stanza,
StanzaHandlerData(
false,
false,
stanza,
details.postSendExtensions ?? TypedMap(),
),
);
}
});
manager.register(attributes);
await manager.onXmppEvent(
StreamManagementEnabledEvent(resource: 'hallo'),
);
// Send a stanza 5 times
for (var i = 0; i < 5; i++) {
await runOutgoingStanzaHandlers(manager, stanza);
}
// <a h='3'/>
await manager.runNonzaHandlers(mkAck(3));
//expect(manager.getUnackedStanzas().length, 2);
final oldC2s = manager.state.c2s;
final oldQueue = Map<int, SMQueueEntry>.from(manager.getUnackedStanzas());
// Disconnect and reconnect
await manager.onXmppEvent(ConnectingEvent());
await manager.onXmppEvent(StreamResumedEvent(h: 3));
expect(manager.state.c2s, oldC2s);
expect(manager.getUnackedStanzas(), oldQueue);
// Now they get acked
await manager.runNonzaHandlers(mkAck(5));
expect(manager.getUnackedStanzas().length, 0);
expect(manager.state.c2s, 5);
});
}