Compare commits

..

No commits in common. "a8693da2629da00b35888694b8ea6570a25f135a" and "4d312b21000731ab80ce655890546ac54bc2d878" have entirely different histories.

7 changed files with 27 additions and 193 deletions

View File

@ -111,6 +111,7 @@ class _MyHomePageState extends State<MyHomePage> {
connection.connectionSettings = ConnectionSettings(
jid: JID.fromString(jidController.text),
password: passwordController.text,
useDirectTLS: true,
);
final result = await connection.connect(waitUntilLogin: true);
setState(() {

View File

@ -57,6 +57,4 @@ component_interfaces = { '127.0.0.1' }
VirtualHost "localhost"
Component "component.localhost"
component_secret = "abc123"
Component "muc.localhost" "muc"
component_secret = "abc123"

View File

@ -308,6 +308,7 @@ class XmppConnection {
unawaited(
_connectImpl(
waitForConnection: true,
shouldReconnect: false,
),
);
}
@ -901,7 +902,7 @@ class XmppConnection {
port: port,
);
if (!result) {
await handleError(NoConnectionPossibleError());
await handleError(NoConnectionError());
return Result(NoConnectionPossibleError());
} else {

View File

@ -7,6 +7,13 @@ abstract class XmppError {
bool isRecoverable();
}
/// Returned if we could not establish a TCP connection
/// to the server.
class NoConnectionError extends XmppError {
@override
bool isRecoverable() => true;
}
/// Returned if a socket error occured
class SocketError extends XmppError {
SocketError(this.event);

View File

@ -1,7 +0,0 @@
import 'package:moxxmpp/src/errors.dart';
/// Triggered by the StreamManagementManager when an ack request times out.
class StreamManagementAckTimeoutError extends XmppError {
@override
bool isRecoverable() => true;
}

View File

@ -11,7 +11,6 @@ import 'package:moxxmpp/src/namespaces.dart';
import 'package:moxxmpp/src/negotiators/namespaces.dart';
import 'package:moxxmpp/src/stanza.dart';
import 'package:moxxmpp/src/stringxml.dart';
import 'package:moxxmpp/src/xeps/xep_0198/errors.dart';
import 'package:moxxmpp/src/xeps/xep_0198/negotiator.dart';
import 'package:moxxmpp/src/xeps/xep_0198/nonzas.dart';
import 'package:moxxmpp/src/xeps/xep_0198/state.dart';
@ -46,10 +45,10 @@ class StreamManagementManager extends XmppManagerBase {
@internal
final Duration ackTimeout;
/// The time at which the last ack has been received
/// The time at which the last ack has been sent
int _lastAckTimestamp = -1;
/// The timer to see if the connection timed out
/// The timer to see if we timed the connection out
Timer? _ackTimer;
/// Counts how many acks we're waiting for
@ -195,7 +194,6 @@ class StreamManagementManager extends XmppManagerBase {
_stopAckTimer();
break;
case XmppConnectionState.connecting:
_stopAckTimer();
// NOOP
break;
}
@ -216,34 +214,25 @@ class StreamManagementManager extends XmppManagerBase {
/// Stops the timer, if it is running.
void _stopAckTimer() {
logger.fine('Stopping ack timer');
_ackTimer?.cancel();
_ackTimer = null;
}
if (_ackTimer == null) return;
@visibleForTesting
Future<void> handleAckTimeout() async {
_stopAckTimer();
await getAttributes()
.getConnection()
.handleError(StreamManagementAckTimeoutError());
logger.fine('Stopping ack timer');
_ackTimer!.cancel();
_ackTimer = null;
}
/// Timer callback that checks if all acks have been answered. If not and the last
/// response has been more that [ackTimeout] in the past, declare the session dead.
Future<void> _ackTimerCallback(Timer timer) async {
logger.finest('Ack timer callback called');
final shouldTimeout = await _ackLock.synchronized(() {
void _ackTimerCallback(Timer timer) {
_ackLock.synchronized(() async {
final now = DateTime.now().millisecondsSinceEpoch;
return now - _lastAckTimestamp >= ackTimeout.inMilliseconds &&
_pendingAcks > 0;
if (now - _lastAckTimestamp >= ackTimeout.inMilliseconds &&
_pendingAcks > 0) {
_stopAckTimer();
await getAttributes().getConnection().reconnectionPolicy.onFailure();
}
});
logger.finest('Should timeout: $shouldTimeout');
if (shouldTimeout) {
await handleAckTimeout();
}
}
/// Wrapper around sending an <r /> nonza that starts the ack timeout timer.
@ -251,7 +240,9 @@ class StreamManagementManager extends XmppManagerBase {
logger.fine('_sendAckRequest: Waiting to acquire lock...');
await _ackLock.synchronized(() async {
logger.fine('_sendAckRequest: Done...');
final now = DateTime.now().millisecondsSinceEpoch;
_lastAckTimestamp = now;
_pendingAcks++;
_startAckTimer();
@ -296,21 +287,13 @@ class StreamManagementManager extends XmppManagerBase {
Future<bool> _handleAckResponse(XMLNode nonza) async {
final h = int.parse(nonza.attributes['h']! as String);
_lastAckTimestamp = DateTime.now().millisecondsSinceEpoch;
await _ackLock.synchronized(() async {
await _stateLock.synchronized(() async {
if (_pendingAcks > 0) {
// Prevent diff from becoming negative
final diff = max(_state.c2s - h, 0);
_pendingAcks = diff;
// Reset the timer
if (_pendingAcks > 0) {
_startAckTimer();
}
}
if (_pendingAcks == 0) {
} else {
_stopAckTimer();
}

View File

@ -634,155 +634,6 @@ void main() {
});
});
test('Test timing out an ack request', () async {
final fakeSocket = StubTCPSocket([
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StringExpectation(
"<resume xmlns='urn:xmpp:sm:3' previd='id-1' h='10' />",
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
),
StanzaExpectation(
"<iq to='localhost' type='get' from='polynomdivision@test.server/abc123' xmlns='jabber:client' />",
'',
ignoreId: true,
),
StanzaExpectation(
"<r xmlns='urn:xmpp:sm:3' />",
'',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>''',
),
StringExpectation(
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>AHBvbHlub21kaXZpc2lvbgBhYWFh</auth>",
'<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl" />',
),
StringExpectation(
"<stream:stream xmlns='jabber:client' version='1.0' xmlns:stream='http://etherx.jabber.org/streams' to='test.server' from='polynomdivision@test.server' xml:lang='en'>",
'''
<stream:stream
xmlns="jabber:client"
version="1.0"
xmlns:stream="http://etherx.jabber.org/streams"
from="test.server"
xml:lang="en">
<stream:features xmlns="http://etherx.jabber.org/streams">
<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">
<required/>
</bind>
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional/>
</session>
<csi xmlns="urn:xmpp:csi:0"/>
<sm xmlns="urn:xmpp:sm:3"/>
</stream:features>
''',
),
StringExpectation(
"<resume xmlns='urn:xmpp:sm:3' previd='id-1' h='10' />",
"<resumed xmlns='urn:xmpp:sm:3' h='id-1' h='12' />",
),
]);
final conn = XmppConnection(
TestingSleepReconnectionPolicy(1),
AlwaysConnectedConnectivityManager(),
ClientToServerNegotiator(),
fakeSocket,
)..connectionSettings = ConnectionSettings(
jid: JID.fromString('polynomdivision@test.server'),
password: 'aaaa',
);
await conn.registerManagers([
StreamManagementManager(ackTimeout: const Duration(minutes: 9999)),
]);
await conn.registerFeatureNegotiators([
SaslPlainNegotiator(),
StreamManagementNegotiator()..resource = 'abc123',
]);
await conn.getManagerById<StreamManagementManager>(smManager)!.setState(
StreamManagementState(
10,
10,
streamResumptionId: 'id-1',
),
);
await conn.connect(
waitUntilLogin: true,
);
expect(fakeSocket.getState(), 4);
expect(await conn.getConnectionState(), XmppConnectionState.connected);
// Send a bogus stanza
unawaited(
conn.sendStanza(Stanza.iq(to: 'localhost', type: 'get')),
);
await Future<void>.delayed(const Duration(seconds: 5));
expect(fakeSocket.getState(), 6);
// Trigger a fake timeout
await conn
.getManagerById<StreamManagementManager>(smManager)!
.handleAckTimeout();
// Wait for reconnect
await Future<void>.delayed(const Duration(seconds: 5));
expect(fakeSocket.getState(), 14);
});
test('Test SASL2 inline stream resumption', () async {
final fakeSocket = StubTCPSocket([
StringExpectation(