Simone Robutti 5335ed8cc3
command tests (#97)
* filtering publications with inactive publishers

* filtering publications with inactive publishers

* WIP: Generate publications at runtime.

TODO:
- change `MobilizonEvent.compute_status`'s contract and break everything
- while we're at it we should remove `PublicationStatus.WAITING`
- test `storage.query.create_publications_for_publishers`

* cli: inspect_events: Unnest if-then-else.

* publishers: abstract: Remove `EventPublication.make`.

* fixed tests

* split query.py file

* added tests for get_unpublished_events

* added tests

* more tests

* added start test

* refactored start test

* added test start with db event

* added test recap

* added failed publication test

* added format test

Co-authored-by: Giacomo Leidi <goodoldpaul@autistici.org>
2021-11-11 16:20:50 +01:00

53 lines
1.7 KiB
Python

import logging
from typing import Optional, List
from arrow import now
from mobilizon_reshare.event.event import EventPublicationStatus, MobilizonEvent
from mobilizon_reshare.publishers import get_active_publishers
from mobilizon_reshare.publishers.abstract import RecapPublication
from mobilizon_reshare.publishers.coordinator import (
RecapCoordinator,
PublicationFailureNotifiersCoordinator,
BaseCoordinatorReport,
)
from mobilizon_reshare.publishers.platforms.platform_mapping import (
get_publisher_class,
get_formatter_class,
)
from mobilizon_reshare.storage.query.read_query import events_with_status
logger = logging.getLogger(__name__)
async def select_events_to_recap() -> List[MobilizonEvent]:
return list(
await events_with_status(
status=[EventPublicationStatus.COMPLETED], from_date=now()
)
)
async def recap() -> Optional[BaseCoordinatorReport]:
# I want to recap only the events that have been succesfully published and that haven't happened yet
events_to_recap = await select_events_to_recap()
if events_to_recap:
logger.debug(f"Found {len(events_to_recap)} events to recap.")
recap_publications = [
RecapPublication(
get_publisher_class(publisher)(),
get_formatter_class(publisher)(),
events_to_recap,
)
for publisher in get_active_publishers()
]
reports = RecapCoordinator(recap_publications).run()
for report in reports.reports:
if report.status == EventPublicationStatus.FAILED:
PublicationFailureNotifiersCoordinator(report).notify_failure()
return reports
else:
logger.debug("Found no events")