Compare commits
No commits in common. "a8d80eaddf8784532d88442d74202a47af7de047" and "ed49212f5a527b2b0084faf97cb5c383221772e2" have entirely different histories.
a8d80eaddf
...
ed49212f5a
@ -198,6 +198,9 @@ class XmppConnection {
|
|||||||
// ignore: use_late_for_private_fields_and_variables
|
// ignore: use_late_for_private_fields_and_variables
|
||||||
Completer<XmppConnectionResult>? _connectionCompleter;
|
Completer<XmppConnectionResult>? _connectionCompleter;
|
||||||
|
|
||||||
|
/// Controls whether an XmppSocketClosureEvent triggers a reconnection.
|
||||||
|
bool _socketClosureTriggersReconnect = true;
|
||||||
|
|
||||||
/// Negotiators
|
/// Negotiators
|
||||||
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {};
|
final Map<String, XmppFeatureNegotiatorBase> _featureNegotiators = {};
|
||||||
XmppFeatureNegotiatorBase? _currentNegotiator;
|
XmppFeatureNegotiatorBase? _currentNegotiator;
|
||||||
@ -425,6 +428,7 @@ class XmppConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await _setConnectionState(XmppConnectionState.error);
|
await _setConnectionState(XmppConnectionState.error);
|
||||||
|
await _resetIsConnectionRunning();
|
||||||
await _reconnectionPolicy.onFailure();
|
await _reconnectionPolicy.onFailure();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -433,9 +437,9 @@ class XmppConnection {
|
|||||||
if (event is XmppSocketErrorEvent) {
|
if (event is XmppSocketErrorEvent) {
|
||||||
await handleError(SocketError(event));
|
await handleError(SocketError(event));
|
||||||
} else if (event is XmppSocketClosureEvent) {
|
} else if (event is XmppSocketClosureEvent) {
|
||||||
if (!event.expected) {
|
if (_socketClosureTriggersReconnect) {
|
||||||
_log.fine('Received unexpected XmppSocketClosureEvent. Reconnecting...');
|
_log.fine('Received XmppSocketClosureEvent. Reconnecting...');
|
||||||
await handleError(SocketError(XmppSocketErrorEvent(event)));
|
await _reconnectionPolicy.onFailure();
|
||||||
} else {
|
} else {
|
||||||
_log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...');
|
_log.fine('Received XmppSocketClosureEvent. No reconnection attempt since _socketClosureTriggersReconnect is false...');
|
||||||
}
|
}
|
||||||
@ -846,7 +850,6 @@ class XmppConnection {
|
|||||||
final result = await _currentNegotiator!.negotiate(nonza);
|
final result = await _currentNegotiator!.negotiate(nonza);
|
||||||
if (result.isType<NegotiatorError>()) {
|
if (result.isType<NegotiatorError>()) {
|
||||||
_log.severe('Negotiator returned an error');
|
_log.severe('Negotiator returned an error');
|
||||||
await _resetIsConnectionRunning();
|
|
||||||
await handleError(result.get<NegotiatorError>());
|
await handleError(result.get<NegotiatorError>());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -870,8 +873,6 @@ class XmppConnection {
|
|||||||
if (_isMandatoryNegotiationDone(_streamFeatures) && !_isNegotiationPossible(_streamFeatures)) {
|
if (_isMandatoryNegotiationDone(_streamFeatures) && !_isNegotiationPossible(_streamFeatures)) {
|
||||||
_log.finest('Negotiations done!');
|
_log.finest('Negotiations done!');
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
await _reconnectionPolicy.onSuccess();
|
|
||||||
await _resetIsConnectionRunning();
|
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
} else {
|
} else {
|
||||||
_currentNegotiator = getNextNegotiator(_streamFeatures);
|
_currentNegotiator = getNextNegotiator(_streamFeatures);
|
||||||
@ -895,8 +896,6 @@ class XmppConnection {
|
|||||||
_log.finest('Negotiations done!');
|
_log.finest('Negotiations done!');
|
||||||
|
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
await _reconnectionPolicy.onSuccess();
|
|
||||||
await _resetIsConnectionRunning();
|
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
} else {
|
} else {
|
||||||
_log.finest('Picking new negotiator...');
|
_log.finest('Picking new negotiator...');
|
||||||
@ -913,8 +912,6 @@ class XmppConnection {
|
|||||||
_log.finest('Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!');
|
_log.finest('Negotiator wants to skip the remaining negotiation... Negotiations (assumed) done!');
|
||||||
|
|
||||||
_updateRoutingState(RoutingState.handleStanzas);
|
_updateRoutingState(RoutingState.handleStanzas);
|
||||||
await _reconnectionPolicy.onSuccess();
|
|
||||||
await _resetIsConnectionRunning();
|
|
||||||
await _onNegotiationsDone();
|
await _onNegotiationsDone();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1026,6 +1023,7 @@ class XmppConnection {
|
|||||||
|
|
||||||
Future<void> _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async {
|
Future<void> _disconnect({required XmppConnectionState state, bool triggeredByUser = true}) async {
|
||||||
_reconnectionPolicy.setShouldReconnect(false);
|
_reconnectionPolicy.setShouldReconnect(false);
|
||||||
|
_socketClosureTriggersReconnect = false;
|
||||||
|
|
||||||
if (triggeredByUser) {
|
if (triggeredByUser) {
|
||||||
getPresenceManager().sendUnavailablePresence();
|
getPresenceManager().sendUnavailablePresence();
|
||||||
@ -1081,6 +1079,7 @@ class XmppConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await _reconnectionPolicy.reset();
|
await _reconnectionPolicy.reset();
|
||||||
|
_socketClosureTriggersReconnect = true;
|
||||||
await _sendEvent(ConnectingEvent());
|
await _sendEvent(ConnectingEvent());
|
||||||
|
|
||||||
final smManager = getStreamManagementManager();
|
final smManager = getStreamManagementManager();
|
||||||
|
@ -5,17 +5,13 @@ abstract class XmppSocketEvent {}
|
|||||||
|
|
||||||
/// Triggered by the socket when an error occurs.
|
/// Triggered by the socket when an error occurs.
|
||||||
class XmppSocketErrorEvent extends XmppSocketEvent {
|
class XmppSocketErrorEvent extends XmppSocketEvent {
|
||||||
|
|
||||||
XmppSocketErrorEvent(this.error);
|
XmppSocketErrorEvent(this.error);
|
||||||
final Object error;
|
final Object error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Triggered when the socket is closed
|
/// Triggered when the socket is closed
|
||||||
class XmppSocketClosureEvent extends XmppSocketEvent {
|
class XmppSocketClosureEvent extends XmppSocketEvent {}
|
||||||
XmppSocketClosureEvent(this.expected);
|
|
||||||
|
|
||||||
/// Indicate that the socket did not close unexpectedly.
|
|
||||||
final bool expected;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This class is the base for a socket that XmppConnection can use.
|
/// This class is the base for a socket that XmppConnection can use.
|
||||||
abstract class BaseSocketWrapper {
|
abstract class BaseSocketWrapper {
|
||||||
|
@ -185,18 +185,9 @@ class StreamManagementManager extends XmppManagerBase {
|
|||||||
_disableStreamManagement();
|
_disableStreamManagement();
|
||||||
_streamResumed = false;
|
_streamResumed = false;
|
||||||
} else if (event is ConnectionStateChangedEvent) {
|
} else if (event is ConnectionStateChangedEvent) {
|
||||||
switch (event.state) {
|
if (event.state == XmppConnectionState.connected) {
|
||||||
case XmppConnectionState.connected:
|
|
||||||
// Push out all pending stanzas
|
// Push out all pending stanzas
|
||||||
await onStreamResumed(0);
|
await onStreamResumed(0);
|
||||||
break;
|
|
||||||
case XmppConnectionState.error:
|
|
||||||
case XmppConnectionState.notConnected:
|
|
||||||
_stopAckTimer();
|
|
||||||
break;
|
|
||||||
case XmppConnectionState.connecting:
|
|
||||||
// NOOP
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,31 +10,22 @@ import 'package:moxxmpp_socket_tcp/src/rfc_2782.dart';
|
|||||||
|
|
||||||
/// TCP socket implementation for XmppConnection
|
/// TCP socket implementation for XmppConnection
|
||||||
class TCPSocketWrapper extends BaseSocketWrapper {
|
class TCPSocketWrapper extends BaseSocketWrapper {
|
||||||
TCPSocketWrapper(this._logData);
|
TCPSocketWrapper(this._logData)
|
||||||
|
: _log = Logger('TCPSocketWrapper'),
|
||||||
/// The underlying Socket/SecureSocket instance.
|
_dataStream = StreamController.broadcast(),
|
||||||
|
_eventStream = StreamController.broadcast(),
|
||||||
|
_secure = false,
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
Socket? _socket;
|
Socket? _socket;
|
||||||
|
bool _ignoreSocketClosure;
|
||||||
/// Indicates that we expect a socket closure.
|
final StreamController<String> _dataStream;
|
||||||
bool _expectSocketClosure = false;
|
final StreamController<XmppSocketEvent> _eventStream;
|
||||||
|
|
||||||
/// The stream of incoming data from the socket.
|
|
||||||
final StreamController<String> _dataStream = StreamController.broadcast();
|
|
||||||
|
|
||||||
/// The stream of outgoing (TCPSocketWrapper -> XmppConnection) events.
|
|
||||||
final StreamController<XmppSocketEvent> _eventStream = StreamController.broadcast();
|
|
||||||
|
|
||||||
/// A subscription on the socket's data stream.
|
|
||||||
StreamSubscription<dynamic>? _socketSubscription;
|
StreamSubscription<dynamic>? _socketSubscription;
|
||||||
|
|
||||||
/// Logger
|
final Logger _log;
|
||||||
final Logger _log = Logger('TCPSocketWrapper');
|
|
||||||
|
|
||||||
/// Flag to indicate if incoming and outgoing data should get logged.
|
|
||||||
final bool _logData;
|
final bool _logData;
|
||||||
|
|
||||||
/// Indiacted whether the connection is secure.
|
bool _secure;
|
||||||
bool _secure = false;
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
bool isSecure() => _secure;
|
bool isSecure() => _secure;
|
||||||
@ -81,6 +72,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
for (final srv in results) {
|
for (final srv in results) {
|
||||||
try {
|
try {
|
||||||
_log.finest('Attempting secure connection to ${srv.target}:${srv.port}...');
|
_log.finest('Attempting secure connection to ${srv.target}:${srv.port}...');
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
|
|
||||||
// Workaround: We cannot set the SNI directly when using SecureSocket.connect.
|
// Workaround: We cannot set the SNI directly when using SecureSocket.connect.
|
||||||
// instead, we connect using a regular socket and then secure it. This allows
|
// instead, we connect using a regular socket and then secure it. This allows
|
||||||
@ -97,11 +89,13 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
onBadCertificate: (cert) => onBadCertificate(cert, domain),
|
onBadCertificate: (cert) => onBadCertificate(cert, domain),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
_secure = true;
|
_secure = true;
|
||||||
_log.finest('Success!');
|
_log.finest('Success!');
|
||||||
return true;
|
return true;
|
||||||
} on Exception catch(e) {
|
} on Exception catch(e) {
|
||||||
_log.finest('Failure! $e');
|
_log.finest('Failure! $e');
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
|
|
||||||
if (e is HandshakeException) {
|
if (e is HandshakeException) {
|
||||||
failedDueToTLS = true;
|
failedDueToTLS = true;
|
||||||
@ -124,16 +118,19 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
for (final srv in results) {
|
for (final srv in results) {
|
||||||
try {
|
try {
|
||||||
_log.finest('Attempting connection to ${srv.target}:${srv.port}...');
|
_log.finest('Attempting connection to ${srv.target}:${srv.port}...');
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
_socket = await Socket.connect(
|
_socket = await Socket.connect(
|
||||||
srv.target,
|
srv.target,
|
||||||
srv.port,
|
srv.port,
|
||||||
timeout: const Duration(seconds: 5),
|
timeout: const Duration(seconds: 5),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
_log.finest('Success!');
|
_log.finest('Success!');
|
||||||
return true;
|
return true;
|
||||||
} on Exception catch(e) {
|
} on Exception catch(e) {
|
||||||
_log.finest('Failure! $e');
|
_log.finest('Failure! $e');
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -147,6 +144,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
Future<bool> _hostPortConnect(String host, int port) async {
|
Future<bool> _hostPortConnect(String host, int port) async {
|
||||||
try {
|
try {
|
||||||
_log.finest('Attempting fallback connection to $host:$port...');
|
_log.finest('Attempting fallback connection to $host:$port...');
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
_socket = await Socket.connect(
|
_socket = await Socket.connect(
|
||||||
host,
|
host,
|
||||||
port,
|
port,
|
||||||
@ -156,6 +154,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
return true;
|
return true;
|
||||||
} on Exception catch(e) {
|
} on Exception catch(e) {
|
||||||
_log.finest('Failure! $e');
|
_log.finest('Failure! $e');
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -180,11 +179,10 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
_log.severe('Failed to secure socket since _socket is null');
|
_log.severe('Failed to secure socket since _socket is null');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// The socket is closed during the entire process
|
|
||||||
_expectSocketClosure = true;
|
|
||||||
|
|
||||||
_socket = await SecureSocket.secure(
|
_socket = await SecureSocket.secure(
|
||||||
_socket!,
|
_socket!,
|
||||||
supportedProtocols: const [ xmppClientALPNId ],
|
supportedProtocols: const [ xmppClientALPNId ],
|
||||||
@ -192,10 +190,12 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
);
|
);
|
||||||
|
|
||||||
_secure = true;
|
_secure = true;
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
_setupStreams();
|
_setupStreams();
|
||||||
return true;
|
return true;
|
||||||
} on Exception catch (e) {
|
} on Exception catch (e) {
|
||||||
_log.severe('Failed to secure socket: $e');
|
_log.severe('Failed to secure socket: $e');
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
|
|
||||||
if (e is HandshakeException) {
|
if (e is HandshakeException) {
|
||||||
_eventStream.add(XmppSocketTLSFailedEvent());
|
_eventStream.add(XmppSocketTLSFailedEvent());
|
||||||
@ -226,14 +226,15 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
);
|
);
|
||||||
// ignore: implicit_dynamic_parameter
|
// ignore: implicit_dynamic_parameter
|
||||||
_socket!.done.then((_) {
|
_socket!.done.then((_) {
|
||||||
_eventStream.add(XmppSocketClosureEvent(_expectSocketClosure));
|
if (!_ignoreSocketClosure) {
|
||||||
_expectSocketClosure = false;
|
_eventStream.add(XmppSocketClosureEvent());
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<bool> connect(String domain, { String? host, int? port }) async {
|
Future<bool> connect(String domain, { String? host, int? port }) async {
|
||||||
_expectSocketClosure = false;
|
_ignoreSocketClosure = false;
|
||||||
_secure = false;
|
_secure = false;
|
||||||
|
|
||||||
// Connection order:
|
// Connection order:
|
||||||
@ -266,8 +267,6 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
|
|
||||||
@override
|
@override
|
||||||
void close() {
|
void close() {
|
||||||
_expectSocketClosure = true;
|
|
||||||
|
|
||||||
if (_socketSubscription != null) {
|
if (_socketSubscription != null) {
|
||||||
_log.finest('Closing socket subscription');
|
_log.finest('Closing socket subscription');
|
||||||
_socketSubscription!.cancel();
|
_socketSubscription!.cancel();
|
||||||
@ -278,11 +277,13 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
try {
|
try {
|
||||||
_socket!.close();
|
_socket!.close();
|
||||||
} catch(e) {
|
} catch(e) {
|
||||||
_log.warning('Closing socket threw exception: $e');
|
_log.warning('Closing socket threw exception: $e');
|
||||||
}
|
}
|
||||||
|
_ignoreSocketClosure = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
@ -315,5 +316,7 @@ class TCPSocketWrapper extends BaseSocketWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
void prepareDisconnect() {}
|
void prepareDisconnect() {
|
||||||
|
_ignoreSocketClosure = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user