#3020 -- Switch to MariaDB-based message queue.

This commit is contained in:
Buster "Silver Eagle" Neece 2020-07-11 00:22:02 -05:00
parent 755d4c9259
commit f3f845cf1a
No known key found for this signature in database
GPG Key ID: 6D9E12FF03411F4E
6 changed files with 171 additions and 39 deletions

34
composer.lock generated
View File

@ -1076,16 +1076,16 @@
},
{
"name": "doctrine/cache",
"version": "1.10.1",
"version": "1.10.2",
"source": {
"type": "git",
"url": "https://github.com/doctrine/cache.git",
"reference": "35a4a70cd94e09e2259dfae7488afc6b474ecbd3"
"reference": "13e3381b25847283a91948d04640543941309727"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/doctrine/cache/zipball/35a4a70cd94e09e2259dfae7488afc6b474ecbd3",
"reference": "35a4a70cd94e09e2259dfae7488afc6b474ecbd3",
"url": "https://api.github.com/repos/doctrine/cache/zipball/13e3381b25847283a91948d04640543941309727",
"reference": "13e3381b25847283a91948d04640543941309727",
"shasum": ""
},
"require": {
@ -1168,20 +1168,20 @@
"type": "tidelift"
}
],
"time": "2020-05-27T16:24:54+00:00"
"time": "2020-07-07T18:54:01+00:00"
},
{
"name": "doctrine/collections",
"version": "1.6.5",
"version": "1.6.6",
"source": {
"type": "git",
"url": "https://github.com/doctrine/collections.git",
"reference": "fc0206348e17e530d09463fef07ba8968406cd6d"
"reference": "5f0470363ff042d0057006ae7acabc5d7b5252d5"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/doctrine/collections/zipball/fc0206348e17e530d09463fef07ba8968406cd6d",
"reference": "fc0206348e17e530d09463fef07ba8968406cd6d",
"url": "https://api.github.com/repos/doctrine/collections/zipball/5f0470363ff042d0057006ae7acabc5d7b5252d5",
"reference": "5f0470363ff042d0057006ae7acabc5d7b5252d5",
"shasum": ""
},
"require": {
@ -1247,7 +1247,7 @@
"type": "tidelift"
}
],
"time": "2020-05-25T19:24:35+00:00"
"time": "2020-06-22T19:14:02+00:00"
},
{
"name": "doctrine/common",
@ -7165,16 +7165,16 @@
},
{
"name": "symfony/service-contracts",
"version": "v2.1.2",
"version": "v2.1.3",
"source": {
"type": "git",
"url": "https://github.com/symfony/service-contracts.git",
"reference": "66a8f0957a3ca54e4f724e49028ab19d75a8918b"
"reference": "58c7475e5457c5492c26cc740cc0ad7464be9442"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/service-contracts/zipball/66a8f0957a3ca54e4f724e49028ab19d75a8918b",
"reference": "66a8f0957a3ca54e4f724e49028ab19d75a8918b",
"url": "https://api.github.com/repos/symfony/service-contracts/zipball/58c7475e5457c5492c26cc740cc0ad7464be9442",
"reference": "58c7475e5457c5492c26cc740cc0ad7464be9442",
"shasum": ""
},
"require": {
@ -7188,6 +7188,10 @@
"extra": {
"branch-alias": {
"dev-master": "2.1-dev"
},
"thanks": {
"name": "symfony/contracts",
"url": "https://github.com/symfony/contracts"
}
},
"autoload": {
@ -7233,7 +7237,7 @@
"type": "tidelift"
}
],
"time": "2020-05-20T17:43:50+00:00"
"time": "2020-07-06T13:23:11+00:00"
},
{
"name": "symfony/stopwatch",

View File

@ -162,7 +162,7 @@ return [
return $redis;
},
// Configuration management
App\Config::class => function (App\Settings $settings) {
return new App\Config($settings[App\Settings::CONFIG_DIR]);
@ -238,7 +238,11 @@ return [
new App\Normalizer\DoctrineEntityNormalizer($em, $annotation_reader, $meta_factory),
new Symfony\Component\Serializer\Normalizer\ObjectNormalizer($meta_factory),
];
return new Symfony\Component\Serializer\Serializer($normalizers);
$encoders = [
new Symfony\Component\Serializer\Encoder\JsonEncoder,
];
return new Symfony\Component\Serializer\Serializer($normalizers, $encoders);
},
// Symfony Validator
@ -254,29 +258,32 @@ return [
return $builder->getValidator();
},
Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport::class => function (
Redis $redis
Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport::class => function (
Doctrine\DBAL\Connection $db,
Symfony\Component\Serializer\Serializer $serializer
) {
// Configure message transport middleware
$redisConnection = new Symfony\Component\Messenger\Bridge\Redis\Transport\Connection(
[],
array_filter([
'host' => $redis->getHost(),
'port' => $redis->getPort(),
'auth' => $redis->getAuth(),
])
$doctrineConnection = new Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection(
[
'table_name' => 'messenger_messages',
'auto_setup' => false,
],
$db
);
return new Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport($redisConnection);
return new Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport(
$doctrineConnection,
new Symfony\Component\Messenger\Transport\Serialization\PhpSerializer
);
},
Symfony\Component\Messenger\MessageBus::class => function (
ContainerInterface $di,
Monolog\Logger $logger
) {
// Configure message sending middleware
$senders = [
App\Message\AbstractMessage::class => [
Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport::class,
Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport::class,
],
];

View File

@ -5,7 +5,7 @@ use App\Doctrine\Messenger\ClearEntityManagerSubscriber;
use App\EventDispatcher;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Worker;
@ -15,13 +15,13 @@ class ProcessMessageQueueCommand extends CommandAbstract
public function __invoke(
MessageBus $messageBus,
EventDispatcher $eventDispatcher,
RedisTransport $redisTransport,
DoctrineTransport $doctrineTransport,
LoggerInterface $logger,
EntityManagerInterface $em,
int $runtime = 0
) {
$receivers = [
RedisTransport::class => $redisTransport,
DoctrineTransport::class => $doctrineTransport,
];
$eventDispatcher->addSubscriber(new ClearEntityManagerSubscriber($em));

View File

@ -0,0 +1,63 @@
<?php
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* Internal table used for Symfony Messenger handling.
*
* @ORM\Table(name="messenger_messages", indexes={
* @ORM\Index(columns={"queue_name"}),
* @ORM\Index(columns={"available_at"}),
* @ORM\Index(columns={"delivered_at"}),
* })
* @ORM\Entity()
*
* @internal
*/
class MessengerMessage
{
/**
* @ORM\Column(name="id", type="bigint")
* @ORM\Id
* @ORM\GeneratedValue(strategy="IDENTITY")
* @var int
*/
protected $id;
/**
* @ORM\Column(name="body", type="text")
* @var string
*/
protected $body;
/**
* @ORM\Column(name="headers", type="text")
* @var string
*/
protected $headers;
/**
* @ORM\Column(name="queue_name", type="string", length=190)
* @var string
*/
protected $queue_name;
/**
* @ORM\Column(name="created_at", type="datetime")
* @var \DateTime
*/
protected $created_at;
/**
* @ORM\Column(name="available_at", type="datetime")
* @var \DateTime
*/
protected $available_at;
/**
* @ORM\Column(name="delivered_at", type="datetime", nullable=true)
* @var \DateTime
*/
protected $delivered_at;
}

View File

@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace App\Entity\Migration;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\AbstractMigration;
/**
* Auto-generated Migration: Please modify to your needs!
*/
final class Version20200711002451 extends AbstractMigration
{
public function getDescription(): string
{
return 'Add Messenger messages table.';
}
public function up(Schema $schema): void
{
// this up() migration is auto-generated, please modify it to your needs
$this->addSql('CREATE TABLE messenger_messages (id BIGINT AUTO_INCREMENT NOT NULL, body LONGTEXT NOT NULL, headers LONGTEXT NOT NULL, queue_name VARCHAR(190) NOT NULL, created_at DATETIME NOT NULL, available_at DATETIME NOT NULL, delivered_at DATETIME DEFAULT NULL, INDEX IDX_75EA56E0FB7336F0 (queue_name), INDEX IDX_75EA56E0E3BD61CE (available_at), INDEX IDX_75EA56E016BA31DB (delivered_at), PRIMARY KEY(id)) DEFAULT CHARACTER SET utf8mb4 COLLATE `utf8mb4_general_ci` ENGINE = InnoDB');
}
public function down(Schema $schema): void
{
// this down() migration is auto-generated, please modify it to your needs
$this->addSql('DROP TABLE messenger_messages');
}
}

View File

@ -11,6 +11,7 @@ use DoctrineBatchUtils\BatchProcessing\SimpleBatchIteratorAggregate;
use Jhofm\FlysystemIterator\Filter\FilterFactory;
use Psr\Log\LoggerInterface;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\MessageBus;
class Media extends AbstractTask
@ -23,6 +24,8 @@ class Media extends AbstractTask
protected MessageBus $messageBus;
protected DoctrineTransport $doctrineTransport;
public function __construct(
EntityManagerInterface $em,
Entity\Repository\SettingsRepository $settingsRepo,
@ -30,7 +33,8 @@ class Media extends AbstractTask
Entity\Repository\StationMediaRepository $mediaRepo,
Entity\Repository\StationPlaylistMediaRepository $spmRepo,
Filesystem $filesystem,
MessageBus $messageBus
MessageBus $messageBus,
DoctrineTransport $doctrineTransport
) {
parent::__construct($em, $settingsRepo, $logger);
@ -38,6 +42,7 @@ class Media extends AbstractTask
$this->spmRepo = $spmRepo;
$this->filesystem = $filesystem;
$this->messageBus = $messageBus;
$this->doctrineTransport = $doctrineTransport;
}
/**
@ -117,6 +122,22 @@ class Media extends AbstractTask
$stats['total_size'] = $total_size . ' (' . Quota::getReadableSize($total_size) . ')';
$stats['total_files'] = count($music_files);
// Check queue for existing pending processing entries.
$queued_media_updates = [];
$queued_new_files = [];
$queue = $this->doctrineTransport->all();
foreach ($queue as $envelope) {
$message = $envelope->getMessage();
if ($message instanceof Message\ReprocessMediaMessage) {
$queued_media_updates[$message->media_id] = true;
} elseif ($message instanceof Message\AddNewMediaMessage && $message->station_id === $station->getId()) {
$queued_new_files[$message->path] = true;
}
}
$existingMediaQuery = $this->em->createQuery(/** @lang DQL */ 'SELECT
sm
FROM App\Entity\StationMedia sm
@ -139,7 +160,9 @@ class Media extends AbstractTask
}
$file_info = $music_files[$path_hash];
if ($force_reprocess || $media_row->needsReprocessing($file_info['timestamp'])) {
if (isset($queued_media_updates[$media_row->getId()])) {
$stats['already_queued']++;
} elseif ($force_reprocess || $media_row->needsReprocessing($file_info['timestamp'])) {
$message = new Message\ReprocessMediaMessage;
$message->media_id = $media_row->getId();
$message->force = $force_reprocess;
@ -163,13 +186,17 @@ class Media extends AbstractTask
// Create files that do not currently exist.
foreach ($music_files as $path_hash => $new_music_file) {
$message = new Message\AddNewMediaMessage;
$message->station_id = $station->getId();
$message->path = $new_music_file['path'];
if (isset($queued_new_files[$new_music_file['path']])) {
$stats['already_queued']++;
} else {
$message = new Message\AddNewMediaMessage;
$message->station_id = $station->getId();
$message->path = $new_music_file['path'];
$this->messageBus->dispatch($message);
$this->messageBus->dispatch($message);
$stats['created']++;
$stats['created']++;
}
}
$this->logger->debug(sprintf('Media processed for station "%s".', $station->getName()), $stats);