AzuraCast/src/Console/Command/Sync/NowPlayingCommand.php

150 lines
4.2 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Console\Command\Sync;
use App\Cache\NowPlayingCache;
use App\Container\EntityManagerAwareTrait;
use App\Container\SettingsAwareTrait;
use App\Lock\LockFactory;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function random_int;
#[AsCommand(
name: 'azuracast:sync:nowplaying',
description: 'Task to run the Now Playing worker task.'
)]
final class NowPlayingCommand extends AbstractSyncCommand
{
use EntityManagerAwareTrait;
use SettingsAwareTrait;
public final const MAX_CONCURRENT_PROCESSES = 5;
public function __construct(
private readonly NowPlayingCache $nowPlayingCache,
LockFactory $lockFactory,
) {
parent::__construct($lockFactory);
}
protected function configure(): void
{
$this->addOption(
'timeout',
't',
InputOption::VALUE_OPTIONAL,
'Amount of time (in seconds) to run the worker process.',
600
);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$settings = $this->readSettings();
if ($settings->getSyncDisabled()) {
$this->logger->error('Automated synchronization is temporarily disabled.');
return 1;
}
$timeout = (int)$input->getOption('timeout');
$this->loop($io, $timeout);
return 0;
}
private function loop(SymfonyStyle $io, int $timeout): void
{
$threshold = time() + $timeout;
while (time() < $threshold || !empty($this->processes)) {
// Check existing processes.
$this->checkRunningProcesses();
$numProcesses = count($this->processes);
if (
$numProcesses < self::MAX_CONCURRENT_PROCESSES
&& time() < $threshold - 5
) {
// Ensure a process is running for every active station.
$npDelay = max(min($this->environment->getNowPlayingDelayTime(), 60), 5);
$npThreshold = time() - $npDelay - random_int(0, 5);
foreach ($this->getStationsToRun($npThreshold) as $shortName) {
if (count($this->processes) >= self::MAX_CONCURRENT_PROCESSES) {
break;
}
if (isset($this->processes[$shortName])) {
continue;
}
$this->logger->debug('Starting NP process for station: ' . $shortName);
$this->start($io, $shortName);
usleep(250000);
}
}
$this->em->clear();
gc_collect_cycles();
usleep(1000000);
}
}
private function getStationsToRun(
int $threshold
): array {
$lookupRaw = $this->nowPlayingCache->getLookup();
$lookup = [];
foreach ($lookupRaw as $stationRow) {
$lookup[$stationRow['short_name']] = (int)($stationRow['updated_at'] ?? 0);
}
$allStations = $this->em->createQuery(
<<<'DQL'
SELECT s.short_name
FROM App\Entity\Station s
WHERE s.is_enabled = 1 AND s.has_started = 1
DQL
)->getSingleColumnResult();
$stationsByUpdated = [];
foreach ($allStations as $shortName) {
$stationsByUpdated[$shortName] = $lookup[$shortName] ?? 0;
}
asort($stationsByUpdated, SORT_NUMERIC);
return array_keys(
array_filter(
$stationsByUpdated,
fn($timestamp) => $timestamp < $threshold
)
);
}
private function start(
SymfonyStyle $io,
string $shortName
): void {
$this->lockAndRunConsoleCommand(
$io,
$shortName,
'nowplaying',
[
'azuracast:sync:nowplaying:station',
$shortName,
]
);
}
}