fix: Add the closure expectation to the event

This commit is contained in:
PapaTutuWawa 2023-01-14 12:41:17 +01:00
parent 9baf1ed73c
commit a8d80eaddf

View File

@ -10,22 +10,31 @@ import 'package:moxxmpp_socket_tcp/src/rfc_2782.dart';
/// TCP socket implementation for XmppConnection /// TCP socket implementation for XmppConnection
class TCPSocketWrapper extends BaseSocketWrapper { class TCPSocketWrapper extends BaseSocketWrapper {
TCPSocketWrapper(this._logData) TCPSocketWrapper(this._logData);
: _log = Logger('TCPSocketWrapper'),
_dataStream = StreamController.broadcast(), /// The underlying Socket/SecureSocket instance.
_eventStream = StreamController.broadcast(),
_secure = false,
_ignoreSocketClosure = false;
Socket? _socket; Socket? _socket;
bool _ignoreSocketClosure;
final StreamController<String> _dataStream; /// Indicates that we expect a socket closure.
final StreamController<XmppSocketEvent> _eventStream; bool _expectSocketClosure = false;
/// The stream of incoming data from the socket.
final StreamController<String> _dataStream = StreamController.broadcast();
/// The stream of outgoing (TCPSocketWrapper -> XmppConnection) events.
final StreamController<XmppSocketEvent> _eventStream = StreamController.broadcast();
/// A subscription on the socket's data stream.
StreamSubscription<dynamic>? _socketSubscription; StreamSubscription<dynamic>? _socketSubscription;
final Logger _log; /// Logger
final Logger _log = Logger('TCPSocketWrapper');
/// Flag to indicate if incoming and outgoing data should get logged.
final bool _logData; final bool _logData;
bool _secure; /// Indiacted whether the connection is secure.
bool _secure = false;
@override @override
bool isSecure() => _secure; bool isSecure() => _secure;
@ -72,7 +81,6 @@ class TCPSocketWrapper extends BaseSocketWrapper {
for (final srv in results) { for (final srv in results) {
try { try {
_log.finest('Attempting secure connection to ${srv.target}:${srv.port}...'); _log.finest('Attempting secure connection to ${srv.target}:${srv.port}...');
_ignoreSocketClosure = true;
// Workaround: We cannot set the SNI directly when using SecureSocket.connect. // Workaround: We cannot set the SNI directly when using SecureSocket.connect.
// instead, we connect using a regular socket and then secure it. This allows // instead, we connect using a regular socket and then secure it. This allows
@ -89,13 +97,11 @@ class TCPSocketWrapper extends BaseSocketWrapper {
onBadCertificate: (cert) => onBadCertificate(cert, domain), onBadCertificate: (cert) => onBadCertificate(cert, domain),
); );
_ignoreSocketClosure = false;
_secure = true; _secure = true;
_log.finest('Success!'); _log.finest('Success!');
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
if (e is HandshakeException) { if (e is HandshakeException) {
failedDueToTLS = true; failedDueToTLS = true;
@ -118,19 +124,16 @@ class TCPSocketWrapper extends BaseSocketWrapper {
for (final srv in results) { for (final srv in results) {
try { try {
_log.finest('Attempting connection to ${srv.target}:${srv.port}...'); _log.finest('Attempting connection to ${srv.target}:${srv.port}...');
_ignoreSocketClosure = true;
_socket = await Socket.connect( _socket = await Socket.connect(
srv.target, srv.target,
srv.port, srv.port,
timeout: const Duration(seconds: 5), timeout: const Duration(seconds: 5),
); );
_ignoreSocketClosure = false;
_log.finest('Success!'); _log.finest('Success!');
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
continue; continue;
} }
} }
@ -144,7 +147,6 @@ class TCPSocketWrapper extends BaseSocketWrapper {
Future<bool> _hostPortConnect(String host, int port) async { Future<bool> _hostPortConnect(String host, int port) async {
try { try {
_log.finest('Attempting fallback connection to $host:$port...'); _log.finest('Attempting fallback connection to $host:$port...');
_ignoreSocketClosure = true;
_socket = await Socket.connect( _socket = await Socket.connect(
host, host,
port, port,
@ -154,7 +156,6 @@ class TCPSocketWrapper extends BaseSocketWrapper {
return true; return true;
} on Exception catch(e) { } on Exception catch(e) {
_log.finest('Failure! $e'); _log.finest('Failure! $e');
_ignoreSocketClosure = false;
return false; return false;
} }
} }
@ -179,10 +180,11 @@ class TCPSocketWrapper extends BaseSocketWrapper {
_log.severe('Failed to secure socket since _socket is null'); _log.severe('Failed to secure socket since _socket is null');
return false; return false;
} }
_ignoreSocketClosure = true;
try { try {
// The socket is closed during the entire process
_expectSocketClosure = true;
_socket = await SecureSocket.secure( _socket = await SecureSocket.secure(
_socket!, _socket!,
supportedProtocols: const [ xmppClientALPNId ], supportedProtocols: const [ xmppClientALPNId ],
@ -190,12 +192,10 @@ class TCPSocketWrapper extends BaseSocketWrapper {
); );
_secure = true; _secure = true;
_ignoreSocketClosure = false;
_setupStreams(); _setupStreams();
return true; return true;
} on Exception catch (e) { } on Exception catch (e) {
_log.severe('Failed to secure socket: $e'); _log.severe('Failed to secure socket: $e');
_ignoreSocketClosure = false;
if (e is HandshakeException) { if (e is HandshakeException) {
_eventStream.add(XmppSocketTLSFailedEvent()); _eventStream.add(XmppSocketTLSFailedEvent());
@ -226,15 +226,14 @@ class TCPSocketWrapper extends BaseSocketWrapper {
); );
// ignore: implicit_dynamic_parameter // ignore: implicit_dynamic_parameter
_socket!.done.then((_) { _socket!.done.then((_) {
if (!_ignoreSocketClosure) { _eventStream.add(XmppSocketClosureEvent(_expectSocketClosure));
_eventStream.add(XmppSocketClosureEvent()); _expectSocketClosure = false;
}
}); });
} }
@override @override
Future<bool> connect(String domain, { String? host, int? port }) async { Future<bool> connect(String domain, { String? host, int? port }) async {
_ignoreSocketClosure = false; _expectSocketClosure = false;
_secure = false; _secure = false;
// Connection order: // Connection order:
@ -267,6 +266,8 @@ class TCPSocketWrapper extends BaseSocketWrapper {
@override @override
void close() { void close() {
_expectSocketClosure = true;
if (_socketSubscription != null) { if (_socketSubscription != null) {
_log.finest('Closing socket subscription'); _log.finest('Closing socket subscription');
_socketSubscription!.cancel(); _socketSubscription!.cancel();
@ -277,13 +278,11 @@ class TCPSocketWrapper extends BaseSocketWrapper {
return; return;
} }
_ignoreSocketClosure = true;
try { try {
_socket!.close(); _socket!.close();
} catch(e) { } catch(e) {
_log.warning('Closing socket threw exception: $e'); _log.warning('Closing socket threw exception: $e');
} }
_ignoreSocketClosure = false;
} }
@override @override
@ -316,7 +315,5 @@ class TCPSocketWrapper extends BaseSocketWrapper {
} }
@override @override
void prepareDisconnect() { void prepareDisconnect() {}
_ignoreSocketClosure = true;
}
} }