service: Begin moving code over into the HttpFileTransferService
This commit is contained in:
parent
b87da625b5
commit
d000c7b7b1
@ -4,6 +4,7 @@ import 'package:connectivity_plus/connectivity_plus.dart';
|
|||||||
import 'package:get_it/get_it.dart';
|
import 'package:get_it/get_it.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
|
import 'package:moxxyv2/service/httpfiletransfer/httpfiletransfer.dart';
|
||||||
import 'package:moxxyv2/service/moxxmpp/reconnect.dart';
|
import 'package:moxxyv2/service/moxxmpp/reconnect.dart';
|
||||||
import 'package:moxxyv2/xmpp/connection.dart';
|
import 'package:moxxyv2/xmpp/connection.dart';
|
||||||
|
|
||||||
@ -30,11 +31,15 @@ class ConnectivityService {
|
|||||||
// See https://github.com/fluttercommunity/plus_plugins/issues/567
|
// See https://github.com/fluttercommunity/plus_plugins/issues/567
|
||||||
final skipAmount = Platform.isAndroid ? 1 : 0;
|
final skipAmount = Platform.isAndroid ? 1 : 0;
|
||||||
conn.onConnectivityChanged.skip(skipAmount).listen((ConnectivityResult result) {
|
conn.onConnectivityChanged.skip(skipAmount).listen((ConnectivityResult result) {
|
||||||
|
final regained = _connectivity == ConnectivityResult.none && result != ConnectivityResult.none;
|
||||||
_connectivity = result;
|
_connectivity = result;
|
||||||
|
|
||||||
|
// TODO(PapaTutuWawa): Should we use Streams?
|
||||||
// Notify other services
|
// Notify other services
|
||||||
final policy = GetIt.I.get<XmppConnection>().reconnectionPolicy;
|
final policy = GetIt.I.get<XmppConnection>().reconnectionPolicy;
|
||||||
(policy as MoxxyReconnectionPolicy).onConnectivityChanged(result);
|
(policy as MoxxyReconnectionPolicy).onConnectivityChanged(result);
|
||||||
|
|
||||||
|
GetIt.I.get<HttpFileTransferService>().onConnectivityChanged(regained);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
42
lib/service/httpfiletransfer/helpers.dart
Normal file
42
lib/service/httpfiletransfer/helpers.dart
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
import 'dart:io';
|
||||||
|
import 'package:external_path/external_path.dart';
|
||||||
|
import 'package:moxxyv2/shared/helpers.dart';
|
||||||
|
import 'package:path/path.dart' as path;
|
||||||
|
|
||||||
|
/// Calculates the path for a given file to be saved to and, if neccessary, create it.
|
||||||
|
Future<String> getDownloadPath(String filename, String conversationJid, String? mime) async {
|
||||||
|
String type;
|
||||||
|
var prependMoxxy = true;
|
||||||
|
if (mime != null && ['image/', 'video/'].any((e) => mime.startsWith(e))) {
|
||||||
|
type = ExternalPath.DIRECTORY_PICTURES;
|
||||||
|
} else {
|
||||||
|
type = ExternalPath.DIRECTORY_DOWNLOADS;
|
||||||
|
prependMoxxy = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final externalDir = await ExternalPath.getExternalStoragePublicDirectory(type);
|
||||||
|
final fileDirectory = prependMoxxy ? path.join(externalDir, 'Moxxy', conversationJid) : externalDir;
|
||||||
|
final dir = Directory(fileDirectory);
|
||||||
|
if (!dir.existsSync()) {
|
||||||
|
await dir.create(recursive: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
var i = 0;
|
||||||
|
while (true) {
|
||||||
|
final filenameSuffix = i == 0 ? '' : '($i)';
|
||||||
|
final suffixedFilename = filenameWithSuffix(filename, filenameSuffix);
|
||||||
|
|
||||||
|
final filePath = path.join(fileDirectory, suffixedFilename);
|
||||||
|
if (!File(filePath).existsSync()) {
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the request was successful based on [statusCode].
|
||||||
|
/// Based on https://developer.mozilla.org/en-US/docs/Web/HTTP/Status
|
||||||
|
bool isRequestOkay(int? statusCode) {
|
||||||
|
return statusCode != null && statusCode >= 200 && statusCode <= 399;
|
||||||
|
}
|
235
lib/service/httpfiletransfer/httpfiletransfer.dart
Normal file
235
lib/service/httpfiletransfer/httpfiletransfer.dart
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:collection';
|
||||||
|
import 'dart:io';
|
||||||
|
import 'package:connectivity_plus/connectivity_plus.dart';
|
||||||
|
import 'package:dio/dio.dart' as dio;
|
||||||
|
import 'package:get_it/get_it.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
import 'package:mime/mime.dart';
|
||||||
|
import 'package:moxplatform/moxplatform.dart';
|
||||||
|
import 'package:moxxyv2/service/connectivity.dart';
|
||||||
|
import 'package:moxxyv2/service/conversation.dart';
|
||||||
|
import 'package:moxxyv2/service/database.dart';
|
||||||
|
import 'package:moxxyv2/service/httpfiletransfer/helpers.dart';
|
||||||
|
import 'package:moxxyv2/service/httpfiletransfer/jobs.dart';
|
||||||
|
import 'package:moxxyv2/service/message.dart';
|
||||||
|
import 'package:moxxyv2/service/notifications.dart';
|
||||||
|
import 'package:moxxyv2/service/service.dart';
|
||||||
|
import 'package:moxxyv2/shared/events.dart';
|
||||||
|
import 'package:synchronized/synchronized.dart';
|
||||||
|
|
||||||
|
/// This service is responsible for managing the up- and download of files using Http.
|
||||||
|
class HttpFileTransferService {
|
||||||
|
HttpFileTransferService()
|
||||||
|
: _uploadQueue = Queue<FileUploadJob>(),
|
||||||
|
_downloadQueue = Queue<FileDownloadJob>(),
|
||||||
|
_uploadLock = Lock(),
|
||||||
|
_downloadLock = Lock(),
|
||||||
|
_log = Logger('HttpFileTransferService');
|
||||||
|
|
||||||
|
final Logger _log;
|
||||||
|
|
||||||
|
/// Queues for tracking up- and download tasks
|
||||||
|
final Queue<FileDownloadJob> _downloadQueue;
|
||||||
|
final Queue<FileUploadJob> _uploadQueue;
|
||||||
|
|
||||||
|
/// The currently running job and their lock
|
||||||
|
FileUploadJob? _currentUploadJob;
|
||||||
|
FileDownloadJob? _currentDownloadJob;
|
||||||
|
|
||||||
|
/// Locks for upload and download state
|
||||||
|
final Lock _uploadLock;
|
||||||
|
final Lock _downloadLock;
|
||||||
|
|
||||||
|
/// Called by the ConnectivityService if the connection got lost but then was regained.
|
||||||
|
Future<void> onConnectivityChanged(bool regained) async {
|
||||||
|
if (!regained) return;
|
||||||
|
|
||||||
|
await _uploadLock.synchronized(() async {
|
||||||
|
if (_currentUploadJob != null) {
|
||||||
|
_log.finest('Connectivity regained and there is still an upload job. Restarting it.');
|
||||||
|
unawaited(_performFileUpload(_currentUploadJob!));
|
||||||
|
} else {
|
||||||
|
if (_uploadQueue.isNotEmpty) {
|
||||||
|
_log.finest('Connectivity regained and the upload queue is not empty. Starting a new upload job.');
|
||||||
|
_currentUploadJob = _uploadQueue.removeFirst();
|
||||||
|
unawaited(_performFileUpload(_currentUploadJob!));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await _downloadLock.synchronized(() async {
|
||||||
|
if (_currentDownloadJob != null) {
|
||||||
|
_log.finest('Connectivity regained and there is still a download job. Restarting it.');
|
||||||
|
unawaited(_performFileDownload(_currentDownloadJob!));
|
||||||
|
} else {
|
||||||
|
if (_downloadQueue.isNotEmpty) {
|
||||||
|
_log.finest('Connectivity regained and the download queue is not empty. Starting a new download job.');
|
||||||
|
_currentDownloadJob = _downloadQueue.removeFirst();
|
||||||
|
unawaited(_performFileDownload(_currentDownloadJob!));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Queue the upload job [job] to be performed.
|
||||||
|
Future<void> uploadFile(FileUploadJob job) async {
|
||||||
|
var canUpload = false;
|
||||||
|
await _uploadLock.synchronized(() async {
|
||||||
|
if (_currentUploadJob != null) {
|
||||||
|
_uploadQueue.add(job);
|
||||||
|
} else {
|
||||||
|
_currentUploadJob = job;
|
||||||
|
canUpload = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (canUpload) {
|
||||||
|
unawaited(_performFileUpload(job));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Queue the download job [job] to be performed.
|
||||||
|
Future<void> downloadFile(FileDownloadJob job) async {
|
||||||
|
var canDownload = false;
|
||||||
|
await _uploadLock.synchronized(() async {
|
||||||
|
if (_currentDownloadJob != null) {
|
||||||
|
_downloadQueue.add(job);
|
||||||
|
} else {
|
||||||
|
_currentDownloadJob = job;
|
||||||
|
canDownload = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (canDownload) {
|
||||||
|
unawaited(_performFileDownload(job));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Actually attempt to upload the file described by the job [job].
|
||||||
|
Future<void> _performFileUpload(FileUploadJob job) async {
|
||||||
|
_log.finest('Beginning upload of ${job.path}');
|
||||||
|
final data = await File(job.path).readAsBytes();
|
||||||
|
final putUri = Uri.parse(job.putUrl);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final response = await dio.Dio().putUri<dynamic>(
|
||||||
|
putUri,
|
||||||
|
options: dio.Options(
|
||||||
|
headers: job.headers,
|
||||||
|
contentType: 'application/octet-stream',
|
||||||
|
requestEncoder: (_, __) => data,
|
||||||
|
),
|
||||||
|
data: data,
|
||||||
|
onSendProgress: (count, total) {
|
||||||
|
final progress = count.toDouble() / total.toDouble();
|
||||||
|
sendEvent(
|
||||||
|
ProgressEvent(
|
||||||
|
id: job.mId,
|
||||||
|
progress: progress == 1 ? 0.99 : progress,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
if (response.statusCode != 201) {
|
||||||
|
// TODO(PapaTutuWawa): Trigger event
|
||||||
|
_log.severe('Upload failed');
|
||||||
|
} else {
|
||||||
|
// TODO(PapaTutuWawa): Trigger event
|
||||||
|
_log.fine('Upload was successful');
|
||||||
|
}
|
||||||
|
} on dio.DioError {
|
||||||
|
// TODO(PapaTutuWawa): Check if this is a timeout
|
||||||
|
_log.finest('Upload failed due to connection error');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free the upload resources for the next one
|
||||||
|
if (GetIt.I.get<ConnectivityService>().currentState == ConnectivityResult.none) return;
|
||||||
|
await _uploadLock.synchronized(() async {
|
||||||
|
if (_uploadQueue.isNotEmpty) {
|
||||||
|
_currentUploadJob = _uploadQueue.removeFirst();
|
||||||
|
unawaited(_performFileUpload(_currentUploadJob!));
|
||||||
|
} else {
|
||||||
|
_currentUploadJob = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Actually attempt to download the file described by the job [job].
|
||||||
|
Future<void> _performFileDownload(FileDownloadJob job) async {
|
||||||
|
_log.finest('Downloading ${job.url}');
|
||||||
|
final uri = Uri.parse(job.url);
|
||||||
|
final filename = uri.pathSegments.last;
|
||||||
|
final downloadedPath = await getDownloadPath(filename, job.conversationJid, job.mimeGuess);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final response = await dio.Dio().downloadUri(
|
||||||
|
uri,
|
||||||
|
downloadedPath,
|
||||||
|
onReceiveProgress: (count, total) {
|
||||||
|
final progress = count.toDouble() / total.toDouble();
|
||||||
|
sendEvent(
|
||||||
|
ProgressEvent(
|
||||||
|
id: job.mId,
|
||||||
|
progress: progress == 1 ? 0.99 : progress,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!isRequestOkay(response.statusCode)) {
|
||||||
|
// TODO(PapaTutuWawa): Error handling
|
||||||
|
// TODO(PapaTutuWawa): Trigger event
|
||||||
|
_log.warning('HTTP GET of ${job.url} returned ${response.statusCode}');
|
||||||
|
} else {
|
||||||
|
// Check the MIME type
|
||||||
|
final notification = GetIt.I.get<NotificationsService>();
|
||||||
|
final mime = job.mimeGuess ?? lookupMimeType(downloadedPath);
|
||||||
|
|
||||||
|
if (mime != null && ['image/', 'video/', 'audio/'].any(mime.startsWith)) {
|
||||||
|
MoxplatformPlugin.media.scanFile(downloadedPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
final msg = await GetIt.I.get<MessageService>().updateMessage(
|
||||||
|
job.mId,
|
||||||
|
mediaUrl: downloadedPath,
|
||||||
|
mediaType: mime,
|
||||||
|
);
|
||||||
|
|
||||||
|
sendEvent(MessageUpdatedEvent(message: msg.copyWith(isDownloading: false)));
|
||||||
|
|
||||||
|
if (notification.shouldShowNotification(msg.conversationJid)) {
|
||||||
|
_log.finest('Creating notification with bigPicture $downloadedPath');
|
||||||
|
await notification.showNotification(msg, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
final conv = (await GetIt.I.get<ConversationService>().getConversationByJid(job.conversationJid))!;
|
||||||
|
final sharedMedium = await GetIt.I.get<DatabaseService>().addSharedMediumFromData(
|
||||||
|
downloadedPath,
|
||||||
|
msg.timestamp,
|
||||||
|
mime: mime,
|
||||||
|
);
|
||||||
|
final newConv = await GetIt.I.get<ConversationService>().updateConversation(
|
||||||
|
conv.id,
|
||||||
|
sharedMedia: [sharedMedium],
|
||||||
|
);
|
||||||
|
sendEvent(ConversationUpdatedEvent(conversation: newConv));
|
||||||
|
}
|
||||||
|
} on dio.DioError {
|
||||||
|
// TODO(PapaTutuWawa): Do
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free the download resources for the next one
|
||||||
|
if (GetIt.I.get<ConnectivityService>().currentState == ConnectivityResult.none) return;
|
||||||
|
await _uploadLock.synchronized(() async {
|
||||||
|
if (_uploadQueue.isNotEmpty) {
|
||||||
|
_currentDownloadJob = _downloadQueue.removeFirst();
|
||||||
|
unawaited(_performFileDownload(_currentDownloadJob!));
|
||||||
|
} else {
|
||||||
|
_currentDownloadJob = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
46
lib/service/httpfiletransfer/jobs.dart
Normal file
46
lib/service/httpfiletransfer/jobs.dart
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import 'package:meta/meta.dart';
|
||||||
|
|
||||||
|
/// A job describing the download of a file.
|
||||||
|
@immutable
|
||||||
|
class FileUploadJob {
|
||||||
|
|
||||||
|
const FileUploadJob(this.path, this.putUrl, this.headers, this.mId);
|
||||||
|
final String path;
|
||||||
|
final String putUrl;
|
||||||
|
final Map<String, String> headers;
|
||||||
|
final int mId;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool operator ==(Object other) {
|
||||||
|
return other is FileUploadJob &&
|
||||||
|
path == other.path &&
|
||||||
|
putUrl == other.putUrl &&
|
||||||
|
headers == other.headers &&
|
||||||
|
mId == other.mId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
int get hashCode => path.hashCode ^ putUrl.hashCode ^ headers.hashCode ^ mId.hashCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A job describing the upload of a file.
|
||||||
|
@immutable
|
||||||
|
class FileDownloadJob {
|
||||||
|
|
||||||
|
const FileDownloadJob(this.url, this.mId, this.conversationJid, this.mimeGuess);
|
||||||
|
final String url;
|
||||||
|
final int mId;
|
||||||
|
final String conversationJid;
|
||||||
|
final String? mimeGuess;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool operator ==(Object other) {
|
||||||
|
return other is FileDownloadJob &&
|
||||||
|
url == other.url &&
|
||||||
|
mId == other.mId &&
|
||||||
|
conversationJid == other.conversationJid &&
|
||||||
|
mimeGuess == other.mimeGuess;
|
||||||
|
}
|
||||||
|
@override
|
||||||
|
int get hashCode => url.hashCode ^ mId.hashCode ^ conversationJid.hashCode ^ mimeGuess.hashCode;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user