fix(tests): Fix tests
This commit is contained in:
parent
3a94dd9634
commit
49d3c6411b
@ -1,6 +1,5 @@
|
||||
import 'dart:async';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:moxxmpp/src/jid.dart';
|
||||
import 'package:moxxmpp/src/stringxml.dart';
|
||||
import 'package:synchronized/synchronized.dart';
|
||||
|
||||
@ -11,7 +10,7 @@ class _StanzaSurrogateKey {
|
||||
|
||||
/// The JID the original stanza was sent to. We expect the result to come from the
|
||||
/// same JID.
|
||||
final String sentTo;
|
||||
final String? sentTo;
|
||||
|
||||
/// The ID of the original stanza. We expect the result to have the same ID.
|
||||
final String id;
|
||||
@ -52,7 +51,7 @@ class StanzaAwaiter {
|
||||
/// [tag] is the stanza's tag name.
|
||||
///
|
||||
/// Returns a future that might resolve to the response to the stanza.
|
||||
Future<Future<XMLNode>> addPending(String to, String id, String tag) async {
|
||||
Future<Future<XMLNode>> addPending(String? to, String id, String tag) async {
|
||||
final completer = await _lock.synchronized(() {
|
||||
final completer = Completer<XMLNode>();
|
||||
_pending[_StanzaSurrogateKey(to, id, tag)] = completer;
|
||||
@ -62,20 +61,15 @@ class StanzaAwaiter {
|
||||
return completer.future;
|
||||
}
|
||||
|
||||
/// Checks if the stanza [stanza] is being awaited. [bareJid] is the bare JID of
|
||||
/// the connection.
|
||||
/// Checks if the stanza [stanza] is being awaited.
|
||||
/// If [stanza] is awaited, resolves the future and returns true. If not, returns
|
||||
/// false.
|
||||
Future<bool> onData(XMLNode stanza, JID bareJid) async {
|
||||
assert(bareJid.isBare(), 'bareJid must be bare');
|
||||
|
||||
Future<bool> onData(XMLNode stanza) async {
|
||||
final id = stanza.attributes['id'] as String?;
|
||||
if (id == null) return false;
|
||||
|
||||
final key = _StanzaSurrogateKey(
|
||||
// Section 8.1.2.1 § 3 of RFC 6120 says that an empty "from" indicates that the
|
||||
// attribute is implicitly from our own bare JID.
|
||||
stanza.attributes['from'] as String? ?? bareJid.toString(),
|
||||
stanza.attributes['from'] as String?,
|
||||
id,
|
||||
stanza.tag,
|
||||
);
|
||||
@ -91,4 +85,19 @@ class StanzaAwaiter {
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
/// Checks if [stanza] represents a stanza that is awaited. Returns true, if [stanza]
|
||||
/// is awaited. False, if not.
|
||||
Future<bool> isAwaited(XMLNode stanza) async {
|
||||
final id = stanza.attributes['id'] as String?;
|
||||
if (id == null) return false;
|
||||
|
||||
final key = _StanzaSurrogateKey(
|
||||
stanza.attributes['from'] as String?,
|
||||
id,
|
||||
stanza.tag,
|
||||
);
|
||||
|
||||
return _lock.synchronized(() => _pending.containsKey(key));
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ class XmppConnection {
|
||||
},
|
||||
);
|
||||
|
||||
_incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream);
|
||||
_incomingStanzaQueue = IncomingStanzaQueue(handleXmlStream, _stanzaAwaiter);
|
||||
_socketStream = _socket.getDataStream();
|
||||
// TODO(Unknown): Handle on done
|
||||
_socketStream
|
||||
@ -531,12 +531,15 @@ class XmppConnection {
|
||||
_log.finest('==> $prefix${newStanza.toXml()}');
|
||||
|
||||
if (details.awaitable) {
|
||||
final isOwnJid =
|
||||
data.stanza.to == connectionSettings.jid.toBare().toString();
|
||||
|
||||
await _stanzaAwaiter
|
||||
.addPending(
|
||||
// A stanza with no to attribute is for direct processing by the server. As such,
|
||||
// we can correlate it by just *assuming* we have that attribute
|
||||
// (RFC 6120 Section 8.1.1.1)
|
||||
data.stanza.to ?? connectionSettings.jid.toBare().toString(),
|
||||
isOwnJid ? null : data.stanza.to,
|
||||
data.stanza.id!,
|
||||
data.stanza.tag,
|
||||
)
|
||||
@ -773,9 +776,15 @@ class XmppConnection {
|
||||
return;
|
||||
}
|
||||
|
||||
// In case the stanza came from our own bare Jid, remove it so that the stanza
|
||||
// awaiter works correctly.
|
||||
final isOwnJid = incomingPreHandlers.stanza.from ==
|
||||
connectionSettings.jid.toBare().toString();
|
||||
final ownJidStanza = isOwnJid
|
||||
? incomingPreHandlers.stanza.copyWith(from: null)
|
||||
: incomingPreHandlers.stanza;
|
||||
final awaited = await _stanzaAwaiter.onData(
|
||||
incomingPreHandlers.stanza,
|
||||
connectionSettings.jid.toBare(),
|
||||
ownJidStanza,
|
||||
);
|
||||
if (awaited) {
|
||||
return;
|
||||
|
@ -102,6 +102,8 @@ class RemoteServerTimeoutError extends StanzaError {
|
||||
/// An unknown error.
|
||||
class UnknownStanzaError extends StanzaError {}
|
||||
|
||||
const _stanzaNotDefined = Object();
|
||||
|
||||
class Stanza extends XMLNode {
|
||||
// ignore: use_super_parameters
|
||||
Stanza({
|
||||
@ -216,7 +218,7 @@ class Stanza extends XMLNode {
|
||||
|
||||
Stanza copyWith({
|
||||
String? id,
|
||||
String? from,
|
||||
Object? from = _stanzaNotDefined,
|
||||
String? to,
|
||||
String? type,
|
||||
List<XMLNode>? children,
|
||||
@ -225,7 +227,7 @@ class Stanza extends XMLNode {
|
||||
return Stanza(
|
||||
tag: tag,
|
||||
to: to ?? this.to,
|
||||
from: from ?? this.from,
|
||||
from: from != _stanzaNotDefined ? from as String? : this.from,
|
||||
id: id ?? this.id,
|
||||
type: type ?? this.type,
|
||||
children: children ?? this.children,
|
||||
|
@ -1,24 +1,37 @@
|
||||
import 'dart:async';
|
||||
import 'dart:collection';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:moxxmpp/src/awaiter.dart';
|
||||
import 'package:moxxmpp/src/parser.dart';
|
||||
import 'package:synchronized/synchronized.dart';
|
||||
|
||||
typedef LockResult = (Completer<void>?, XMPPStreamObject);
|
||||
|
||||
class IncomingStanzaQueue {
|
||||
IncomingStanzaQueue(this._callback);
|
||||
IncomingStanzaQueue(this._callback, this._stanzaAwaiter);
|
||||
|
||||
/// The queue for storing the completer of each
|
||||
/// incoming stanza (or stream object to be precise).
|
||||
/// Only access while holding the lock [_lock].
|
||||
final Queue<Completer<void>> _queue = Queue();
|
||||
|
||||
final Future<void> Function(XMPPStreamObject) _callback;
|
||||
/// Flag indicating whether a callback is already running (true)
|
||||
/// or not. "a callback" and not "the callback" because awaited stanzas
|
||||
/// are allowed to bypass the queue.
|
||||
/// Only access while holding the lock [_lock].
|
||||
bool _isRunning = false;
|
||||
|
||||
/// The function to call to process an incoming stream object.
|
||||
final Future<void> Function(XMPPStreamObject) _callback;
|
||||
|
||||
/// Lock guarding both [_queue] and [_isRunning].
|
||||
final Lock _lock = Lock();
|
||||
|
||||
// TODO: Remove once we can await stanzas (or can we).
|
||||
bool negotiationsDone = false;
|
||||
|
||||
/// Logger.
|
||||
final Logger _log = Logger('IncomingStanzaQueue');
|
||||
|
||||
bool negotiationsDone = false;
|
||||
final StanzaAwaiter _stanzaAwaiter;
|
||||
|
||||
Future<void> _processStreamObject(
|
||||
Future<void>? future,
|
||||
@ -37,37 +50,22 @@ class IncomingStanzaQueue {
|
||||
await future;
|
||||
|
||||
// Run the callback.
|
||||
if (object is XMPPStreamElement) {
|
||||
_log.finest('Running callback for ${object.node.toXml()}');
|
||||
}
|
||||
await _callback(object);
|
||||
if (object is XMPPStreamElement) {
|
||||
_log.finest(
|
||||
'Callback for ${object.node.tag} (${object.node.attributes["id"]}) done',
|
||||
);
|
||||
}
|
||||
|
||||
// Run the next entry.
|
||||
_log.finest('Entering second lock...');
|
||||
await _lock.synchronized(() {
|
||||
_log.finest('Second lock entered...');
|
||||
if (_queue.isNotEmpty) {
|
||||
_log.finest('New queue size: ${_queue.length - 1}');
|
||||
_queue.removeFirst().complete();
|
||||
} else {
|
||||
_isRunning = false;
|
||||
_log.finest('New queue size: 0');
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> addStanza(List<XMPPStreamObject> objects) async {
|
||||
_log.finest('Entering initial lock...');
|
||||
await _lock.synchronized(() {
|
||||
_log.finest('Lock entered...');
|
||||
|
||||
await _lock.synchronized(() async {
|
||||
for (final object in objects) {
|
||||
if (canBypassQueue(object)) {
|
||||
if (await canBypassQueue(object)) {
|
||||
unawaited(
|
||||
_processStreamObject(null, object),
|
||||
);
|
||||
@ -89,11 +87,12 @@ class IncomingStanzaQueue {
|
||||
});
|
||||
}
|
||||
|
||||
bool canBypassQueue(XMPPStreamObject object) {
|
||||
// TODO: Ask the StanzaAwaiter if the stanza is awaited
|
||||
return object is XMPPStreamElement &&
|
||||
negotiationsDone &&
|
||||
object.node.tag == 'iq' &&
|
||||
['result', 'error'].contains(object.node.attributes['type'] as String?);
|
||||
Future<bool> canBypassQueue(XMPPStreamObject object) async {
|
||||
if (object is XMPPStreamHeader) {
|
||||
return false;
|
||||
}
|
||||
|
||||
object as XMPPStreamElement;
|
||||
return _stanzaAwaiter.isAwaited(object.node);
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import 'package:moxxmpp/src/xeps/xep_0045/errors.dart';
|
||||
import 'package:moxxmpp/src/xeps/xep_0045/events.dart';
|
||||
import 'package:moxxmpp/src/xeps/xep_0045/types.dart';
|
||||
import 'package:moxxmpp/src/xeps/xep_0359.dart';
|
||||
import 'package:synchronized/extension.dart';
|
||||
import 'package:synchronized/synchronized.dart';
|
||||
|
||||
/// (Room JID, nickname)
|
||||
|
@ -173,8 +173,13 @@ class EntityCapabilitiesManager extends XmppManagerBase {
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _performQuery(Stanza presence, String ver,
|
||||
String hashFunctionName, String capabilityNode, JID from) async {
|
||||
Future<void> _performQuery(
|
||||
Stanza presence,
|
||||
String ver,
|
||||
String hashFunctionName,
|
||||
String capabilityNode,
|
||||
JID from,
|
||||
) async {
|
||||
final dm = getAttributes().getManagerById<DiscoManager>(discoManager)!;
|
||||
final discoRequest = await dm.discoInfoQuery(
|
||||
from,
|
||||
|
@ -3,8 +3,6 @@ import 'package:moxxmpp/src/awaiter.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
void main() {
|
||||
const bareJid = JID('moxxmpp', 'server3.example', '');
|
||||
|
||||
test('Test awaiting an awaited stanza with a from attribute', () async {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
@ -20,14 +18,12 @@ void main() {
|
||||
XMLNode.fromString(
|
||||
'<iq from="user3@server.example" id="abc123" type="result" />',
|
||||
),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
final result2 = await awaiter.onData(
|
||||
XMLNode.fromString(
|
||||
'<iq from="user1@server.example" id="lol" type="result" />',
|
||||
),
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, false);
|
||||
|
||||
@ -37,7 +33,6 @@ void main() {
|
||||
);
|
||||
final result3 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result3, true);
|
||||
expect(await future, stanza);
|
||||
@ -47,12 +42,11 @@ void main() {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
final future = await awaiter.addPending(null, 'abc123', 'iq');
|
||||
|
||||
// Receive the wrong answer
|
||||
final result1 = await awaiter.onData(
|
||||
XMLNode.fromString('<iq id="lol" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
|
||||
@ -60,7 +54,6 @@ void main() {
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, true);
|
||||
expect(await future, stanza);
|
||||
@ -70,13 +63,12 @@ void main() {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
final future = await awaiter.addPending(null, 'abc123', 'iq');
|
||||
|
||||
// Receive the correct answer
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result1 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, true);
|
||||
expect(await future, stanza);
|
||||
@ -84,7 +76,6 @@ void main() {
|
||||
// Receive it again
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, false);
|
||||
});
|
||||
@ -93,20 +84,18 @@ void main() {
|
||||
final awaiter = StanzaAwaiter();
|
||||
|
||||
// "Send" a stanza
|
||||
final future = await awaiter.addPending(bareJid.toString(), 'abc123', 'iq');
|
||||
final future = await awaiter.addPending(null, 'abc123', 'iq');
|
||||
|
||||
// Receive the wrong answer
|
||||
final stanza = XMLNode.fromString('<iq id="abc123" type="result" />');
|
||||
final result1 = await awaiter.onData(
|
||||
XMLNode.fromString('<message id="abc123" type="result" />'),
|
||||
bareJid,
|
||||
);
|
||||
expect(result1, false);
|
||||
|
||||
// Receive the correct answer
|
||||
final result2 = await awaiter.onData(
|
||||
stanza,
|
||||
bareJid,
|
||||
);
|
||||
expect(result2, true);
|
||||
expect(await future, stanza);
|
||||
|
@ -343,6 +343,7 @@ void main() {
|
||||
stanza,
|
||||
StanzaHandlerData(false, false, stanza, TypedMap()),
|
||||
);
|
||||
await Future<void>.delayed(const Duration(seconds: 2));
|
||||
|
||||
expect(
|
||||
await manager.getCachedDiscoInfoFromJid(aliceJid) != null,
|
||||
@ -513,6 +514,7 @@ void main() {
|
||||
stanza,
|
||||
StanzaHandlerData(false, false, stanza, TypedMap()),
|
||||
);
|
||||
await Future<void>.delayed(const Duration(seconds: 2));
|
||||
|
||||
final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid);
|
||||
expect(
|
||||
@ -549,6 +551,7 @@ void main() {
|
||||
stanza,
|
||||
StanzaHandlerData(false, false, stanza, TypedMap()),
|
||||
);
|
||||
await Future<void>.delayed(const Duration(seconds: 2));
|
||||
|
||||
final cachedItem = await manager.getCachedDiscoInfoFromJid(aliceJid);
|
||||
expect(
|
||||
|
Loading…
Reference in New Issue
Block a user