Mobilizon-Reshare-condividi.../mobilizon_reshare/publishers/coordinator.py

210 lines
6.5 KiB
Python
Raw Normal View History

import logging
from dataclasses import dataclass
from typing import List, Optional
from mobilizon_reshare.models.publication import PublicationStatus
from mobilizon_reshare.publishers import get_active_notifiers
from mobilizon_reshare.publishers.abstract import (
EventPublication,
AbstractPlatform,
RecapPublication,
)
from mobilizon_reshare.publishers.exceptions import PublisherError
from mobilizon_reshare.publishers.platforms.platform_mapping import get_notifier_class
logger = logging.getLogger(__name__)
@dataclass
class BasePublicationReport:
status: PublicationStatus
reason: Optional[str]
@property
def succesful(self):
return self.status == PublicationStatus.COMPLETED
def get_failure_message(self):
return (
f"Publication failed with status: {self.status}.\n" f"Reason: {self.reason}"
)
@dataclass
class EventPublicationReport(BasePublicationReport):
2021-10-24 21:32:28 +02:00
publication: EventPublication
def get_failure_message(self):
2021-10-24 21:32:28 +02:00
if not self.reason:
logger.error("Report of failure without reason.", exc_info=True)
return (
2021-10-24 21:32:28 +02:00
f"Publication {self.publication.id} failed with status: {self.status}.\n"
f"Reason: {self.reason}\n"
f"Publisher: {self.publication.publisher.name}"
)
@dataclass
class BaseCoordinatorReport:
reports: List[BasePublicationReport]
@property
def successful(self):
return all(r.status == PublicationStatus.COMPLETED for r in self.reports)
@dataclass
class PublisherCoordinatorReport(BaseCoordinatorReport):
reports: List[EventPublicationReport]
publications: List[EventPublication]
class PublisherCoordinator:
def __init__(self, publications: List[EventPublication]):
self.publications = publications
def run(self) -> PublisherCoordinatorReport:
errors = self._validate()
if errors:
return PublisherCoordinatorReport(
reports=errors, publications=self.publications
)
return self._post()
def _post(self):
reports = []
for publication in self.publications:
try:
message = publication.formatter.get_message_from_event(
publication.event
)
publication.publisher.send(message)
reports.append(
EventPublicationReport(
status=PublicationStatus.COMPLETED,
2021-10-24 21:32:28 +02:00
publication=publication,
reason=None,
)
)
except PublisherError as e:
reports.append(
EventPublicationReport(
status=PublicationStatus.FAILED,
reason=str(e),
2021-10-24 21:32:28 +02:00
publication=publication,
)
)
return PublisherCoordinatorReport(
publications=self.publications, reports=reports
)
2021-10-24 21:32:28 +02:00
def _safe_run(self, reasons, f, *args, **kwargs):
try:
f(*args, **kwargs)
return reasons
except Exception as e:
logger.error(str(e))
return reasons + [str(e)]
def _validate(self):
errors = []
for publication in self.publications:
2021-10-24 21:32:28 +02:00
reasons = []
reasons = self._safe_run(
reasons, publication.publisher.validate_credentials,
)
reasons = self._safe_run(
reasons, publication.formatter.validate_event, publication.event
)
reasons = self._safe_run(
reasons,
publication.formatter.validate_message,
publication.formatter.get_message_from_event(publication.event),
)
2021-10-24 21:32:28 +02:00
if len(reasons) > 0:
errors.append(
EventPublicationReport(
status=PublicationStatus.FAILED,
2021-10-24 21:32:28 +02:00
reason=", ".join(reasons),
publication=publication,
)
)
return errors
class AbstractCoordinator:
def __init__(self, message: str, platforms: List[AbstractPlatform] = None):
self.message = message
self.platforms = platforms
def send_to_all(self):
for platform in self.platforms:
try:
platform.send(self.message)
except Exception as e:
logger.critical(f"Notifier failed to send message:\n{self.message}")
logger.exception(e)
class AbstractNotifiersCoordinator(AbstractCoordinator):
def __init__(self, message: str, notifiers: List[AbstractPlatform] = None):
platforms = notifiers or [
get_notifier_class(notifier)() for notifier in get_active_notifiers()
]
super(AbstractNotifiersCoordinator, self).__init__(message, platforms)
class PublicationFailureNotifiersCoordinator(AbstractNotifiersCoordinator):
def __init__(self, report: BasePublicationReport, platforms=None):
self.report = report
super(PublicationFailureNotifiersCoordinator, self).__init__(
message=report.get_failure_message(), notifiers=platforms
)
def notify_failure(self):
logger.info("Sending failure notifications")
if self.report.status == PublicationStatus.FAILED:
self.send_to_all()
class RecapCoordinator:
def __init__(self, recap_publications: List[RecapPublication]):
self.recap_publications = recap_publications
def run(self) -> BaseCoordinatorReport:
reports = []
for recap_publication in self.recap_publications:
try:
fragments = [recap_publication.formatter.get_recap_header()]
for event in recap_publication.events:
fragments.append(
recap_publication.formatter.get_recap_fragment(event)
)
message = "\n\n".join(fragments)
recap_publication.publisher.send(message)
reports.append(
BasePublicationReport(
status=PublicationStatus.COMPLETED, reason=None,
)
)
except PublisherError as e:
reports.append(
BasePublicationReport(
status=PublicationStatus.FAILED, reason=str(e),
)
)
return BaseCoordinatorReport(reports=reports)