Compare commits

..

No commits in common. "c3be199ccaa8ebb94fb6533c0a3548b43c6707d3" and "b1da6e5a538fba335e82f6892e2ffe3cabe21183" have entirely different histories.

29 changed files with 650 additions and 976 deletions

View File

@ -7,7 +7,7 @@ line-length=72
[title-trailing-punctuation] [title-trailing-punctuation]
[title-hard-tab] [title-hard-tab]
[title-match-regex] [title-match-regex]
regex=^((feat|fix|chore|refactor|docs|release|test)\((meta|tests|style|docs|xep|core|example|all)+(,(meta|tests|style|docs|xep|core|example|all))*\)|release): [A-Z0-9].*$ regex=^((feat|fix|chore|refactor|docs|release|test)\((meta|tests|style|docs|xep|core|example)+(,(meta|tests|style|docs|xep|core|example))*\)|release): [A-Z0-9].*$
[body-trailing-whitespace] [body-trailing-whitespace]

View File

@ -9,8 +9,6 @@
- **BREAKING**: Remove `DiscoManager.discoInfoCapHashQuery`. - **BREAKING**: Remove `DiscoManager.discoInfoCapHashQuery`.
- **BREAKING**: The entity argument of `DiscoManager.discoInfoQuery` and `DiscoManager.discoItemsQuery` are now `JID` instead of `String`. - **BREAKING**: The entity argument of `DiscoManager.discoInfoQuery` and `DiscoManager.discoItemsQuery` are now `JID` instead of `String`.
- **BREAKING**: `PubSubManager` and `UserAvatarManager` now use `JID` instead of `String`. - **BREAKING**: `PubSubManager` and `UserAvatarManager` now use `JID` instead of `String`.
- **BREAKING**: `XmppConnection.sendStanza` not only takes a `StanzaDetails` argument.
- Sent stanzas are not kept in a queue until sent.
## 0.3.1 ## 0.3.1

View File

