Fix reloading on profile, refactor how LS sends feedback to avoid issues.

This commit is contained in:
Buster "Silver Eagle" Neece 2019-02-27 13:18:12 -06:00
parent b9231da6b9
commit 7a872d145b
15 changed files with 283 additions and 165 deletions

View File

@ -3,4 +3,6 @@
return [
\App\Message\AddNewMedia::class => \App\Sync\Task\Media::class,
\App\Message\ReprocessMedia::class => \App\Sync\Task\Media::class,
\App\Message\UpdateNowPlayingMessage::class => \App\Sync\Task\NowPlaying::class,
];

View File

@ -204,7 +204,6 @@ return function (\Azura\Container $di)
// Event dispatcher
$dispatcher = new \Symfony\Component\EventDispatcher\EventDispatcher;
$dispatcher->addSubscriber(new \Bernard\EventListener\LoggerSubscriber($di[\Monolog\Logger::class]));
// Build Producer
$producer = new \Bernard\Producer($queue_factory, $dispatcher);
@ -215,11 +214,15 @@ return function (\Azura\Container $di)
$consumer = new Bernard\Consumer($router, $dispatcher);
return new \App\MessageQueue(
$mq = new \App\MessageQueue(
$queue_factory,
$producer,
$consumer
$consumer,
$di[\Monolog\Logger::class]
);
$dispatcher->addSubscriber($mq);
return $mq;
};
//

View File

@ -2,6 +2,7 @@
namespace App\Console\Command\Internal;
use App\Radio\AutoDJ;
use App\Sync\Task\NowPlaying;
use Azura\Console\Command\CommandAbstract;
use Doctrine\ORM\EntityManager;
use App\Entity;
@ -58,16 +59,15 @@ class Feedback extends CommandAbstract
return null;
}
/** @var AutoDJ $autodj */
$autodj = $this->get(AutoDJ::class);
try {
$autodj->setNextCuedSong(
$station,
$input->getOption('song'),
$input->getOption('media'),
$input->getOption('playlist')
);
/** @var NowPlaying $sync_nowplaying */
$sync_nowplaying = $this->get(NowPlaying::class);
$sync_nowplaying->queueStation($station, [
'song_id' => $input->getOption('song'),
'media_id' => $input->getOption('media'),
'playlist' => $input->getOption('playlist'),
]);
$output->write('OK');
return null;

View File

@ -94,19 +94,15 @@ class InternalController
$station = $request->getStation();
try {
$body = $request->getParsedBody();
$this->autodj->setNextCuedSong(
$station,
$body['song'] ?? null,
$body['media'] ?? null,
$body['playlist'] ?? null
);
$this->sync_nowplaying->queueStation($station, [
'song_id' => $body['song'] ?? null,
'media_id' => $body['media'] ?? null,
'playlist' => $body['playlist'] ?? null,
]);
return $response->write('OK');
} catch (\Exception $e) {
return $response->write('Error: '.$e->getMessage());
}
}
/**

View File

@ -47,9 +47,14 @@ class SongHistoryRepository extends Repository
* @param Entity\Song $song
* @param Entity\Station $station
* @param array $np
* @param array $extra_metadata
* @return Entity\SongHistory
*/
public function register(Entity\Song $song, Entity\Station $station, $np): Entity\SongHistory
public function register(
Entity\Song $song,
Entity\Station $station,
array $np,
array $extra_metadata = []): Entity\SongHistory
{
// Pull the most recent history item for this station.
$last_sh = $this->_em->createQuery('SELECT sh FROM '.Entity\SongHistory::class.' sh
@ -118,6 +123,23 @@ class SongHistoryRepository extends Repository
$sh = new Entity\SongHistory($song, $station);
}
// Set extra metadata (supplied by Liquidsoap, for example)
if (!empty($extra_metadata['song_id']) && $song->getId() === $extra_metadata['song_id']) {
if (!empty($extra_metadata['media_id']) && null === $sh->getMedia()) {
$media = $this->_em->find(Entity\StationMedia::class, $extra_metadata['media_id']);
if ($media instanceof Entity\StationMedia) {
$sh->setMedia($media);
}
}
if (!empty($extra_metadata['playlist_id']) && null === $sh->getPlaylist()) {
$playlist = $this->_em->find(Entity\StationPlaylist::class, $extra_metadata['playlist_id']);
if ($playlist instanceof Entity\StationPlaylist) {
$sh->setPlaylist($playlist);
}
}
}
$sh->setTimestampStart(time());
$sh->setListenersStart($listeners);
$sh->addDeltaPoint($listeners);

View File

@ -617,6 +617,8 @@ class StationMedia
$annotations[$annotation_name] = $prop;
}
$annotations['genre'] = $this->id;
return $annotations;
}

