#6796 -- Revert to Centrifugo for high-performance NP updates.

This commit is contained in:
Buster Neece 2023-12-13 09:57:51 -06:00
parent 68a555fe7e
commit 4285561eda
No known key found for this signature in database
20 changed files with 363 additions and 1502 deletions

1
.gitignore vendored
View File

@ -1,6 +1,5 @@
# Frontend
node_modules
/frontend/hpnp*
# Junk/cache files.
*Thumbs.db

View File

@ -10,6 +10,8 @@ RUN go install github.com/jwilder/dockerize@v0.6.1
RUN go install github.com/aptible/supercronic@v0.2.28
RUN go install github.com/centrifugal/centrifugo/v5@v5.1.2
#
# MariaDB dependencies build step
#
@ -35,6 +37,7 @@ ENV TZ="UTC"
# Add Go dependencies
COPY --from=go-dependencies /go/bin/dockerize /usr/local/bin
COPY --from=go-dependencies /go/bin/supercronic /usr/local/bin/supercronic
COPY --from=go-dependencies /go/bin/centrifugo /usr/local/bin/centrifugo
# Add MariaDB dependencies
COPY --from=mariadb /usr/local/bin/healthcheck.sh /usr/local/bin/db_healthcheck.sh
@ -161,18 +164,6 @@ ENV TZ="UTC" \
ENTRYPOINT ["tini", "--", "/usr/local/bin/my_init"]
CMD ["--no-main-command"]
#
# High-Performance Now Playing (HPNP) Build
#
FROM node:20-alpine AS hpnp
COPY --chown=node:node ./frontend /data
WORKDIR /data
USER node
RUN npm ci --include=dev \
&& npm run build-hpnp
#
# Final build (Just environment vars and squishing the FS)
#
@ -180,10 +171,6 @@ FROM ubuntu:jammy AS final
COPY --from=pre-final / /
# Add HPNP from previous step
COPY --from=hpnp --chown=azuracast:azuracast /data/hpnp /usr/local/bin/hpnp
RUN chmod a+x /usr/local/bin/hpnp
USER azuracast
WORKDIR /var/azuracast/www

View File

@ -130,6 +130,7 @@ return static function (CallableEventDispatcherInterface $dispatcher) {
App\Sync\Task\RotateLogsTask::class,
App\Sync\Task\RunAnalyticsTask::class,
App\Sync\Task\RunBackupTask::class,
App\Sync\Task\SendTimeOnSocketTask::class,
App\Sync\Task\UpdateGeoLiteTask::class,
App\Sync\Task\UpdateStorageLocationSizesTask::class,
]);

File diff suppressed because it is too large Load Diff

View File