@ -26,7 +26,6 @@ import 'package:moxxmpp/src/socket.dart';
import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/types/result.dart'; import 'package:moxxmpp/src/types/result.dart';
import 'package:moxxmpp/src/util/queue.dart';
import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart';
import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart'; import 'package:moxxmpp/src/xeps/xep_0198/xep_0198.dart';
import 'package:moxxmpp/src/xeps/xep_0352.dart'; import 'package:moxxmpp/src/xeps/xep_0352.dart';
@ -49,6 +48,18 @@ enum XmppConnectionState {
error error
} }
/// Metadata for [XmppConnection.sendStanza].
enum StanzaFromType {
/// Add the full JID to the stanza as the from attribute
full,
/// Add the bare JID to the stanza as the from attribute
bare,
/// Add no JID as the from attribute
none,
}
/// This class is a connection to the server. /// This class is a connection to the server.
class XmppConnection { class XmppConnection {
XmppConnection( XmppConnection(
@ -80,12 +91,7 @@ class XmppConnection {
_socketStream = _socket.getDataStream(); _socketStream = _socket.getDataStream();
// TODO(Unknown): Handle on done // TODO(Unknown): Handle on done
_socketStream.transform(_streamParser).forEach(handleXmlStream); _socketStream.transform(_streamParser).forEach(handleXmlStream);
_socket.getEventStream().listen(handleSocketEvent); _socket.getEventStream().listen(_handleSocketEvent);
_stanzaQueue = AsyncStanzaQueue(
_sendStanzaImpl,
_canSendData,
);
} }
/// The state that the connection is currently in /// The state that the connection is currently in
@ -169,8 +175,6 @@ class XmppConnection {
bool get isAuthenticated => _isAuthenticated; bool get isAuthenticated => _isAuthenticated;
late final AsyncStanzaQueue _stanzaQueue;
/// Returns the JID we authenticate with and add the resource that we have bound. /// Returns the JID we authenticate with and add the resource that we have bound.
JID _getJidWithResource() { JID _getJidWithResource() {
assert(_resource.isNotEmpty, 'The resource must not be empty'); assert(_resource.isNotEmpty, 'The resource must not be empty');
@ -362,8 +366,7 @@ class XmppConnection {
} }
/// Called whenever the socket creates an event /// Called whenever the socket creates an event
@visibleForTesting Future<void> _handleSocketEvent(XmppSocketEvent event) async {
Future<void> handleSocketEvent(XmppSocketEvent event) async {
if (event is XmppSocketErrorEvent) { if (event is XmppSocketErrorEvent) {
await handleError(SocketError(event)); await handleError(SocketError(event));
} else if (event is XmppSocketClosureEvent) { } else if (event is XmppSocketClosureEvent) {
@ -409,135 +412,133 @@ class XmppConnection {
.contains(await getConnectionState()); .contains(await getConnectionState());
} }
/// Sends a stanza described by [details] to the server. Until sent, the stanza is /// Sends a [stanza] to the server. If stream management is enabled, then keeping track
/// kept in a queue, that is flushed after going online again. If Stream Management /// of the stanza is taken care of. Returns a Future that resolves when we receive a
/// is active, stanza's acknowledgement is tracked. /// response to the stanza.
///
/// If addFrom is true, then a 'from' attribute will be added to the stanza if
/// [stanza] has none.
/// If addId is true, then an 'id' attribute will be added to the stanza if [stanza] has
/// none.
// TODO(Unknown): if addId = false, the function crashes. // TODO(Unknown): if addId = false, the function crashes.
Future<XMLNode?> sendStanza(StanzaDetails details) async { Future<XMLNode> sendStanza(
Stanza stanza, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true,
bool awaitable = true,
bool encrypted = false,
bool forceEncryption = false,
}) async {
assert( assert(
implies( implies(addId == false && stanza.id == null, !awaitable),
details.awaitable, 'Cannot await a stanza with no id',
details.stanza.id != null && details.stanza.id!.isNotEmpty ||
details.addId,
),
'An awaitable stanza must have an id',
); );
final completer = details.awaitable ? Completer<XMLNode>() : null; // Add extra data in case it was not set
await _stanzaQueue.enqueueStanza( var stanza_ = stanza;
StanzaQueueEntry( if (addId && (stanza_.id == null || stanza_.id == '')) {
details, stanza_ = stanza.copyWith(id: generateId());
completer,
),
);
return completer?.future;
}
Future<void> _sendStanzaImpl(StanzaQueueEntry entry) async {
final details = entry.details;
var newStanza = details.stanza;
// Generate an id, if requested
if (details.addId && (newStanza.id == null || newStanza.id == '')) {
newStanza = newStanza.copyWith(id: generateId());
} }
if (addFrom != StanzaFromType.none &&
// NOTE: Originally, we handled adding a "from" attribute to the stanza here. (stanza_.from == null || stanza_.from == '')) {
// However, this is not neccessary as RFC 6120 states: switch (addFrom) {
// case StanzaFromType.full:
// > When a server receives an XML stanza from a connected client, the {
// > server MUST add a 'from' attribute to the stanza or override the stanza_ = stanza_.copyWith(
// > 'from' attribute specified by the client, where the value of the from: _getJidWithResource().toString(),
// > 'from' attribute MUST be the full JID );
// > (<localpart@domainpart/resource>) determined by the server for }
// > the connected resource that generated the stanza (see break;
// > Section 4.3.6), or the bare JID (<localpart@domainpart>) in the case StanzaFromType.bare:
// > case of subscription-related presence stanzas (see [XMPP-IM]). {
// stanza_ = stanza_.copyWith(
// This means that even if we add a "from" attribute, the server will discard from: connectionSettings.jid.toBare().toString(),
// it. If we don't specify it, then the server will add the correct value );
// itself. }
break;
// Add the correct stanza namespace case StanzaFromType.none:
newStanza = newStanza.copyWith( break;
}
}
stanza_ = stanza_.copyWith(
xmlns: _negotiationsHandler.getStanzaNamespace(), xmlns: _negotiationsHandler.getStanzaNamespace(),
); );
// Run pre-send handlers
_log.fine('Running pre stanza handlers..'); _log.fine('Running pre stanza handlers..');
final data = await _runOutgoingPreStanzaHandlers( final data = await _runOutgoingPreStanzaHandlers(
newStanza, stanza_,
initial: StanzaHandlerData( initial: StanzaHandlerData(
false, false,
false, false,
null, null,
newStanza, stanza_,
encrypted: details.encrypted, encrypted: encrypted,
forceEncryption: details.forceEncryption, forceEncryption: forceEncryption,
), ),
); );
_log.fine('Done'); _log.fine('Done');
// Cancel sending, if the pre-send handlers indicated it.
if (data.cancel) { if (data.cancel) {
_log.fine('A stanza handler indicated that it wants to cancel sending.'); _log.fine('A stanza handler indicated that it wants to cancel sending.');
await _sendEvent(StanzaSendingCancelledEvent(data)); await _sendEvent(StanzaSendingCancelledEvent(data));
return Stanza(
// Resolve the future, if one was given. tag: data.stanza.tag,
if (details.awaitable) { to: data.stanza.from,
entry.completer!.complete( from: data.stanza.to,
Stanza( attributes: <String, String>{
tag: data.stanza.tag, 'type': 'error',
to: data.stanza.from, ...data.stanza.id != null
from: data.stanza.to, ? {
attributes: <String, String>{ 'id': data.stanza.id!,
'type': 'error', }
if (data.stanza.id != null) 'id': data.stanza.id!, : {},
}, },
), );
);
}
return;
} }
// Log the (raw) stanza
final prefix = data.encrypted ? '(Encrypted) ' : ''; final prefix = data.encrypted ? '(Encrypted) ' : '';
_log.finest('==> $prefix${newStanza.toXml()}'); _log.finest('==> $prefix${stanza_.toXml()}');
if (details.awaitable) { final stanzaString = data.stanza.toXml();
await _stanzaAwaiter
.addPending( // ignore: cascade_invocations
_log.fine('Attempting to acquire lock for ${data.stanza.id}...');
// TODO(PapaTutuWawa): Handle this much more graceful
var future = Future.value(XMLNode(tag: 'not-used'));
if (awaitable) {
future = await _stanzaAwaiter.addPending(
// A stanza with no to attribute is for direct processing by the server. As such, // A stanza with no to attribute is for direct processing by the server. As such,
// we can correlate it by just *assuming* we have that attribute // we can correlate it by just *assuming* we have that attribute
// (RFC 6120 Section 8.1.1.1) // (RFC 6120 Section 8.1.1.1)
data.stanza.to ?? connectionSettings.jid.toBare().toString(), data.stanza.to ?? connectionSettings.jid.toBare().toString(),
data.stanza.id!, data.stanza.id!,
data.stanza.tag, data.stanza.tag,
) );
.then((result) {
entry.completer!.complete(result);
});
} }
// This uses the StreamManager to behave like a send queue
if (await _canSendData()) { if (await _canSendData()) {
_socket.write(data.stanza.toXml()); _socket.write(stanzaString);
// Try to ack every stanza
// NOTE: Here we have send an Ack request nonza. This is now done by StreamManagementManager when receiving the StanzaSentEvent
} else { } else {
_log.fine('Not sending dat as _canSendData() returned false.'); _log.fine('_canSendData() returned false.');
} }
// Run post-send handlers
_log.fine('Running post stanza handlers..'); _log.fine('Running post stanza handlers..');
await _runOutgoingPostStanzaHandlers( await _runOutgoingPostStanzaHandlers(
newStanza, stanza_,
initial: StanzaHandlerData( initial: StanzaHandlerData(
false, false,
false, false,
null, null,
newStanza, stanza_,
), ),
); );
_log.fine('Done'); _log.fine('Done');
return future;
} }
/// Called when we timeout during connecting /// Called when we timeout during connecting
@ -561,11 +562,18 @@ class XmppConnection {
// Set the new routing state // Set the new routing state
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
// Set the connection state
await _setConnectionState(XmppConnectionState.connected);
// Enable reconnections // Enable reconnections
if (_enableReconnectOnSuccess) { if (_enableReconnectOnSuccess) {
await _reconnectionPolicy.setShouldReconnect(true); await _reconnectionPolicy.setShouldReconnect(true);
} }
// Resolve the connection completion future
_connectionCompleter?.complete(const Result(true));
_connectionCompleter = null;
// Tell consumers of the event stream that we're done with stream feature // Tell consumers of the event stream that we're done with stream feature
// negotiations // negotiations
await _sendEvent( await _sendEvent(
@ -574,16 +582,6 @@ class XmppConnection {
false, false,
), ),
); );
// Set the connection state
await _setConnectionState(XmppConnectionState.connected);
// Resolve the connection completion future
_connectionCompleter?.complete(const Result(true));
_connectionCompleter = null;
// Flush the stanza send queue
await _stanzaQueue.restart();
} }
/// Sets the connection state to [state] and triggers an event of type /// Sets the connection state to [state] and triggers an event of type

View File

@ -23,11 +23,9 @@ Future<void> handleUnhandledStanza(
); );
await conn.sendStanza( await conn.sendStanza(
StanzaDetails( stanza,
stanza, awaitable: false,
awaitable: false, forceEncryption: data.encrypted,
forceEncryption: data.encrypted,
),
); );
} }
} }

View File

@ -23,7 +23,14 @@ class XmppManagerAttributes {
}); });
/// Send a stanza whose response can be awaited. /// Send a stanza whose response can be awaited.
final Future<XMLNode?> Function(StanzaDetails) sendStanza; final Future<XMLNode> Function(
Stanza stanza, {
StanzaFromType addFrom,
bool addId,
bool awaitable,
bool encrypted,
bool forceEncryption,
}) sendStanza;
/// Send a nonza. /// Send a nonza.
final void Function(XMLNode) sendNonza; final void Function(XMLNode) sendNonza;

View File

