feat(core): Implement the send queue

This commit is contained in:
PapaTutuWawa 2023-05-24 13:34:36 +02:00
parent b1da6e5a53
commit bd4e1d28ea
3 changed files with 313 additions and 65 deletions

View File

@ -26,6 +26,7 @@ import 'package:moxxmpp/src/socket.dart';
import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/types/result.dart'; import 'package:moxxmpp/src/types/result.dart';
import 'package:moxxmpp/src/util/queue.dart';
import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart';
import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart';
import 'package:moxxmpp/src/xeps/xep_0352.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart';
@ -92,6 +93,11 @@ class XmppConnection {
// TODO(Unknown): Handle on done // TODO(Unknown): Handle on done
_socketStream.transform(_streamParser).forEach(handleXmlStream); _socketStream.transform(_streamParser).forEach(handleXmlStream);
_socket.getEventStream().listen(_handleSocketEvent); _socket.getEventStream().listen(_handleSocketEvent);
_stanzaQueue = AsyncStanzaQueue(
_sendStanzaImpl,
_canSendData,
);
} }
/// The state that the connection is currently in /// The state that the connection is currently in
@ -175,6 +181,8 @@ class XmppConnection {
bool get isAuthenticated => _isAuthenticated; bool get isAuthenticated => _isAuthenticated;
late final AsyncStanzaQueue _stanzaQueue;
/// Returns the JID we authenticate with and add the resource that we have bound. /// Returns the JID we authenticate with and add the resource that we have bound.
JID _getJidWithResource() { JID _getJidWithResource() {
assert(_resource.isNotEmpty, 'The resource must not be empty'); assert(_resource.isNotEmpty, 'The resource must not be empty');
@ -412,6 +420,136 @@ class XmppConnection {
.contains(await getConnectionState()); .contains(await getConnectionState());
} }
Future<Future<XMLNode>?> sendStanza2(StanzaDetails details) async {
assert(
implies(
details.awaitable,
details.stanza.id != null && details.stanza.id!.isNotEmpty ||
details.addId),
'An awaitable stanza must have an id',
);
final completer = details.awaitable ? Completer<XMLNode>() : null;
await _stanzaQueue.enqueueStanza(
StanzaQueueEntry(
details,
completer,
),
);
return completer?.future;
}
Future<void> _sendStanzaImpl(StanzaQueueEntry entry) async {
final details = entry.details;
var newStanza = details.stanza;
// Generate an id, if requested
if (details.addId && (newStanza.id == null || newStanza.id == '')) {
newStanza = newStanza.copyWith(id: generateId());
}
// Add a from type, if requested
if (details.addFrom != StanzaFromType.none &&
(newStanza.from == null || newStanza.from == '')) {
switch (details.addFrom) {
case StanzaFromType.full:
newStanza = newStanza.copyWith(
from: _getJidWithResource().toString(),
);
break;
case StanzaFromType.bare:
newStanza = newStanza.copyWith(
from: connectionSettings.jid.toBare().toString(),
);
break;
case StanzaFromType.none:
// NOOP
break;
}
}
// Add the correct stanza namespace
newStanza = newStanza.copyWith(
xmlns: _negotiationsHandler.getStanzaNamespace(),
);
// Run pre-send handlers
_log.fine('Running pre stanza handlers..');
final data = await _runOutgoingPreStanzaHandlers(
newStanza,
initial: StanzaHandlerData(
false,
false,
null,
newStanza,
encrypted: details.encrypted,
forceEncryption: details.forceEncryption,
),
);
_log.fine('Done');
// Cancel sending, if the pre-send handlers indicated it.
if (data.cancel) {
_log.fine('A stanza handler indicated that it wants to cancel sending.');
await _sendEvent(StanzaSendingCancelledEvent(data));
// Resolve the future, if one was given.
if (details.awaitable) {
entry.completer!.complete(
Stanza(
tag: data.stanza.tag,
to: data.stanza.from,
from: data.stanza.to,
attributes: <String, String>{
'type': 'error',
if (data.stanza.id != null) 'id': data.stanza.id!,
},
),
);
}
return;
}
// Log the (raw) stanza
final prefix = data.encrypted ? '(Encrypted) ' : '';
_log.finest('==> $prefix${newStanza.toXml()}');
if (details.awaitable) {
await _stanzaAwaiter
.addPending(
// A stanza with no to attribute is for direct processing by the server. As such,
// we can correlate it by just *assuming* we have that attribute
// (RFC 6120 Section 8.1.1.1)
data.stanza.to ?? connectionSettings.jid.toBare().toString(),
data.stanza.id!,
data.stanza.tag,
)
.then((result) {
entry.completer!.complete(result);
});
}
if (await _canSendData()) {
_socket.write(data.stanza.toXml());
} else {
_log.fine('Not sending dat as _canSendData() returned false.');
}
// Run post-send handlers
_log.fine('Running post stanza handlers..');
await _runOutgoingPostStanzaHandlers(
newStanza,
initial: StanzaHandlerData(
false,
false,
null,
newStanza,
),
);
_log.fine('Done');
}
/// Sends a [stanza] to the server. If stream management is enabled, then keeping track /// Sends a [stanza] to the server. If stream management is enabled, then keeping track
/// of the stanza is taken care of. Returns a Future that resolves when we receive a /// of the stanza is taken care of. Returns a Future that resolves when we receive a
/// response to the stanza. /// response to the stanza.
@ -487,11 +625,7 @@ class XmppConnection {
from: data.stanza.to, from: data.stanza.to,
attributes: <String, String>{ attributes: <String, String>{
'type': 'error', 'type': 'error',
...data.stanza.id != null if (data.stanza.id != null) 'id': data.stanza.id!,
? {
'id': data.stanza.id!,
}
: {},
}, },
); );
} }

