mirror of
https://github.com/Tech-Workers-Coalition-Italia/mobilizon-reshare.git
synced 2025-02-18 04:30:53 +01:00
* fixed parsing bug * implemented events and publications endpoints split endpoints by entity removed credentials * add pagination (#179) * added pagination * integrated pagination with tortoise * added test for publications * removed converter file * moved publications to dataclasses module * implemented import pattern on dataclasses to prevent circular imports * removed redundant fetch * removed unused query * split build_publications * split failed_publications * removed redundant query functions * split publication retrieve * split all read functions * removed redundant write function * fixed lock
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
from datetime import datetime
|
|
from typing import Iterable, Optional
|
|
|
|
import click
|
|
from arrow import Arrow
|
|
|
|
from mobilizon_reshare.dataclasses import MobilizonEvent
|
|
from mobilizon_reshare.dataclasses.event import (
|
|
_EventPublicationStatus,
|
|
get_all_mobilizon_events,
|
|
get_published_events,
|
|
get_mobilizon_events_with_status,
|
|
get_mobilizon_events_without_publications,
|
|
)
|
|
from mobilizon_reshare.event.event_selection_strategies import select_unpublished_events
|
|
|
|
status_to_color = {
|
|
_EventPublicationStatus.COMPLETED: "green",
|
|
_EventPublicationStatus.FAILED: "red",
|
|
_EventPublicationStatus.PARTIAL: "yellow",
|
|
_EventPublicationStatus.WAITING: "white",
|
|
}
|
|
|
|
|
|
def show_events(events: Iterable[MobilizonEvent]):
|
|
click.echo_via_pager("\n".join(map(pretty, events)))
|
|
|
|
|
|
def pretty(event: MobilizonEvent):
|
|
return (
|
|
f"{event.name : ^40}{click.style(event.status.name, fg=status_to_color[event.status]) : ^22}"
|
|
f"{str(event.mobilizon_id) : <40}"
|
|
f"{event.begin_datetime.to('local').isoformat() : <29}"
|
|
f"{event.end_datetime.to('local').isoformat()}"
|
|
)
|
|
|
|
|
|
async def list_unpublished_events(frm: Arrow = None, to: Arrow = None):
|
|
return select_unpublished_events(
|
|
list(await get_published_events(from_date=frm, to_date=to)),
|
|
list(
|
|
await get_mobilizon_events_without_publications(from_date=frm, to_date=to)
|
|
),
|
|
)
|
|
|
|
|
|
async def list_events(
|
|
status: Optional[_EventPublicationStatus] = None,
|
|
frm: Optional[datetime] = None,
|
|
to: Optional[datetime] = None,
|
|
):
|
|
|
|
frm = Arrow.fromdatetime(frm) if frm else None
|
|
to = Arrow.fromdatetime(to) if to else None
|
|
if status is None:
|
|
events = await get_all_mobilizon_events(from_date=frm, to_date=to)
|
|
elif status == _EventPublicationStatus.WAITING:
|
|
events = await list_unpublished_events(frm=frm, to=to)
|
|
else:
|
|
events = await get_mobilizon_events_with_status(
|
|
[status], from_date=frm, to_date=to
|
|
)
|
|
events = list(events)
|
|
if events:
|
|
show_events(events)
|
|
else:
|
|
message = (
|
|
f"No event found with status: {status.name}"
|
|
if status is not None
|
|
else "No event found"
|
|
)
|
|
click.echo(message)
|