strawberry-audio-player-win.../ext/libstrawberry-common/core/workerpool.h

466 lines
14 KiB
C++

/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Copyright 2018-2021, Jonas Kvinge <jonas@jkvinge.net>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef WORKERPOOL_H
#define WORKERPOOL_H
#include "config.h"
#include <cstdio>
#include <cstddef>
#include <QtGlobal>
#include <QObject>
#include <QCoreApplication>
#include <QThread>
#include <QMutex>
#include <QLocalServer>
#include <QProcess>
#include <QDir>
#include <QFile>
#include <QList>
#include <QQueue>
#include <QString>
#include <QStringList>
#include <QAtomicInt>
#include <QRandomGenerator>
#include "core/logging.h"
class QLocalSocket;
// Base class containing signals and slots - required because moc doesn't do templated objects.
class _WorkerPoolBase : public QObject {
Q_OBJECT
public:
explicit _WorkerPoolBase(QObject *parent = nullptr);
signals:
// Emitted when a worker failed to start. This usually happens when the worker wasn't found, or couldn't be executed.
void WorkerFailedToStart();
protected slots:
virtual void DoStart() {}
virtual void NewConnection() {}
virtual void ProcessReadyReadStandardOutput() {}
virtual void ProcessReadyReadStandardError() {}
virtual void ProcessError(QProcess::ProcessError) {}
virtual void SendQueuedMessages() {}
};
// Manages a pool of one or more external processes.
// A local socket server is started for each process, and the address is passed to the process as argv[1].
// The process is expected to connect back to the socket server, and when it does a HandlerType is created for it.
// Instances of HandlerType are created in the WorkerPool's thread.
template<typename HandlerType>
class WorkerPool : public _WorkerPoolBase {
public:
explicit WorkerPool(QObject *parent = nullptr);
~WorkerPool() override;
using MessageType = typename HandlerType::MessageType;
using ReplyType = typename HandlerType::ReplyType;
// Sets the name of the worker executable. This is looked for first in the current directory, and then in $PATH.
// You must call this before calling Start().
void SetExecutableName(const QString &executable_name);
// Sets the number of worker process to use. Defaults to 1 <= (processors / 2) <= 2.
void SetWorkerCount(const int count);
// Sets the prefix to use for the local server (on unix this is a named pipe in /tmp).
// Defaults to QApplication::applicationName().
// A random number is appended to this name when creating each server.
void SetLocalServerName(const QString &local_server_name);
// Starts all workers.
void Start();
// Fills in the message's "id" field and creates a reply future.
// The message is queued and the WorkerPool's thread will send it to the next available worker.
// Can be called from any thread.
ReplyType *SendMessageWithReply(MessageType *message);
protected:
// These are all reimplemented slots, they are called on the WorkerPool's thread.
void DoStart() override;
void NewConnection() override;
void ProcessReadyReadStandardOutput() override;
void ProcessReadyReadStandardError() override;
void ProcessError(QProcess::ProcessError error) override;
void SendQueuedMessages() override;
private:
struct Worker {
Worker() : local_server_(nullptr), local_socket_(nullptr), process_(nullptr), handler_(nullptr) {}
QLocalServer *local_server_;
QLocalSocket *local_socket_;
QProcess *process_;
HandlerType *handler_;
};
// Must only ever be called on my thread.
void StartOneWorker(Worker *worker);
template<typename T>
Worker *FindWorker(T Worker::*member, T value) {
for (typename QList<Worker>::iterator it = workers_.begin(); it != workers_.end(); ++it) {
if ((*it).*member == value) {
return &(*it);
}
}
return nullptr;
}
template<typename T>
void DeleteQObjectPointerLater(T **p) {
if (*p) {
(*p)->deleteLater();
*p = nullptr;
}
}
// Creates a new reply future for the request with the next sequential ID,
// and sets the request's ID to the ID of the reply. Can be called from any thread
ReplyType *NewReply(MessageType *message);
// Returns the next handler, or nullptr if there isn't one. Must be called from my thread.
HandlerType *NextHandler() const;
private:
QString local_server_name_;
QString executable_name_;
QString executable_path_;
int worker_count_;
mutable int next_worker_;
QList<Worker> workers_;
QAtomicInt next_id_;
QMutex message_queue_mutex_;
QQueue<ReplyType *> message_queue_;
};
template<typename HandlerType>
WorkerPool<HandlerType>::WorkerPool(QObject *parent)
: _WorkerPoolBase(parent),
worker_count_(1),
next_worker_(0),
next_id_(0) {
local_server_name_ = qApp->applicationName().toLower();
if (local_server_name_.isEmpty()) {
local_server_name_ = "workerpool";
}
}
template<typename HandlerType>
WorkerPool<HandlerType>::~WorkerPool() {
for (const Worker &worker : workers_) {
if (worker.local_socket_ && worker.process_) {
QObject::disconnect(worker.process_, &QProcess::errorOccurred, this, &WorkerPool::ProcessError);
QObject::disconnect(worker.process_, &QProcess::readyReadStandardOutput, this, &WorkerPool::ProcessReadyReadStandardOutput);
QObject::disconnect(worker.process_, &QProcess::readyReadStandardError, this, &WorkerPool::ProcessReadyReadStandardError);
// The worker is connected. Close his socket and wait for him to exit.
qLog(Debug) << "Closing worker socket";
worker.local_socket_->close();
worker.process_->waitForFinished(500);
}
if (worker.process_ && worker.process_->state() == QProcess::Running) {
// The worker is still running - kill it.
qLog(Debug) << "Killing worker process";
worker.process_->terminate();
if (!worker.process_->waitForFinished(500)) {
worker.process_->kill();
}
}
}
for (ReplyType *reply : message_queue_) {
reply->Abort();
}
}
template<typename HandlerType>
void WorkerPool<HandlerType>::SetWorkerCount(const int count) {
Q_ASSERT(workers_.isEmpty());
worker_count_ = count;
}
template<typename HandlerType>
void WorkerPool<HandlerType>::SetLocalServerName(const QString &local_server_name) {
Q_ASSERT(workers_.isEmpty());
local_server_name_ = local_server_name;
}
template<typename HandlerType>
void WorkerPool<HandlerType>::SetExecutableName(const QString &executable_name) {
Q_ASSERT(workers_.isEmpty());
executable_name_ = executable_name;
}
template<typename HandlerType>
void WorkerPool<HandlerType>::Start() {
QMetaObject::invokeMethod(this, &WorkerPool<HandlerType>::DoStart);
}
template<typename HandlerType>
void WorkerPool<HandlerType>::DoStart() {
Q_ASSERT(workers_.isEmpty());
Q_ASSERT(!executable_name_.isEmpty());
Q_ASSERT(QThread::currentThread() == thread());
// Find the executable if we can, default to searching $PATH
executable_path_ = executable_name_;
QStringList search_path;
search_path << QCoreApplication::applicationDirPath();
#if defined(Q_OS_UNIX)
search_path << "/usr/libexec";
search_path << "/usr/local/libexec";
#endif
#if defined(Q_OS_MACOS)
search_path << QDir::cleanPath(QCoreApplication::applicationDirPath() + "/../PlugIns");
#endif
for (const QString &path_prefix : search_path) {
const QString executable_path = path_prefix + "/" + executable_name_;
if (QFile::exists(executable_path)) {
executable_path_ = executable_path;
qLog(Debug) << "Using worker" << executable_name_ << "from" << path_prefix;
break;
}
}
if (executable_path_ == executable_name_) {
qLog(Debug) << "Using worker" << executable_name_;
}
// Start all the workers
for (int i = 0; i < worker_count_; ++i) {
Worker worker;
StartOneWorker(&worker);
workers_ << worker;
}
}
template<typename HandlerType>
void WorkerPool<HandlerType>::StartOneWorker(Worker *worker) {
Q_ASSERT(QThread::currentThread() == thread());
DeleteQObjectPointerLater(&worker->local_server_);
DeleteQObjectPointerLater(&worker->local_socket_);
DeleteQObjectPointerLater(&worker->process_);
DeleteQObjectPointerLater(&worker->handler_);
worker->local_server_ = new QLocalServer(this);
worker->process_ = new QProcess(this);
QObject::connect(worker->local_server_, &QLocalServer::newConnection, this, &WorkerPool::NewConnection);
QObject::connect(worker->process_, &QProcess::errorOccurred, this, &WorkerPool::ProcessError);
QObject::connect(worker->process_, &QProcess::readyReadStandardOutput, this, &WorkerPool::ProcessReadyReadStandardOutput);
QObject::connect(worker->process_, &QProcess::readyReadStandardError, this, &WorkerPool::ProcessReadyReadStandardError);
// Create a server, find an unused name and start listening
forever {
const quint32 unique_number = QRandomGenerator::global()->bounded(static_cast<quint32>(quint64(this) & 0xFFFFFFFF));
const QString name = QString("%1_%2").arg(local_server_name_).arg(unique_number);
if (worker->local_server_->listen(name)) {
break;
}
}
qLog(Debug) << "Starting worker" << worker << executable_path_ << worker->local_server_->fullServerName();
#ifdef Q_OS_WIN32
worker->process_->setProcessChannelMode(QProcess::SeparateChannels);
#else
worker->process_->setProcessChannelMode(QProcess::ForwardedChannels);
#endif
worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName());
}
template<typename HandlerType>
void WorkerPool<HandlerType>::NewConnection() {
Q_ASSERT(QThread::currentThread() == thread());
QLocalServer *server = qobject_cast<QLocalServer*>(sender());
// Find the worker with this server.
Worker *worker = FindWorker(&Worker::local_server_, server);
if (!worker) return;
qLog(Debug) << "Worker" << worker << "connected to" << server->fullServerName();
// Accept the connection.
worker->local_socket_ = server->nextPendingConnection();
// We only ever accept one connection per worker, so destroy the server now.
worker->local_socket_->setParent(this);
worker->local_server_->deleteLater();
worker->local_server_ = nullptr;
// Create the handler.
worker->handler_ = new HandlerType(worker->local_socket_, this);
SendQueuedMessages();
}
template<typename HandlerType>
void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
Q_ASSERT(QThread::currentThread() == thread());
QProcess *process = qobject_cast<QProcess*>(sender());
// Find the worker with this process.
Worker *worker = FindWorker(&Worker::process_, process);
if (!worker) return;
switch (error) {
case QProcess::FailedToStart:
// Failed to start errors are bad - it usually means the worker isn't installed.
// Don't restart the process, but tell our owner, who will probably want to do something fatal.
qLog(Error) << "Worker failed to start";
emit WorkerFailedToStart();
break;
default:
// On any other error we just restart the process.
qLog(Debug) << "Worker" << worker << "failed with error" << error << "- restarting";
StartOneWorker(worker);
break;
}
}
template<typename HandlerType>
void WorkerPool<HandlerType>::ProcessReadyReadStandardOutput() {
Q_ASSERT(QThread::currentThread() == thread());
QProcess *process = qobject_cast<QProcess*>(sender());
QByteArray data = process->readAllStandardOutput();
fprintf(stdout, "%s", data.data());
fflush(stdout);
}
template<typename HandlerType>
void WorkerPool<HandlerType>::ProcessReadyReadStandardError() {
Q_ASSERT(QThread::currentThread() == thread());
QProcess *process = qobject_cast<QProcess*>(sender());
QByteArray data = process->readAllStandardError();
fprintf(stderr, "%s", data.data());
fflush(stderr);
}
template <typename HandlerType>
typename WorkerPool<HandlerType>::ReplyType*
WorkerPool<HandlerType>::NewReply(MessageType *message) {
const int id = next_id_.fetchAndAddOrdered(1);
message->set_id(id);
return new ReplyType(*message);
}
template <typename HandlerType>
typename WorkerPool<HandlerType>::ReplyType*
WorkerPool<HandlerType>::SendMessageWithReply(MessageType *message) {
ReplyType *reply = NewReply(message);
// Add the pending reply to the queue
{
QMutexLocker l(&message_queue_mutex_);
message_queue_.enqueue(reply);
}
// Wake up the main thread
QMetaObject::invokeMethod(this, &WorkerPool<HandlerType>::SendQueuedMessages, Qt::QueuedConnection);
return reply;
}
template<typename HandlerType>
void WorkerPool<HandlerType>::SendQueuedMessages() {
QMutexLocker l(&message_queue_mutex_);
while (!message_queue_.isEmpty()) {
ReplyType *reply = message_queue_.dequeue();
// Find a worker for this message
HandlerType *handler = NextHandler();
if (!handler) {
// No available handlers - put the message on the front of the queue.
message_queue_.prepend(reply);
qLog(Debug) << "No available handlers to process request";
break;
}
handler->SendRequest(reply);
}
}
template<typename HandlerType>
HandlerType *WorkerPool<HandlerType>::NextHandler() const {
for (int i = 0; i < workers_.count(); ++i) {
const int worker_index = (next_worker_ + i) % workers_.count();
if (workers_[worker_index].handler_ && !workers_[worker_index].handler_->is_device_closed()) {
next_worker_ = (worker_index + 1) % workers_.count();
return workers_[worker_index].handler_;
}
}
return nullptr;
}
#endif // WORKERPOOL_H