View File

@ -0,0 +1,14 @@
<?php
namespace App\Message;
abstract class AbstractDelayedMessage extends AbstractMessage
{
/** @var int One millisecond in microseconds. */
public const ONE_MSEC = 1000;
/** @var int One second in microseconds. */
public const ONE_SEC = 1000000;
/** @var int The delay before the message should be processed, in seconds. */
public $delay = 0;
}

View File

@ -0,0 +1,16 @@
<?php
namespace App\Message;
class UpdateNowPlayingMessage extends AbstractDelayedMessage
{
public function __construct()
{
$this->delay = self::ONE_SEC;
}
/** @var int */
public $station_id;
/** @var array */
public $extra_metadata = [];
}

View File

@ -1,15 +1,21 @@
<?php
namespace App;
use App\Message\AbstractDelayedMessage;
use Bernard\BernardEvents;
use Bernard\Consumer;
use Bernard\Event\EnvelopeEvent;
use Bernard\Event\RejectEnvelopeEvent;
use Bernard\Message;
use Bernard\Producer;
use Bernard\Queue;
use Bernard\QueueFactory;
use Monolog\Logger;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class MessageQueue
class MessageQueue implements EventSubscriberInterface
{
const GLOBAL_QUEUE_NAME = 'azuracast';
public const GLOBAL_QUEUE_NAME = 'azuracast';
/** @var QueueFactory */
protected $queues;
@ -20,16 +26,25 @@ class MessageQueue
/** @var Consumer */
protected $consumer;
/** @var Logger */
protected $logger;
/**
* @param QueueFactory $queues
* @param Producer $producer
* @param Consumer $consumer
* @param Logger $logger
*/
public function __construct(QueueFactory $queues, Producer $producer, Consumer $consumer)
public function __construct(
QueueFactory $queues,
Producer $producer,
Consumer $consumer,
Logger $logger)
{
$this->queues = $queues;
$this->producer = $producer;
$this->consumer = $consumer;
$this->logger = $logger;
}
/**
@ -75,4 +90,74 @@ class MessageQueue
{
$this->consumer->consume($this->queues->create(self::GLOBAL_QUEUE_NAME), $options);
}
/**
* @return array The event names to listen to
*/
public static function getSubscribedEvents()
{
return [
BernardEvents::PRODUCE => [
['logProduce', -5],
],
BernardEvents::INVOKE => [
['logInvoke', -5],
['handleDelay', 0],
],
BernardEvents::REJECT => [
['logReject', -5],
],
];
}
/**
* @param EnvelopeEvent $e
*/
public function handleDelay(EnvelopeEvent $e): void
{
$message = $e->getEnvelope()->getMessage();
if ($message instanceof AbstractDelayedMessage) {
$this->logger->debug(sprintf(
'Delaying queued message %s by %d microseconds.',
$message->getName(),
$message->delay
));
$delay_us = $message->delay;
usleep($delay_us);
}
}
/**
* @param EnvelopeEvent $event
*/
public function logProduce(EnvelopeEvent $event): void
{
$this->logger->info(sprintf(
'New message of type %s added to queue.', $event->getEnvelope()->getMessage()->getName()
));
}
/**
* @param EnvelopeEvent $event
*/
public function logInvoke(EnvelopeEvent $event): void
{
$this->logger->info(sprintf(
'Handling message of type %s.', $event->getEnvelope()->getMessage()->getName()
));
}
/**
* @param RejectEnvelopeEvent $event
*/
public function logReject(RejectEnvelopeEvent $event): void
{
$this->logger->error(sprintf(
'Exception when processing message type %s: %s',
$event->getEnvelope()->getMessage()->getName(),
$event->getException()->getMessage()
));
}
}

View File

@ -89,7 +89,8 @@ class SyncProvider implements ServiceProviderInterface
$di[\App\Radio\AutoDJ::class],
$di[\Azura\Cache::class],
$di[\InfluxDB\Database::class],
$di[\Azura\EventDispatcher::class]
$di[\Azura\EventDispatcher::class],
$di[\App\MessageQueue::class]
);
};

View File