@ -6,8 +6,7 @@
"build": "vite build",
"serve": "vite",
"generate-locales": "vue-gettext-extract",
"generate-api": "swagger-typescript-api --path http://localhost/api/openapi.yml --output ./src/entities --name ApiInterfaces.ts --no-client",
"build-hpnp": "esbuild --bundle --target=node18 --platform=node ./src/hpnp/index.ts > ./hpnp.cjs && pkg --target=node18-linux --output ./hpnp ./hpnp.cjs"
"generate-api": "swagger-typescript-api --path http://localhost/api/openapi.yml --output ./src/entities --name ApiInterfaces.ts --no-client"
},
"dependencies": {
"@codemirror/lang-css": "^6.0.1",
@ -20,28 +19,23 @@
"@fullcalendar/timegrid": "^6",
"@fullcalendar/vue3": "^6",
"@popperjs/core": "^2.11.8",
"@tinyhttp/app": "^2.2.1",
"@vuelidate/core": "^2.0.0",
"@vuelidate/validators": "^2.0.0",
"@vuepic/vue-datepicker": "^7",
"@vueuse/core": "^10",
"axios": "^1",
"better-sse": "^0.10.0",
"bootstrap": "^5.3.0",
"chart.js": "^4.2.1",
"chartjs-adapter-luxon": "^1.1.0",
"chartjs-plugin-zoom": "^2.0.0",
"codemirror": "^6",
"codemirror-lang-liquidsoap": "^0.2.5",
"esbuild": "^0.19.8",
"hls.js": "^1.1.5",
"leaflet": "^1.7.1",
"leaflet-fullscreen": "^1.0.2",
"lodash": "^4.17.21",
"luxon": "^3",
"milliparsec": "^2.3.0",
"nprogress": "^0.2.0",
"pkg": "^5.8.1",
"qrcode": "^1.5.3",
"roboto-fontface": "^0.10.0",
"sweetalert2": "11.4.8",
@ -67,6 +61,7 @@
"@vitejs/plugin-vue": "^4.2.3",
"@vue/eslint-config-typescript": "^12",
"del": "^7",
"esbuild": "^0.19.9",
"eslint": "^8.45.0",
"eslint-plugin-vue": "^9.8.0",
"glob": "^10.2.7",

View File

@ -5,6 +5,7 @@ import {ApiNowPlaying} from "~/entities/ApiInterfaces.ts";
import {getApiUrl} from "~/router.ts";
import {useAxios} from "~/vendor/axios.ts";
import formatTime from "~/functions/formatTime.ts";
import {has} from "lodash";
export const nowPlayingProps = {
stationShortName: {
@ -40,7 +41,6 @@ interface SSEResponse {
export default function useNowPlaying(props) {
const np: ShallowRef<ApiNowPlaying> = shallowRef(NowPlaying);
const npUpdated: Ref<number> = ref(0);
const currentTime: Ref<number> = ref(Math.floor(Date.now() / 1000));
const currentTrackDuration: Ref<number> = ref(0);
@ -48,7 +48,6 @@ export default function useNowPlaying(props) {
const setNowPlaying = (np_new: ApiNowPlaying) => {
np.value = np_new;
npUpdated.value = currentTime.value;
currentTrackDuration.value = np_new.now_playing.duration ?? 0;
@ -68,41 +67,59 @@ export default function useNowPlaying(props) {
}));
}
if (props.useSse) {
const sseUri = getApiUrl(`/live/nowplaying/${props.stationShortName}`);
const nowPlayingUri = props.useStatic
? getApiUrl(`/nowplaying_static/${props.stationShortName}.json`)
: getApiUrl(`/nowplaying/${props.stationShortName}`);
const {data} = useEventSource(sseUri.value);
const timeUri = getApiUrl('/time');
const {axiosSilent} = useAxios();
const axiosNoCacheConfig = {
headers: {
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': '0',
}
};
if (props.useSse) {
const sseBaseUri = getApiUrl('/live/nowplaying/sse');
const sseUriParams = new URLSearchParams({
"cf_connect": JSON.stringify({
"subs": {
[`station:${props.stationShortName}`]: {},
"global:time": {},
}
}),
});
const sseUri = sseBaseUri.value + '?' + sseUriParams.toString();
// Make an initial AJAX request before SSE takes over.
axiosSilent.get(nowPlayingUri.value, axiosNoCacheConfig).then((response) => {
setNowPlaying(response.data);
});
axiosSilent.get(timeUri.value, axiosNoCacheConfig).then((response) => {
currentTime.value = response.data.timestamp;
});
// Subsequent events come from SSE.
const {data} = useEventSource(sseUri);
watch(data, (dataRaw: string) => {
const jsonData: SSEResponse = JSON.parse(dataRaw);
const jsonDataNp = jsonData?.pub?.data ?? {};
if (jsonData.type === 'time') {
currentTime.value = jsonData.payload.timestamp;
} else if (jsonData.type === 'nowplaying') {
if (npUpdated.value === 0) {
setNowPlaying(jsonData.payload.np);
} else {
setTimeout(() => {
setNowPlaying(jsonData.payload.np);
}, 3000);
}
if (has(jsonDataNp, 'np')) {
setTimeout(() => {
setNowPlaying(jsonDataNp.np);
}, 3000);
} else if (has(jsonDataNp, 'time')) {
currentTime.value = jsonDataNp.time;
}
});
} else {
const nowPlayingUri = props.useStatic
? getApiUrl(`/nowplaying_static/${props.stationShortName}`)
: getApiUrl(`/nowplaying/${props.stationShortName}`);
const timeUri = getApiUrl('/time');
const {axios} = useAxios();
const checkNowPlaying = () => {
axios.get(nowPlayingUri.value, {
headers: {
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': '0',
}
}).then((response) => {
axiosSilent.get(nowPlayingUri.value, axiosNoCacheConfig).then((response) => {
setNowPlaying(response.data);
setTimeout(checkNowPlaying, (!document.hidden) ? 15000 : 30000);
@ -112,13 +129,7 @@ export default function useNowPlaying(props) {
};
const checkTime = () => {
axios.get(timeUri.value, {
headers: {
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': '0',
}
}).then((response) => {
axiosSilent.get(timeUri.value, axiosNoCacheConfig).then((response) => {
currentTime.value = response.data.timestamp;
}).finally(() => {
setTimeout(checkTime, (!document.hidden) ? 300000 : 600000);

View File

@ -1,136 +0,0 @@
#!/usr/bin/env node
import {ApiNowPlaying} from "~/entities/ApiInterfaces.ts";
import {Channel, createChannel, createSession, Session} from "better-sse";
import {App} from '@tinyhttp/app';
import {json} from "milliparsec";
const publicPort: number = 6050;
const internalPort: number = 6055;
interface NowPlayingSubmission {
station: string,
np: ApiNowPlaying,
triggers: string[] | null
}
interface StationChannelState extends Record<string, unknown> {
timestamp: number,
lastMessage: NowPlayingSubmission
}
const unixTimestamp = (): number => Math.floor(Date.now() / 1000);
const timeChannel = createChannel();
timeChannel.on("session-registered", (session: Session) => {
session.push({
type: 'time',
payload: {
timestamp: unixTimestamp()
}
});
});
const stationChannels: Map<string, Channel<StationChannelState>> = new Map();
// Routine time ping.
setInterval(() => {
console.debug('Sending time ping...');
timeChannel.broadcast({
type: 'time',
payload: {
timestamp: unixTimestamp()
}
});
}, 15000);
// If a station hasn't posted NP updates in a specified time, close its channel and garbage-collect its sessions.
setInterval(() => {
const threshold = unixTimestamp() - 120;
for (const [key, channel] of stationChannels) {
if (channel.state.timestamp < threshold) {
channel.activeSessions.forEach((session) => {
channel.deregister(session);
});
stationChannels.delete(key);
}
}
}, 60000);
const publicServer = new App();
publicServer.get('/:station', async (req, res) => {
res.set("Access-Control-Allow-Origin", "*");
res.set("X-Accel-Buffering", "no");
const stations: string[] = req.params.station.split(',');
let anyStationsFound: boolean = false;
for (const stationId of stations) {
if (stationChannels.has(stationId)) {
anyStationsFound = true;
break;
}
}
if (!anyStationsFound) {
return res.status(404).send('Station(s) Not Found');
}
const session = await createSession(req, res, {
retry: 5000,
keepAlive: null,
});
timeChannel.register(session);
for (const stationId of stations) {
const stationChannel = stationChannels.get(stationId);
stationChannel!.register(session);
}
});
publicServer.listen(publicPort, () => {
console.debug(`Public server listening on port ${publicPort}...`);
});
const privateServer = new App();
privateServer.use(json());
privateServer.post('/', async (req, res) => {
const body: NowPlayingSubmission = req.body;
console.debug(
`NP Update received for channel ${body.station}.`
);
let channel: Channel<StationChannelState>;
if (stationChannels.has(body.station)) {
channel = stationChannels.get(body.station)!;
} else {
// Create a new channel if none exists.
channel = createChannel();
channel.on("session-registered", (session: Session) => {
session.push({
type: 'nowplaying',
payload: channel.state.lastMessage
});
});
stationChannels.set(body.station, channel);
}
channel.state.timestamp = unixTimestamp();
channel.state.lastMessage = body;
channel.broadcast({
type: 'nowplaying',
payload: body
});
return res.send('OK');
});
privateServer.listen(internalPort, () => {
console.debug(`Internal server listening on port ${internalPort}...`);
});

View File

@ -0,0 +1,68 @@
<?php
declare(strict_types=1);
namespace App\Service;
use App\Container\EnvironmentAwareTrait;
use App\Entity\Station;
use GuzzleHttp\Client;
final class Centrifugo
{
use EnvironmentAwareTrait;
public const GLOBAL_TIME_CHANNEL = 'global:time';
public function __construct(
private readonly Client $client,
) {
}
public function isSupported(): bool
{
return $this->environment->isDocker() && !$this->environment->isTesting();
}
public function sendTime(): void
{
$this->send([
'method' => 'publish',
'params' => [
'channel' => self::GLOBAL_TIME_CHANNEL,
'data' => [
'time' => time(),
],
],
]);
}
public function publishToStation(Station $station, mixed $message, array $triggers): void
{
$this->send([
'method' => 'publish',
'params' => [
'channel' => $this->getChannelName($station),
'data' => [
'np' => $message,
'triggers' => $triggers,
],
],
]);
}
private function send(array $body): void
{
$this->client->post(
'http://localhost:6025/api',
[
'json' => $body,
]
);
}
public function getChannelName(Station $station): string
{
return 'station:' . $station->getShortName();
}
}

View File

@ -1,41 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Service;
use App\Container\EnvironmentAwareTrait;
use App\Entity\Station;
use GuzzleHttp\Client;
/**
* Utility class for the High-Performance Now-Playing (HPNP) library.
*/
final class HpNp
{
use EnvironmentAwareTrait;
public function __construct(
private readonly Client $client,
) {
}
public function isSupported(): bool
{
return $this->environment->isDocker() && !$this->environment->isTesting();
}
public function publishToStation(Station $station, mixed $message, array $triggers = []): void
{
$this->client->post(
'http://localhost:6055',
[
'json' => [
'station' => $station->getShortName(),
'np' => $message,
'triggers' => $triggers,
],
]
);
}
}

View File

@ -20,7 +20,7 @@ final class ServiceControl
public function __construct(
private readonly SupervisorInterface $supervisor,
private readonly HpNp $hpNp
private readonly Centrifugo $centrifugo
) {
}
@ -85,12 +85,12 @@ final class ServiceControl
'php-worker' => __('PHP queue processing worker'),
'redis' => __('Cache'),
'sftpgo' => __('SFTP service'),
'hpnp' => __('High-Performance Now Playing updates'),
'centrifugo' => __('Live Now Playing updates'),
'vite' => __('Frontend Assets'),
];
if (!$this->hpNp->isSupported()) {
unset($services['hpnp']);
if (!$this->centrifugo->isSupported()) {
unset($services['centrifugo']);
}
if (!$this->environment->useLocalDatabase()) {

View File

@ -0,0 +1,29 @@
<?php
declare(strict_types=1);
namespace App\Sync\Task;
use App\Service\Centrifugo;
final class SendTimeOnSocketTask extends AbstractTask
{
public function __construct(
private readonly Centrifugo $centrifugo,
) {
}
public static function getSchedulePattern(): string
{
return self::SCHEDULE_EVERY_MINUTE;
}
public function run(bool $force = false): void
{
if (!$this->centrifugo->isSupported()) {
return;
}
$this->centrifugo->sendTime();
}
}

View File

@ -5,12 +5,12 @@ declare(strict_types=1);
namespace App\VueComponent;
use App\Http\ServerRequest;
use App\Service\HpNp;
use App\Service\Centrifugo;
final class NowPlayingComponent implements VueComponentInterface
{
public function __construct(
private readonly HpNp $hpNp
private readonly Centrifugo $centrifugo
) {
}
@ -38,7 +38,7 @@ final class NowPlayingComponent implements VueComponentInterface
return [
'stationShortName' => $station->getShortName(),
'useStatic' => $customization->useStaticNowPlaying(),
'useSse' => $customization->useStaticNowPlaying() && $this->hpNp->isSupported(),
'useSse' => $customization->useStaticNowPlaying() && $this->centrifugo->isSupported(),
];
}
}

View File

@ -8,7 +8,7 @@ use App\Container\EnvironmentAwareTrait;
use App\Container\LoggerAwareTrait;
use App\Entity\Api\NowPlaying\NowPlaying;
use App\Entity\Station;
use App\Service\HpNp;
use App\Service\Centrifugo;
use Symfony\Component\Filesystem\Filesystem;
use const JSON_PRETTY_PRINT;
@ -21,7 +21,7 @@ final class LocalWebhookHandler
public const NAME = 'local';
public function __construct(
private readonly HpNp $hpNp
private readonly Centrifugo $centrifugo
) {
}
@ -69,8 +69,8 @@ final class LocalWebhookHandler
);
// Publish to websocket library
if ($this->hpNp->isSupported()) {
$this->hpNp->publishToStation($station, $np, $triggers);
if ($this->centrifugo->isSupported()) {
$this->centrifugo->publishToStation($station, $np, $triggers);
}
}
}

View File

@ -1,19 +0,0 @@
[program:hpnp]
directory=/var/azuracast/www/frontend
command=bun --hot ./src/hpnp/index.ts
user=azuracast
priority=700
numprocs=1
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/var/azuracast/www_tmp/service_hpnp.log
stdout_logfile_maxbytes=5MB
stdout_logfile_backups=5
redirect_stderr=true
stdout_events_enabled = true
stderr_events_enabled = true

View File

@ -1,8 +0,0 @@
#!/bin/bash
set -e
set -x
curl -fsSL https://bun.sh/install | gosu azuracast bash
ln -s /var/azuracast/.bun/bin/bun /usr/local/bin/bun
ln -s /var/azuracast/.bun/bin/bunx /usr/local/bin/bunx

View File

@ -0,0 +1,34 @@
allow_anonymous_connect_without_token: true
api_insecure: true
admin: true
admin_insecure: true
port: 6020
internal_port: 6025
websocket_disable: true
uni_websocket: true
uni_sse: true
uni_http_stream: true
allowed_origins:
- "*"
namespaces:
- name: "station"
history_size: 1
history_ttl: "30s"
allow_subscribe_for_client: true
allow_subscribe_for_anonymous: true
allow_history_for_client: true
allow_history_for_anonymous: true
- name: "global"
history_size: 0
allow_subscribe_for_client: true
allow_subscribe_for_anonymous: true
allow_history_for_client: true
allow_history_for_anonymous: true
{{if isTrue .Env.ENABLE_REDIS }}
engine: "redis"
redis_address: "{{ .Env.REDIS_HOST }}:{{ default .Env.REDIS_PORT "6379" }}"
redis_db: 0
{{end}}

View File

@ -6,8 +6,8 @@ upstream php-fpm-www {
server unix:/var/run/php-fpm-www.sock;
}
upstream hpnp {
server 127.0.0.1:6050;
upstream centrifugo {
server 127.0.0.1:6020;
}
{{if eq .Env.APPLICATION_ENV "development"}}
@ -95,10 +95,10 @@ server {
try_files $uri =404;
}
# SSE Now Playing Updates
location ~ ^/api/live/nowplaying/(.*)$ {
# Websocket/SSE Now Playing Updates
location ~ ^/api/live/nowplaying/(\w+)$ {
include proxy_params;
proxy_pass http://hpnp/$1?$args;
proxy_pass http://centrifugo/connection/uni_$1?$args;
}
# Default clean URL routing

View File

@ -1,6 +1,6 @@
[program:hpnp]
directory=/var/azuracast
command=hpnp
[program:centrifugo]
directory=/var/azuracast/centrifugo
command=centrifugo -c /var/azuracast/centrifugo/config.yaml
user=azuracast
priority=700
numprocs=1
@ -10,7 +10,7 @@ autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/var/azuracast/www_tmp/service_hpnp.log
stdout_logfile=/var/azuracast/www_tmp/service_centrifugo.log
stdout_logfile_maxbytes=5MB
stdout_logfile_backups=5
redirect_stderr=true

View File

@ -0,0 +1,6 @@
#!/bin/bash
set -e
set -x
mkdir -p /var/azuracast/centrifugo
cp /bd_build/web/centrifugo/config.yaml.tmpl /var/azuracast/centrifugo/config.yaml.tmpl

View File

@ -0,0 +1,6 @@
#!/bin/bash
ENABLE_REDIS=${ENABLE_REDIS:-true}
export ENABLE_REDIS
dockerize -template "/var/azuracast/centrifugo/config.yaml.tmpl:/var/azuracast/centrifugo/config.yaml"