feat(core): Allow implementing different negotiation strategies

Fixes #27.
This commit is contained in:
PapaTutuWawa 2023-04-03 16:05:20 +02:00
parent 275d6e0346
commit 03328bdf7a
19 changed files with 442 additions and 287 deletions

View File

@ -61,6 +61,7 @@ class _MyHomePageState extends State<MyHomePage> {
final XmppConnection connection = XmppConnection(
RandomBackoffReconnectionPolicy(1, 60),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
// The below causes the app to crash.
//ExampleTcpSocketWrapper(),
// In a production app, the below should be false.

View File

@ -15,6 +15,7 @@ export 'package:moxxmpp/src/managers/namespaces.dart';
export 'package:moxxmpp/src/managers/priorities.dart';
export 'package:moxxmpp/src/message.dart';
export 'package:moxxmpp/src/namespaces.dart';
export 'package:moxxmpp/src/negotiators/handler.dart';
export 'package:moxxmpp/src/negotiators/manager.dart';
export 'package:moxxmpp/src/negotiators/namespaces.dart';
export 'package:moxxmpp/src/negotiators/negotiator.dart';

View File

@ -16,6 +16,7 @@ import 'package:moxxmpp/src/managers/data.dart';
import 'package:moxxmpp/src/managers/handlers.dart';
import 'package:moxxmpp/src/managers/namespaces.dart';
import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/handler.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/presence.dart';
@ -84,6 +85,7 @@ class XmppConnection {
XmppConnection(
ReconnectionPolicy reconnectionPolicy,
ConnectivityManager connectivityManager,
this._negotiationsHandler,
this._socket, {
this.connectingTimeout = const Duration(minutes: 2),
}) : _reconnectionPolicy = reconnectionPolicy,
@ -93,6 +95,14 @@ class XmppConnection {
_attemptReconnection,
);
// Register the negotiations handler
_negotiationsHandler.register(
_onNegotiationsDone,
handleError,
_sendStreamHeaders,
() => _isAuthenticated,
);
_socketStream = _socket.getDataStream();
// TODO(Unknown): Handle on done
_socketStream.transform(_streamBuffer).forEach(handleXmlStream);
@ -161,10 +171,10 @@ class XmppConnection {
// ignore: use_late_for_private_fields_and_variables
Completer<Result<bool, XmppError>>? _connectionCompleter;
/// Negotiators
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {};
XmppFeatureNegotiatorBase? _currentNegotiator;
final List<XMLNode> _streamFeatures = List.empty(growable: true);
/// The handler for dealing with stream feature negotiations.
final NegotiationsHandler _negotiationsHandler;
T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) =>
_negotiationsHandler.getNegotiatorById<T>(id);
/// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator
/// is still running.
@ -178,11 +188,6 @@ class XmppConnection {
bool get isAuthenticated => _isAuthenticated;
/// Return the registered feature negotiator that has id [id]. Returns null if
/// none can be found.
T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) =>
_featureNegotiators[id] as T?;
/// Registers a list of [XmppManagerBase] sub-classes as managers on this connection.
Future<void> registerManagers(List<XmppManagerBase> managers) async {
for (final manager in managers) {
@ -197,7 +202,7 @@ class XmppConnection {
getFullJID: () => _connectionSettings.jid.withResource(_resource),
getSocket: () => _socket,
getConnection: () => this,
getNegotiatorById: getNegotiatorById,
getNegotiatorById: _negotiationsHandler.getNegotiatorById,
),
);
@ -231,13 +236,6 @@ class XmppConnection {
_isAuthenticated = true;
}
/// Remove [feature] from the stream features we are currently negotiating.
void _removeNegotiatingFeature(String feature) {
_streamFeatures.removeWhere((node) {
return node.attributes['xmlns'] == feature;
});
}
/// Register a list of negotiator with the connection.
Future<void> registerFeatureNegotiators(
List<XmppFeatureNegotiatorBase> negotiators,
@ -250,34 +248,21 @@ class XmppConnection {
() => this,
() => _connectionSettings,
_sendEvent,
getNegotiatorById,
_negotiationsHandler.getNegotiatorById,
getManagerById,
() => _connectionSettings.jid.withResource(_resource),
() => _socket,
() => _isAuthenticated,
_setAuthenticated,
setResource,
_removeNegotiatingFeature,
_negotiationsHandler.removeNegotiatingFeature,
),
);
_featureNegotiators[negotiator.id] = negotiator;
_negotiationsHandler.registerNegotiator(negotiator);
}
_log.finest('Negotiators registered');
for (final negotiator in _featureNegotiators.values) {
await negotiator.postRegisterCallback();
}
}
/// Reset all registered negotiators.
void _resetNegotiators() {
for (final negotiator in _featureNegotiators.values) {
negotiator.reset();
}
// Prevent leaking the last active negotiator
_currentNegotiator = null;
await _negotiationsHandler.runPostRegisterCallback();
}
/// Generate an Id suitable for an origin-id or stanza id
@ -358,7 +343,7 @@ class XmppConnection {
/// Called when a stream ending error has occurred
Future<void> handleError(XmppError error) async {
_log.severe('handleError called with ${error.toString()}');
// Whenever we encounter an error that would trigger a reconnection attempt while
// the connection result is being awaited, don't attempt a reconnection but instead
// try to gracefully disconnect.
@ -594,6 +579,29 @@ class XmppConnection {
}
}
/// Called once all negotiations are done. Sends the initial presence, performs
/// a disco sweep among other things.
Future<void> _onNegotiationsDone() async {
// Set the new routing state
_updateRoutingState(RoutingState.handleStanzas);
// Set the connection state
await _setConnectionState(XmppConnectionState.connected);
// Enable reconnections
if (_enableReconnectOnSuccess) {
await _reconnectionPolicy.setShouldReconnect(true);
}
// Resolve the connection completion future
_connectionCompleter?.complete(const Result(true));
_connectionCompleter = null;
// Tell consumers of the event stream that we're done with stream feature
// negotiations
await _sendEvent(StreamNegotiationsDoneEvent());
}
/// Sets the connection state to [state] and triggers an event of type
/// [ConnectionStateChangedEvent].
Future<void> _setConnectionState(XmppConnectionState state) async {
@ -619,7 +627,8 @@ class XmppConnection {
_destroyConnectingTimer();
}
final sm = getNegotiatorById<StreamManagementNegotiator>(
final sm =
_negotiationsHandler.getNegotiatorById<StreamManagementNegotiator>(
streamManagementNegotiator,
);
await _sendEvent(
@ -766,167 +775,6 @@ class XmppConnection {
}
}
/// Returns true if all mandatory features in [features] have been negotiated.
/// Otherwise returns false.
bool _isMandatoryNegotiationDone(List<XMLNode> features) {
return features.every((XMLNode feature) {
return feature.firstTag('required') == null &&
feature.tag != 'mechanisms';
});
}
/// Returns true if we can still negotiate. Returns false if no negotiator is
/// matching and ready.
bool _isNegotiationPossible(List<XMLNode> features) {
return getNextNegotiator(features, log: false) != null;
}
/// Returns the next negotiator that matches [features]. Returns null if none can be
/// picked. If [log] is true, then the list of matching negotiators will be logged.
@visibleForTesting
XmppFeatureNegotiatorBase? getNextNegotiator(
List<XMLNode> features, {
bool log = true,
}) {
final matchingNegotiators = _featureNegotiators.values
.where((XmppFeatureNegotiatorBase negotiator) {
return negotiator.state == NegotiatorState.ready &&
negotiator.matchesFeature(features);
}).toList()
..sort((a, b) => b.priority.compareTo(a.priority));
if (log) {
_log.finest(
'List of matching negotiators: ${matchingNegotiators.map((a) => a.id)}',
);
}
if (matchingNegotiators.isEmpty) return null;
return matchingNegotiators.first;
}
/// Called once all negotiations are done. Sends the initial presence, performs
/// a disco sweep among other things.
Future<void> _onNegotiationsDone() async {
// Set the connection state
await _setConnectionState(XmppConnectionState.connected);
// Enable reconnections
if (_enableReconnectOnSuccess) {
await _reconnectionPolicy.setShouldReconnect(true);
}
// Resolve the connection completion future
_connectionCompleter?.complete(const Result(true));
_connectionCompleter = null;
// Tell consumers of the event stream that we're done with stream feature
// negotiations
await _sendEvent(StreamNegotiationsDoneEvent());
}
Future<void> _executeCurrentNegotiator(XMLNode nonza) async {
// If we don't have a negotiator, get one
_currentNegotiator ??= getNextNegotiator(_streamFeatures);
if (_currentNegotiator == null &&
_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas);
await _onNegotiationsDone();
return;
}
// If we don't have a next negotiator, we have to bail
if (_currentNegotiator == null &&
!_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
// We failed before authenticating
if (!_isAuthenticated) {
_log.severe('No negotiator could be picked while unauthenticated');
await handleError(NoMatchingAuthenticationMechanismAvailableError());
return;
} else {
_log.severe(
'No negotiator could be picked while negotiations are not done',
);
await handleError(NoAuthenticatorAvailableError());
return;
}
}
final result = await _currentNegotiator!.negotiate(nonza);
if (result.isType<NegotiatorError>()) {
_log.severe('Negotiator returned an error');
await handleError(result.get<NegotiatorError>());
return;
}
final state = result.get<NegotiatorState>();
_currentNegotiator!.state = state;
switch (state) {
case NegotiatorState.ready:
return;
case NegotiatorState.done:
if (_currentNegotiator!.sendStreamHeaderWhenDone) {
_currentNegotiator = null;
_streamFeatures.clear();
_sendStreamHeader();
} else {
_removeNegotiatingFeature(_currentNegotiator!.negotiatingXmlns);
_currentNegotiator = null;
if (_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas);
await _onNegotiationsDone();
} else {
_currentNegotiator = getNextNegotiator(_streamFeatures);
_log.finest('Chose ${_currentNegotiator!.id} as next negotiator');
final fakeStanza = XMLNode(
tag: 'stream:features',
children: _streamFeatures,
);
await _executeCurrentNegotiator(fakeStanza);
}
}
break;
case NegotiatorState.retryLater:
_log.finest('Negotiator wants to continue later. Picking new one...');
_currentNegotiator!.state = NegotiatorState.ready;
if (_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas);
await _onNegotiationsDone();
} else {
_log.finest('Picking new negotiator...');
_currentNegotiator = getNextNegotiator(_streamFeatures);
_log.finest('Chose $_currentNegotiator as next negotiator');
final fakeStanza = XMLNode(
tag: 'stream:features',
children: _streamFeatures,
);
await _executeCurrentNegotiator(fakeStanza);
}
break;
case NegotiatorState.skipRest:
_log.finest(
'Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!',
);
_updateRoutingState(RoutingState.handleStanzas);
await _onNegotiationsDone();
break;
}
}
/// Called whenever we receive data that has been parsed as XML.
Future<void> handleXmlStream(XMLNode node) async {
// Check if we received a stream error
@ -954,14 +802,7 @@ class XmppConnection {
return;
}
if (node.tag == 'stream:features') {
// Store the received stream features
_streamFeatures
..clear()
..addAll(node.children);
}
await _executeCurrentNegotiator(node);
await _negotiationsHandler.negotiate(node);
});
break;
case RoutingState.handleStanzas:
@ -986,15 +827,13 @@ class XmppConnection {
for (final manager in _xmppManagers.values) {
await manager.onXmppEvent(event);
}
for (final negotiator in _featureNegotiators.values) {
await negotiator.onXmppEvent(event);
}
await _negotiationsHandler.sendEventToNegotiators(event);
_eventStreamController.add(event);
}
/// Sends a stream header to the socket.
void _sendStreamHeader() {
void _sendStreamHeaders() {
_socket.write(
XMLNode(
tag: 'xml',
@ -1114,11 +953,11 @@ class XmppConnection {
} else {
await _reconnectionPolicy.onSuccess();
_log.fine('Preparing the internal state for a connection attempt');
_resetNegotiators();
_negotiationsHandler.resetNegotiators();
await _setConnectionState(XmppConnectionState.connecting);
_updateRoutingState(RoutingState.negotiating);
_isAuthenticated = false;
_sendStreamHeader();
_sendStreamHeaders();
if (waitUntilLogin) {
return _connectionCompleter!.future;

View File

@ -0,0 +1,272 @@
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:moxxmpp/src/connection_errors.dart';
import 'package:moxxmpp/src/errors.dart';
import 'package:moxxmpp/src/events.dart';
import 'package:moxxmpp/src/negotiators/negotiator.dart';
import 'package:moxxmpp/src/stringxml.dart';
/// A callback for when the [NegotiationsHandler] is done.
typedef NegotiationsDoneCallback = Future<void> Function();
/// A callback for the case that an error occurs while negotiating.
typedef ErrorCallback = Future<void> Function(XmppError);
/// Trigger stream headers to be sent
typedef SendStreamHeadersFunction = void Function();
/// Return true if the current connection is authenticated. If not, return false.
typedef IsAuthenticatedFunction = bool Function();
abstract class NegotiationsHandler {
@protected
late final Logger log;
/// Map of all negotiators registered against the handler.
@protected
final Map<String, XmppFeatureNegotiatorBase> negotiators = {};
/// Function that is called once the negotiator is done with its stream negotiations.
@protected
late final NegotiationsDoneCallback onNegotiationsDone;
/// XmppConnection's handleError method.
@protected
late final ErrorCallback handleError;
/// Sends stream headers in the stream.
@protected
late final SendStreamHeadersFunction sendStreamHeaders;
/// Returns true if the connection is authenticated. If not, returns false.
@protected
late final IsAuthenticatedFunction isAuthenticated;
/// Returns, if registered, a negotiator with id [id].
T? getNegotiatorById<T extends XmppFeatureNegotiatorBase>(String id) =>
negotiators[id] as T?;
/// Register the parameters as the corresponding methods in this class. Also
/// initializes the logger.
void register(
NegotiationsDoneCallback onNegotiationsDone,
ErrorCallback handleError,
SendStreamHeadersFunction sendStreamHeaders,
IsAuthenticatedFunction isAuthenticated,
) {
this.onNegotiationsDone = onNegotiationsDone;
this.handleError = handleError;
this.sendStreamHeaders = sendStreamHeaders;
this.isAuthenticated = isAuthenticated;
log = Logger(toString());
}
/// Registers the negotiator [negotiator] against this negotiations handler.
void registerNegotiator(XmppFeatureNegotiatorBase negotiator);
/// Runs the post-register callback of all negotiators.
Future<void> runPostRegisterCallback() async {
for (final negotiator in negotiators.values) {
await negotiator.postRegisterCallback();
}
}
Future<void> sendEventToNegotiators(XmppEvent event) async {
for (final negotiator in negotiators.values) {
await negotiator.onXmppEvent(event);
}
}
/// Remove [feature] from the stream features we are currently negotiating.
void removeNegotiatingFeature(String feature) {}
/// Resets all registered negotiators.
@mustCallSuper
void resetNegotiators() {
for (final negotiator in negotiators.values) {
negotiator.reset();
}
}
/// Called whenever a new nonza [nonza] is received while negotiating.
Future<void> negotiate(XMLNode nonza);
}
class ClientToServerNegotiator extends NegotiationsHandler {
ClientToServerNegotiator() : super();
/// Cached list of stream features.
final List<XMLNode> _streamFeatures = List.empty(growable: true);
/// The currently active negotiator.
XmppFeatureNegotiatorBase? _currentNegotiator;
@override
void registerNegotiator(XmppFeatureNegotiatorBase negotiator) {
negotiators[negotiator.id] = negotiator;
}
@override
void resetNegotiators() {
super.resetNegotiators();
// Prevent leaking the last active negotiator
_currentNegotiator = null;
}
@override
void removeNegotiatingFeature(String feature) {
_streamFeatures.removeWhere((node) {
return node.attributes['xmlns'] == feature;
});
}
/// Returns true if all mandatory features in [features] have been negotiated.
/// Otherwise returns false.
bool _isMandatoryNegotiationDone(List<XMLNode> features) {
return features.every((XMLNode feature) {
return feature.firstTag('required') == null &&
feature.tag != 'mechanisms';
});
}
/// Returns true if we can still negotiate. Returns false if no negotiator is
/// matching and ready.
bool _isNegotiationPossible(List<XMLNode> features) {
return getNextNegotiator(features, log: false) != null;
}
/// Returns the next negotiator that matches [features]. Returns null if none can be
/// picked. If [log] is true, then the list of matching negotiators will be logged.
@visibleForTesting
XmppFeatureNegotiatorBase? getNextNegotiator(
List<XMLNode> features, {
bool log = true,
}) {
final matchingNegotiators =
negotiators.values.where((XmppFeatureNegotiatorBase negotiator) {
return negotiator.state == NegotiatorState.ready &&
negotiator.matchesFeature(features);
}).toList()
..sort((a, b) => b.priority.compareTo(a.priority));
if (log) {
this.log.finest(
'List of matching negotiators: ${matchingNegotiators.map((a) => a.id)}',
);
}
if (matchingNegotiators.isEmpty) return null;
return matchingNegotiators.first;
}
Future<void> _executeCurrentNegotiator(XMLNode nonza) async {
// If we don't have a negotiator, get one
_currentNegotiator ??= getNextNegotiator(_streamFeatures);
if (_currentNegotiator == null &&
_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
log.finest('Negotiations done!');
await onNegotiationsDone();
return;
}
// If we don't have a next negotiator, we have to bail
if (_currentNegotiator == null &&
!_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
// We failed before authenticating
if (!isAuthenticated()) {
log.severe('No negotiator could be picked while unauthenticated');
await handleError(NoMatchingAuthenticationMechanismAvailableError());
return;
} else {
log.severe(
'No negotiator could be picked while negotiations are not done',
);
await handleError(NoAuthenticatorAvailableError());
return;
}
}
final result = await _currentNegotiator!.negotiate(nonza);
if (result.isType<NegotiatorError>()) {
log.severe('Negotiator returned an error');
await handleError(result.get<NegotiatorError>());
return;
}
final state = result.get<NegotiatorState>();
_currentNegotiator!.state = state;
switch (state) {
case NegotiatorState.ready:
return;
case NegotiatorState.done:
if (_currentNegotiator!.sendStreamHeaderWhenDone) {
_currentNegotiator = null;
_streamFeatures.clear();
sendStreamHeaders();
} else {
removeNegotiatingFeature(_currentNegotiator!.negotiatingXmlns);
_currentNegotiator = null;
if (_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
log.finest('Negotiations done!');
await onNegotiationsDone();
} else {
_currentNegotiator = getNextNegotiator(_streamFeatures);
log.finest('Chose ${_currentNegotiator!.id} as next negotiator');
final fakeStanza = XMLNode(
tag: 'stream:features',
children: _streamFeatures,
);
await _executeCurrentNegotiator(fakeStanza);
}
}
break;
case NegotiatorState.retryLater:
log.finest('Negotiator wants to continue later. Picking new one...');
_currentNegotiator!.state = NegotiatorState.ready;
if (_isMandatoryNegotiationDone(_streamFeatures) &&
!_isNegotiationPossible(_streamFeatures)) {
log.finest('Negotiations done!');
await onNegotiationsDone();
} else {
log.finest('Picking new negotiator...');
_currentNegotiator = getNextNegotiator(_streamFeatures);
log.finest('Chose $_currentNegotiator as next negotiator');
final fakeStanza = XMLNode(
tag: 'stream:features',
children: _streamFeatures,
);
await _executeCurrentNegotiator(fakeStanza);
}
break;
case NegotiatorState.skipRest:
log.finest(
'Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!',
);
await onNegotiationsDone();
break;
}
}
@override
Future<void> negotiate(XMLNode nonza) async {
if (nonza.tag == 'stream:features') {
// Store the received stream features
_streamFeatures
..clear()
..addAll(nonza.children);
}
await _executeCurrentNegotiator(nonza);
}
}

View File

@ -17,7 +17,7 @@ abstract class ReconnectionPolicy {
PerformReconnectFunction? performReconnect;
final Lock _lock = Lock();
/// Indicate if a reconnection attempt is currently running.
bool _isReconnecting = false;
@ -25,17 +25,19 @@ abstract class ReconnectionPolicy {
bool _shouldAttemptReconnection = false;
@protected
Future<bool> canTryReconnecting() async => _lock.synchronized(() => !_isReconnecting);
Future<bool> canTryReconnecting() async =>
_lock.synchronized(() => !_isReconnecting);
@protected
Future<bool> getIsReconnecting() async => _lock.synchronized(() => _isReconnecting);
Future<bool> getIsReconnecting() async =>
_lock.synchronized(() => _isReconnecting);
Future<void> _resetIsReconnecting() async {
await _lock.synchronized(() {
_isReconnecting = false;
});
}
/// Called by XmppConnection to register the policy.
void register(
PerformReconnectFunction performReconnect,
@ -62,10 +64,10 @@ abstract class ReconnectionPolicy {
return false;
});
}
/// Called by the XmppConnection when the reconnection failed.
Future<void> onFailure() async {}
/// Caled by the XmppConnection when the reconnection was successful.
Future<void> onSuccess();
@ -75,8 +77,7 @@ abstract class ReconnectionPolicy {
/// Set whether a reconnection attempt should be made.
Future<void> setShouldReconnect(bool value) async {
return _lock
.synchronized(() => _shouldAttemptReconnection = value);
return _lock.synchronized(() => _shouldAttemptReconnection = value);
}
}
@ -106,7 +107,7 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
final Logger _log = Logger('RandomBackoffReconnectionPolicy');
final Lock _timerLock = Lock();
/// Called when the backoff expired
@visibleForTesting
Future<void> onTimerElapsed() async {
@ -118,7 +119,7 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
if (!(await getShouldReconnect())) {
_log.fine(
'Should not reconnect. Stopping here.',
'Should not reconnect. Stopping here.',
);
return;
}
@ -147,7 +148,7 @@ class RandomBackoffReconnectionPolicy extends ReconnectionPolicy {
_timer = Timer(Duration(seconds: seconds), onTimerElapsed);
}
@override
Future<void> onSuccess() async {
await reset();

View File

@ -4,6 +4,7 @@ import 'package:moxxmpp/src/connectivity.dart';
import 'package:moxxmpp/src/jid.dart';
import 'package:moxxmpp/src/managers/attributes.dart';
import 'package:moxxmpp/src/managers/base.dart';
import 'package:moxxmpp/src/negotiators/handler.dart';
import 'package:moxxmpp/src/reconnect.dart';
import 'package:moxxmpp/src/settings.dart';
import 'package:moxxmpp/src/socket.dart';
@ -50,6 +51,7 @@ class TestingManagerHolder {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
_socket,
),
getConnectionSettings: () => settings,

View File

@ -143,7 +143,7 @@ class StubTCPSocket extends BaseSocketWrapper {
void injectSocketFault() {
_eventStream.add(XmppSocketClosureEvent(false));
}
/// Let the "connection" receive [data].
void injectRawXml(String data) {
// ignore: avoid_print

View File

@ -1,7 +1,7 @@
import 'package:moxxmpp/moxxmpp.dart';
import 'package:test/test.dart';
//import 'package:test/test.dart';
import 'helpers/logging.dart';
import 'helpers/xmpp.dart';
//import 'helpers/xmpp.dart';
const exampleXmlns1 = 'im:moxxmpp:example1';
const exampleNamespace1 = 'im.moxxmpp.test.example1';
@ -43,65 +43,67 @@ class StubNegotiator2 extends XmppFeatureNegotiatorBase {
void main() {
initLogger();
final stubSocket = StubTCPSocket(
[
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='user@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<example1 xmlns="im:moxxmpp:example1" />
<example2 xmlns="im:moxxmpp:example2" />
</stream:features>''',
),
],
);
// TODO(Unknown): Directly test the ClientToServerNegotiator.
// final stubSocket = StubTCPSocket(
// [
// StringExpectation(
// "<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='user@test.server' xml:lang='en'>",
// '''
// <stream:stream
// xmlns="jabber:client"
// version="1.0"
// xmlns:stream="http://etherx.jabber.org/streams"
// from="test.server"
// xml:lang="en">
// <stream:features xmlns="http://etherx.jabber.org/streams">
// <example1 xmlns="im:moxxmpp:example1" />
// <example2 xmlns="im:moxxmpp:example2" />
// </stream:features>''',
// ),
// ],
// );
final connection = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
stubSocket,
)
..registerFeatureNegotiators([
StubNegotiator1(),
StubNegotiator2(),
])
..registerManagers([
PresenceManager(),
RosterManager(TestingRosterStateManager('', [])),
DiscoManager([]),
EntityCapabilitiesManager('http://moxxmpp.example'),
])
..setConnectionSettings(
ConnectionSettings(
jid: JID.fromString('user@test.server'),
password: 'abc123',
useDirectTLS: true,
),
);
final features = [
XMLNode.xmlns(tag: 'example1', xmlns: exampleXmlns1),
XMLNode.xmlns(tag: 'example2', xmlns: exampleXmlns2),
];
// final connection = XmppConnection(
// TestingReconnectionPolicy(),
// AlwaysConnectedConnectivityManager(),
// ClientToServerNegotiator(),
// stubSocket,
// )
// ..registerFeatureNegotiators([
// StubNegotiator1(),
// StubNegotiator2(),
// ])
// ..registerManagers([
// PresenceManager(),
// RosterManager(TestingRosterStateManager('', [])),
// DiscoManager([]),
// EntityCapabilitiesManager('http://moxxmpp.example'),
// ])
// ..setConnectionSettings(
// ConnectionSettings(
// jid: JID.fromString('user@test.server'),
// password: 'abc123',
// useDirectTLS: true,
// ),
// );
// final features = [
// XMLNode.xmlns(tag: 'example1', xmlns: exampleXmlns1),
// XMLNode.xmlns(tag: 'example2', xmlns: exampleXmlns2),
// ];
test('Test the priority system', () {
expect(connection.getNextNegotiator(features)?.id, exampleNamespace2);
});
// test('Test the priority system', () {
// expect(connection.getNextNegotiator(features)?.id, exampleNamespace2);
// });
test('Test negotiating features with no stream restarts', () async {
await connection.connect();
await Future.delayed(const Duration(seconds: 3), () {
final negotiator1 =
connection.getNegotiatorById<StubNegotiator1>(exampleNamespace1);
final negotiator2 =
connection.getNegotiatorById<StubNegotiator2>(exampleNamespace2);
expect(negotiator1?.called, true);
expect(negotiator2?.called, true);
});
});
// test('Test negotiating features with no stream restarts', () async {
// await connection.connect();
// await Future.delayed(const Duration(seconds: 3), () {
// final negotiator1 =
// connection.getNegotiatorById<StubNegotiator1>(exampleNamespace1);
// final negotiator2 =
// connection.getNegotiatorById<StubNegotiator2>(exampleNamespace2);
// expect(negotiator1?.called, true);
// expect(negotiator2?.called, true);
// });
// });
}

View File

@ -11,7 +11,7 @@ void main() {
9999,
);
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
@ -32,7 +32,7 @@ void main() {
9999,
)..register(() async => expect(true, false));
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),
@ -63,7 +63,7 @@ void main() {
counter++;
});
await policy.setShouldReconnect(true);
// We have a failure
expect(
await policy.canTriggerFailure(),

View File

@ -49,6 +49,7 @@ void main() {
() => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
),
() => ConnectionSettings(
@ -150,6 +151,7 @@ void main() {
() => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
() => ConnectionSettings(
@ -206,6 +208,7 @@ void main() {
() => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
() => ConnectionSettings(
@ -252,6 +255,7 @@ void main() {
() => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
() => ConnectionSettings(
@ -301,6 +305,7 @@ void main() {
() => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
() => ConnectionSettings(

View File

@ -70,6 +70,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(

View File

@ -159,6 +159,7 @@ void main() {
final connection = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
socket,
);

View File

@ -68,6 +68,7 @@ XmppManagerAttributes mkAttributes(void Function(Stanza) callback) {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
getNegotiatorById: getNegotiatorNullStub,
@ -280,6 +281,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -405,6 +407,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -565,6 +568,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -659,6 +663,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -750,6 +755,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -837,6 +843,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)
..setConnectionSettings(
@ -936,6 +943,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)
..setConnectionSettings(
@ -1039,6 +1047,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)
..setConnectionSettings(

View File

@ -35,6 +35,7 @@ void main() {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
getNegotiatorById: getNegotiatorNullStub,
@ -103,6 +104,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)
..setConnectionSettings(

View File

@ -63,6 +63,7 @@ void main() {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
),
@ -106,6 +107,7 @@ void main() {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
),
@ -167,6 +169,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(

View File

@ -44,6 +44,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -117,6 +118,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(

View File

@ -101,6 +101,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -181,6 +182,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -260,6 +262,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -347,6 +350,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -432,6 +436,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(

View File

@ -107,6 +107,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -222,6 +223,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(

View File

@ -40,6 +40,7 @@ Future<bool> testRosterManager(
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
),
@ -124,6 +125,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -181,6 +183,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -240,6 +243,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..setConnectionSettings(
ConnectionSettings(
@ -301,6 +305,7 @@ void main() {
getConnection: () => XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
StubTCPSocket([]),
),
),
@ -399,6 +404,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
);
await conn.registerManagers([
@ -487,7 +493,7 @@ void main() {
StanzaExpectation(
'<iq xmlns="jabber:client" type="set" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/></iq>',
'<iq xmlns="jabber:client" type="result" id="a"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><jid>testuser@example.org/MU29eEZn</jid></bind></iq>',
ignoreId: true,
ignoreId: true,
),
],
);
@ -495,6 +501,7 @@ void main() {
final conn = XmppConnection(
TestingReconnectionPolicy(),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
);
await conn.registerManagers([