View File

@ -1,37 +1,96 @@
import 'dart:async'; import 'dart:async';
import 'dart:collection'; import 'dart:collection';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/connection.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
/// A job to be submitted to an [AsyncQueue]. class StanzaDetails {
typedef AsyncQueueJob = Future<void> Function(); const StanzaDetails(
this.stanza, {
this.addFrom = StanzaFromType.full,
this.addId = true,
this.awaitable = true,
this.encrypted = false,
this.forceEncryption = false,
});
/// The stanza to send.
final Stanza stanza;
/// How to set the "from" attribute of the stanza.
final StanzaFromType addFrom;
/// Flag indicating whether a stanza id should be added before sending.
final bool addId;
/// Track the stanza to allow awaiting its response.
final bool awaitable;
final bool encrypted;
final bool forceEncryption;
}
class StanzaQueueEntry {
const StanzaQueueEntry(
this.details,
this.completer,
);
/// The actual data to send.
final StanzaDetails details;
/// The [Completer] to resolve when the response is received.
final Completer<XMLNode>? completer;
}
/// A function that is executed when a job is popped from the queue.
typedef SendStanzaFunction = Future<void> Function(StanzaQueueEntry);
/// A function that is called before popping a queue item. Should return true when
/// the [SendStanzaFunction] can be executed.
typedef CanSendCallback = Future<bool> Function();
/// A (hopefully) async-safe queue that attempts to force /// A (hopefully) async-safe queue that attempts to force
/// in-order execution of its jobs. /// in-order execution of its jobs.
class AsyncQueue { class AsyncStanzaQueue {
/// The lock for accessing [AsyncQueue._lock] and [AsyncQueue._running]. AsyncStanzaQueue(
this._sendStanzaFunction,
this._canSendCallback,
);
/// The lock for accessing [AsyncStanzaQueue._lock] and [AsyncStanzaQueue._running].
final Lock _lock = Lock(); final Lock _lock = Lock();
/// The actual job queue. /// The actual job queue.
final Queue<AsyncQueueJob> _queue = Queue<AsyncQueueJob>(); final Queue<StanzaQueueEntry> _queue = Queue<StanzaQueueEntry>();
/// Sends the stanza when we can pop from the queue.
final SendStanzaFunction _sendStanzaFunction;
final CanSendCallback _canSendCallback;
/// Indicates whether we are currently executing a job. /// Indicates whether we are currently executing a job.
bool _running = false; bool _running = false;
@visibleForTesting @visibleForTesting
Queue<AsyncQueueJob> get queue => _queue; Queue<StanzaQueueEntry> get queue => _queue;
@visibleForTesting @visibleForTesting
bool get isRunning => _running; bool get isRunning => _running;
/// Adds a job [job] to the queue. /// Adds a job [entry] to the queue.
Future<void> addJob(AsyncQueueJob job) async { Future<void> enqueueStanza(StanzaQueueEntry entry) async {
await _lock.synchronized(() { await _lock.synchronized(() async {
_queue.add(job); _queue.add(entry);
if (!_running && _queue.isNotEmpty) { if (!_running && _queue.isNotEmpty && await _canSendCallback()) {
_running = true; _running = true;
unawaited(_popJob()); unawaited(
_runJob(_queue.removeFirst()),
);
} }
}); });
} }
@ -40,17 +99,30 @@ class AsyncQueue {
await _lock.synchronized(_queue.clear); await _lock.synchronized(_queue.clear);
} }
Future<void> _popJob() async { Future<void> _runJob(StanzaQueueEntry details) async {
final job = _queue.removeFirst(); await _sendStanzaFunction(details);
final future = job();
await future;
await _lock.synchronized(() { await _lock.synchronized(() async {
if (_queue.isNotEmpty) { if (_queue.isNotEmpty && await _canSendCallback()) {
unawaited(_popJob()); unawaited(
_runJob(_queue.removeFirst()),
);
} else { } else {
_running = false; _running = false;
} }
}); });
} }
Future<void> restart() async {
if (!(await _canSendCallback())) return;
await _lock.synchronized(() {
if (_queue.isNotEmpty) {
_running = true;
unawaited(
_runJob(_queue.removeFirst()),
);
}
});
}
} }