@ -354,108 +354,37 @@ class AutoDJ implements EventSubscriberInterface
$this->em->persist($spm);
// Log in history
return $this->setNextCuedSong(
$playlist->getStation(),
$media_to_play->getSong(),
$media_to_play,
$playlist
);
$sh = new Entity\SongHistory($media_to_play->getSong(), $playlist->getStation());
$sh->setPlaylist($playlist);
$sh->setMedia($media_to_play);
$sh->setDuration($media_to_play->getCalculatedLength());
$sh->setTimestampCued(time());
$this->em->persist($sh);
$this->em->flush();
return $sh;
}
if (is_array($media_to_play)) {
[$media_uri, $media_duration] = $media_to_play;
return $this->setNextCuedSong(
$playlist->getStation(),
$song_repo->getOrCreate(['text' => 'Internal AutoDJ URI']),
null,
$playlist,
$media_duration,
$media_uri
);
$sh = new Entity\SongHistory($song_repo->getOrCreate([
'text' => 'Internal AutoDJ URI',
]), $playlist->getStation());
$sh->setPlaylist($playlist);
$sh->setAutodjCustomUri($media_uri);
$sh->setDuration($media_duration);
$sh->setTimestampCued(time());
$this->em->persist($sh);
$this->em->flush();
return $sh;
}
return null;
}
/**
* @param Entity\Station $station
* @param Entity\Song|string $song
* @param Entity\StationMedia|string|int|null $media
* @param Entity\StationPlaylist|string|int|null $playlist
* @param int|null $duration
* @param string|null $custom_uri
* @return Entity\SongHistory
*/
public function setNextCuedSong(
Entity\Station $station,
$song,
$media = null,
$playlist = null,
$duration = null,
$custom_uri = null): Entity\SongHistory
{
/** @var Entity\Song|null $song */
$song = $this->getEntity(Entity\Song::class, $song);
if (!($song instanceof Entity\Song)) {
throw new \Azura\Exception('Error: Song ID is not valid.');
}
/** @var Entity\Repository\SongHistoryRepository $sh_repo */
$sh_repo = $this->em->getRepository(Entity\SongHistory::class);
$sh = $sh_repo->getCuedSong($song, $station);
if ($sh instanceof Entity\SongHistory) {
return $sh;
}
$sh = new Entity\SongHistory($song, $station);
$sh->setTimestampCued(time());
$media = $this->getEntity(Entity\StationMedia::class, $media);
if ($media instanceof Entity\StationMedia) {
$sh->setMedia($media);
}
$playlist = $this->getEntity(Entity\StationPlaylist::class, $playlist);
if ($playlist instanceof Entity\StationPlaylist) {
$sh->setPlaylist($playlist);
}
if (!empty($duration)) {
$sh->setDuration($duration);
} else if ($media instanceof Entity\StationMedia) {
$sh->setDuration($media->getCalculatedLength());
}
if (!empty($custom_uri)) {
$sh->setAutodjCustomUri($custom_uri);
}
$this->em->persist($sh);
$this->em->flush($sh);
return $sh;
}
/**
* Fetch an entity if given either the entity object itself OR its identifier.
*
* @param string $class_name
* @param object|string|int $identifier
* @return object|null
*/
protected function getEntity($class_name, $identifier): ?object
{
if ($identifier instanceof $class_name) {
return $identifier;
}
if (empty($identifier)) {
return null;
}
return $this->em->find($class_name, $identifier);
}
}

View File

