mirror of
https://github.com/Tech-Workers-Coalition-Italia/mobilizon-reshare.git
synced 2025-01-30 17:14:53 +01:00
4dc1e4080a
* filtering publications with inactive publishers * filtering publications with inactive publishers * WIP: Generate publications at runtime. TODO: - change `MobilizonEvent.compute_status`'s contract and break everything - while we're at it we should remove `PublicationStatus.WAITING` - test `storage.query.create_publications_for_publishers` * cli: inspect_events: Unnest if-then-else. * publishers: abstract: Remove `EventPublication.make`. * fixed tests * split query.py file * added tests for get_unpublished_events * added tests * more tests * added start test * main: start: Remove filter_publications_with_inactive_publishers. Co-authored-by: Giacomo Leidi <goodoldpaul@autistici.org>
69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
from datetime import timedelta
|
|
from typing import Union
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
|
|
from mobilizon_reshare.models.event import Event
|
|
from mobilizon_reshare.models.publication import PublicationStatus, Publication
|
|
from mobilizon_reshare.models.publisher import Publisher
|
|
from tests.storage import today
|
|
|
|
|
|
async def _generate_publishers(specification):
|
|
|
|
publishers = []
|
|
for i, publisher_name in enumerate(specification["publisher"]):
|
|
publisher = Publisher(
|
|
id=UUID(int=i), name=publisher_name, account_ref=f"account_ref_{i}"
|
|
)
|
|
publishers.append(publisher)
|
|
await publisher.save()
|
|
|
|
return publishers
|
|
|
|
|
|
async def _generate_events(specification):
|
|
events = []
|
|
if "event" in specification.keys():
|
|
for i in range(specification["event"]):
|
|
begin_date = today + timedelta(days=i)
|
|
event = Event(
|
|
id=UUID(int=i),
|
|
name=f"event_{i}",
|
|
description=f"desc_{i}",
|
|
mobilizon_id=UUID(int=i),
|
|
mobilizon_link=f"moblink_{i}",
|
|
thumbnail_link=f"thumblink_{i}",
|
|
location=f"loc_{i}",
|
|
begin_datetime=begin_date,
|
|
end_datetime=begin_date + timedelta(hours=2),
|
|
)
|
|
events.append(event)
|
|
await event.save()
|
|
return events
|
|
|
|
|
|
async def _generate_publications(events, publishers, specification):
|
|
if "publications" in specification.keys():
|
|
for i, publication in enumerate(specification["publications"]):
|
|
status = publication.get("status", PublicationStatus.COMPLETED)
|
|
timestamp = publication.get("timestamp", today + timedelta(hours=i))
|
|
await Publication.create(
|
|
id=UUID(int=i),
|
|
status=status,
|
|
timestamp=timestamp,
|
|
event_id=events[publication["event_idx"]].id,
|
|
publisher_id=publishers[publication["publisher_idx"]].id,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def generate_models():
|
|
async def _generate_models(specification: dict[str, Union[int, list]]):
|
|
publishers = await _generate_publishers(specification)
|
|
events = await _generate_events(specification)
|
|
await _generate_publications(events, publishers, specification)
|
|
|
|
return _generate_models
|