added mypy

This commit is contained in:
Simone Robutti 2022-04-15 22:12:01 +02:00
parent 94c85d8b48
commit 51f5210f83
8 changed files with 46 additions and 42 deletions

View File

@ -8,3 +8,10 @@ repos:
rev: v1.2.3
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.902
hooks:
- id: mypy
exclude: ^tests/
args: [--strict]

View File

@ -193,3 +193,6 @@
language: python
types: [text]
minimum_pre_commit_version: 0.15.0

View File

@ -20,7 +20,7 @@ from mobilizon_reshare.cli.commands.retry.main import (
)
from mobilizon_reshare.models.publication import PublicationStatus
status_name_to_enum = {
status_name_to_enum: dict = {
"event": {
"waiting": EventPublicationStatus.WAITING,
"completed": EventPublicationStatus.COMPLETED,
@ -82,36 +82,26 @@ def mobilizon_reshare(obj):
help="Synchronize and publish events. It is equivalent to running consecutively pull and then publish."
)
@pass_context
def start(
ctx,
):
def start(ctx,):
ctx.ensure_object(dict)
safe_execution(
start_main,
)
safe_execution(start_main,)
@mobilizon_reshare.command(help="Publish a recap of already published events.")
def recap():
safe_execution(
recap_main,
)
safe_execution(recap_main,)
@mobilizon_reshare.command(
help="Fetch the latest events from Mobilizon and store them."
)
def pull():
safe_execution(
pull_main,
)
safe_execution(pull_main,)
@mobilizon_reshare.command(help="Select an event and publish it.")
def publish():
safe_execution(
publish_main,
)
safe_execution(publish_main,)
@mobilizon_reshare.group(help="Operations that pertain to events")
@ -132,10 +122,7 @@ def event_list(status, begin, end):
safe_execution(
functools.partial(
list_events,
status_name_to_enum["event"][status],
frm=begin,
to=end,
list_events, status_name_to_enum["event"][status], frm=begin, to=end,
),
)
@ -162,28 +149,21 @@ def publication_list(status, begin, end):
@click.argument("event-id", type=click.UUID)
@click.argument("publisher", type=click.Choice(publisher_names))
def format(
event_id,
publisher,
event_id, publisher,
):
safe_execution(
functools.partial(format_event, event_id, publisher),
)
safe_execution(functools.partial(format_event, event_id, publisher),)
@event.command(name="retry", help="Retries all the failed publications")
@click.argument("event-id", type=click.UUID)
def event_retry(event_id):
safe_execution(
functools.partial(retry_event_command, event_id),
)
safe_execution(functools.partial(retry_event_command, event_id),)
@publication.command(name="retry", help="Retries a specific publication")
@click.argument("publication-id", type=click.UUID)
def publication_retry(publication_id):
safe_execution(
functools.partial(retry_publication_command, publication_id),
)
safe_execution(functools.partial(retry_publication_command, publication_id),)
if __name__ == "__main__":

View File

@ -48,14 +48,16 @@ async def list_events(
to: Optional[datetime] = None,
):
frm = Arrow.fromdatetime(frm) if frm else None
to = Arrow.fromdatetime(to) if to else None
frm_arrow = Arrow.fromdatetime(frm) if frm else None
to_arrow = Arrow.fromdatetime(to) if to else None
if status is None:
events = await get_all_events(from_date=frm, to_date=to)
events = await get_all_events(from_date=frm_arrow, to_date=to_arrow)
elif status == EventPublicationStatus.WAITING:
events = await list_unpublished_events(frm=frm, to=to)
events = await list_unpublished_events(frm=frm_arrow, to=to_arrow)
else:
events = await events_with_status([status], from_date=frm, to_date=to)
events = await events_with_status(
[status], from_date=frm_arrow, to_date=to_arrow
)
events = list(events)
if events:
show_events(events)

View File

@ -34,12 +34,14 @@ async def list_publications(
to: Optional[datetime] = None,
):
frm = Arrow.fromdatetime(frm) if frm else None
to = Arrow.fromdatetime(to) if to else None
frm_arrow = Arrow.fromdatetime(frm) if frm else None
to_arrow = Arrow.fromdatetime(to) if to else None
if status is None:
publications = await get_all_publications(from_date=frm, to_date=to)
publications = await get_all_publications(from_date=frm_arrow, to_date=to_arrow)
else:
publications = await publications_with_status(status, from_date=frm, to_date=to)
publications = await publications_with_status(
status, from_date=frm_arrow, to_date=to_arrow
)
if publications:
show_publications(publications)

View File

@ -33,7 +33,7 @@ async def retry_publication(publication_id) -> Optional[PublisherCoordinatorRepo
publication = await get_publication(publication_id)
if not publication:
logger.info(f"Publication {publication_id} not found.")
return
return None
logger.info(f"Publication {publication_id} found.")
reports = PublisherCoordinator([publication]).run()

View File

@ -108,6 +108,16 @@ class AbstractPlatform(ABC, LoggerMixin, ConfLoaderMixin):
class AbstractEventFormatter(LoggerMixin, ConfLoaderMixin):
@property
@abstractmethod
def default_template_path(self):
raise
@property
@abstractmethod
def default_recap_template_path(self):
raise
@abstractmethod
def _validate_event(self, event: MobilizonEvent) -> None:
"""

View File

@ -5,7 +5,7 @@ from mobilizon_reshare.models.publication import Publication
from mobilizon_reshare.models.publication import PublicationStatus
from tests import today
complete_specification = {
complete_specification: dict = {
"event": 4,
"publications": [
{"event_idx": 0, "publisher_idx": 0, "status": PublicationStatus.COMPLETED},