@ -5,7 +5,6 @@ import 'package:moxxmpp/src/managers/attributes.dart';
import 'package:moxxmpp/src/managers/data.dart'; import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/managers/handlers.dart'; import 'package:moxxmpp/src/managers/handlers.dart';
import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/xeps/xep_0030/errors.dart'; import 'package:moxxmpp/src/xeps/xep_0030/errors.dart';
import 'package:moxxmpp/src/xeps/xep_0030/types.dart'; import 'package:moxxmpp/src/xeps/xep_0030/types.dart';
@ -166,11 +165,9 @@ abstract class XmppManagerBase {
); );
await getAttributes().sendStanza( await getAttributes().sendStanza(
StanzaDetails( stanza,
stanza, awaitable: false,
awaitable: false, forceEncryption: data.encrypted,
forceEncryption: data.encrypted,
),
); );
} }
} }

View File

@ -320,11 +320,6 @@ class MessageManager extends XmppManagerBase {
); );
} }
getAttributes().sendStanza( getAttributes().sendStanza(stanza, awaitable: false);
StanzaDetails(
stanza,
awaitable: false,
),
);
} }
} }

View File

@ -1,4 +1,5 @@
import 'dart:async'; import 'dart:async';
import 'package:moxxmpp/src/connection.dart';
import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/managers/base.dart'; import 'package:moxxmpp/src/managers/base.dart';
@ -6,8 +7,10 @@ import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/managers/handlers.dart'; import 'package:moxxmpp/src/managers/handlers.dart';
import 'package:moxxmpp/src/managers/namespaces.dart'; import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart'; import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/xeps/xep_0198/negotiator.dart';
/// A function that will be called when presence, outside of subscription request /// A function that will be called when presence, outside of subscription request
/// management, will be sent. Useful for managers that want to add [XMLNode]s to said /// management, will be sent. Useful for managers that want to add [XMLNode]s to said
@ -46,8 +49,12 @@ class PresenceManager extends XmppManagerBase {
Future<void> onXmppEvent(XmppEvent event) async { Future<void> onXmppEvent(XmppEvent event) async {
if (event is StreamNegotiationsDoneEvent) { if (event is StreamNegotiationsDoneEvent) {
// Send initial presence only when we have not resumed the stream // Send initial presence only when we have not resumed the stream
if (!event.resumed) { final sm = getAttributes().getNegotiatorById<StreamManagementNegotiator>(
await sendInitialPresence(); streamManagementNegotiator,
);
final isResumed = sm?.isResumed ?? false;
if (!isResumed) {
unawaited(sendInitialPresence());
} }
} }
} }
@ -101,77 +108,66 @@ class PresenceManager extends XmppManagerBase {
final attrs = getAttributes(); final attrs = getAttributes();
await attrs.sendStanza( await attrs.sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( from: attrs.getFullJID().toString(),
children: children, children: children,
),
awaitable: false,
addId: false,
), ),
awaitable: false,
addId: false,
); );
} }
/// Send an unavailable presence with no 'to' attribute. /// Send an unavailable presence with no 'to' attribute.
void sendUnavailablePresence() { void sendUnavailablePresence() {
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( type: 'unavailable',
type: 'unavailable',
),
awaitable: false,
), ),
addFrom: StanzaFromType.full,
); );
} }
/// Sends a subscription request to [to]. /// Sends a subscription request to [to].
void sendSubscriptionRequest(String to) { void sendSubscriptionRequest(String to) {
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( type: 'subscribe',
type: 'subscribe', to: to,
to: to,
),
awaitable: false,
), ),
addFrom: StanzaFromType.none,
); );
} }
/// Sends an unsubscription request to [to]. /// Sends an unsubscription request to [to].
void sendUnsubscriptionRequest(String to) { void sendUnsubscriptionRequest(String to) {
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( type: 'unsubscribe',
type: 'unsubscribe', to: to,
to: to,
),
awaitable: false,
), ),
addFrom: StanzaFromType.none,
); );
} }
/// Accept a presence subscription request for [to]. /// Accept a presence subscription request for [to].
void sendSubscriptionRequestApproval(String to) { void sendSubscriptionRequestApproval(String to) {
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( type: 'subscribed',
type: 'subscribed', to: to,
to: to,
),
awaitable: false,
), ),
addFrom: StanzaFromType.none,
); );
} }
/// Reject a presence subscription request for [to]. /// Reject a presence subscription request for [to].
void sendSubscriptionRequestRejection(String to) { void sendSubscriptionRequestRejection(String to) {
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.presence(
Stanza.presence( type: 'unsubscribed',
type: 'unsubscribed', to: to,
to: to,
),
awaitable: false,
), ),
addFrom: StanzaFromType.none,
); );
} }
} }

View File

