Switch to new Symfony Lock component for our mutex locks.
This commit is contained in:
parent
41937eb4eb
commit
7bac35b2ff
|
@ -44,7 +44,6 @@
|
|||
"league/flysystem-cached-adapter": "^1.0",
|
||||
"league/plates": "^3.1",
|
||||
"lstrojny/fxmlrpc": "dev-master",
|
||||
"malkusch/lock": "^2.1",
|
||||
"maxmind-db/reader": "~1.0",
|
||||
"mezzio/mezzio-session": "^1.3",
|
||||
"mezzio/mezzio-session-cache": "^1.4",
|
||||
|
@ -67,6 +66,7 @@
|
|||
"symfony/console": "^5",
|
||||
"symfony/event-dispatcher": "^5",
|
||||
"symfony/finder": "^5",
|
||||
"symfony/lock": "^5.1",
|
||||
"symfony/messenger": "^5",
|
||||
"symfony/process": "^5",
|
||||
"symfony/property-access": "^5",
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
|
||||
"This file is @generated automatically"
|
||||
],
|
||||
"content-hash": "6d6b75f418dc3dab7a0f32d0e6986719",
|
||||
"content-hash": "9a9cb6e8e6a42a17e3b5bfbaffee22cb",
|
||||
"packages": [
|
||||
{
|
||||
"name": "aws/aws-sdk-php",
|
||||
|
@ -3493,87 +3493,6 @@
|
|||
],
|
||||
"time": "2020-07-02T22:06:59+00:00"
|
||||
},
|
||||
{
|
||||
"name": "malkusch/lock",
|
||||
"version": "v2.1",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/php-lock/lock.git",
|
||||
"reference": "093f389ec2f38fc8686d2f70e23378182fce7714"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/php-lock/lock/zipball/093f389ec2f38fc8686d2f70e23378182fce7714",
|
||||
"reference": "093f389ec2f38fc8686d2f70e23378182fce7714",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
"php": ">=7.1",
|
||||
"psr/log": "^1"
|
||||
},
|
||||
"require-dev": {
|
||||
"eloquent/liberator": "^2.0",
|
||||
"ext-memcached": "*",
|
||||
"ext-pcntl": "*",
|
||||
"ext-pdo_mysql": "*",
|
||||
"ext-pdo_sqlite": "*",
|
||||
"ext-redis": "*",
|
||||
"ext-sysvsem": "*",
|
||||
"johnkary/phpunit-speedtrap": "^3.0",
|
||||
"kriswallsmith/spork": "^0.3",
|
||||
"mikey179/vfsstream": "^1.6",
|
||||
"php-mock/php-mock-phpunit": "^2.1",
|
||||
"phpunit/phpunit": "^7.4",
|
||||
"predis/predis": "^1.1",
|
||||
"squizlabs/php_codesniffer": "^3.3"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-pnctl": "Enables locking with flock without busy waiting in CLI scripts.",
|
||||
"ext-redis": "To use this library with the PHP Redis extension.",
|
||||
"ext-sysvsem": "Enables locking using semaphores.",
|
||||
"predis/predis": "To use this library with predis."
|
||||
},
|
||||
"type": "library",
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"malkusch\\lock\\": "classes/"
|
||||
}
|
||||
},
|
||||
"notification-url": "https://packagist.org/downloads/",
|
||||
"license": [
|
||||
"WTFPL"
|
||||
],
|
||||
"authors": [
|
||||
{
|
||||
"name": "Markus Malkusch",
|
||||
"email": "markus@malkusch.de",
|
||||
"homepage": "http://markus.malkusch.de",
|
||||
"role": "Developer"
|
||||
},
|
||||
{
|
||||
"name": "Willem Stuursma-Ruwen",
|
||||
"email": "willem@stuursma.name",
|
||||
"role": "Developer"
|
||||
}
|
||||
],
|
||||
"description": "Mutex library for exclusive code execution.",
|
||||
"homepage": "https://github.com/malkusch/lock",
|
||||
"keywords": [
|
||||
"advisory-locks",
|
||||
"cas",
|
||||
"flock",
|
||||
"lock",
|
||||
"locking",
|
||||
"memcache",
|
||||
"mutex",
|
||||
"mysql",
|
||||
"postgresql",
|
||||
"redis",
|
||||
"redlock",
|
||||
"semaphore"
|
||||
],
|
||||
"time": "2018-12-12T19:53:29+00:00"
|
||||
},
|
||||
{
|
||||
"name": "maxmind-db/reader",
|
||||
"version": "v1.7.0",
|
||||
|
@ -6532,6 +6451,87 @@
|
|||
],
|
||||
"time": "2020-08-17T10:01:29+00:00"
|
||||
},
|
||||
{
|
||||
"name": "symfony/lock",
|
||||
"version": "v5.1.5",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/symfony/lock.git",
|
||||
"reference": "178b8bdea788ca1a2bb8ff01d235de4035596e33"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/symfony/lock/zipball/178b8bdea788ca1a2bb8ff01d235de4035596e33",
|
||||
"reference": "178b8bdea788ca1a2bb8ff01d235de4035596e33",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
"php": ">=7.2.5",
|
||||
"psr/log": "~1.0",
|
||||
"symfony/polyfill-php80": "^1.15"
|
||||
},
|
||||
"conflict": {
|
||||
"doctrine/dbal": "<2.5"
|
||||
},
|
||||
"require-dev": {
|
||||
"doctrine/dbal": "^2.5|^3.0",
|
||||
"mongodb/mongodb": "~1.1",
|
||||
"predis/predis": "~1.0"
|
||||
},
|
||||
"type": "library",
|
||||
"extra": {
|
||||
"branch-alias": {
|
||||
"dev-master": "5.1-dev"
|
||||
}
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Symfony\\Component\\Lock\\": ""
|
||||
},
|
||||
"exclude-from-classmap": [
|
||||
"/Tests/"
|
||||
]
|
||||
},
|
||||
"notification-url": "https://packagist.org/downloads/",
|
||||
"license": [
|
||||
"MIT"
|
||||
],
|
||||
"authors": [
|
||||
{
|
||||
"name": "Jérémy Derussé",
|
||||
"email": "jeremy@derusse.com"
|
||||
},
|
||||
{
|
||||
"name": "Symfony Community",
|
||||
"homepage": "https://symfony.com/contributors"
|
||||
}
|
||||
],
|
||||
"description": "Symfony Lock Component",
|
||||
"homepage": "https://symfony.com",
|
||||
"keywords": [
|
||||
"cas",
|
||||
"flock",
|
||||
"locking",
|
||||
"mutex",
|
||||
"redlock",
|
||||
"semaphore"
|
||||
],
|
||||
"funding": [
|
||||
{
|
||||
"url": "https://symfony.com/sponsor",
|
||||
"type": "custom"
|
||||
},
|
||||
{
|
||||
"url": "https://github.com/fabpot",
|
||||
"type": "github"
|
||||
},
|
||||
{
|
||||
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
|
||||
"type": "tidelift"
|
||||
}
|
||||
],
|
||||
"time": "2020-08-19T10:50:35+00:00"
|
||||
},
|
||||
{
|
||||
"name": "symfony/messenger",
|
||||
"version": "v5.1.5",
|
||||
|
|
|
@ -276,6 +276,19 @@ return [
|
|||
);
|
||||
},
|
||||
|
||||
Symfony\Component\Lock\LockFactory::class => function (
|
||||
Redis $redis,
|
||||
Psr\Log\LoggerInterface $logger
|
||||
) {
|
||||
$redisStore = new Symfony\Component\Lock\Store\RedisStore($redis);
|
||||
$retryStore = new Symfony\Component\Lock\Store\RetryTillSaveStore($redisStore, 1000, 60);
|
||||
|
||||
$lockFactory = new Symfony\Component\Lock\LockFactory($retryStore);
|
||||
$lockFactory->setLogger($logger);
|
||||
|
||||
return $lockFactory;
|
||||
},
|
||||
|
||||
Symfony\Component\Messenger\MessageBus::class => function (
|
||||
ContainerInterface $di,
|
||||
Monolog\Logger $logger
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
<?php
|
||||
namespace App\Lock;
|
||||
|
||||
use malkusch\lock\exception\ExecutionOutsideLockException;
|
||||
use malkusch\lock\exception\LockAcquireException;
|
||||
use malkusch\lock\exception\LockReleaseException;
|
||||
use malkusch\lock\mutex\PHPRedisMutex;
|
||||
use malkusch\lock\util\Loop;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Redis;
|
||||
|
||||
class Lock extends PHPRedisMutex
|
||||
{
|
||||
protected LoggerInterface $logger;
|
||||
|
||||
protected Loop $loop;
|
||||
|
||||
protected int $timeout;
|
||||
|
||||
protected string $key;
|
||||
|
||||
protected float $acquired;
|
||||
|
||||
protected bool $waitForLock = false;
|
||||
|
||||
protected int $waitTimeout = 30;
|
||||
|
||||
public function __construct(
|
||||
Redis $redis,
|
||||
LoggerInterface $logger,
|
||||
string $key,
|
||||
int $ttl = 30,
|
||||
bool $waitForLock = false,
|
||||
?int $waitTimeout = null
|
||||
) {
|
||||
parent::__construct([$redis], $key, $ttl);
|
||||
|
||||
$this->logger = $logger;
|
||||
$this->setLogger($logger);
|
||||
|
||||
$this->key = $key;
|
||||
$this->timeout = $ttl;
|
||||
$this->waitForLock = $waitForLock;
|
||||
$this->waitTimeout = $waitTimeout ?? $ttl;
|
||||
}
|
||||
|
||||
protected function lock(): void
|
||||
{
|
||||
if ($this->waitForLock) {
|
||||
$this->loop = new Loop($this->waitTimeout);
|
||||
|
||||
$this->loop->execute(function (): void {
|
||||
$this->acquired = microtime(true);
|
||||
|
||||
if ($this->acquire($this->key, $this->timeout + 1)) {
|
||||
$this->loop->end();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
$this->acquired = microtime(true);
|
||||
|
||||
if (!$this->acquire($this->key, $this->timeout + 1)) {
|
||||
throw new LockAcquireException('Failed to acquire the lock.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected function unlock(): void
|
||||
{
|
||||
$elapsed_time = microtime(true) - $this->acquired;
|
||||
if ($elapsed_time > $this->timeout) {
|
||||
$e = ExecutionOutsideLockException::create($elapsed_time, $this->timeout);
|
||||
$this->logger->error($e->getMessage());
|
||||
}
|
||||
|
||||
if (!$this->release($this->key)) {
|
||||
throw new LockReleaseException("Failed to release the lock.");
|
||||
}
|
||||
}
|
||||
|
||||
public function run(callable $code)
|
||||
{
|
||||
return $this->synchronized($code);
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
<?php
|
||||
namespace App\Lock;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Redis;
|
||||
|
||||
class LockManager
|
||||
{
|
||||
protected Redis $redis;
|
||||
|
||||
protected LoggerInterface $logger;
|
||||
|
||||
public function __construct(Redis $redis, LoggerInterface $logger)
|
||||
{
|
||||
$this->redis = $redis;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function getLock(
|
||||
string $key,
|
||||
int $ttl = 30,
|
||||
bool $waitForLock = false,
|
||||
?int $waitTimeout = null
|
||||
): Lock {
|
||||
return new Lock($this->redis, $this->logger, $key, $ttl, $waitForLock, $waitTimeout);
|
||||
}
|
||||
}
|
|
@ -5,9 +5,9 @@ use App\Entity;
|
|||
use App\Entity\Repository\SettingsRepository;
|
||||
use App\Event\GetSyncTasks;
|
||||
use App\EventDispatcher;
|
||||
use App\Lock\LockManager;
|
||||
use App\Settings;
|
||||
use Monolog\Logger;
|
||||
use Symfony\Component\Lock\LockFactory;
|
||||
|
||||
/**
|
||||
* The runner of scheduled synchronization tasks.
|
||||
|
@ -18,19 +18,19 @@ class Runner
|
|||
|
||||
protected SettingsRepository $settingsRepo;
|
||||
|
||||
protected LockManager $lockManager;
|
||||
protected LockFactory $lockFactory;
|
||||
|
||||
protected EventDispatcher $eventDispatcher;
|
||||
|
||||
public function __construct(
|
||||
SettingsRepository $settingsRepo,
|
||||
Logger $logger,
|
||||
LockManager $lockManager,
|
||||
LockFactory $lockFactory,
|
||||
EventDispatcher $eventDispatcher
|
||||
) {
|
||||
$this->settingsRepo = $settingsRepo;
|
||||
$this->logger = $logger;
|
||||
$this->lockManager = $lockManager;
|
||||
$this->lockFactory = $lockFactory;
|
||||
$this->eventDispatcher = $eventDispatcher;
|
||||
}
|
||||
|
||||
|
@ -100,10 +100,13 @@ class Runner
|
|||
|
||||
$this->logger->info(sprintf('Running sync task: %s', $syncInfo['name']));
|
||||
|
||||
$lock = $this->lockManager->getLock('sync_' . $type, $syncInfo['timeout'], $force);
|
||||
$lock = $this->lockFactory->createLock('sync_' . $type, $syncInfo['timeout']);
|
||||
|
||||
$lock->run(function () use ($syncInfo, $type, $force) {
|
||||
if (!$lock->acquire($force)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$event = new GetSyncTasks($type);
|
||||
$this->eventDispatcher->dispatch($event);
|
||||
|
||||
|
@ -125,7 +128,9 @@ class Runner
|
|||
}
|
||||
|
||||
$this->settingsRepo->setSetting($syncInfo['lastRunSetting'], time());
|
||||
});
|
||||
} finally {
|
||||
$lock->release();
|
||||
}
|
||||
}
|
||||
|
||||
public function getSyncTimes(): array
|
||||
|
|
|
@ -2,28 +2,28 @@
|
|||
namespace App\Sync\Task;
|
||||
|
||||
use App\Entity;
|
||||
use App\Lock\LockManager;
|
||||
use App\Radio\AutoDJ;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Lock\LockFactory;
|
||||
|
||||
class BuildQueue extends AbstractTask
|
||||
{
|
||||
protected AutoDJ $autoDJ;
|
||||
|
||||
protected LockManager $lockManager;
|
||||
protected LockFactory $lockFactory;
|
||||
|
||||
public function __construct(
|
||||
EntityManagerInterface $em,
|
||||
Entity\Repository\SettingsRepository $settingsRepo,
|
||||
LoggerInterface $logger,
|
||||
AutoDJ $autoDJ,
|
||||
LockManager $lockManager
|
||||
LockFactory $lockFactory
|
||||
) {
|
||||
parent::__construct($em, $settingsRepo, $logger);
|
||||
|
||||
$this->autoDJ = $autoDJ;
|
||||
$this->lockManager = $lockManager;
|
||||
$this->lockFactory = $lockFactory;
|
||||
}
|
||||
|
||||
public function run(bool $force = false): void
|
||||
|
@ -44,9 +44,16 @@ class BuildQueue extends AbstractTask
|
|||
return;
|
||||
}
|
||||
|
||||
$lock = $this->lockManager->getLock('autodj_queue_' . $station->getId(), 60, $force);
|
||||
$lock->run(function () use ($station) {
|
||||
$lock = $this->lockFactory->createLock('autodj_queue_' . $station->getId(), 60);
|
||||
|
||||
if (!$lock->acquire($force)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->autoDJ->buildQueue($station);
|
||||
});
|
||||
} finally {
|
||||
$lock->release();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,7 +7,6 @@ use App\Entity\Station;
|
|||
use App\Event\Radio\GenerateRawNowPlaying;
|
||||
use App\Event\SendWebhooks;
|
||||
use App\EventDispatcher;
|
||||
use App\Lock\LockManager;
|
||||
use App\Message;
|
||||
use App\Radio\Adapters;
|
||||
use App\Radio\AutoDJ;
|
||||
|
@ -22,6 +21,7 @@ use NowPlaying\Result\Result;
|
|||
use Psr\Log\LoggerInterface;
|
||||
use Psr\SimpleCache\CacheInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Lock\LockFactory;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use function DeepCopy\deep_copy;
|
||||
|
@ -50,7 +50,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
|
|||
|
||||
protected Entity\Repository\ListenerRepository $listener_repo;
|
||||
|
||||
protected LockManager $lockManager;
|
||||
protected LockFactory $lockFactory;
|
||||
|
||||
protected string $analytics_level = Entity\Analytics::LEVEL_ALL;
|
||||
|
||||
|
@ -64,7 +64,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
|
|||
LoggerInterface $logger,
|
||||
EventDispatcher $event_dispatcher,
|
||||
MessageBus $messageBus,
|
||||
LockManager $lockManager,
|
||||
LockFactory $lockFactory,
|
||||
Entity\Repository\SongHistoryRepository $historyRepository,
|
||||
Entity\Repository\SongRepository $songRepository,
|
||||
Entity\Repository\ListenerRepository $listenerRepository,
|
||||
|
@ -80,7 +80,7 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
|
|||
$this->event_dispatcher = $event_dispatcher;
|
||||
$this->messageBus = $messageBus;
|
||||
$this->influx = $influx;
|
||||
$this->lockManager = $lockManager;
|
||||
$this->lockFactory = $lockFactory;
|
||||
|
||||
$this->history_repo = $historyRepository;
|
||||
$this->song_repo = $songRepository;
|
||||
|
@ -174,9 +174,11 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
|
|||
Entity\Station $station,
|
||||
$standalone = false
|
||||
): Entity\Api\NowPlaying {
|
||||
$lock = $this->lockManager->getLock('nowplaying_station_' . $station->getId(), 600, true, 30);
|
||||
$lock = $this->lockFactory->createLock('nowplaying_station_' . $station->getId(), 600);
|
||||
|
||||
return $lock->run(function () use ($station, $standalone) {
|
||||
$lock->acquire(true);
|
||||
|
||||
try {
|
||||
/** @var Logger $logger */
|
||||
$logger = $this->logger;
|
||||
|
||||
|
@ -338,7 +340,9 @@ class NowPlaying extends AbstractTask implements EventSubscriberInterface
|
|||
$logger->popProcessor();
|
||||
|
||||
return $np;
|
||||
});
|
||||
} finally {
|
||||
$lock->release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue