Compare commits

..

No commits in common. "a8d80eaddf8784532d88442d74202a47af7de047" and "ed49212f5a527b2b0084faf97cb5c383221772e2" have entirely different histories.

4 changed files with 44 additions and 55 deletions

View File

@ -198,6 +198,9 @@ class XmppConnection {
// ignore: use_late_for_private_fields_and_variables // ignore: use_late_for_private_fields_and_variables
Completer<XmppConnectionResult>? _connectionCompleter; Completer<XmppConnectionResult>? _connectionCompleter;
/// Controls whether an XmppSocketClosureEvent triggers a reconnection.
bool _socketClosureTriggersReconnect = true;
/// Negotiators /// Negotiators
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {}; final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {};
XmppFeatureNegotiatorBase? _currentNegotiator; XmppFeatureNegotiatorBase? _currentNegotiator;
@ -425,6 +428,7 @@ class XmppConnection {
} }
await _setConnectionState(XmppConnectionState.error); await _setConnectionState(XmppConnectionState.error);
await _resetIsConnectionRunning();
await _reconnectionPolicy.onFailure(); await _reconnectionPolicy.onFailure();
} }
@ -433,9 +437,9 @@ class XmppConnection {
if (event is XmppSocketErrorEvent) { if (event is XmppSocketErrorEvent) {
await handleError(SocketError(event)); await handleError(SocketError(event));
} else if (event is XmppSocketClosureEvent) { } else if (event is XmppSocketClosureEvent) {
if (!event.expected) { if (_socketClosureTriggersReconnect) {
_log.fine('Received unexpected XmppSocketClosureEvent. Reconnecting...'); _log.fine('Received XmppSocketClosureEvent. Reconnecting...');
await handleError(SocketError(XmppSocketErrorEvent(event))); await _reconnectionPolicy.onFailure();
} else { } else {
_log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...'); _log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...');
} }
@ -846,7 +850,6 @@ class XmppConnection {
final result = await _currentNegotiator!.negotiate(nonza); final result = await _currentNegotiator!.negotiate(nonza);
if (result.isType<NegotiatorError>()) { if (result.isType<NegotiatorError>()) {
_log.severe('Negotiator returned an error'); _log.severe('Negotiator returned an error');
await _resetIsConnectionRunning();
await handleError(result.get<NegotiatorError>()); await handleError(result.get<NegotiatorError>());
return; return;
} }
@ -870,8 +873,6 @@ class XmppConnection {
if (_isMandatoryNegotiationDone(_streamFeatures) && !_isNegotiationPossible(_streamFeatures)) { if (_isMandatoryNegotiationDone(_streamFeatures) && !_isNegotiationPossible(_streamFeatures)) {
_log.finest('Negotiations done!'); _log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _reconnectionPolicy.onSuccess();
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
} else { } else {
_currentNegotiator = getNextNegotiator(_streamFeatures); _currentNegotiator = getNextNegotiator(_streamFeatures);
@ -895,8 +896,6 @@ class XmppConnection {
_log.finest('Negotiations done!'); _log.finest('Negotiations done!');
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _reconnectionPolicy.onSuccess();
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
} else { } else {
_log.finest('Picking new negotiator...'); _log.finest('Picking new negotiator...');
@ -913,8 +912,6 @@ class XmppConnection {
_log.finest('Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!'); _log.finest('Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!');
_updateRoutingState(RoutingState.handleStanzas); _updateRoutingState(RoutingState.handleStanzas);
await _reconnectionPolicy.onSuccess();
await _resetIsConnectionRunning();
await _onNegotiationsDone(); await _onNegotiationsDone();
break; break;
} }
@ -1026,6 +1023,7 @@ class XmppConnection {
Future<void> _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async { Future<void> _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async {
_reconnectionPolicy.setShouldReconnect(false); _reconnectionPolicy.setShouldReconnect(false);
_socketClosureTriggersReconnect = false;
if (triggeredByUser) { if (triggeredByUser) {
getPresenceManager().sendUnavailablePresence(); getPresenceManager().sendUnavailablePresence();
@ -1081,6 +1079,7 @@ class XmppConnection {
} }
await _reconnectionPolicy.reset(); await _reconnectionPolicy.reset();
_socketClosureTriggersReconnect = true;
await _sendEvent(ConnectingEvent()); await _sendEvent(ConnectingEvent());
final smManager = getStreamManagementManager(); final smManager = getStreamManagementManager();

View File

@ -5,17 +5,13 @@ abstract class XmppSocketEvent {}
/// Triggered by the socket when an error occurs. /// Triggered by the socket when an error occurs.
class XmppSocketErrorEvent extends XmppSocketEvent { class XmppSocketErrorEvent extends XmppSocketEvent {
XmppSocketErrorEvent(this.error); XmppSocketErrorEvent(this.error);
final Object error; final Object error;
} }
/// Triggered when the socket is closed /// Triggered when the socket is closed
class XmppSocketClosureEvent extends XmppSocketEvent { class XmppSocketClosureEvent extends XmppSocketEvent {}
XmppSocketClosureEvent(this.expected);
/// Indicate that the socket did not close unexpectedly.
final bool expected;
}
/// This class is the base for a socket that XmppConnection can use. /// This class is the base for a socket that XmppConnection can use.
abstract class BaseSocketWrapper { abstract class BaseSocketWrapper {

View File

@ -185,18 +185,9 @@ class StreamManagementManager extends XmppManagerBase {
_disableStreamManagement(); _disableStreamManagement();
_streamResumed = false; _streamResumed = false;
} else if (event is ConnectionStateChangedEvent) { } else if (event is ConnectionStateChangedEvent) {
switch (event.state) { if (event.state == XmppConnectionState.connected) {
case XmppConnectionState.connected:
// Push out all pending stanzas // Push out all pending stanzas
await onStreamResumed(0); await onStreamResumed(0);
break;
case XmppConnectionState.error:
case XmppConnectionState.notConnected:
_stopAckTimer();
break;
case XmppConnectionState.connecting:
// NOOP
break;
} }
} }
} }

View File

@ -10,31 +10,22 @@ import 'package:moxxmpp_socket_tcp/src/rfc_2782.dart';
/// TCP socket implementation for XmppConnection /// TCP socket implementation for XmppConnection
class TCPSocketWrapper extends BaseSocketWrapper { class TCPSocketWrapper extends BaseSocketWrapper {
TCPSocketWrapper(this._logData); TCPSocketWrapper(this._logData)
: _log = Logger('TCPSocketWrapper'),
/// The underlying Socket/SecureSocket instance. _dataStream = StreamController.broadcast(),
_eventStream = StreamController.broadcast(),
_secure = false,
_ignoreSocketClosure = false;
Socket? _socket; Socket? _socket;
bool _ignoreSocketClosure;
/// Indicates that we expect a socket closure. final StreamController<String> _dataStream;
bool _expectSocketClosure = false; final StreamController<XmppSocketEvent> _eventStream;
/// The stream of incoming data from the socket.
final StreamController<String> _dataStream = StreamController.broadcast();
/// The stream of outgoing (TCPSocketWrapper -> XmppConnection) events.
final StreamController<XmppSocketEvent> _eventStream = StreamController.broadcast();
/// A subscription on the socket's data stream.
StreamSubscription<dynamic>? _socketSubscription; StreamSubscription<dynamic>? _socketSubscription;
/// Logger final Logger _log;
final Logger _log = Logger('TCPSocketWrapper');
/// Flag to indicate if incoming and outgoing data should get logged.
final bool _logData; final bool _logData;
/// Indiacted whether the connection is secure. bool _secure;
bool _secure = false;
@override @override
bool isSecure() => _secure; bool isSecure() => _secure;
@ -81,6 +72,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
for (final srv in results) { for (final srv in results) {
try { try {
_log.finest('Attempting secure connection to ${srv.target}:${srv.port}...'); _log.finest('Attempting secure connection to ${srv.target}:${srv.port}...');
_ignoreSocketClosure = true;
// Workaround: We cannot set the SNI directly when using SecureSocket.connect. // Workaround: We cannot set the SNI directly when using SecureSocket.connect.
// instead, we connect using a regular socket and then secure it. This allows // instead, we connect using a regular socket and then secure it. This allows
@ -97,11 +89,13 @@ class TCPSocketWrapper extends BaseSocketWrapper {
onBadCertificate: (cert) => onBadCertificate(cert, domain), onBadCertificate: (cert) => onBadCertificate(cert, domain),
); );
_ignoreSocketClosure = false;
_secure = true; _secure = true;
_log.finest('Success!'); _log.finest('Success!');
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
if (e is HandshakeException) { if (e is HandshakeException) {
failedDueToTLS = true; failedDueToTLS = true;
@ -124,16 +118,19 @@ class TCPSocketWrapper extends BaseSocketWrapper {
for (final srv in results) { for (final srv in results) {
try { try {
_log.finest('Attempting connection to ${srv.target}:${srv.port}...'); _log.finest('Attempting connection to ${srv.target}:${srv.port}...');
_ignoreSocketClosure = true;
_socket = await Socket.connect( _socket = await Socket.connect(
srv.target, srv.target,
srv.port, srv.port,
timeout: const Duration(seconds: 5), timeout: const Duration(seconds: 5),
); );
_ignoreSocketClosure = false;
_log.finest('Success!'); _log.finest('Success!');
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
continue; continue;
} }
} }
@ -147,6 +144,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
Future<bool> _hostPortConnect(String host, int port) async { Future<bool> _hostPortConnect(String host, int port) async {
try { try {
_log.finest('Attempting fallback connection to $host:$port...'); _log.finest('Attempting fallback connection to $host:$port...');
_ignoreSocketClosure = true;
_socket = await Socket.connect( _socket = await Socket.connect(
host, host,
port, port,
@ -156,6 +154,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
return false; return false;
} }
} }
@ -181,10 +180,9 @@ class TCPSocketWrapper extends BaseSocketWrapper {
return false; return false;
} }
try { _ignoreSocketClosure = true;
// The socket is closed during the entire process
_expectSocketClosure = true;
try {
_socket = await SecureSocket.secure( _socket = await SecureSocket.secure(
_socket!, _socket!,
supportedProtocols: const [ xmppClientALPNId ], supportedProtocols: const [ xmppClientALPNId ],
@ -192,10 +190,12 @@ class TCPSocketWrapper extends BaseSocketWrapper {
); );
_secure = true; _secure = true;
_ignoreSocketClosure = false;
_setupStreams(); _setupStreams();
return true; return true;
} on Exception catch (e) { } on Exception catch (e) {
_log.severe('Failed to secure socket: $e'); _log.severe('Failed to secure socket: $e');
_ignoreSocketClosure = false;
if (e is HandshakeException) { if (e is HandshakeException) {
_eventStream.add(XmppSocketTLSFailedEvent()); _eventStream.add(XmppSocketTLSFailedEvent());
@ -226,14 +226,15 @@ class TCPSocketWrapper extends BaseSocketWrapper {
); );
// ignore: implicit_dynamic_parameter // ignore: implicit_dynamic_parameter
_socket!.done.then((_) { _socket!.done.then((_) {
_eventStream.add(XmppSocketClosureEvent(_expectSocketClosure)); if (!_ignoreSocketClosure) {
_expectSocketClosure = false; _eventStream.add(XmppSocketClosureEvent());
}
}); });
} }
@override @override
Future<bool> connect(String domain, { String? host, int? port }) async { Future<bool> connect(String domain, { String? host, int? port }) async {
_expectSocketClosure = false; _ignoreSocketClosure = false;
_secure = false; _secure = false;
// Connection order: // Connection order:
@ -266,8 +267,6 @@ class TCPSocketWrapper extends BaseSocketWrapper {
@override @override
void close() { void close() {
_expectSocketClosure = true;
if (_socketSubscription != null) { if (_socketSubscription != null) {
_log.finest('Closing socket subscription'); _log.finest('Closing socket subscription');
_socketSubscription!.cancel(); _socketSubscription!.cancel();
@ -278,11 +277,13 @@ class TCPSocketWrapper extends BaseSocketWrapper {
return; return;
} }
_ignoreSocketClosure = true;
try { try {
_socket!.close(); _socket!.close();
} catch(e) { } catch(e) {
_log.warning('Closing socket threw exception: $e'); _log.warning('Closing socket threw exception: $e');
} }
_ignoreSocketClosure = false;
} }
@override @override
@ -315,5 +316,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
} }
@override @override
void prepareDisconnect() {} void prepareDisconnect() {
_ignoreSocketClosure = true;
}
} }