Move reporting item changes to a background task.

This commit is contained in:
Tom Hacohen 2021-01-12 11:57:43 +02:00
parent 61bd82f1e3
commit 240469342b

View File

@ -1,11 +1,11 @@
import typing as t
from asgiref.sync import sync_to_async, async_to_sync
from asgiref.sync import sync_to_async
from django.core import exceptions as django_exceptions
from django.core.files.base import ContentFile
from django.db import transaction, IntegrityError
from django.db.models import Q, QuerySet
from fastapi import APIRouter, Depends, status, Request
from fastapi import APIRouter, Depends, status, Request, BackgroundTasks
from django_etebase import models
from myauth.models import UserType
@ -191,14 +191,13 @@ class ItemBatchIn(BaseModel):
)
# FIXME: make it a background task
def report_items_changed(col_uid: str, stoken: str, items: t.List[CollectionItemIn]):
async def report_items_changed(col_uid: str, stoken: str, items: t.List[CollectionItemIn]):
if not redisw.is_active:
return
redis = redisw.redis
content = msgpack_encode(CollectionItemListResponse(data=items, stoken=stoken, done=True).dict())
async_to_sync(redis.publish)(f"col.{col_uid}", content)
await redis.publish(f"col.{col_uid}", content)
def collection_list_common(
@ -462,7 +461,14 @@ async def item_list_subscription_ticket(
return await get_ticket(TicketRequest(collection=collection.uid), user)
def item_bulk_common(data: ItemBatchIn, user: UserType, stoken: t.Optional[str], uid: str, validate_etag: bool):
def item_bulk_common(
data: ItemBatchIn,
user: UserType,
stoken: t.Optional[str],
uid: str,
validate_etag: bool,
background_tasks: BackgroundTasks,
):
queryset = get_collection_queryset(user)
with transaction.atomic(): # We need this for locking the collection object
collection_object = queryset.select_for_update().get(uid=uid)
@ -487,7 +493,7 @@ def item_bulk_common(data: ItemBatchIn, user: UserType, stoken: t.Optional[str],
status_code=status.HTTP_409_CONFLICT,
)
report_items_changed(collection_object.uid, collection_object.stoken, data.items)
background_tasks.add_task(report_items_changed, collection_object.uid, collection_object.stoken, data.items)
@item_router.get(
@ -564,20 +570,22 @@ def fetch_updates(
def item_transaction(
collection_uid: str,
data: ItemBatchIn,
background_tasks: BackgroundTasks,
stoken: t.Optional[str] = None,
user: UserType = Depends(get_authenticated_user),
):
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True)
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True, background_tasks=background_tasks)
@item_router.post("/item/batch/", dependencies=[Depends(has_write_access), *PERMISSIONS_READWRITE])
def item_batch(
collection_uid: str,
data: ItemBatchIn,
background_tasks: BackgroundTasks,
stoken: t.Optional[str] = None,
user: UserType = Depends(get_authenticated_user),
):
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=False)
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=False, background_tasks=background_tasks)
# Chunks