Compare commits

..

No commits in common. "ed49212f5a527b2b0084faf97cb5c383221772e2" and "d7723615fe311a2fc8028946fd27c091848d788d" have entirely different histories.

6 changed files with 103 additions and 204 deletions

View File

@ -29,35 +29,22 @@ import 'package:moxxmpp/src/xeps/xep_0352.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
/// The states the XmppConnection can be in
enum XmppConnectionState { enum XmppConnectionState {
/// The XmppConnection instance is not connected to the server. This is either the
/// case before connecting or after disconnecting.
notConnected, notConnected,
/// We are currently trying to connect to the server.
connecting, connecting,
/// We are currently connected to the server.
connected, connected,
/// We have received an unrecoverable error and the server killed the connection
error error
} }
/// Metadata for [XmppConnection.sendStanza].
enum StanzaFromType { enum StanzaFromType {
/// Add the full JID to the stanza as the from attribute // Add the full JID to the stanza as the from attribute
full, full,
// Add the bare JID to the stanza as the from attribute
/// Add the bare JID to the stanza as the from attribute
bare, bare,
// Add no JID as the from attribute
/// Add no JID as the from attribute none
none,
} }
/// Nonza describing the XMPP stream header.
class StreamHeaderNonza extends XMLNode { class StreamHeaderNonza extends XMLNode {
StreamHeaderNonza(String serverDomain) : super( StreamHeaderNonza(String serverDomain) : super(
tag: 'stream:stream', tag: 'stream:stream',
@ -72,7 +59,6 @@ class StreamHeaderNonza extends XMLNode {
); );
} }
/// The result of an awaited connection.
class XmppConnectionResult { class XmppConnectionResult {
const XmppConnectionResult( const XmppConnectionResult(
this.success, this.success,
@ -81,24 +67,16 @@ class XmppConnectionResult {
} }
); );
/// True if the connection was successful. False if it failed for any reason.
final bool success; final bool success;
// If a connection attempt fails, i.e. success is false, then this indicates the // If a connection attempt fails, i.e. success is false, then this indicates the
// reason the connection failed. // reason the connection failed.
final XmppError? error; final XmppError? error;
} }
/// A surrogate key for awaiting stanzas.
@immutable @immutable
class _StanzaAwaitableData { class _StanzaAwaitableData {
const _StanzaAwaitableData(this.sentTo, this.id); const _StanzaAwaitableData(this.sentTo, this.id);
/// The JID the original stanza was sent to. We expect the result to come from the
/// same JID.
final String sentTo; final String sentTo;
/// The ID of the original stanza. We expect the result to have the same ID.
final String id; final String id;
@override @override
@ -112,8 +90,10 @@ class _StanzaAwaitableData {
} }
} }
/// This class is a connection to the server.
class XmppConnection { class XmppConnection {
/// [_socket] is for debugging purposes.
/// [connectionPingDuration] is the duration after which a ping will be sent to keep
/// the connection open. Defaults to 15 minutes.
XmppConnection( XmppConnection(
ReconnectionPolicy reconnectionPolicy, ReconnectionPolicy reconnectionPolicy,
this._socket, this._socket,
@ -121,7 +101,26 @@ class XmppConnection {
this.connectionPingDuration = const Duration(minutes: 3), this.connectionPingDuration = const Duration(minutes: 3),
this.connectingTimeout = const Duration(minutes: 2), this.connectingTimeout = const Duration(minutes: 2),
} }
) : _reconnectionPolicy = reconnectionPolicy { ) :
_connectionState = XmppConnectionState.notConnected,
_routingState = RoutingState.preConnection,
_eventStreamController = StreamController.broadcast(),
_resource = '',
_streamBuffer = XmlStreamBuffer(),
_uuid = const Uuid(),
_awaitingResponse = {},
_awaitingResponseLock = Lock(),
_xmppManagers = {},
_incomingStanzaHandlers = List.empty(growable: true),
_incomingPreStanzaHandlers = List.empty(growable: true),
_outgoingPreStanzaHandlers = List.empty(growable: true),
_outgoingPostStanzaHandlers = List.empty(growable: true),
_reconnectionPolicy = reconnectionPolicy,
_featureNegotiators = {},
_streamFeatures = List.empty(growable: true),
_negotiationLock = Lock(),
_isAuthenticated = false,
_log = Logger('XmppConnection') {
// Allow the reconnection policy to perform reconnections by itself // Allow the reconnection policy to perform reconnections by itself
_reconnectionPolicy.register( _reconnectionPolicy.register(
_attemptReconnection, _attemptReconnection,
@ -135,103 +134,69 @@ class XmppConnection {
} }
/// The state that the connection is currently in /// Connection properties
XmppConnectionState _connectionState = XmppConnectionState.notConnected; ///
/// The state that the connection currently is in
XmppConnectionState _connectionState;
/// The socket that we are using for the connection and its data stream /// The socket that we are using for the connection and its data stream
final BaseSocketWrapper _socket; final BaseSocketWrapper _socket;
/// The data stream of the socket
late final Stream<String> _socketStream; late final Stream<String> _socketStream;
/// Account settings
/// Connection settings
late ConnectionSettings _connectionSettings; late ConnectionSettings _connectionSettings;
/// A policy on how to reconnect /// A policy on how to reconnect
final ReconnectionPolicy _reconnectionPolicy; final ReconnectionPolicy _reconnectionPolicy;
/// A list of stanzas we are tracking with its corresponding critical section
final Map<_StanzaAwaitableData, Completer<XMLNode>> _awaitingResponse;
final Lock _awaitingResponseLock;
/// A list of stanzas we are tracking with its corresponding critical section lock /// Helpers
final Map<_StanzaAwaitableData, Completer<XMLNode>> _awaitingResponse = {}; ///
final Lock _awaitingResponseLock = Lock(); final List<StanzaHandler> _incomingStanzaHandlers;
final List<StanzaHandler> _incomingPreStanzaHandlers;
/// Sorted list of handlers that we call or incoming and outgoing stanzas final List<StanzaHandler> _outgoingPreStanzaHandlers;
final List<StanzaHandler> _incomingStanzaHandlers = List.empty(growable: true); final List<StanzaHandler> _outgoingPostStanzaHandlers;
final List<StanzaHandler> _incomingPreStanzaHandlers = List.empty(growable: true); final StreamController<XmppEvent> _eventStreamController;
final List<StanzaHandler> _outgoingPreStanzaHandlers = List.empty(growable: true); final Map<String, XmppManagerBase> _xmppManagers;
final List<StanzaHandler> _outgoingPostStanzaHandlers = List.empty(growable: true);
final StreamController<XmppEvent> _eventStreamController = StreamController.broadcast();
final Map<String, XmppManagerBase> _xmppManagers = {};
/// Stream properties
///
/// Disco info we got after binding a resource (xmlns) /// Disco info we got after binding a resource (xmlns)
final List<String> _serverFeatures = List.empty(growable: true); final List<String> _serverFeatures = List.empty(growable: true);
/// The buffer object to keep split up stanzas together /// The buffer object to keep split up stanzas together
final XmlStreamBuffer _streamBuffer = XmlStreamBuffer(); final XmlStreamBuffer _streamBuffer;
/// UUID object to generate stanza and origin IDs /// UUID object to generate stanza and origin IDs
final Uuid _uuid = const Uuid(); final Uuid _uuid;
/// The time between sending a ping to keep the connection open /// The time between sending a ping to keep the connection open
// TODO(Unknown): Only start the timer if we did not send a stanza after n seconds // TODO(Unknown): Only start the timer if we did not send a stanza after n seconds
final Duration connectionPingDuration; final Duration connectionPingDuration;
/// The time that we may spent in the "connecting" state /// The time that we may spent in the "connecting" state
final Duration connectingTimeout; final Duration connectingTimeout;
/// The current state of the connection handling state machine. /// The current state of the connection handling state machine.
RoutingState _routingState = RoutingState.preConnection; RoutingState _routingState;
/// The currently bound resource or '' if none has been bound yet. /// The currently bound resource or '' if none has been bound yet.
String _resource = ''; String _resource;
/// True if we are authenticated. False if not. /// True if we are authenticated. False if not.
bool _isAuthenticated = false; bool _isAuthenticated;
/// Timer for the keep-alive ping. /// Timer for the keep-alive ping.
Timer? _connectionPingTimer; Timer? _connectionPingTimer;
/// Timer for the connecting timeout /// Timer for the connecting timeout
Timer? _connectingTimeoutTimer; Timer? _connectingTimeoutTimer;
/// Completers for certain actions /// Completers for certain actions
// ignore: use_late_for_private_fields_and_variables // ignore: use_late_for_private_fields_and_variables
Completer<XmppConnectionResult>? _connectionCompleter; Completer<XmppConnectionResult>? _connectionCompleter;
/// Controls whether an XmppSocketClosureEvent triggers a reconnection. /// Controls whether an XmppSocketClosureEvent triggers a reconnection.
bool _socketClosureTriggersReconnect = true; bool _socketClosureTriggersReconnect = true;
/// Negotiators /// Negotiators
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {}; final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators;
XmppFeatureNegotiatorBase? _currentNegotiator; XmppFeatureNegotiatorBase? _currentNegotiator;
final List<XMLNode> _streamFeatures = List.empty(growable: true); final List<XMLNode> _streamFeatures;
/// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator /// Prevent data from being passed to _currentNegotiator.negotiator while the negotiator
/// is still running. /// is still running.
final Lock _negotiationLock = Lock(); final Lock _negotiationLock;
/// The logger for the class /// Misc
final Logger _log = Logger('XmppConnection'); final Logger _log;
/// A value indicating whether a connection attempt is currently running or not
bool _isConnectionRunning = false;
final Lock _connectionRunningLock = Lock();
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and does the following:
/// - if _isConnectionRunning is false, set it to true and return false.
/// - if _isConnectionRunning is true, return true.
Future<bool> _testAndSetIsConnectionRunning() async => _connectionRunningLock.synchronized(() {
if (!_isConnectionRunning) {
_isConnectionRunning = true;
return false;
}
return true;
});
/// Enters the critical section for accessing [XmppConnection._isConnectionRunning]
/// and sets it to false.
Future<void> _resetIsConnectionRunning() async => _connectionRunningLock.synchronized(() => _isConnectionRunning = false);
ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy; ReconnectionPolicy get reconnectionPolicy => _reconnectionPolicy;
@ -388,11 +353,6 @@ class XmppConnection {
/// Attempts to reconnect to the server by following an exponential backoff. /// Attempts to reconnect to the server by following an exponential backoff.
Future<void> _attemptReconnection() async { Future<void> _attemptReconnection() async {
if (await _testAndSetIsConnectionRunning()) {
_log.warning('_attemptReconnection is called but connection attempt is already running. Ignoring...');
return;
}
_log.finest('_attemptReconnection: Setting state to notConnected'); _log.finest('_attemptReconnection: Setting state to notConnected');
await _setConnectionState(XmppConnectionState.notConnected); await _setConnectionState(XmppConnectionState.notConnected);
_log.finest('_attemptReconnection: Done'); _log.finest('_attemptReconnection: Done');
@ -428,7 +388,6 @@ class XmppConnection {
} }
await _setConnectionState(XmppConnectionState.error); await _setConnectionState(XmppConnectionState.error);
await _resetIsConnectionRunning();
await _reconnectionPolicy.onFailure(); await _reconnectionPolicy.onFailure();
} }
@ -826,7 +785,6 @@ class XmppConnection {
/// a disco sweep among other things. /// a disco sweep among other things.
Future<void> _onNegotiationsDone() async { Future<void> _onNegotiationsDone() async {
// Set the connection state // Set the connection state
await _resetIsConnectionRunning();
await _setConnectionState(XmppConnectionState.connected); await _setConnectionState(XmppConnectionState.connected);
// Resolve the connection completion future // Resolve the connection completion future
@ -1010,10 +968,9 @@ class XmppConnection {
} }
/// To be called when we lost the network connection. /// To be called when we lost the network connection.
Future<void> _onNetworkConnectionLost() async { void _onNetworkConnectionLost() {
_socket.close(); _socket.close();
await _resetIsConnectionRunning(); _setConnectionState(XmppConnectionState.notConnected);
await _setConnectionState(XmppConnectionState.notConnected);
} }
/// Attempt to gracefully close the session /// Attempt to gracefully close the session
@ -1054,12 +1011,11 @@ class XmppConnection {
/// Like [connect] but the Future resolves when the resource binding is either done or /// Like [connect] but the Future resolves when the resource binding is either done or
/// SASL has failed. /// SASL has failed.
Future<XmppConnectionResult> connectAwaitable({ String? lastResource }) async { Future<XmppConnectionResult> connectAwaitable({ String? lastResource }) {
_runPreConnectionAssertions(); _runPreConnectionAssertions();
await _resetIsConnectionRunning();
_connectionCompleter = Completer(); _connectionCompleter = Completer();
_log.finest('Calling connect() from connectAwaitable'); _log.finest('Calling connect() from connectAwaitable');
await connect(lastResource: lastResource); connect(lastResource: lastResource);
return _connectionCompleter!.future; return _connectionCompleter!.future;
} }
@ -1071,7 +1027,6 @@ class XmppConnection {
} }
_runPreConnectionAssertions(); _runPreConnectionAssertions();
await _resetIsConnectionRunning();
_reconnectionPolicy.setShouldReconnect(true); _reconnectionPolicy.setShouldReconnect(true);
if (lastResource != null) { if (lastResource != null) {

View File

@ -1,11 +1,10 @@
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
/// Represents a Jabber ID in parsed form.
@immutable @immutable
class JID { class JID {
const JID(this.local, this.domain, this.resource); const JID(this.local, this.domain, this.resource);
/// Parses the string [jid] into a JID instance.
factory JID.fromString(String jid) { factory JID.fromString(String jid) {
// Algorithm taken from here: https://blog.samwhited.com/2021/02/xmpp-addresses/ // Algorithm taken from here: https://blog.samwhited.com/2021/02/xmpp-addresses/
var localPart = ''; var localPart = '';
@ -44,29 +43,12 @@ class JID {
final String domain; final String domain;
final String resource; final String resource;
/// Returns true if the JID is bare.
bool isBare() => resource.isEmpty; bool isBare() => resource.isEmpty;
/// Returns true if the JID is full.
bool isFull() => resource.isNotEmpty; bool isFull() => resource.isNotEmpty;
/// Converts the JID into a bare JID.
JID toBare() => JID(local, domain, ''); JID toBare() => JID(local, domain, '');
/// Converts the JID into one with a resource part of [resource].
JID withResource(String resource) => JID(local, domain, resource); JID withResource(String resource) => JID(local, domain, resource);
/// Compares the JID with [other]. This function assumes that JID and [other]
/// are bare, i.e. only the domain- and localparts are compared. If [ensureBare]
/// is optionally set to true, then [other] MUST be bare. Otherwise, false is returned.
bool bareCompare(JID other, { bool ensureBare = false }) {
if (ensureBare && !other.isBare()) return false;
return local == other.local && domain == other.domain;
}
/// Converts to JID instance into its string representation of
/// localpart@domainpart/resource.
@override @override
String toString() { String toString() {
var result = ''; var result = '';

View File

@ -4,33 +4,27 @@ import 'package:logging/logging.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:synchronized/synchronized.dart'; import 'package:synchronized/synchronized.dart';
/// A callback function to be called when the connection to the server has been lost.
typedef ConnectionLostCallback = Future<void> Function();
/// A function that, when called, causes the XmppConnection to connect to the server, if
/// another reconnection is not already running.
typedef PerformReconnectFunction = Future<void> Function();
abstract class ReconnectionPolicy { abstract class ReconnectionPolicy {
ReconnectionPolicy()
: _shouldAttemptReconnection = false,
_isReconnecting = false,
_isReconnectingLock = Lock();
/// Function provided by XmppConnection that allows the policy /// Function provided by XmppConnection that allows the policy
/// to perform a reconnection. /// to perform a reconnection.
PerformReconnectFunction? performReconnect; Future<void> Function()? performReconnect;
/// Function provided by XmppConnection that allows the policy /// Function provided by XmppConnection that allows the policy
/// to say that we lost the connection. /// to say that we lost the connection.
ConnectionLostCallback? triggerConnectionLost; void Function()? triggerConnectionLost;
/// Indicate if should try to reconnect. /// Indicate if should try to reconnect.
bool _shouldAttemptReconnection = false; bool _shouldAttemptReconnection;
/// Indicate if a reconnection attempt is currently running. /// Indicate if a reconnection attempt is currently running.
bool _isReconnecting = false; bool _isReconnecting;
/// And the corresponding lock /// And the corresponding lock
final Lock _isReconnectingLock = Lock(); final Lock _isReconnectingLock;
/// Called by XmppConnection to register the policy. /// Called by XmppConnection to register the policy.
void register(PerformReconnectFunction performReconnect, ConnectionLostCallback triggerConnectionLost) { void register(Future<void> Function() performReconnect, void Function() triggerConnectionLost) {
this.performReconnect = performReconnect; this.performReconnect = performReconnect;
this.triggerConnectionLost = triggerConnectionLost; this.triggerConnectionLost = triggerConnectionLost;

View File

@ -12,17 +12,10 @@ import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart'; import 'package:moxxmpp/src/xeps/xep_0030/xep_0030.dart';
import 'package:moxxmpp/src/xeps/xep_0297.dart'; import 'package:moxxmpp/src/xeps/xep_0297.dart';
/// This manager class implements support for XEP-0280.
class CarbonsManager extends XmppManagerBase { class CarbonsManager extends XmppManagerBase {
CarbonsManager() : super(); CarbonsManager() : super();
/// Indicates that message carbons are enabled.
bool _isEnabled = false; bool _isEnabled = false;
/// Indicates that the server supports message carbons.
bool _supported = false; bool _supported = false;
/// Indicates that we know that [CarbonsManager._supported] is accurate.
bool _gotSupported = false; bool _gotSupported = false;
@override @override
@ -109,14 +102,9 @@ class CarbonsManager extends XmppManagerBase {
); );
} }
/// Send a request to the server, asking it to enable Message Carbons.
///
/// Returns true if carbons were enabled. False, if not.
Future<bool> enableCarbons() async { Future<bool> enableCarbons() async {
final attrs = getAttributes(); final result = await getAttributes().sendStanza(
final result = await attrs.sendStanza(
Stanza.iq( Stanza.iq(
to: attrs.getFullJID().toBare().toString(),
type: 'set', type: 'set',
children: [ children: [
XMLNode.xmlns( XMLNode.xmlns(
@ -141,9 +129,6 @@ class CarbonsManager extends XmppManagerBase {
return true; return true;
} }
/// Send a request to the server, asking it to disable Message Carbons.
///
/// Returns true if carbons were disabled. False, if not.
Future<bool> disableCarbons() async { Future<bool> disableCarbons() async {
final result = await getAttributes().sendStanza( final result = await getAttributes().sendStanza(
Stanza.iq( Stanza.iq(
@ -179,14 +164,7 @@ class CarbonsManager extends XmppManagerBase {
_isEnabled = true; _isEnabled = true;
} }
/// Checks if a carbon sent by [senderJid] is valid to prevent vulnerabilities like
/// the ones listed at https://xmpp.org/extensions/xep-0280.html#security.
///
/// Returns true if the carbon is valid. Returns false if not.
bool isCarbonValid(JID senderJid) { bool isCarbonValid(JID senderJid) {
return _isEnabled && getAttributes().getFullJID().bareCompare( return _isEnabled && senderJid == getAttributes().getConnectionSettings().jid.toBare();
senderJid,
ensureBare: true,
);
} }
} }

View File

@ -1,5 +1,6 @@
import 'package:moxxmpp/moxxmpp.dart'; import 'package:moxxmpp/moxxmpp.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
void main() { void main() {
test('Parse a full JID', () { test('Parse a full JID', () {
final jid = JID.fromString('test@server/abc'); final jid = JID.fromString('test@server/abc');
@ -41,15 +42,4 @@ void main() {
test('Parse resource with a slash', () { test('Parse resource with a slash', () {
expect(JID.fromString('hallo@welt.example./test/welt') == JID('hallo', 'welt.example', 'test/welt'), true); expect(JID.fromString('hallo@welt.example./test/welt') == JID('hallo', 'welt.example', 'test/welt'), true);
}); });
test('bareCompare', () {
final jid1 = JID('hallo', 'welt', 'lol');
final jid2 = JID('hallo', 'welt', '');
final jid3 = JID('hallo', 'earth', 'true');
expect(jid1.bareCompare(jid2), true);
expect(jid2.bareCompare(jid1), true);
expect(jid2.bareCompare(jid1, ensureBare: true), false);
expect(jid2.bareCompare(jid3), false);
});
} }