mirror of
https://github.com/Tech-Workers-Coalition-Italia/mobilizon-reshare.git
synced 2025-01-28 16:20:10 +01:00
370e00d187
* fixed parsing bug * implemented events and publications endpoints split endpoints by entity removed credentials * add pagination (#179) * added pagination * integrated pagination with tortoise * added test for publications * removed converter file * moved publications to dataclasses module * implemented import pattern on dataclasses to prevent circular imports * removed redundant fetch * removed unused query * split build_publications * split failed_publications * removed redundant query functions * split publication retrieve * split all read functions * removed redundant write function * fixed lock
182 lines
5.5 KiB
Python
182 lines
5.5 KiB
Python
from uuid import UUID
|
|
|
|
import pytest
|
|
|
|
from mobilizon_reshare.dataclasses.publication import _EventPublication
|
|
from mobilizon_reshare.models.publication import PublicationStatus, Publication
|
|
from mobilizon_reshare.models.publisher import Publisher
|
|
from mobilizon_reshare.publishers.coordinators.event_publishing.publish import (
|
|
EventPublicationReport,
|
|
PublisherCoordinatorReport,
|
|
)
|
|
from mobilizon_reshare.publishers.platforms.telegram import (
|
|
TelegramFormatter,
|
|
TelegramPublisher,
|
|
)
|
|
from mobilizon_reshare.storage.query.write import (
|
|
save_publication_report,
|
|
update_publishers,
|
|
create_unpublished_events,
|
|
)
|
|
from tests.conftest import event_6, event_0, event_1, event_2, event_3, event_3_updated
|
|
from tests.storage import complete_specification
|
|
|
|
two_publishers_specification = {"publisher": ["telegram", "twitter"]}
|
|
|
|
all_published_specification = {
|
|
"event": 2,
|
|
"publications": [
|
|
{"event_idx": 0, "publisher_idx": 1, "status": PublicationStatus.FAILED},
|
|
{"event_idx": 1, "publisher_idx": 0, "status": PublicationStatus.COMPLETED},
|
|
],
|
|
"publisher": ["telegram", "twitter"],
|
|
}
|
|
|
|
two_events_specification = {
|
|
"event": 2,
|
|
"publications": [
|
|
{"event_idx": 0, "publisher_idx": 1, "status": PublicationStatus.FAILED},
|
|
],
|
|
"publisher": ["telegram", "twitter"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"specification,names,expected_result",
|
|
[
|
|
[
|
|
two_publishers_specification,
|
|
["telegram", "twitter"],
|
|
{
|
|
Publisher(id=UUID(int=0), name="telegram"),
|
|
Publisher(id=UUID(int=1), name="twitter"),
|
|
},
|
|
],
|
|
[
|
|
{"publisher": ["telegram"]},
|
|
["telegram", "twitter"],
|
|
{"telegram", "twitter"},
|
|
],
|
|
[
|
|
two_publishers_specification,
|
|
["telegram", "mastodon", "facebook"],
|
|
{"telegram", "twitter", "mastodon", "facebook"},
|
|
],
|
|
],
|
|
)
|
|
async def test_update_publishers(
|
|
specification, names, expected_result, generate_models,
|
|
):
|
|
await generate_models(specification)
|
|
await update_publishers(names)
|
|
if type(list(expected_result)[0]) == Publisher:
|
|
publishers = set(await Publisher.all())
|
|
else:
|
|
publishers = set(p.name for p in await Publisher.all())
|
|
|
|
assert len(publishers) == len(expected_result)
|
|
assert publishers == expected_result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"specification,events_from_mobilizon,expected_result",
|
|
[
|
|
[
|
|
# Empty DB
|
|
{"event": 0, "publications": [], "publisher": []},
|
|
[event_1],
|
|
[event_1],
|
|
],
|
|
[
|
|
# Test whether the query actually does nothing when all events are published
|
|
all_published_specification,
|
|
[event_1],
|
|
[],
|
|
],
|
|
[
|
|
# Test whether the query actually returns only unknown unpublished events
|
|
all_published_specification,
|
|
[event_2],
|
|
[event_2],
|
|
],
|
|
[
|
|
# Test whether the query actually merges remote and local state
|
|
{"event": 2, "publisher": ["telegram", "mastodon", "facebook"]},
|
|
[event_2],
|
|
[event_0, event_1, event_2],
|
|
],
|
|
[
|
|
# Test whether the query actually merges remote and local state
|
|
complete_specification,
|
|
[event_0, event_1, event_2, event_6],
|
|
[event_3, event_6],
|
|
],
|
|
[
|
|
# Test update
|
|
complete_specification,
|
|
[event_0, event_3_updated, event_6],
|
|
[event_3_updated, event_6],
|
|
],
|
|
],
|
|
)
|
|
async def test_create_unpublished_events(
|
|
specification, events_from_mobilizon, expected_result, generate_models,
|
|
):
|
|
await generate_models(specification)
|
|
|
|
unpublished_events = await create_unpublished_events(events_from_mobilizon)
|
|
|
|
assert len(unpublished_events) == len(expected_result)
|
|
assert unpublished_events == expected_result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"specification,report,event,expected_result",
|
|
[
|
|
[
|
|
complete_specification,
|
|
PublisherCoordinatorReport(
|
|
publications=[],
|
|
reports=[
|
|
EventPublicationReport(
|
|
status=PublicationStatus.COMPLETED,
|
|
reason="",
|
|
publication=_EventPublication(
|
|
id=UUID(int=6),
|
|
formatter=TelegramFormatter(),
|
|
event=event_1,
|
|
publisher=TelegramPublisher(),
|
|
),
|
|
),
|
|
],
|
|
),
|
|
event_1,
|
|
{
|
|
UUID(int=6): Publication(
|
|
id=UUID(int=6), status=PublicationStatus.COMPLETED, reason=""
|
|
),
|
|
},
|
|
],
|
|
],
|
|
)
|
|
async def test_save_publication_report(
|
|
specification, report, event, expected_result, generate_models,
|
|
):
|
|
await generate_models(specification)
|
|
known_publication_ids = set(p.id for p in await Publication.all())
|
|
|
|
await save_publication_report(report)
|
|
|
|
publications = {
|
|
p.id: p for p in await Publication.filter(id__not_in=known_publication_ids)
|
|
}
|
|
|
|
assert len(publications) == len(expected_result)
|
|
for i in publications.keys():
|
|
assert publications[i].status == expected_result[i].status
|
|
assert publications[i].reason == expected_result[i].reason
|
|
assert publications[i].timestamp
|