@ -235,16 +235,14 @@ class RosterManager extends XmppManagerBase {
query.attributes['ver'] = rosterVersion; query.attributes['ver'] = rosterVersion;
} }
final response = (await attrs.sendStanza( final response = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', children: [
children: [ query,
query, ],
],
),
), ),
))!; );
if (response.attributes['type'] != 'result') { if (response.attributes['type'] != 'result') {
logger.warning('Error requesting roster: ${response.toXml()}'); logger.warning('Error requesting roster: ${response.toXml()}');
@ -260,22 +258,20 @@ class RosterManager extends XmppManagerBase {
Future<Result<RosterRequestResult?, RosterError>> Future<Result<RosterRequestResult?, RosterError>>
requestRosterPushes() async { requestRosterPushes() async {
final attrs = getAttributes(); final attrs = getAttributes();
final result = (await attrs.sendStanza( final result = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'query',
tag: 'query', xmlns: rosterXmlns,
xmlns: rosterXmlns, attributes: {
attributes: { 'ver': await _stateManager.getRosterVersion() ?? '',
'ver': await _stateManager.getRosterVersion() ?? '', },
}, )
) ],
],
),
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
logger.warning('Requesting roster pushes failed: ${result.toXml()}'); logger.warning('Requesting roster pushes failed: ${result.toXml()}');
@ -300,33 +296,31 @@ class RosterManager extends XmppManagerBase {
List<String>? groups, List<String>? groups,
}) async { }) async {
final attrs = getAttributes(); final attrs = getAttributes();
final response = (await attrs.sendStanza( final response = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'query',
tag: 'query', xmlns: rosterXmlns,
xmlns: rosterXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{
attributes: <String, String>{ 'jid': jid,
'jid': jid, ...title == jid.split('@')[0]
...title == jid.split('@')[0] ? <String, String>{}
? <String, String>{} : <String, String>{'name': title}
: <String, String>{'name': title} },
}, children: (groups ?? [])
children: (groups ?? []) .map((group) => XMLNode(tag: 'group', text: group))
.map((group) => XMLNode(tag: 'group', text: group)) .toList(),
.toList(), )
) ],
], )
), ],
],
),
), ),
))!; );
if (response.attributes['type'] != 'result') { if (response.attributes['type'] != 'result') {
logger.severe('Error adding $jid to roster: $response'); logger.severe('Error adding $jid to roster: $response');
@ -340,28 +334,26 @@ class RosterManager extends XmppManagerBase {
/// false otherwise. /// false otherwise.
Future<RosterRemovalResult> removeFromRoster(String jid) async { Future<RosterRemovalResult> removeFromRoster(String jid) async {
final attrs = getAttributes(); final attrs = getAttributes();
final response = (await attrs.sendStanza( final response = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'query',
tag: 'query', xmlns: rosterXmlns,
xmlns: rosterXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{
attributes: <String, String>{ 'jid': jid,
'jid': jid, 'subscription': 'remove'
'subscription': 'remove' },
}, )
) ],
], )
) ],
],
),
), ),
))!; );
if (response.attributes['type'] != 'result') { if (response.attributes['type'] != 'result') {
logger.severe('Failed to remove roster item: ${response.toXml()}'); logger.severe('Failed to remove roster item: ${response.toXml()}');

View File

@ -1,30 +1,6 @@
import 'package:moxxmpp/src/namespaces.dart'; import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/stringxml.dart'; import 'package:moxxmpp/src/stringxml.dart';
/// A description of a stanza to send.
class StanzaDetails {
const StanzaDetails(
this.stanza, {
this.addId = true,
this.awaitable = true,
this.encrypted = false,
this.forceEncryption = false,
});
/// The stanza to send.
final Stanza stanza;
/// Flag indicating whether a stanza id should be added before sending.
final bool addId;
/// Track the stanza to allow awaiting its response.
final bool awaitable;
final bool encrypted;
final bool forceEncryption;
}
/// A simple description of the <error /> element that may be inside a stanza /// A simple description of the <error /> element that may be inside a stanza
class StanzaError { class StanzaError {
StanzaError(this.type, this.error); StanzaError(this.type, this.error);

View File

@ -1,68 +1,37 @@
import 'dart:async'; import 'dart:async';
import 'dart:collection'; import 'dart:collection';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
class StanzaQueueEntry { /// A job to be submitted to an [AsyncQueue].
const StanzaQueueEntry( typedef AsyncQueueJob = Future<void> Function();
this.details,
this.completer,
);
/// The actual data to send.
final StanzaDetails details;
/// The [Completer] to resolve when the response is received.
final Completer<XMLNode>? completer;
}
/// A function that is executed when a job is popped from the queue.
typedef SendStanzaFunction = Future<void> Function(StanzaQueueEntry);
/// A function that is called before popping a queue item. Should return true when
/// the [SendStanzaFunction] can be executed.
typedef CanSendCallback = Future<bool> Function();
/// A (hopefully) async-safe queue that attempts to force /// A (hopefully) async-safe queue that attempts to force
/// in-order execution of its jobs. /// in-order execution of its jobs.
class AsyncStanzaQueue { class AsyncQueue {
AsyncStanzaQueue( /// The lock for accessing [AsyncQueue._lock] and [AsyncQueue._running].
this._sendStanzaFunction,
this._canSendCallback,
);
/// The lock for accessing [AsyncStanzaQueue._lock] and [AsyncStanzaQueue._running].
final Lock _lock = Lock(); final Lock _lock = Lock();
/// The actual job queue. /// The actual job queue.
final Queue<StanzaQueueEntry> _queue = Queue<StanzaQueueEntry>(); final Queue<AsyncQueueJob> _queue = Queue<AsyncQueueJob>();
/// Sends the stanza when we can pop from the queue.
final SendStanzaFunction _sendStanzaFunction;
final CanSendCallback _canSendCallback;
/// Indicates whether we are currently executing a job. /// Indicates whether we are currently executing a job.
bool _running = false; bool _running = false;
@visibleForTesting @visibleForTesting
Queue<StanzaQueueEntry> get queue => _queue; Queue<AsyncQueueJob> get queue => _queue;
@visibleForTesting @visibleForTesting
bool get isRunning => _running; bool get isRunning => _running;
/// Adds a job [entry] to the queue. /// Adds a job [job] to the queue.
Future<void> enqueueStanza(StanzaQueueEntry entry) async { Future<void> addJob(AsyncQueueJob job) async {
await _lock.synchronized(() async { await _lock.synchronized(() {
_queue.add(entry); _queue.add(job);
if (!_running && _queue.isNotEmpty && await _canSendCallback()) { if (!_running && _queue.isNotEmpty) {
_running = true; _running = true;
unawaited( unawaited(_popJob());
_runJob(_queue.removeFirst()),
);
} }
}); });
} }
@ -71,30 +40,17 @@ class AsyncStanzaQueue {
await _lock.synchronized(_queue.clear); await _lock.synchronized(_queue.clear);
} }
Future<void> _runJob(StanzaQueueEntry details) async { Future<void> _popJob() async {
await _sendStanzaFunction(details); final job = _queue.removeFirst();
final future = job();
await future;
await _lock.synchronized(() async { await _lock.synchronized(() {
if (_queue.isNotEmpty && await _canSendCallback()) { if (_queue.isNotEmpty) {
unawaited( unawaited(_popJob());
_runJob(_queue.removeFirst()),
);
} else { } else {
_running = false; _running = false;
} }
}); });
} }
Future<void> restart() async {
if (!(await _canSendCallback())) return;
await _lock.synchronized(() {
if (_queue.isNotEmpty) {
_running = true;
unawaited(
_runJob(_queue.removeFirst()),
);
}
});
}
} }

View File

@ -291,12 +291,10 @@ class DiscoManager extends XmppManagerBase {
} }
} }
final stanza = (await getAttributes().sendStanza( final stanza = await getAttributes().sendStanza(
StanzaDetails( buildDiscoInfoQueryStanza(entity, node),
buildDiscoInfoQueryStanza(entity, node), encrypted: !shouldEncrypt,
encrypted: !shouldEncrypt, );
),
))!;
final query = stanza.firstTag('query'); final query = stanza.firstTag('query');
if (query == null) { if (query == null) {
final result = Result<DiscoError, DiscoInfo>(InvalidResponseDiscoError()); final result = Result<DiscoError, DiscoInfo>(InvalidResponseDiscoError());
@ -333,12 +331,10 @@ class DiscoManager extends XmppManagerBase {
return future; return future;
} }
final stanza = (await getAttributes().sendStanza( final stanza = await getAttributes().sendStanza(
StanzaDetails( buildDiscoItemsQueryStanza(entity, node: node),
buildDiscoItemsQueryStanza(entity, node: node), encrypted: !shouldEncrypt,
encrypted: !shouldEncrypt, ) as Stanza;
),
))!;
final query = stanza.firstTag('query'); final query = stanza.firstTag('query');
if (query == null) { if (query == null) {
@ -348,7 +344,7 @@ class DiscoManager extends XmppManagerBase {
return result; return result;
} }
if (stanza.attributes['type'] == 'error') { if (stanza.type == 'error') {
//final error = stanza.firstTag('error'); //final error = stanza.firstTag('error');
//print("Disco Items error: " + error.toXml()); //print("Disco Items error: " + error.toXml());
final result = final result =

View File

@ -103,21 +103,19 @@ class VCardManager extends XmppManagerBase {
} }
Future<Result<VCardError, VCard>> requestVCard(String jid) async { Future<Result<VCardError, VCard>> requestVCard(String jid) async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( to: jid,
to: jid, type: 'get',
type: 'get', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'vCard',
tag: 'vCard', xmlns: vCardTempXmlns,
xmlns: vCardTempXmlns, )
) ],
],
),
encrypted: true,
), ),
))!; encrypted: true,
);
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
return Result(UnknownVCardError()); return Result(UnknownVCardError());

