diff --git a/CHANGELOG.md b/CHANGELOG.md index 810851d63..1e2849975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ release channel, you can take advantage of these new features and fixes. ## Code Quality/Technical Changes +- We have once again switched our message queue implementation, this time from the MariaDB database to a + super-lightweight standalone tool called Beanstalkd. We hope this will resolve issues we've encountered with parallel + workers causing database lockups and other problems. + - The main web Docker container will now automatically initialize itself upon startup, performing essential tasks like updating the database, clearing the cache and ensuring the system is set up properly. This means even if you miss a step in installation (or use the Docker images directly) they should still work without issue. diff --git a/composer.json b/composer.json index cb3dcae3f..1e09fd0fb 100644 --- a/composer.json +++ b/composer.json @@ -62,6 +62,7 @@ "spatie/flysystem-dropbox": "^2", "spomky-labs/otphp": "^10.0", "supervisorphp/supervisor": "dev-main", + "symfony/beanstalkd-messenger": "^5.3", "symfony/cache": "^5.2", "symfony/console": "^5", "symfony/event-dispatcher": "^5", diff --git a/composer.lock b/composer.lock index 46e838dad..0feba1a16 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "b8d01546a885d3765652fdf3d8ba59fc", + "content-hash": "9238ec827505c7ffc33c58d9ceea2633", "packages": [ { "name": "aws/aws-sdk-php", @@ -4633,6 +4633,61 @@ }, "time": "2020-12-06T15:14:20+00:00" }, + { + "name": "pda/pheanstalk", + "version": "v4.0.3", + "source": { + "type": "git", + "url": "https://github.com/pheanstalk/pheanstalk.git", + "reference": "6165573aad525d39b3ac8ae916214cb483a61263" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/pheanstalk/pheanstalk/zipball/6165573aad525d39b3ac8ae916214cb483a61263", + "reference": "6165573aad525d39b3ac8ae916214cb483a61263", + "shasum": "" + }, + "require": { + "ext-mbstring": "*", + "php": ">=7.1.0" + }, + "require-dev": { + "phpunit/phpunit": "^7" + }, + "type": "library", + "autoload": { + "psr-4": { + "Pheanstalk\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Paul Annesley", + "email": "paul@annesley.cc", + "homepage": "http://paul.annesley.cc/", + "role": "Developer" + }, + { + "name": "Sam Mousa", + "email": "sam@mousa.nl", + "role": "Maintainer" + } + ], + "description": "PHP client for beanstalkd queue", + "homepage": "https://github.com/pheanstalk/pheanstalk", + "keywords": [ + "beanstalkd" + ], + "support": { + "issues": "https://github.com/pheanstalk/pheanstalk/issues", + "source": "https://github.com/pheanstalk/pheanstalk/tree/v4.0.3" + }, + "time": "2020-09-22T07:17:48+00:00" + }, { "name": "php-di/invoker", "version": "2.3.0", @@ -6310,6 +6365,69 @@ ], "time": "2021-06-17T12:34:27+00:00" }, + { + "name": "symfony/beanstalkd-messenger", + "version": "v5.3.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/beanstalkd-messenger.git", + "reference": "7ca9b2c142ab3a0e13b81910fd5451ebf094bada" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/beanstalkd-messenger/zipball/7ca9b2c142ab3a0e13b81910fd5451ebf094bada", + "reference": "7ca9b2c142ab3a0e13b81910fd5451ebf094bada", + "shasum": "" + }, + "require": { + "pda/pheanstalk": "^4.0", + "php": ">=7.2.5", + "symfony/messenger": "^4.4|^5.0" + }, + "require-dev": { + "symfony/property-access": "^4.4|^5.0", + "symfony/serializer": "^4.4|^5.0" + }, + "type": "symfony-bridge", + "autoload": { + "psr-4": { + "Symfony\\Component\\Messenger\\Bridge\\Beanstalkd\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Antonio Pauletich", + "email": "antonio.pauletich95@gmail.com" + } + ], + "description": "Symfony Beanstalkd Messenger Bridge", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/beanstalkd-messenger/tree/v5.3.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2021-05-26T17:33:56+00:00" + }, { "name": "symfony/cache", "version": "v5.3.3", @@ -13471,5 +13589,5 @@ "ext-xmlwriter": "*" }, "platform-dev": [], - "plugin-api-version": "2.1.0" + "plugin-api-version": "2.0.0" } diff --git a/config/services.php b/config/services.php index cfb3f6976..c945eba9d 100644 --- a/config/services.php +++ b/config/services.php @@ -346,6 +346,22 @@ return [ return $builder->getValidator(); }, + Pheanstalk\Pheanstalk::class => static function () { + return Pheanstalk\Pheanstalk::create('127.0.0.1', 11300); + }, + + App\MessageQueue\QueueManagerInterface::class => static function ( + Environment $environment, + ContainerInterface $di + ) { + if ($environment->isTesting()) { + return new App\MessageQueue\TestQueueManager(); + } + + $pheanstalk = $di->get(Pheanstalk\Pheanstalk::class); + return new App\MessageQueue\QueueManager($pheanstalk); + }, + Symfony\Component\Messenger\MessageBus::class => static function ( App\MessageQueue\QueueManager $queueManager, App\LockFactory $lockFactory, diff --git a/src/Console/Command/InitializeCommand.php b/src/Console/Command/InitializeCommand.php index 905d15cb7..74386815e 100644 --- a/src/Console/Command/InitializeCommand.php +++ b/src/Console/Command/InitializeCommand.php @@ -47,7 +47,6 @@ class InitializeCommand extends CommandAbstract $io->section(__('Reload System Data')); $this->runCommand($output, 'cache:clear'); - $this->runCommand($output, 'queue:clear'); // Ensure default storage locations exist. $storageLocationRepo->createDefaultStorageLocations(); diff --git a/src/Console/Command/MessageQueue/ClearCommand.php b/src/Console/Command/MessageQueue/ClearCommand.php index 3c054f3cb..86bda4c91 100644 --- a/src/Console/Command/MessageQueue/ClearCommand.php +++ b/src/Console/Command/MessageQueue/ClearCommand.php @@ -5,22 +5,22 @@ declare(strict_types=1); namespace App\Console\Command\MessageQueue; use App\Console\Command\CommandAbstract; -use App\Entity\Repository\MessengerMessageRepository; -use App\MessageQueue\QueueManager; +use App\MessageQueue\AbstractQueueManager; +use App\MessageQueue\QueueManagerInterface; use Symfony\Component\Console\Style\SymfonyStyle; class ClearCommand extends CommandAbstract { public function __invoke( SymfonyStyle $io, - MessengerMessageRepository $messengerMessageRepo, + QueueManagerInterface $queueManager, ?string $queue = null ): int { - $allQueues = QueueManager::getAllQueues(); + $allQueues = AbstractQueueManager::getAllQueues(); if (!empty($queue)) { if (in_array($queue, $allQueues, true)) { - $messengerMessageRepo->clearQueue($queue); + $queueManager->clearQueue($queue); $io->success(sprintf('Message queue "%s" cleared.', $queue)); } else { @@ -28,7 +28,10 @@ class ClearCommand extends CommandAbstract return 1; } } else { - $messengerMessageRepo->clearQueue(); + foreach ($allQueues as $queueName) { + $queueManager->clearQueue($queueName); + } + $io->success('All message queues cleared.'); } diff --git a/src/Console/Command/MessageQueue/ProcessCommand.php b/src/Console/Command/MessageQueue/ProcessCommand.php index be97ea747..0e8c3c94d 100644 --- a/src/Console/Command/MessageQueue/ProcessCommand.php +++ b/src/Console/Command/MessageQueue/ProcessCommand.php @@ -9,7 +9,7 @@ use App\Doctrine\Messenger\ClearEntityManagerSubscriber; use App\Environment; use App\EventDispatcher; use App\MessageQueue\LogWorkerExceptionSubscriber; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; use App\MessageQueue\ResetArrayCacheMiddleware; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; @@ -22,7 +22,7 @@ class ProcessCommand extends CommandAbstract public function __invoke( MessageBus $messageBus, EventDispatcher $eventDispatcher, - QueueManager $queueManager, + QueueManagerInterface $queueManager, LoggerInterface $logger, Environment $environment, ?int $runtime = 0, diff --git a/src/Controller/Admin/DebugController.php b/src/Controller/Admin/DebugController.php index 1ede312e2..236051399 100644 --- a/src/Controller/Admin/DebugController.php +++ b/src/Controller/Admin/DebugController.php @@ -10,7 +10,8 @@ use App\Entity; use App\Http\Response; use App\Http\ServerRequest; use App\Message\RunSyncTaskMessage; -use App\MessageQueue\QueueManager; +use App\MessageQueue\AbstractQueueManager; +use App\MessageQueue\QueueManagerInterface; use App\Radio\AutoDJ; use App\Radio\Backend\Liquidsoap; use App\Session\Flash; @@ -39,9 +40,9 @@ class DebugController extends AbstractLogViewerController Response $response, Entity\Repository\StationRepository $stationRepo, Runner $sync, - QueueManager $queueManager + QueueManagerInterface $queueManager ): ResponseInterface { - $queues = QueueManager::getAllQueues(); + $queues = AbstractQueueManager::getAllQueues(); $queueTotals = []; foreach ($queues as $queue) { diff --git a/src/Controller/Api/Stations/Files/BatchAction.php b/src/Controller/Api/Stations/Files/BatchAction.php index 7c68996e1..0c608816c 100644 --- a/src/Controller/Api/Stations/Files/BatchAction.php +++ b/src/Controller/Api/Stations/Files/BatchAction.php @@ -11,7 +11,7 @@ use App\Http\Response; use App\Http\ServerRequest; use App\Media\BatchUtilities; use App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; use App\Radio\Backend\Liquidsoap; use App\Utilities\File; use Azura\Files\ExtendedFilesystemInterface; @@ -28,7 +28,7 @@ class BatchAction protected BatchUtilities $batchUtilities, protected ReloadableEntityManagerInterface $em, protected MessageBus $messageBus, - protected QueueManager $queueManager, + protected QueueManagerInterface $queueManager, protected Entity\Repository\StationPlaylistMediaRepository $playlistMediaRepo, protected Entity\Repository\StationPlaylistFolderRepository $playlistFolderRepo, ) { @@ -288,7 +288,7 @@ class BatchAction $queuedMediaUpdates = []; $queuedNewFiles = []; - foreach ($this->queueManager->getMessagesInTransport(QueueManager::QUEUE_MEDIA) as $message) { + foreach ($this->queueManager->getMessagesInTransport(QueueManagerInterface::QUEUE_MEDIA) as $message) { if ($message instanceof Message\ReprocessMediaMessage) { $queuedMediaUpdates[$message->media_id] = true; } elseif ( diff --git a/src/Entity/MessengerMessage.php b/src/Entity/MessengerMessage.php deleted file mode 100644 index f3738c582..000000000 --- a/src/Entity/MessengerMessage.php +++ /dev/null @@ -1,44 +0,0 @@ -addSql('DROP TABLE messenger_messages'); + } + + public function down(Schema $schema): void + { + // this down() migration is auto-generated, please modify it to your needs + $this->addSql('CREATE TABLE messenger_messages (id BIGINT AUTO_INCREMENT NOT NULL, body LONGTEXT CHARACTER SET utf8mb4 NOT NULL COLLATE `utf8mb4_general_ci`, headers LONGTEXT CHARACTER SET utf8mb4 NOT NULL COLLATE `utf8mb4_general_ci`, queue_name VARCHAR(190) CHARACTER SET utf8mb4 NOT NULL COLLATE `utf8mb4_general_ci`, created_at DATETIME NOT NULL, available_at DATETIME NOT NULL, delivered_at DATETIME DEFAULT NULL, INDEX IDX_75EA56E0FB7336F0 (queue_name), INDEX IDX_75EA56E0E3BD61CE (available_at), INDEX IDX_75EA56E016BA31DB (delivered_at), PRIMARY KEY(id)) DEFAULT CHARACTER SET utf8 COLLATE `utf8_unicode_ci` ENGINE = InnoDB COMMENT = \'\' '); + } +} diff --git a/src/Entity/Repository/MessengerMessageRepository.php b/src/Entity/Repository/MessengerMessageRepository.php deleted file mode 100644 index f227d5c9c..000000000 --- a/src/Entity/Repository/MessengerMessageRepository.php +++ /dev/null @@ -1,24 +0,0 @@ -em->createQueryBuilder() - ->delete(Entity\MessengerMessage::class, 'mm'); - - if (!empty($queueName)) { - $qb->where('mm.queueName = :queueName') - ->setParameter('queueName', $queueName); - } - - $qb->getQuery()->execute(); - } -} diff --git a/src/Message/AbstractMessage.php b/src/Message/AbstractMessage.php index 60a86000f..384787874 100644 --- a/src/Message/AbstractMessage.php +++ b/src/Message/AbstractMessage.php @@ -4,12 +4,12 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; abstract class AbstractMessage { public function getQueue(): string { - return QueueManager::QUEUE_NORMAL_PRIORITY; + return QueueManagerInterface::QUEUE_NORMAL_PRIORITY; } } diff --git a/src/Message/AddNewMediaMessage.php b/src/Message/AddNewMediaMessage.php index 2aaa17590..a0066e6de 100644 --- a/src/Message/AddNewMediaMessage.php +++ b/src/Message/AddNewMediaMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class AddNewMediaMessage extends AbstractUniqueMessage { @@ -16,6 +16,6 @@ class AddNewMediaMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_MEDIA; + return QueueManagerInterface::QUEUE_MEDIA; } } diff --git a/src/Message/AddNewPodcastMediaMessage.php b/src/Message/AddNewPodcastMediaMessage.php index 8d931f4e4..ca720bf84 100644 --- a/src/Message/AddNewPodcastMediaMessage.php +++ b/src/Message/AddNewPodcastMediaMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class AddNewPodcastMediaMessage extends AbstractUniqueMessage { @@ -16,6 +16,6 @@ class AddNewPodcastMediaMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_PODCAST_MEDIA; + return QueueManagerInterface::QUEUE_PODCAST_MEDIA; } } diff --git a/src/Message/BackupMessage.php b/src/Message/BackupMessage.php index d99ad0d04..76ed71fcf 100644 --- a/src/Message/BackupMessage.php +++ b/src/Message/BackupMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class BackupMessage extends AbstractUniqueMessage { @@ -33,6 +33,6 @@ class BackupMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_LOW_PRIORITY; + return QueueManagerInterface::QUEUE_LOW_PRIORITY; } } diff --git a/src/Message/ReprocessMediaMessage.php b/src/Message/ReprocessMediaMessage.php index 81cee262c..17cf88670 100644 --- a/src/Message/ReprocessMediaMessage.php +++ b/src/Message/ReprocessMediaMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class ReprocessMediaMessage extends AbstractUniqueMessage { @@ -21,6 +21,6 @@ class ReprocessMediaMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_MEDIA; + return QueueManagerInterface::QUEUE_MEDIA; } } diff --git a/src/Message/ReprocessPodcastMediaMessage.php b/src/Message/ReprocessPodcastMediaMessage.php index 185a453ff..7333a591f 100644 --- a/src/Message/ReprocessPodcastMediaMessage.php +++ b/src/Message/ReprocessPodcastMediaMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class ReprocessPodcastMediaMessage extends AbstractUniqueMessage { @@ -21,6 +21,6 @@ class ReprocessPodcastMediaMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_PODCAST_MEDIA; + return QueueManagerInterface::QUEUE_PODCAST_MEDIA; } } diff --git a/src/Message/RunSyncTaskMessage.php b/src/Message/RunSyncTaskMessage.php index e9865b884..f0c01674e 100644 --- a/src/Message/RunSyncTaskMessage.php +++ b/src/Message/RunSyncTaskMessage.php @@ -5,7 +5,7 @@ declare(strict_types=1); namespace App\Message; use App\Environment; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class RunSyncTaskMessage extends AbstractUniqueMessage { @@ -26,6 +26,6 @@ class RunSyncTaskMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_HIGH_PRIORITY; + return QueueManagerInterface::QUEUE_HIGH_PRIORITY; } } diff --git a/src/Message/UpdateNowPlayingMessage.php b/src/Message/UpdateNowPlayingMessage.php index 6cb7d3261..1cc7b8200 100644 --- a/src/Message/UpdateNowPlayingMessage.php +++ b/src/Message/UpdateNowPlayingMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class UpdateNowPlayingMessage extends AbstractMessage { @@ -12,6 +12,6 @@ class UpdateNowPlayingMessage extends AbstractMessage public function getQueue(): string { - return QueueManager::QUEUE_HIGH_PRIORITY; + return QueueManagerInterface::QUEUE_HIGH_PRIORITY; } } diff --git a/src/Message/WritePlaylistFileMessage.php b/src/Message/WritePlaylistFileMessage.php index db91c3152..a4e23b498 100644 --- a/src/Message/WritePlaylistFileMessage.php +++ b/src/Message/WritePlaylistFileMessage.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace App\Message; -use App\MessageQueue\QueueManager; +use App\MessageQueue\QueueManagerInterface; class WritePlaylistFileMessage extends AbstractUniqueMessage { @@ -18,6 +18,6 @@ class WritePlaylistFileMessage extends AbstractUniqueMessage public function getQueue(): string { - return QueueManager::QUEUE_LOW_PRIORITY; + return QueueManagerInterface::QUEUE_LOW_PRIORITY; } } diff --git a/src/MessageQueue/AbstractQueueManager.php b/src/MessageQueue/AbstractQueueManager.php new file mode 100644 index 000000000..5411402cd --- /dev/null +++ b/src/MessageQueue/AbstractQueueManager.php @@ -0,0 +1,76 @@ +workerName = $workerName; + } + + /** + * @inheritDoc + */ + public function getSenders(Envelope $envelope): iterable + { + $message = $envelope->getMessage(); + + if (!$message instanceof AbstractMessage) { + return [ + $this->getTransport(self::QUEUE_NORMAL_PRIORITY), + ]; + } + + $queue = $message->getQueue(); + return [ + $this->getTransport($queue), + ]; + } + + /** + * @inheritDoc + */ + public function getMessagesInTransport(string $queueName): Generator + { + foreach ($this->getTransport($queueName)->get() as $envelope) { + $message = $envelope->getMessage(); + if ($message instanceof AbstractMessage) { + yield $message; + } + } + } + + public function getTransports(): array + { + $allQueues = self::getAllQueues(); + + $transports = []; + foreach ($allQueues as $queueName) { + $transports[$queueName] = $this->getTransport($queueName); + } + return $transports; + } + + /** + * @return string[] + */ + public static function getAllQueues(): array + { + return [ + self::QUEUE_HIGH_PRIORITY, + self::QUEUE_NORMAL_PRIORITY, + self::QUEUE_LOW_PRIORITY, + self::QUEUE_MEDIA, + self::QUEUE_PODCAST_MEDIA, + ]; + } +} diff --git a/src/MessageQueue/QueueManager.php b/src/MessageQueue/QueueManager.php index 63def64dd..cbad7cc08 100644 --- a/src/MessageQueue/QueueManager.php +++ b/src/MessageQueue/QueueManager.php @@ -4,133 +4,52 @@ declare(strict_types=1); namespace App\MessageQueue; -use App\Message\AbstractMessage; -use Doctrine\DBAL\Connection; -use Generator; -use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection as MessengerConnection; -use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface; +use Pheanstalk\Pheanstalk; +use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport; +use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection as MessengerConnection; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; -class QueueManager implements SendersLocatorInterface +class QueueManager extends AbstractQueueManager { - public const QUEUE_HIGH_PRIORITY = 'high_priority'; - public const QUEUE_NORMAL_PRIORITY = 'normal_priority'; - public const QUEUE_LOW_PRIORITY = 'low_priority'; - public const QUEUE_MEDIA = 'media'; - public const QUEUE_PODCAST_MEDIA = 'podcast_media'; - - protected string $workerName = 'app'; - public function __construct( - protected Connection $db + protected Pheanstalk $pheanstalk ) { } - public function setWorkerName(string $workerName): void + public function clearQueue(string $queueName): void { - $this->workerName = $workerName; - } + $pheanstalk = $this->pheanstalk->useTube($queueName); - /** - * @inheritDoc - */ - public function getSenders(Envelope $envelope): iterable - { - $message = $envelope->getMessage(); - - if (!$message instanceof AbstractMessage) { - return [ - $this->getTransport(self::QUEUE_NORMAL_PRIORITY), - ]; + while ($job = $pheanstalk->reserve()) { + $pheanstalk->delete($job); } - - $queue = $message->getQueue(); - return [ - $this->getTransport($queue), - ]; } - public function getConnection(string $queueName): MessengerConnection + public function getTransport(string $queueName): BeanstalkdTransport { - return new MessengerConnection( - [ - 'table_name' => 'messenger_messages', - 'queue_name' => $queueName, - 'auto_setup' => false, - ], - $this->db - ); - } - - public function getTransport(string $queueName): DoctrineTransport - { - return new DoctrineTransport( + return new BeanstalkdTransport( $this->getConnection($queueName), new PhpSerializer() ); } - /** - * @param string $queueName - * - * @return Generator - */ - public function getMessagesInTransport(string $queueName): Generator + protected function getConnection(string $queueName): MessengerConnection { - foreach ($this->getTransport($queueName)->all() as $envelope) { - $message = $envelope->getMessage(); - if ($message instanceof AbstractMessage) { - yield $message; - } - } - } - - /** - * @return DoctrineTransport[] - */ - public function getTransports(): array - { - $allQueues = self::getAllQueues(); - - $transports = []; - foreach ($allQueues as $queueName) { - $transports[$queueName] = $this->getTransport($queueName); - } - return $transports; - } - - /** - * @return MessengerConnection[] - */ - public function getConnections(): array - { - $allQueues = self::getAllQueues(); - - $connections = []; - foreach ($allQueues as $queueName) { - $connections[$queueName] = $this->getConnection($queueName); - } - return $connections; + return new MessengerConnection( + [ + 'tube_name' => $queueName, + ], + $this->pheanstalk + ); } public function getQueueCount(string $queueName): int { - return $this->getConnection($queueName)->getMessageCount(); - } - - /** - * @return string[] - */ - public static function getAllQueues(): array - { - return [ - self::QUEUE_HIGH_PRIORITY, - self::QUEUE_NORMAL_PRIORITY, - self::QUEUE_LOW_PRIORITY, - self::QUEUE_MEDIA, - self::QUEUE_PODCAST_MEDIA, - ]; + try { + return $this->getConnection($queueName)->getMessageCount(); + } catch (TransportException) { + return 0; + } } } diff --git a/src/MessageQueue/QueueManagerInterface.php b/src/MessageQueue/QueueManagerInterface.php new file mode 100644 index 000000000..08771e58b --- /dev/null +++ b/src/MessageQueue/QueueManagerInterface.php @@ -0,0 +1,41 @@ + + */ + public function getMessagesInTransport(string $queueName): Generator; + + /** + * @return TransportInterface[] + */ + public function getTransports(): array; + + public function getQueueCount(string $queueName): int; + + public static function getAllQueues(): array; +} diff --git a/src/MessageQueue/TestQueueManager.php b/src/MessageQueue/TestQueueManager.php new file mode 100644 index 000000000..dccbe70fe --- /dev/null +++ b/src/MessageQueue/TestQueueManager.php @@ -0,0 +1,26 @@ +queueManager->getMessagesInTransport(QueueManager::QUEUE_MEDIA) as $message) { + foreach ($this->queueManager->getMessagesInTransport(QueueManagerInterface::QUEUE_MEDIA) as $message) { if ($message instanceof Message\ReprocessMediaMessage) { $queuedMediaUpdates[$message->media_id] = true; } elseif ( diff --git a/update.sh b/update.sh index 86803a84c..089f1a682 100755 --- a/update.sh +++ b/update.sh @@ -34,7 +34,7 @@ else fi APP_ENV="${APP_ENV:-production}" -UPDATE_REVISION="${UPDATE_REVISION:-64}" +UPDATE_REVISION="${UPDATE_REVISION:-65}" echo "Updating AzuraCast (Environment: $APP_ENV, Update revision: $UPDATE_REVISION)" diff --git a/util/ansible/deploy.yml b/util/ansible/deploy.yml index 4d26380e4..daf9a1747 100644 --- a/util/ansible/deploy.yml +++ b/util/ansible/deploy.yml @@ -24,6 +24,7 @@ - nginx - php - redis + - beanstalkd - mariadb - azuracast-db-install - ufw diff --git a/util/ansible/roles/beanstalkd/tasks/main.yml b/util/ansible/roles/beanstalkd/tasks/main.yml new file mode 100644 index 000000000..073ad1c8f --- /dev/null +++ b/util/ansible/roles/beanstalkd/tasks/main.yml @@ -0,0 +1,5 @@ +--- +- name: Install Beanstalkd + apt: + name: beanstalkd + state: latest diff --git a/util/ansible/roles/services/tasks/main.yml b/util/ansible/roles/services/tasks/main.yml index 03fdb95d4..8ccd34a70 100644 --- a/util/ansible/roles/services/tasks/main.yml +++ b/util/ansible/roles/services/tasks/main.yml @@ -11,3 +11,4 @@ - "nginx" - "redis-server" - "redis" + - "beanstalkd" diff --git a/util/ansible/update.yml b/util/ansible/update.yml index 8c22cc8e2..925eb9399 100644 --- a/util/ansible/update.yml +++ b/util/ansible/update.yml @@ -21,6 +21,7 @@ - { role : mariadb, when : update_revision|int < 63 } - { role : nginx, when : update_revision|int < 60 } - { role : redis, when : update_revision|int < 57 } + - { role : beanstalkd, when : update_revision|int < 65 } - { role : php, when : update_revision|int < 62 } - composer - { role : influxdb, when : update_revision|int < 58 } diff --git a/util/docker/web/runit/beanstalkd/run b/util/docker/web/runit/beanstalkd/run new file mode 100644 index 000000000..872088c37 --- /dev/null +++ b/util/docker/web/runit/beanstalkd/run @@ -0,0 +1,5 @@ +#!/bin/bash + +echo 'Spinning up Beanstalkd process...' + +setuser azuracast beanstalkd -p 11300 diff --git a/util/docker/web/setup/beanstalkd.sh b/util/docker/web/setup/beanstalkd.sh new file mode 100644 index 000000000..d936d46f9 --- /dev/null +++ b/util/docker/web/setup/beanstalkd.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e +source /bd_build/buildconfig +set -x + +apt-get update + +$minimal_apt_get_install beanstalkd