* fixed visualization

* simplified tests

* split into files

* refactored test expected publications

* split update tests

* expanded specifications and tests

* added event_status window tests

* fixed 'all' command
This commit is contained in:
Simone Robutti 2021-08-14 18:23:55 +02:00 committed by GitHub
parent 41317f062d
commit 450e5b0044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 368 additions and 371 deletions

View File

@ -10,8 +10,18 @@ from mobilizon_bots.cli.main import main
from mobilizon_bots.event.event import EventPublicationStatus
settings_file_option = click.option("--settings-file", type=click.Path(exists=True))
from_date_option = click.option("--begin", type=click.DateTime(), expose_value=True)
to_date_option = click.option("--end", type=click.DateTime(), expose_value=True)
from_date_option = click.option(
"--begin",
type=click.DateTime(),
expose_value=True,
help="Include only events that begin after this datetime",
)
to_date_option = click.option(
"--end",
type=click.DateTime(),
expose_value=True,
help="Include only events that begin before this datetime",
)
@click.group()
@ -38,13 +48,17 @@ def inspect(ctx, begin, end):
@inspect.command()
@settings_file_option
def all(settings_file):
safe_execution(inspect_events, settings_file)
@pass_obj
def all(obj, settings_file):
safe_execution(
functools.partial(inspect_events, frm=obj["begin"], to=obj["end"],),
settings_file,
)
@inspect.command()
@settings_file_option
@pass_obj
@settings_file_option
def waiting(obj, settings_file):
safe_execution(
functools.partial(
@ -58,8 +72,8 @@ def waiting(obj, settings_file):
@inspect.command()
@settings_file_option
@pass_obj
@settings_file_option
def failed(obj, settings_file):
safe_execution(
functools.partial(
@ -73,8 +87,8 @@ def failed(obj, settings_file):
@inspect.command()
@settings_file_option
@pass_obj
@settings_file_option
def partial(obj, settings_file):
safe_execution(
functools.partial(

View File

@ -24,7 +24,7 @@ def show_events(events: Iterable[MobilizonEvent]):
def pretty(event: MobilizonEvent):
return (
f"{event.name}|{click.style(event.status.name, fg=status_to_color[event.status])}"
f"|{event.mobilizon_id}|{event.publication_time['telegram'].isoformat()}"
f"|{event.mobilizon_id}|{event.begin_datetime.isoformat()}->{event.end_datetime.isoformat()}"
)

View File

@ -39,6 +39,7 @@ class MobilizonEvent:
assert self.status in [
EventPublicationStatus.COMPLETED,
EventPublicationStatus.PARTIAL,
EventPublicationStatus.FAILED,
]
def _fill_template(self, pattern: Template) -> str:
@ -85,10 +86,10 @@ class MobilizonEvent:
description=event.description,
begin_datetime=arrow.get(
tortoise.timezone.localtime(value=event.begin_datetime, timezone=tz)
).to('local'),
).to("local"),
end_datetime=arrow.get(
tortoise.timezone.localtime(value=event.end_datetime, timezone=tz)
).to('local'),
).to("local"),
mobilizon_link=event.mobilizon_link,
mobilizon_id=event.mobilizon_id,
thumbnail_link=event.thumbnail_link,
@ -96,7 +97,7 @@ class MobilizonEvent:
publication_time={
pub.publisher.name: arrow.get(
tortoise.timezone.localtime(value=pub.timestamp, timezone=tz)
).to('local')
).to("local")
for pub in event.publications
}
if publication_status != PublicationStatus.WAITING

View File

@ -0,0 +1,50 @@
from datetime import datetime, timezone, timedelta
from uuid import UUID
from mobilizon_bots.models.publication import PublicationStatus
from mobilizon_bots.models.publication import Publication
today = datetime(
year=2021, month=6, day=6, hour=5, minute=0, tzinfo=timezone(timedelta(hours=2)),
)
complete_specification = {
"event": 4,
"publications": [
{"event_idx": 0, "publisher_idx": 0, "status": PublicationStatus.COMPLETED},
{"event_idx": 0, "publisher_idx": 1, "status": PublicationStatus.COMPLETED},
{"event_idx": 0, "publisher_idx": 2, "status": PublicationStatus.COMPLETED},
{"event_idx": 1, "publisher_idx": 0, "status": PublicationStatus.WAITING},
{"event_idx": 1, "publisher_idx": 1, "status": PublicationStatus.WAITING},
{"event_idx": 1, "publisher_idx": 2, "status": PublicationStatus.COMPLETED},
{"event_idx": 2, "publisher_idx": 0, "status": PublicationStatus.FAILED},
{"event_idx": 2, "publisher_idx": 1, "status": PublicationStatus.COMPLETED},
{"event_idx": 2, "publisher_idx": 2, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 0, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 1, "status": PublicationStatus.WAITING},
{"event_idx": 3, "publisher_idx": 2, "status": PublicationStatus.WAITING},
],
}
def _make_test_publication(publication_id, status, event_id, publisher_id):
return Publication(
id=UUID(int=publication_id),
status=status,
timestamp=today + timedelta(hours=publication_id),
event_id=UUID(int=event_id),
publisher_id=UUID(int=publisher_id),
)
result_publication = {
i: _make_test_publication(
i,
publisher_id=publication["publisher_idx"],
event_id=publication["event_idx"],
status=publication["status"],
)
for i, publication in enumerate(complete_specification["publications"])
}

70
tests/storage/conftest.py Normal file
View File

@ -0,0 +1,70 @@
from datetime import timedelta
from typing import Union
from uuid import UUID
import pytest
from mobilizon_bots.models.event import Event
from mobilizon_bots.models.publication import PublicationStatus, Publication
from mobilizon_bots.models.publisher import Publisher
from tests.storage import today
async def _generate_publishers(specification):
publishers = []
for i in range(
specification["publisher"] if "publisher" in specification.keys() else 3
):
publisher = Publisher(
id=UUID(int=i), name=f"publisher_{i}", account_ref=f"account_ref_{i}"
)
publishers.append(publisher)
await publisher.save()
return publishers
async def _generate_events(specification):
events = []
if "event" in specification.keys():
for i in range(specification["event"]):
begin_date = today + timedelta(days=i)
event = Event(
id=UUID(int=i),
name=f"event_{i}",
description=f"desc_{i}",
mobilizon_id=f"mobid_{i}",
mobilizon_link=f"moblink_{i}",
thumbnail_link=f"thumblink_{i}",
location=f"loc_{i}",
begin_datetime=begin_date,
end_datetime=begin_date + timedelta(hours=2),
)
events.append(event)
await event.save()
return events
async def _generate_publications(events, publishers, specification):
if "publications" in specification.keys():
for i in range(len(specification["publications"])):
publication = specification["publications"][i]
status = publication.get("status", PublicationStatus.WAITING)
timestamp = publication.get("timestamp", today + timedelta(hours=i))
await Publication.create(
id=UUID(int=i),
status=status,
timestamp=timestamp,
event_id=events[publication["event_idx"]].id,
publisher_id=publishers[publication["publisher_idx"]].id,
)
@pytest.fixture(scope="module")
def generate_models():
async def _generate_models(specification: dict[str, Union[int, list]]):
publishers = await _generate_publishers(specification)
events = await _generate_events(specification)
await _generate_publications(events, publishers, specification)
return _generate_models

View File

@ -1,19 +1,12 @@
from datetime import datetime, timedelta, timezone
from typing import Union
from uuid import UUID
from datetime import timedelta
import arrow
import pytest
from mobilizon_bots.event.event import MobilizonEvent, EventPublicationStatus
from mobilizon_bots.models.event import Event
from mobilizon_bots.models.publication import PublicationStatus, Publication
from mobilizon_bots.models.publisher import Publisher
from mobilizon_bots.publishers.coordinator import (
PublisherCoordinatorReport,
PublicationReport,
)
from mobilizon_bots.models.publication import PublicationStatus
from mobilizon_bots.storage.query import events_with_status
from mobilizon_bots.storage.query import (
get_published_events,
get_unpublished_events,
@ -22,107 +15,36 @@ from mobilizon_bots.storage.query import (
prefetch_event_relations,
get_publishers,
publications_with_status,
update_publishers,
save_publication_report,
)
from tests.storage import result_publication
from tests.storage import complete_specification
from tests.storage import today
event_0 = MobilizonEvent(
name="event_0",
description="desc_0",
mobilizon_id="mobid_0",
mobilizon_link="moblink_0",
thumbnail_link="thumblink_0",
location="loc_0",
publication_time={
"publisher_0": arrow.get(today + timedelta(hours=0)),
"publisher_1": arrow.get(today + timedelta(hours=1)),
"publisher_2": arrow.get(today + timedelta(hours=2)),
},
status=EventPublicationStatus.COMPLETED,
begin_datetime=arrow.get(today + timedelta(days=0)),
end_datetime=arrow.get(today + timedelta(days=0) + timedelta(hours=2)),
)
today = datetime(
year=2021,
month=6,
day=6,
hour=5,
minute=0,
tzinfo=timezone(timedelta(hours=2)),
)
@pytest.mark.asyncio
async def test_get_published_events(generate_models):
await generate_models(complete_specification)
published_events = list(await get_published_events())
two_publishers_specification = {"publisher": 2}
complete_specification = {
"event": 4,
"publications": [
{"event_idx": 0, "publisher_idx": 0},
{
"event_idx": 0,
"publisher_idx": 1,
"status": PublicationStatus.COMPLETED,
},
{
"event_idx": 1,
"publisher_idx": 0,
"status": PublicationStatus.WAITING,
},
{
"event_idx": 1,
"publisher_idx": 1,
},
{
"event_idx": 2,
"publisher_idx": 2,
"status": PublicationStatus.FAILED,
},
{
"event_idx": 2,
"publisher_idx": 1,
"status": PublicationStatus.COMPLETED,
},
{
"event_idx": 3,
"publisher_idx": 2,
"status": PublicationStatus.COMPLETED,
},
],
}
@pytest.fixture(scope="module")
def generate_models():
async def _generate_models(specification: dict[str, Union[int, list]]):
publishers = []
for i in range(
specification["publisher"] if "publisher" in specification.keys() else 3
):
publisher = Publisher(
id=UUID(int=i), name=f"publisher_{i}", account_ref=f"account_ref_{i}"
)
publishers.append(publisher)
await publisher.save()
events = []
if "event" in specification.keys():
for i in range(specification["event"]):
begin_date = today + timedelta(days=i)
event = Event(
id=UUID(int=i),
name=f"event_{i}",
description=f"desc_{i}",
mobilizon_id=f"mobid_{i}",
mobilizon_link=f"moblink_{i}",
thumbnail_link=f"thumblink_{i}",
location=f"loc_{i}",
begin_datetime=begin_date,
end_datetime=begin_date + timedelta(hours=2),
)
events.append(event)
await event.save()
if "publications" in specification.keys():
for i in range(len(specification["publications"])):
await Publication.create(
id=UUID(int=i),
status=specification["publications"][i].get(
"status", PublicationStatus.WAITING
),
timestamp=specification["publications"][i].get(
"timestamp", today + timedelta(hours=i)
),
event_id=events[specification["publications"][i]["event_idx"]].id,
publisher_id=publishers[
specification["publications"][i]["publisher_idx"]
].id,
)
return _generate_models
assert len(published_events) == 1
assert published_events == [event_0]
@pytest.mark.asyncio
@ -139,46 +61,11 @@ def generate_models():
mobilizon_link="moblink_3",
thumbnail_link="thumblink_3",
location="loc_3",
publication_time={
"publisher_2": arrow.get(today + timedelta(hours=6)),
},
status=EventPublicationStatus.COMPLETED,
status=EventPublicationStatus.WAITING,
begin_datetime=arrow.get(today + timedelta(days=3)),
end_datetime=arrow.get(
today + timedelta(days=3) + timedelta(hours=2)
),
)
],
]
],
)
async def test_get_published_events(specification, expected_result, generate_models):
await generate_models(specification)
published_events = list(await get_published_events())
assert len(published_events) == len(expected_result)
assert published_events == expected_result
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,expected_result",
[
[
complete_specification,
[
MobilizonEvent(
name="event_1",
description="desc_1",
mobilizon_id="mobid_1",
mobilizon_link="moblink_1",
thumbnail_link="thumblink_1",
location="loc_1",
status=EventPublicationStatus.WAITING,
begin_datetime=arrow.get(today + timedelta(days=1)),
end_datetime=arrow.get(
today + timedelta(days=1) + timedelta(hours=2)
),
),
],
]
@ -194,10 +81,9 @@ async def test_get_unpublished_events(specification, expected_result, generate_m
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,expected_result",
"expected_result",
[
[
complete_specification,
[
Event(
name="event_1",
@ -234,12 +120,9 @@ async def test_get_unpublished_events(specification, expected_result, generate_m
],
)
async def test_create_unpublished_events(
specification,
expected_result,
generate_models,
event_generator,
expected_result, generate_models, event_generator,
):
await generate_models(specification)
await generate_models(complete_specification)
event_3 = event_generator(begin_date=arrow.get(today + timedelta(days=6)))
event_4 = event_generator(
@ -255,21 +138,12 @@ async def test_create_unpublished_events(
)
unpublished_events = list(await get_unpublished_events())
assert len(unpublished_events) == 3
assert unpublished_events[0].mobilizon_id == unpublished_events[0].mobilizon_id
assert unpublished_events[1].mobilizon_id == unpublished_events[1].mobilizon_id
assert unpublished_events[2].mobilizon_id == unpublished_events[2].mobilizon_id
assert len(unpublished_events) == 4
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification",
[
complete_specification,
],
)
async def test_get_mobilizon_event_publications(specification, generate_models):
await generate_models(specification)
async def test_get_mobilizon_event_publications(generate_models):
await generate_models(complete_specification)
models = await prefetch_event_relations(Event.filter(name="event_0"))
mobilizon_event = MobilizonEvent.from_model(models[0])
@ -279,44 +153,33 @@ async def test_get_mobilizon_event_publications(specification, generate_models):
await pub.fetch_related("event")
await pub.fetch_related("publisher")
assert len(publications) == 2
assert len(publications) == 3
assert publications[0].event.name == "event_0"
assert publications[0].publisher.name == "publisher_0"
assert publications[0].status == PublicationStatus.WAITING
assert publications[0].status == PublicationStatus.COMPLETED
assert publications[1].event.name == "event_0"
assert publications[1].publisher.name == "publisher_1"
assert publications[1].status == PublicationStatus.COMPLETED
assert publications[2].event.name == "event_0"
assert publications[2].publisher.name == "publisher_2"
assert publications[2].status == PublicationStatus.COMPLETED
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,name,expected_result",
"name,expected_result",
[
[
complete_specification,
None,
{
"publisher_0",
"publisher_1",
"publisher_2",
},
],
[
complete_specification,
"publisher_0",
{"publisher_0"},
],
[None, {"publisher_0", "publisher_1", "publisher_2"}],
["publisher_0", {"publisher_0"}],
],
)
async def test_get_publishers(
specification,
name,
expected_result,
generate_models,
name, expected_result, generate_models,
):
await generate_models(specification)
await generate_models(complete_specification)
result = await get_publishers(name)
if type(result) == list:
@ -330,128 +193,56 @@ async def test_get_publishers(
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,status,mobilizon_id,from_date,to_date,expected_result",
"status,mobilizon_id,from_date,to_date,expected_result",
[
[
complete_specification,
PublicationStatus.WAITING,
None,
None,
None,
[
Publication(
id=UUID(int=0),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=0),
event_id=UUID(int=0),
publisher_id=UUID(int=0),
),
Publication(
id=UUID(int=2),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=2),
event_id=UUID(int=0),
publisher_id=UUID(int=1),
),
Publication(
id=UUID(int=3),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=3),
event_id=UUID(int=1),
publisher_id=UUID(int=1),
),
result_publication[3],
result_publication[4],
result_publication[8],
result_publication[9],
result_publication[10],
result_publication[11],
],
],
[
complete_specification,
PublicationStatus.WAITING,
"mobid_1",
None,
None,
[
Publication(
id=UUID(int=2),
status=PublicationStatus.COMPLETED,
timestamp=today + timedelta(hours=2),
event_id=UUID(int=1),
publisher_id=UUID(int=1),
),
Publication(
id=UUID(int=3),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=5),
event_id=UUID(int=1),
publisher_id=UUID(int=1),
),
],
[result_publication[3], result_publication[4]],
],
[
complete_specification,
PublicationStatus.WAITING,
None,
arrow.get(today + timedelta(hours=-1)),
arrow.get(today + timedelta(hours=1)),
[
Publication(
id=UUID(int=0),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=0),
event_id=UUID(int=0),
publisher_id=UUID(int=0),
),
],
arrow.get(today),
arrow.get(today + timedelta(hours=6)),
[result_publication[3], result_publication[4]],
],
[
complete_specification,
PublicationStatus.WAITING,
PublicationStatus.COMPLETED,
None,
arrow.get(today + timedelta(hours=1)),
None,
[
Publication(
id=UUID(int=2),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=2),
event_id=UUID(int=0),
publisher_id=UUID(int=1),
),
Publication(
id=UUID(int=3),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=5),
event_id=UUID(int=1),
publisher_id=UUID(int=1),
),
],
[result_publication[2], result_publication[5], result_publication[7]],
],
[
complete_specification,
PublicationStatus.WAITING,
PublicationStatus.COMPLETED,
None,
None,
arrow.get(today + timedelta(hours=1)),
[
Publication(
id=UUID(int=0),
status=PublicationStatus.WAITING,
timestamp=today + timedelta(hours=0),
event_id=UUID(int=0),
publisher_id=UUID(int=0),
),
],
arrow.get(today + timedelta(hours=2)),
[result_publication[0], result_publication[1]],
],
],
)
async def test_publications_with_status(
specification,
status,
mobilizon_id,
from_date,
to_date,
expected_result,
generate_models,
status, mobilizon_id, from_date, to_date, expected_result, generate_models,
):
await generate_models(specification)
await generate_models(complete_specification)
publications = await publications_with_status(
status=status,
event_mobilizon_id=mobilizon_id,
@ -459,107 +250,62 @@ async def test_publications_with_status(
to_date=to_date,
)
assert len(publications) == len(expected_result)
assert publications == expected_result
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,names,expected_result",
"status, expected_events_count",
[
[
two_publishers_specification,
["publisher_0", "publisher_1"],
{
Publisher(id=UUID(int=0), name="publisher_0"),
Publisher(id=UUID(int=1), name="publisher_1"),
},
],
[
{"publisher": 0},
["publisher_0", "publisher_1"],
{"publisher_0", "publisher_1"},
],
[
two_publishers_specification,
["publisher_0", "publisher_2", "publisher_3"],
{"publisher_0", "publisher_1", "publisher_2", "publisher_3"},
],
(EventPublicationStatus.COMPLETED, 1),
(EventPublicationStatus.FAILED, 1),
(EventPublicationStatus.PARTIAL, 1),
(EventPublicationStatus.WAITING, 1),
],
)
async def test_update_publishers(
specification,
names,
expected_result,
generate_models,
):
await generate_models(specification)
await update_publishers(names)
if type(list(expected_result)[0]) == Publisher:
publishers = set(await get_publishers())
else:
publishers = set(p.name for p in await get_publishers())
async def test_event_with_status(generate_models, status, expected_events_count):
await generate_models(complete_specification)
result = list(await events_with_status([status]))
assert len(publishers) == len(expected_result)
assert publishers == expected_result
assert len(result) == expected_events_count
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,report,event,expected_result",
"status, expected_events_count, begin_window, end_window",
[
[
complete_specification,
PublisherCoordinatorReport(
reports={
UUID(int=2): PublicationReport(
status=PublicationStatus.FAILED, reason="Invalid credentials"
),
UUID(int=3): PublicationReport(
status=PublicationStatus.COMPLETED, reason=""
),
}
),
MobilizonEvent(
name="event_1",
description="desc_1",
mobilizon_id="mobid_1",
mobilizon_link="moblink_1",
thumbnail_link="thumblink_1",
location="loc_1",
status=EventPublicationStatus.WAITING,
begin_datetime=arrow.get(today + timedelta(days=1)),
end_datetime=arrow.get(today + timedelta(days=1) + timedelta(hours=2)),
),
{
UUID(int=2): Publication(
id=UUID(int=2),
status=PublicationStatus.FAILED,
reason="Invalid credentials",
),
UUID(int=3): Publication(
id=UUID(int=0), status=PublicationStatus.COMPLETED, reason=""
),
},
],
(
EventPublicationStatus.COMPLETED,
1,
arrow.get(today + timedelta(hours=-1)),
None,
),
(
EventPublicationStatus.COMPLETED,
0,
arrow.get(today + timedelta(hours=1)),
None,
),
(
EventPublicationStatus.COMPLETED,
1,
arrow.get(today + timedelta(hours=-2)),
arrow.get(today + timedelta(hours=1)),
),
(
EventPublicationStatus.COMPLETED,
0,
arrow.get(today + timedelta(hours=-2)),
arrow.get(today + timedelta(hours=0)),
),
],
)
async def test_save_publication_report(
specification,
report,
event,
expected_result,
generate_models,
async def test_event_with_status_window(
generate_models, status, expected_events_count, begin_window, end_window
):
await generate_models(specification)
await save_publication_report(report, event)
publication_ids = set(report.reports.keys())
publications = {
p_id: await Publication.filter(id=p_id).first() for p_id in publication_ids
}
await generate_models(complete_specification)
result = list(
await events_with_status([status], from_date=begin_window, to_date=end_window)
)
assert len(publications) == len(expected_result)
for i in publication_ids:
assert publications[i].status == expected_result[i].status
assert publications[i].reason == expected_result[i].reason
assert publications[i].timestamp
assert len(result) == expected_events_count

View File

@ -0,0 +1,116 @@
from datetime import timedelta
from uuid import UUID
from tests.storage import complete_specification
import arrow
import pytest
from mobilizon_bots.event.event import MobilizonEvent, EventPublicationStatus
from mobilizon_bots.models.publication import PublicationStatus, Publication
from mobilizon_bots.models.publisher import Publisher
from mobilizon_bots.publishers.coordinator import (
PublisherCoordinatorReport,
PublicationReport,
)
from mobilizon_bots.storage.query import (
get_publishers,
update_publishers,
save_publication_report,
)
from tests.storage import today
two_publishers_specification = {"publisher": 2}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,names,expected_result",
[
[
two_publishers_specification,
["publisher_0", "publisher_1"],
{
Publisher(id=UUID(int=0), name="publisher_0"),
Publisher(id=UUID(int=1), name="publisher_1"),
},
],
[
{"publisher": 0},
["publisher_0", "publisher_1"],
{"publisher_0", "publisher_1"},
],
[
two_publishers_specification,
["publisher_0", "publisher_2", "publisher_3"],
{"publisher_0", "publisher_1", "publisher_2", "publisher_3"},
],
],
)
async def test_update_publishers(
specification, names, expected_result, generate_models,
):
await generate_models(specification)
await update_publishers(names)
if type(list(expected_result)[0]) == Publisher:
publishers = set(await get_publishers())
else:
publishers = set(p.name for p in await get_publishers())
assert len(publishers) == len(expected_result)
assert publishers == expected_result
@pytest.mark.asyncio
@pytest.mark.parametrize(
"specification,report,event,expected_result",
[
[
complete_specification,
PublisherCoordinatorReport(
reports={
UUID(int=3): PublicationReport(
status=PublicationStatus.FAILED, reason="Invalid credentials"
),
UUID(int=4): PublicationReport(
status=PublicationStatus.COMPLETED, reason=""
),
}
),
MobilizonEvent(
name="event_1",
description="desc_1",
mobilizon_id="mobid_1",
mobilizon_link="moblink_1",
thumbnail_link="thumblink_1",
location="loc_1",
status=EventPublicationStatus.WAITING,
begin_datetime=arrow.get(today + timedelta(days=1)),
end_datetime=arrow.get(today + timedelta(days=1) + timedelta(hours=2)),
),
{
UUID(int=3): Publication(
id=UUID(int=3),
status=PublicationStatus.FAILED,
reason="Invalid credentials",
),
UUID(int=4): Publication(
id=UUID(int=4), status=PublicationStatus.COMPLETED, reason=""
),
},
],
],
)
async def test_save_publication_report(
specification, report, event, expected_result, generate_models,
):
await generate_models(specification)
await save_publication_report(report, event)
publication_ids = set(report.reports.keys())
publications = {
p_id: await Publication.filter(id=p_id).first() for p_id in publication_ids
}
assert len(publications) == len(expected_result)
for i in publication_ids:
assert publications[i].status == expected_result[i].status
assert publications[i].reason == expected_result[i].reason
assert publications[i].timestamp