Simone Robutti 2197e07213
notify failure (#52)
* fixed visualization

* simplified tests

* split into files

* refactored test expected publications

* split update tests

* expanded specifications and tests

* added event_status window tests

* fixed 'all' command

* renamed everything

* fixed uppercase

* refactored main and publisher to add notifications

* tested report successful

* added tests to publisher coordinator

* added more coordinator tests

* test coordinator success
2021-08-27 23:45:24 +02:00

51 lines
2.0 KiB
Python

from datetime import datetime, timezone, timedelta
from uuid import UUID
from mobilizon_reshare.models.publication import Publication
from mobilizon_reshare.models.publication import PublicationStatus
today = datetime(
year=2021, month=6, day=6, hour=5, minute=0, tzinfo=timezone(timedelta(hours=2)),
)
complete_specification = {
"event": 4,
"publications": [
{"event_idx": 0, "publisher_idx": 0, "status": PublicationStatus.COMPLETED},
{"event_idx": 0, "publisher_idx": 1, "status": PublicationStatus.COMPLETED},
{"event_idx": 0, "publisher_idx": 2, "status": PublicationStatus.COMPLETED},
{"event_idx": 1, "publisher_idx": 0, "status": PublicationStatus.WAITING},
{"event_idx": 1, "publisher_idx": 1, "status": PublicationStatus.WAITING},
{"event_idx": 1, "publisher_idx": 2, "status": PublicationStatus.COMPLETED},
{"event_idx": 2, "publisher_idx": 0, "status": PublicationStatus.FAILED},
{"event_idx": 2, "publisher_idx": 1, "status": PublicationStatus.COMPLETED},
{"event_idx": 2, "publisher_idx": 2, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 0, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 1, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 2, "status": PublicationStatus.WAITING},
],
"publisher": ["telegram", "twitter", "mastodon"],
}
def _make_test_publication(publication_id, status, event_id, publisher_id):
return Publication(
id=UUID(int=publication_id),
status=status,
timestamp=today + timedelta(hours=publication_id),
event_id=UUID(int=event_id),
publisher_id=UUID(int=publisher_id),
)
result_publication = {
i: _make_test_publication(
i,
publisher_id=publication["publisher_idx"],
event_id=publication["event_idx"],
status=publication["status"],
)
for i, publication in enumerate(complete_specification["publications"])
}