@ -1,9 +1,11 @@
<?php
namespace App\Sync\Task;
use App\MessageQueue;
use Azura\Cache;
use App\Event\Radio\GenerateRawNowPlaying;
use App\Event\SendWebhooks;
use App\Message;
use Azura\EventDispatcher;
use App\Radio\AutoDJ;
use App\ApiUtilities;
@ -32,6 +34,9 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
/** @var EventDispatcher */
protected $event_dispatcher;
/** @var MessageQueue */
protected $message_queue;
/** @var ApiUtilities */
protected $api_utils;
@ -59,6 +64,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
* @param Cache $cache
* @param Database $influx
* @param EventDispatcher $event_dispatcher
* @param MessageQueue $message_queue
*
* @see \App\Provider\SyncProvider
*/
@ -70,8 +76,9 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
AutoDJ $autodj,
Cache $cache,
Database $influx,
EventDispatcher $event_dispatcher
) {
EventDispatcher $event_dispatcher,
MessageQueue $message_queue)
{
parent::__construct($em, $logger);
$this->adapters = $adapters;
@ -79,6 +86,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
$this->autodj = $autodj;
$this->cache = $cache;
$this->event_dispatcher = $event_dispatcher;
$this->message_queue = $message_queue;
$this->influx = $influx;
$this->history_repo = $em->getRepository(Entity\SongHistory::class);
@ -182,17 +190,50 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
return $nowplaying;
}
/**
* Queue an individual station for processing its "Now Playing" metadata.
*
* @param Entity\Station $station
* @param array $extra_metadata
*/
public function queueStation(Entity\Station $station, array $extra_metadata = []): void
{
$message = new Message\UpdateNowPlayingMessage;
$message->station_id = $station->getId();
$message->extra_metadata = $extra_metadata;
$this->message_queue->produce($message);
}
/**
* Handle event dispatch.
*
* @param Message\AbstractMessage $message
*/
public function __invoke(Message\AbstractMessage $message)
{
try {
if ($message instanceof Message\UpdateNowPlayingMessage) {
$station = $this->em->find(Entity\Station::class, $message->station_id);
$this->processStation($station, true);
}
} finally {
$this->em->clear();
}
}
/**
* Generate Structured NowPlaying Data for a given station.
*
* @param Entity\Station $station
* @param string|null $payload The request body from the watcher notification service (if applicable).
* @param array $extra_metadata
* @param bool $standalone Whether the request is for this station alone or part of the regular sync process.
* @return Entity\Api\NowPlaying
* @throws \Doctrine\ORM\ORMException
* @throws \Doctrine\ORM\OptimisticLockException
* @throws \Exception
*/
public function processStation(Entity\Station $station, $payload = null): Entity\Api\NowPlaying
public function processStation(
Entity\Station $station,
array $extra_metadata = [],
$standalone = false): Entity\Api\NowPlaying
{
$this->logger->pushProcessor(function($record) use ($station) {
$record['extra']['station'] = [
@ -211,7 +252,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
$np_old = $station->getNowplaying();
// Build the new "raw" NowPlaying data.
$event = new GenerateRawNowPlaying($station, $frontend_adapter, $remote_adapters, $payload, $include_clients);
$event = new GenerateRawNowPlaying($station, $frontend_adapter, $remote_adapters, null, $include_clients);
$this->event_dispatcher->dispatch(GenerateRawNowPlaying::NAME, $event);
$np_raw = $event->getRawResponse();
@ -247,7 +288,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
/** @var Entity\Song $song_obj */
$song_obj = $this->song_repo->find($current_song_hash);
$sh_obj = $this->history_repo->register($song_obj, $station, $np_raw);
$sh_obj = $this->history_repo->register($song_obj, $station, $np_raw, $extra_metadata);
$np->song_history = $np_old->song_history;
$np->playing_next = $np_old->playing_next;
@ -255,7 +296,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
// SongHistory registration must ALWAYS come before the history/nextsong calls
// otherwise they will not have up-to-date database info!
$song_obj = $this->song_repo->getOrCreate($np_raw['current_song'], true);
$sh_obj = $this->history_repo->register($song_obj, $station, $np_raw);
$sh_obj = $this->history_repo->register($song_obj, $station, $np_raw, $extra_metadata);
$np->song_history = $this->history_repo->getHistoryForStation($station, $this->api_utils, $uri_empty);
@ -295,7 +336,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
$this->em->flush();
// Trigger the dispatching of webhooks.
$webhook_event = new SendWebhooks($station, $np, $np_old, ($payload !== null));
$webhook_event = new SendWebhooks($station, $np, $np_old, $standalone);
$this->event_dispatcher->dispatch(SendWebhooks::NAME, $webhook_event);
$this->logger->popProcessor();

View File

@ -1,11 +1,35 @@
var nowPlaying;
var nowPlaying, nowPlayingTimeout;
function loadNowPlaying() {
$.getJSON('<?=$router->fromHere('api:nowplaying:index') ?>', function(row) {
nowPlaying.np = row;
if ('mediaSession' in navigator) {
navigator.mediaSession.metadata = new MediaMetadata({
title: row.now_playing.song.title,
artist: row.now_playing.song.artist
});
}
nowPlayingTimeout = setTimeout(loadNowPlaying, 15000);
}).fail(function() {
nowPlayingTimeout = setTimeout(loadNowPlaying, 30000);
});
}
function iterateTimer() {
var np_elapsed = nowPlaying.np.now_playing.elapsed;
var np_total = nowPlaying.np.now_playing.duration;
if (np_elapsed < np_total) {
nowPlaying.np.now_playing.elapsed = np_elapsed + 1;
np_elapsed++;
nowPlaying.np.now_playing.elapsed = np_elapsed;
if (np_elapsed == np_total) {
// If the song JUST reached its endpoint, check now-playing much sooner.
clearTimeout(nowPlayingTimeout);
nowPlayingTimeout = setTimeout(loadNowPlaying, 3000);
}
}
}
@ -52,24 +76,7 @@ $(function() {
setInterval(iterateTimer, 1000);
function loadNowPlaying() {
$.getJSON('<?=$router->fromHere('api:nowplaying:index') ?>', function(row) {
nowPlaying.np = row;
if ('mediaSession' in navigator) {
navigator.mediaSession.metadata = new MediaMetadata({
title: row.now_playing.song.title,
artist: row.now_playing.song.artist
});
}
setTimeout(loadNowPlaying, 15000);
}).fail(function() {
setTimeout(loadNowPlaying, 30000);
});
}
setTimeout(loadNowPlaying, 15000);
nowPlayingTimeout = setTimeout(loadNowPlaying, 15000);
var service_status_lang = {
"running": <?=$this->escapeJs(__('Running')) ?>,

View File

@ -103,11 +103,11 @@ $user = $request->getUser();
</div>
<?php if ($backend_type === \App\Radio\Adapters::BACKEND_LIQUIDSOAP && $acl->userAllowed($user, \App\Acl::STATION_BROADCASTING, $station->getId())): ?>
<div class="card-actions">
<a id="btn_skip_song" class="btn btn-outline-primary api-call" role="button" href="<?=$router->fromHere('api:stations:backend', ['do' => 'skip']) ?>" v-if="!np.live.is_live">
<a id="btn_skip_song" class="btn btn-outline-primary api-call no-reload" role="button" href="<?=$router->fromHere('api:stations:backend', ['do' => 'skip']) ?>" v-if="!np.live.is_live">
<i class="material-icons" aria-hidden="true">skip_next</i>
<?=__('Skip Song') ?>
</a>
<a id="btn_disconnect_streamer" class="btn btn-outline-primary api-call" role="button" href="<?=$router->fromHere('api:stations:backend', ['do' => 'disconnect']) ?>" v-if="np.live.is_live">
<a id="btn_disconnect_streamer" class="btn btn-outline-primary api-call no-reload" role="button" href="<?=$router->fromHere('api:stations:backend', ['do' => 'disconnect']) ?>" v-if="np.live.is_live">
<i class="material-icons" aria-hidden="true">volume_off</i>
<?=__('Disconnect Streamer') ?>
</a>
@ -327,15 +327,15 @@ $user = $request->getUser();
<?php if ($acl->userAllowed($user, 'manage station broadcasting', $station->getId())): ?>
<div class="card-actions">
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'restart']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'restart']) ?>">
<i class="material-icons" aria-hidden="true">update</i>
<?=__('Restart') ?>
</a>
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'start']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'start']) ?>">
<i class="material-icons" aria-hidden="true">play_arrow</i>
<?=__('Start') ?>
</a>
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'stop']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:frontend', ['do' => 'stop']) ?>">
<i class="material-icons" aria-hidden="true">stop</i>
<?=__('Stop') ?>
</a>
@ -372,15 +372,15 @@ $user = $request->getUser();
</div>
<?php if ($acl->userAllowed($user, 'manage station broadcasting', $station->getId())): ?>
<div class="card-actions">
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'restart']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'restart']) ?>">
<i class="material-icons" aria-hidden="true">update</i>
<?=__('Restart') ?>
</a>
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'start']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'start']) ?>">
<i class="material-icons" aria-hidden="true">play_arrow</i>
<?=__('Start') ?>
</a>
<a class="api-call btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'stop']) ?>">
<a class="api-call no-reload btn btn-outline-secondary" href="<?=$router->fromHere('api:stations:backend', ['do' => 'stop']) ?>">
<i class="material-icons" aria-hidden="true">stop</i>
<?=__('Stop') ?>
</a>

View File

@ -4,7 +4,7 @@ $(function() {
var btn = $(this);
var btn_original_text = btn.html();
var trigger_restart = (btn.data('restart') != false);
var trigger_reload = !btn.hasClass('no-reload');
btn.text(<?=$this->escapeJs(__('Please wait...')) ?>);
btn.addClass('disabled');
@ -13,13 +13,13 @@ $(function() {
type: "POST",
url: btn.attr('href'),
success: function(data) {
if (trigger_restart) {
if (trigger_reload) {
// Only restart if the user isn't on a form page
if ($('form').length === 0) {
setTimeout('location.reload()', 2000);
}
} else {
btn.html(btn_original_text);
btn.removeClass('disabled').html(btn_original_text);
}
var notify_type = (data.success) ? 'success' : 'warning';
@ -29,7 +29,7 @@ $(function() {
data = jQuery.parseJSON(response.responseText);
notify(data.message, 'danger');
btn.html(btn_original_text);
btn.removeClass('disabled').html(btn_original_text);
},
dataType: "json"
});