View File

@ -181,29 +181,27 @@ class PubSubManager extends XmppManagerBase {
Future<Result<PubSubError, bool>> subscribe(String jid, String node) async { Future<Result<PubSubError, bool>> subscribe(String jid, String node) async {
final attrs = getAttributes(); final attrs = getAttributes();
final result = (await attrs.sendStanza( final result = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: jid,
to: jid, children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'subscribe',
tag: 'subscribe', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, 'jid': attrs.getFullJID().toBare().toString(),
'jid': attrs.getFullJID().toBare().toString(), },
}, ),
), ],
], ),
), ],
],
),
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
return Result(UnknownPubSubError()); return Result(UnknownPubSubError());
@ -224,29 +222,27 @@ class PubSubManager extends XmppManagerBase {
Future<Result<PubSubError, bool>> unsubscribe(String jid, String node) async { Future<Result<PubSubError, bool>> unsubscribe(String jid, String node) async {
final attrs = getAttributes(); final attrs = getAttributes();
final result = (await attrs.sendStanza( final result = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: jid,
to: jid, children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'unsubscribe',
tag: 'unsubscribe', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, 'jid': attrs.getFullJID().toBare().toString(),
'jid': attrs.getFullJID().toBare().toString(), },
}, ),
), ],
], ),
), ],
],
),
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
return Result(UnknownPubSubError()); return Result(UnknownPubSubError());
@ -297,40 +293,38 @@ class PubSubManager extends XmppManagerBase {
pubOptions = await preprocessPublishOptions(jid, node, options); pubOptions = await preprocessPublishOptions(jid, node, options);
} }
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: jid.toString(),
to: jid.toString(), children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'publish',
tag: 'publish', attributes: <String, String>{'node': node},
attributes: <String, String>{'node': node}, children: [
children: [
XMLNode(
tag: 'item',
attributes: id != null
? <String, String>{'id': id}
: <String, String>{},
children: [payload],
)
],
),
if (pubOptions != null)
XMLNode( XMLNode(
tag: 'publish-options', tag: 'item',
children: [pubOptions.toXml()], attributes: id != null
), ? <String, String>{'id': id}
], : <String, String>{},
) children: [payload],
], )
), ],
),
if (pubOptions != null)
XMLNode(
tag: 'publish-options',
children: [pubOptions.toXml()],
),
],
)
],
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
final error = getPubSubError(result); final error = getPubSubError(result);
@ -401,26 +395,21 @@ class PubSubManager extends XmppManagerBase {
String jid, String jid,
String node, String node,
) async { ) async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', to: jid,
to: jid, children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(tag: 'items', attributes: <String, String>{'node': node}),
XMLNode( ],
tag: 'items', )
attributes: <String, String>{'node': node}, ],
),
],
)
],
),
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
return Result(getPubSubError(result)); return Result(getPubSubError(result));
@ -447,32 +436,30 @@ class PubSubManager extends XmppManagerBase {
String node, String node,
String id, String id,
) async { ) async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', to: jid,
to: jid, children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'items',
tag: 'items', attributes: <String, String>{'node': node},
attributes: <String, String>{'node': node}, children: [
children: [ XMLNode(
XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{'id': id},
attributes: <String, String>{'id': id}, ),
), ],
], ),
), ],
], ),
), ],
],
),
), ),
))!; );
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
return Result(getPubSubError(result)); return Result(getPubSubError(result));
@ -501,57 +488,53 @@ class PubSubManager extends XmppManagerBase {
final attrs = getAttributes(); final attrs = getAttributes();
// Request the form // Request the form
final form = (await attrs.sendStanza( final form = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', to: jid.toString(),
to: jid.toString(), children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubOwnerXmlns,
xmlns: pubsubOwnerXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'configure',
tag: 'configure', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, },
}, ),
), ],
], ),
), ],
],
),
), ),
))!; );
if (form.attributes['type'] != 'result') { if (form.attributes['type'] != 'result') {
return Result(getPubSubError(form)); return Result(getPubSubError(form));
} }
final submit = (await attrs.sendStanza( final submit = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: jid.toString(),
to: jid.toString(), children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubOwnerXmlns,
xmlns: pubsubOwnerXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'configure',
tag: 'configure', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, },
}, children: [
children: [ options.toXml(),
options.toXml(), ],
], ),
), ],
], ),
), ],
],
),
), ),
))!; );
if (submit.attributes['type'] != 'result') { if (submit.attributes['type'] != 'result') {
return Result(getPubSubError(form)); return Result(getPubSubError(form));
} }
@ -560,30 +543,28 @@ class PubSubManager extends XmppManagerBase {
} }
Future<Result<PubSubError, bool>> delete(JID host, String node) async { Future<Result<PubSubError, bool>> delete(JID host, String node) async {
final request = (await getAttributes().sendStanza( final request = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: host.toString(),
to: host.toString(), children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubOwnerXmlns,
xmlns: pubsubOwnerXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'delete',
tag: 'delete', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, },
}, ),
), ],
], ),
), ],
],
),
), ),
))!; ) as Stanza;
if (request.attributes['type'] != 'result') { if (request.type != 'result') {
// TODO(Unknown): Be more specific // TODO(Unknown): Be more specific
return Result(UnknownPubSubError()); return Result(UnknownPubSubError());
} }
@ -596,38 +577,36 @@ class PubSubManager extends XmppManagerBase {
String node, String node,
String itemId, String itemId,
) async { ) async {
final request = (await getAttributes().sendStanza( final request = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', to: host.toString(),
to: host.toString(), children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'pubsub',
tag: 'pubsub', xmlns: pubsubXmlns,
xmlns: pubsubXmlns, children: [
children: [ XMLNode(
XMLNode( tag: 'retract',
tag: 'retract', attributes: <String, String>{
attributes: <String, String>{ 'node': node,
'node': node, },
}, children: [
children: [ XMLNode(
XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{
attributes: <String, String>{ 'id': itemId,
'id': itemId, },
}, ),
), ],
], ),
), ],
], ),
), ],
],
),
), ),
))!; ) as Stanza;
if (request.attributes['type'] != 'result') { if (request.type != 'result') {
// TODO(Unknown): Be more specific // TODO(Unknown): Be more specific
return Result(UnknownPubSubError()); return Result(UnknownPubSubError());
} }

View File

@ -111,14 +111,10 @@ class ChatStateManager extends XmppManagerBase {
final tagName = state.toString().split('.').last; final tagName = state.toString().split('.').last;
getAttributes().sendStanza( getAttributes().sendStanza(
StanzaDetails( Stanza.message(
Stanza.message( to: to,
to: to, type: messageType,
type: messageType, children: [XMLNode.xmlns(tag: tagName, xmlns: chatStateXmlns)],
children: [
XMLNode.xmlns(tag: tagName, xmlns: chatStateXmlns),
],
),
), ),
); );
} }