View File

@ -1,58 +1,100 @@
import 'package:moxxmpp/moxxmpp.dart';
import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/queue.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
void main() { void main() {
test('Test the async queue', () async { test('Test not sending', () async {
final queue = AsyncQueue(); final queue = AsyncStanzaQueue(
var future1Finish = 0; (entry) async {
var future2Finish = 0; assert(false, 'No stanza should be sent');
var future3Finish = 0; },
() async => false,
);
await queue.addJob( await queue.enqueueStanza(
() => Future<void>.delayed( StanzaQueueEntry(
const Duration(seconds: 3), StanzaDetails(
() => future1Finish = DateTime.now().millisecondsSinceEpoch, Stanza.message(),
),
null,
), ),
); );
await queue.addJob( await queue.enqueueStanza(
() => Future<void>.delayed( StanzaQueueEntry(
const Duration(seconds: 3), StanzaDetails(
() => future2Finish = DateTime.now().millisecondsSinceEpoch, Stanza.message(),
), ),
); null,
await queue.addJob(
() => Future<void>.delayed(
const Duration(seconds: 3),
() => future3Finish = DateTime.now().millisecondsSinceEpoch,
), ),
); );
await Future<void>.delayed(const Duration(seconds: 12)); await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 2);
expect(queue.isRunning, false);
});
// The three futures must be done test('Test sending', () async {
expect(future1Finish != 0, true); final queue = AsyncStanzaQueue(
expect(future2Finish != 0, true); (entry) async {},
expect(future3Finish != 0, true); () async => true,
// The end times of the futures must be ordered (on a timeline)
// |-- future1Finish -- future2Finish -- future3Finish --|
expect(
future1Finish < future2Finish && future1Finish < future3Finish,
true,
);
expect(
future2Finish < future3Finish && future2Finish > future1Finish,
true,
);
expect(
future3Finish > future1Finish && future3Finish > future2Finish,
true,
); );
// The queue must be empty at the end await queue.enqueueStanza(
expect(queue.queue.isEmpty, true); StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
// The queue must not be executing anything at the end await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 0);
expect(queue.isRunning, false);
});
test('Test partial sending and resuming', () async {
var canRun = true;
final queue = AsyncStanzaQueue(
(entry) async {
canRun = false;
},
() async => canRun,
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 1);
expect(queue.isRunning, false);
canRun = true;
await queue.restart();
await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 0);
expect(queue.isRunning, false); expect(queue.isRunning, false);
}); });
} }