View File

@ -96,43 +96,39 @@ class BlockingManager extends XmppManagerBase {
} }
Future<bool> block(List<String> items) async { Future<bool> block(List<String> items) async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'block',
tag: 'block', xmlns: blockingXmlns,
xmlns: blockingXmlns, children: items.map((item) {
children: items.map((item) { return XMLNode(
return XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{'jid': item},
attributes: <String, String>{'jid': item}, );
); }).toList(),
}).toList(), )
) ],
],
),
), ),
))!; );
return result.attributes['type'] == 'result'; return result.attributes['type'] == 'result';
} }
Future<bool> unblockAll() async { Future<bool> unblockAll() async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'unblock',
tag: 'unblock', xmlns: blockingXmlns,
xmlns: blockingXmlns, )
) ],
],
),
), ),
))!; );
return result.attributes['type'] == 'result'; return result.attributes['type'] == 'result';
} }
@ -140,45 +136,41 @@ class BlockingManager extends XmppManagerBase {
Future<bool> unblock(List<String> items) async { Future<bool> unblock(List<String> items) async {
assert(items.isNotEmpty, 'The list of items to unblock must be non-empty'); assert(items.isNotEmpty, 'The list of items to unblock must be non-empty');
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'unblock',
tag: 'unblock', xmlns: blockingXmlns,
xmlns: blockingXmlns, children: items
children: items .map(
.map( (item) => XMLNode(
(item) => XMLNode( tag: 'item',
tag: 'item', attributes: <String, String>{'jid': item},
attributes: <String, String>{'jid': item}, ),
), )
) .toList(),
.toList(), )
) ],
],
),
), ),
))!; );
return result.attributes['type'] == 'result'; return result.attributes['type'] == 'result';
} }
Future<List<String>> getBlocklist() async { Future<List<String>> getBlocklist() async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'get',
type: 'get', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'blocklist',
tag: 'blocklist', xmlns: blockingXmlns,
xmlns: blockingXmlns, )
) ],
],
),
), ),
))!; );
final blocklist = result.firstTag('blocklist', xmlns: blockingXmlns)!; final blocklist = result.firstTag('blocklist', xmlns: blockingXmlns)!;
return blocklist return blocklist

View File

@ -414,12 +414,7 @@ class StreamManagementManager extends XmppManagerBase {
_unackedStanzas.clear(); _unackedStanzas.clear();
for (final stanza in stanzas) { for (final stanza in stanzas) {
await getAttributes().sendStanza( await getAttributes().sendStanza(stanza, awaitable: false);
StanzaDetails(
stanza,
awaitable: false,
),
);
} }
} }

View File

@ -1,5 +1,6 @@
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:moxxmpp/src/connection.dart';
import 'package:moxxmpp/src/events.dart'; import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/jid.dart'; import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/managers/base.dart'; import 'package:moxxmpp/src/managers/base.dart';
@ -110,20 +111,20 @@ class CarbonsManager extends XmppManagerBase {
/// Returns true if carbons were enabled. False, if not. /// Returns true if carbons were enabled. False, if not.
Future<bool> enableCarbons() async { Future<bool> enableCarbons() async {
final attrs = getAttributes(); final attrs = getAttributes();
final result = (await attrs.sendStanza( final result = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( to: attrs.getFullJID().toBare().toString(),
to: attrs.getFullJID().toBare().toString(), type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'enable',
tag: 'enable', xmlns: carbonsXmlns,
xmlns: carbonsXmlns, )
) ],
],
),
), ),
))!; addFrom: StanzaFromType.full,
addId: true,
);
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
logger.warning('Failed to enable message carbons'); logger.warning('Failed to enable message carbons');
@ -141,19 +142,19 @@ class CarbonsManager extends XmppManagerBase {
/// ///
/// Returns true if carbons were disabled. False, if not. /// Returns true if carbons were disabled. False, if not.
Future<bool> disableCarbons() async { Future<bool> disableCarbons() async {
final result = (await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( type: 'set',
type: 'set', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'disable',
tag: 'disable', xmlns: carbonsXmlns,
xmlns: carbonsXmlns, )
) ],
],
),
), ),
))!; addFrom: StanzaFromType.full,
addId: true,
);
if (result.attributes['type'] != 'result') { if (result.attributes['type'] != 'result') {
logger.warning('Failed to disable message carbons'); logger.warning('Failed to disable message carbons');

View File

@ -149,25 +149,23 @@ class HttpFileUploadManager extends XmppManagerBase {
} }
final attrs = getAttributes(); final attrs = getAttributes();
final response = (await attrs.sendStanza( final response = await attrs.sendStanza(
StanzaDetails( Stanza.iq(
Stanza.iq( to: _entityJid.toString(),
to: _entityJid.toString(), type: 'get',
type: 'get', children: [
children: [ XMLNode.xmlns(
XMLNode.xmlns( tag: 'request',
tag: 'request', xmlns: httpFileUploadXmlns,
xmlns: httpFileUploadXmlns, attributes: {
attributes: { 'filename': filename,
'filename': filename, 'size': filesize.toString(),
'size': filesize.toString(), ...contentType != null ? {'content-type': contentType} : {}
...contentType != null ? {'content-type': contentType} : {} },
}, )
) ],
],
),
), ),
))!; );
if (response.attributes['type']! != 'result') { if (response.attributes['type']! != 'result') {
logger.severe('Failed to request HTTP File Upload slot.'); logger.severe('Failed to request HTTP File Upload slot.');

View File

@ -262,26 +262,24 @@ abstract class BaseOmemoManager extends XmppManagerBase {
String toJid, String toJid,
) async { ) async {
await getAttributes().sendStanza( await getAttributes().sendStanza(
StanzaDetails( Stanza.message(
Stanza.message( to: toJid,
to: toJid, type: 'chat',
type: 'chat', children: [
children: [ _buildEncryptedElement(
_buildEncryptedElement( result,
result, toJid,
toJid, await _getDeviceId(),
await _getDeviceId(), ),
),
// Add a storage hint in case this is a message // Add a storage hint in case this is a message
// Taken from the example at // Taken from the example at
// https://xmpp.org/extensions/xep-0384.html#message-structure-description. // https://xmpp.org/extensions/xep-0384.html#message-structure-description.
MessageProcessingHint.store.toXml(), MessageProcessingHint.store.toXml(),
], ],
),
awaitable: false,
encrypted: true,
), ),
awaitable: false,
encrypted: true,
); );
} }

View File

@ -1,100 +1,58 @@
import 'package:moxxmpp/moxxmpp.dart';
import 'package:moxxmpp/src/util/queue.dart'; import 'package:moxxmpp/src/util/queue.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
void main() { void main() {
test('Test not sending', () async { test('Test the async queue', () async {
final queue = AsyncStanzaQueue( final queue = AsyncQueue();
(entry) async { var future1Finish = 0;
assert(false, 'No stanza should be sent'); var future2Finish = 0;
}, var future3Finish = 0;
() async => false,
);
await queue.enqueueStanza( await queue.addJob(
StanzaQueueEntry( () => Future<void>.delayed(
StanzaDetails( const Duration(seconds: 3),
Stanza.message(), () => future1Finish = DateTime.now().millisecondsSinceEpoch,
),
null,
), ),
); );
await queue.enqueueStanza( await queue.addJob(
StanzaQueueEntry( () => Future<void>.delayed(
StanzaDetails( const Duration(seconds: 3),
Stanza.message(), () => future2Finish = DateTime.now().millisecondsSinceEpoch,
), ),
null, );
await queue.addJob(
() => Future<void>.delayed(
const Duration(seconds: 3),
() => future3Finish = DateTime.now().millisecondsSinceEpoch,
), ),
); );
await Future<void>.delayed(const Duration(seconds: 1)); await Future<void>.delayed(const Duration(seconds: 12));
expect(queue.queue.length, 2);
expect(queue.isRunning, false);
});
test('Test sending', () async { // The three futures must be done
final queue = AsyncStanzaQueue( expect(future1Finish != 0, true);
(entry) async {}, expect(future2Finish != 0, true);
() async => true, expect(future3Finish != 0, true);
// The end times of the futures must be ordered (on a timeline)
// |-- future1Finish -- future2Finish -- future3Finish --|
expect(
future1Finish < future2Finish && future1Finish < future3Finish,
true,
);
expect(
future2Finish < future3Finish && future2Finish > future1Finish,
true,
);
expect(
future3Finish > future1Finish && future3Finish > future2Finish,
true,
); );
await queue.enqueueStanza( // The queue must be empty at the end
StanzaQueueEntry( expect(queue.queue.isEmpty, true);
StanzaDetails(
Stanza.message(),
),
null,
),
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await Future<void>.delayed(const Duration(seconds: 1)); // The queue must not be executing anything at the end
expect(queue.queue.length, 0);
expect(queue.isRunning, false);
});
test('Test partial sending and resuming', () async {
var canRun = true;
final queue = AsyncStanzaQueue(
(entry) async {
canRun = false;
},
() async => canRun,
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await queue.enqueueStanza(
StanzaQueueEntry(
StanzaDetails(
Stanza.message(),
),
null,
),
);
await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 1);
expect(queue.isRunning, false);
canRun = true;
await queue.restart();
await Future<void>.delayed(const Duration(seconds: 1));
expect(queue.queue.length, 0);
expect(queue.isRunning, false); expect(queue.isRunning, false);
}); });
} }

View File

@ -33,6 +33,7 @@ class TestingManagerHolder {
Future<XMLNode> _sendStanza( Future<XMLNode> _sendStanza(
stanza, { stanza, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true, bool addId = true,
bool awaitable = true, bool awaitable = true,
bool encrypted = false, bool encrypted = false,

View File

@ -98,7 +98,7 @@ List<ExpectationBase> buildAuthenticatedPlay(ConnectionSettings settings) {
ignoreId: true, ignoreId: true,
), ),
StanzaExpectation( StanzaExpectation(
"<presence xmlns='jabber:client'><show>chat</show></presence>", "<presence xmlns='jabber:client' from='${settings.jid.toBare()}/MU29eEZn'><show>chat</show></presence>",
'', '',
), ),
]; ];

View File

@ -58,11 +58,11 @@ void main() {
ignoreId: true, ignoreId: true,
), ),
StanzaExpectation( StanzaExpectation(
"<presence xmlns='jabber:client'><show>chat</show><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://moxxmpp.example' ver='3QvQ2RAy45XBDhArjxy/vEWMl+E=' /></presence>", "<presence xmlns='jabber:client' from='polynomdivision@test.server/MU29eEZn'><show>chat</show><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://moxxmpp.example' ver='3QvQ2RAy45XBDhArjxy/vEWMl+E=' /></presence>",
'', '',
), ),
StanzaExpectation( StanzaExpectation(
"<iq type='get' id='ec325efc-9924-4c48-93f8-ed34a2b0e5fc' to='romeo@montague.lit/orchard' xmlns='jabber:client'><query xmlns='http://jabber.org/protocol/disco#info' /></iq>", "<iq type='get' id='ec325efc-9924-4c48-93f8-ed34a2b0e5fc' to='romeo@montague.lit/orchard' from='polynomdivision@test.server/MU29eEZn' xmlns='jabber:client'><query xmlns='http://jabber.org/protocol/disco#info' /></iq>",
'', '',
ignoreId: true, ignoreId: true,
), ),

View File

@ -92,7 +92,7 @@ void main() {
[ [
StanzaExpectation( StanzaExpectation(
''' '''
<iq type="get" to="pubsub.server.example.org" id="a" xmlns="jabber:client"> <iq type="get" to="pubsub.server.example.org" id="a" from="testuser@example.org/MU29eEZn" xmlns="jabber:client">
<query xmlns="http://jabber.org/protocol/disco#info" /> <query xmlns="http://jabber.org/protocol/disco#info" />
</iq> </iq>
''', ''',
@ -110,7 +110,7 @@ void main() {
), ),
StanzaExpectation( StanzaExpectation(
''' '''
<iq type="get" to="pubsub.server.example.org" id="a" xmlns="jabber:client"> <iq type="get" to="pubsub.server.example.org" id="a" from="testuser@example.org/MU29eEZn" xmlns="jabber:client">
<query xmlns="http://jabber.org/protocol/disco#items" node="princely_musings" /> <query xmlns="http://jabber.org/protocol/disco#items" node="princely_musings" />
</iq> </iq>
''', ''',
@ -124,7 +124,7 @@ void main() {
), ),
StanzaExpectation( StanzaExpectation(
''' '''
<iq type="set" to="pubsub.server.example.org" id="a" xmlns="jabber:client"> <iq type="set" to="pubsub.server.example.org" id="a" from="testuser@example.org/MU29eEZn" xmlns="jabber:client">
<pubsub xmlns='http://jabber.org/protocol/pubsub'> <pubsub xmlns='http://jabber.org/protocol/pubsub'>
<publish node='princely_musings'> <publish node='princely_musings'>
<item id="current"> <item id="current">

View File

@ -44,8 +44,15 @@ Future<void> runOutgoingStanzaHandlers(
XmppManagerAttributes mkAttributes(void Function(Stanza) callback) { XmppManagerAttributes mkAttributes(void Function(Stanza) callback) {
return XmppManagerAttributes( return XmppManagerAttributes(
sendStanza: (StanzaDetails details) async { sendStanza: (
callback(details.stanza); stanza, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true,
bool awaitable = true,
bool encrypted = false,
bool forceEncryption = false,
}) async {
callback(stanza);
return Stanza.message(); return Stanza.message();
}, },
@ -283,8 +290,12 @@ void main() {
); );
final sm = StreamManagementManager(); final sm = StreamManagementManager();
await conn.registerManagers([ await conn.registerManagers([
PresenceManager(),
RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]),
sm, sm,
CarbonsManager()..forceEnable(), CarbonsManager()..forceEnable(),
EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
await conn.registerFeatureNegotiators([ await conn.registerFeatureNegotiators([
SaslPlainNegotiator(), SaslPlainNegotiator(),
@ -380,7 +391,7 @@ void main() {
'<enabled xmlns="urn:xmpp:sm:3" id="some-long-sm-id" resume="true" />', '<enabled xmlns="urn:xmpp:sm:3" id="some-long-sm-id" resume="true" />',
), ),
StanzaExpectation( StanzaExpectation(
"<presence xmlns='jabber:client'><show>chat</show></presence>", "<presence xmlns='jabber:client' from='polynomdivision@test.server/MU29eEZn'><show>chat</show></presence>",
'<iq type="result" />', '<iq type="result" />',
), ),
StringExpectation( StringExpectation(
@ -670,7 +681,7 @@ void main() {
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />", "<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
), ),
StanzaExpectation( StanzaExpectation(
"<iq to='localhost' type='get' xmlns='jabber:client' />", "<iq to='localhost' type='get' from='polynomdivision@test.server/abc123' xmlns='jabber:client' />",
'', '',
ignoreId: true, ignoreId: true,
), ),
@ -723,7 +734,7 @@ void main() {
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />", "<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
), ),
StanzaExpectation( StanzaExpectation(
"<iq to='localhost' type='get' xmlns='jabber:client' />", "<iq to='localhost' type='get' from='polynomdivision@test.server/abc123' xmlns='jabber:client' />",
'', '',
ignoreId: true, ignoreId: true,
), ),
@ -765,11 +776,7 @@ void main() {
// Send a bogus stanza // Send a bogus stanza
unawaited( unawaited(
conn.sendStanza( conn.sendStanza(Stanza.iq(to: 'localhost', type: 'get')),
StanzaDetails(
Stanza.iq(to: 'localhost', type: 'get'),
),
),
); );
await Future<void>.delayed(const Duration(seconds: 5)); await Future<void>.delayed(const Duration(seconds: 5));

View File

@ -9,9 +9,17 @@ void main() {
test("Test if we're vulnerable against CVE-2020-26547 style vulnerabilities", test("Test if we're vulnerable against CVE-2020-26547 style vulnerabilities",
() async { () async {
final attributes = XmppManagerAttributes( final attributes = XmppManagerAttributes(
sendStanza: (StanzaDetails details) async { sendStanza: (
stanza, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true,
bool retransmitted = false,
bool awaitable = true,
bool encrypted = false,
bool forceEncryption = false,
}) async {
// ignore: avoid_print // ignore: avoid_print
print('==> ${details.stanza.toXml()}'); print('==> ${stanza.toXml()}');
return XMLNode(tag: 'iq', attributes: {'type': 'result'}); return XMLNode(tag: 'iq', attributes: {'type': 'result'});
}, },
sendNonza: (nonza) {}, sendNonza: (nonza) {},

View File

@ -39,6 +39,7 @@ void main() {
XmppManagerAttributes( XmppManagerAttributes(
sendStanza: ( sendStanza: (
_, { _, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true, bool addId = true,
bool retransmitted = false, bool retransmitted = false,
bool awaitable = true, bool awaitable = true,
@ -77,6 +78,7 @@ void main() {
XmppManagerAttributes( XmppManagerAttributes(
sendStanza: ( sendStanza: (
_, { _, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true, bool addId = true,
bool retransmitted = false, bool retransmitted = false,
bool awaitable = true, bool awaitable = true,

View File

@ -4,32 +4,6 @@ import 'package:test/test.dart';
import 'helpers/logging.dart'; import 'helpers/logging.dart';
import 'helpers/xmpp.dart'; import 'helpers/xmpp.dart';
class StubConnectivityManager extends ConnectivityManager {
bool _hasConnection = true;
Completer<void> _goingOnlineCompleter = Completer<void>();
@override
Future<bool> hasConnection() async => _hasConnection;
@override
Future<void> waitForConnection() async {
if (!_hasConnection) {
await _goingOnlineCompleter.future;
}
}
void goOffline() {
_hasConnection = false;
}
void goOnline() {
_hasConnection = true;
_goingOnlineCompleter.complete();
_goingOnlineCompleter = Completer<void>();
}
}
/// Returns true if the roster manager triggeres an event for a given stanza /// Returns true if the roster manager triggeres an event for a given stanza
Future<bool> testRosterManager( Future<bool> testRosterManager(
String bareJid, String bareJid,
@ -42,6 +16,7 @@ Future<bool> testRosterManager(
XmppManagerAttributes( XmppManagerAttributes(
sendStanza: ( sendStanza: (
_, { _, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true, bool addId = true,
bool retransmitted = false, bool retransmitted = false,
bool awaitable = true, bool awaitable = true,
@ -156,7 +131,11 @@ void main() {
password: 'aaaa', password: 'aaaa',
); );
await conn.registerManagers([ await conn.registerManagers([
PresenceManager(),
RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]),
StreamManagementManager(), StreamManagementManager(),
EntityCapabilitiesManager('http://moxxmpp.example'),
]); ]);
await conn.registerFeatureNegotiators([ await conn.registerFeatureNegotiators([
SaslPlainNegotiator(), SaslPlainNegotiator(),
@ -292,6 +271,7 @@ void main() {
XmppManagerAttributes( XmppManagerAttributes(
sendStanza: ( sendStanza: (
_, { _, {
StanzaFromType addFrom = StanzaFromType.full,
bool addId = true, bool addId = true,
bool retransmitted = false, bool retransmitted = false,
bool awaitable = true, bool awaitable = true,
@ -645,152 +625,4 @@ void main() {
true, true,
); );
}); });
test('Test sending stanzas while offline', () async {
final fakeSocket = StubTCPSocket(
[
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>polynomdivision@test.server/MU29eEZn</jid></bind></iq>',
ignoreId: true,
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>polynomdivision@test.server/MU29eEZn</jid></bind></iq>',
ignoreId: true,
),
StanzaExpectation(
'<iq xmlns="jabber:client" type="get" id="abc123"></iq>',
'<iq xmlns="jabber:client" type="result" id="abc123"></iq>',
ignoreId: true,
),
],
);
final connectivity = StubConnectivityManager();
final conn = XmppConnection(
TestingReconnectionPolicy(),
connectivity,
ClientToServerNegotiator(),
fakeSocket,
)..connectionSettings = ConnectionSettings(
jid: JID.fromString('polynomdivision@test.server'),
password: 'aaaa',
);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
SaslScramNegotiator(10, '', '', ScramHashType.sha512),
ResourceBindingNegotiator(),
]);
await conn.connect(
waitUntilLogin: true,
);
expect(fakeSocket.getState(), 4);
// Fake going offline
connectivity.goOffline();
await conn.handleSocketEvent(
XmppSocketClosureEvent(false),
);
// Send a stanza while offline
final stanzaFuture = conn.sendStanza(
StanzaDetails(
Stanza.iq(
id: 'abc123',
type: 'get',
),
),
);
// Come online again
connectivity.goOnline();
await conn.connect(
waitUntilLogin: true,
);
await Future<void>.delayed(const Duration(seconds: 6));
expect(fakeSocket.getState(), 9);
expect(await stanzaFuture != null, true);
});
} }