/* This file is part of Strawberry. Copyright 2011, David Sansome Strawberry is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Strawberry is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Strawberry. If not, see . */ #ifndef WORKERPOOL_H #define WORKERPOOL_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "core/logging.h" class QLocalSocket; // Base class containing signals and slots - required because moc doesn't do templated objects. class _WorkerPoolBase : public QObject { Q_OBJECT public: _WorkerPoolBase(QObject *parent = nullptr); signals: // Emitted when a worker failed to start. This usually happens when the worker wasn't found, or couldn't be executed. void WorkerFailedToStart(); protected slots: virtual void DoStart() {} virtual void NewConnection() {} virtual void ProcessError(QProcess::ProcessError) {} virtual void SendQueuedMessages() {} }; // Manages a pool of one or more external processes. // A local socket server is started for each process, and the address is passed to the process as argv[1]. // The process is expected to connect back to the socket server, and when it does a HandlerType is created for it. // Instances of HandlerType are created in the WorkerPool's thread. template class WorkerPool : public _WorkerPoolBase { public: WorkerPool(QObject *parent = nullptr); ~WorkerPool(); typedef typename HandlerType::MessageType MessageType; typedef typename HandlerType::ReplyType ReplyType; // Sets the name of the worker executable. This is looked for first in the current directory, and then in $PATH. // You must call this before calling Start(). void SetExecutableName(const QString &executable_name); // Sets the number of worker process to use. Defaults to 1 <= (processors / 2) <= 2. void SetWorkerCount(int count); // Sets the prefix to use for the local server (on unix this is a named pipe in /tmp). // Defaults to QApplication::applicationName(). // A random number is appended to this name when creating each server. void SetLocalServerName(const QString &local_server_name); // Starts all workers. void Start(); // Fills in the message's "id" field and creates a reply future. // The message is queued and the WorkerPool's thread will send it to the next available worker. // Can be called from any thread. ReplyType *SendMessageWithReply(MessageType *message); protected: // These are all reimplemented slots, they are called on the WorkerPool's thread. void DoStart(); void NewConnection(); void ProcessError(QProcess::ProcessError error); void SendQueuedMessages(); private: struct Worker { Worker() : local_server_(nullptr), local_socket_(nullptr), process_(nullptr), handler_(nullptr) {} QLocalServer *local_server_; QLocalSocket *local_socket_; QProcess *process_; HandlerType *handler_; }; // Must only ever be called on my thread. void StartOneWorker(Worker *worker); template Worker *FindWorker(T Worker::*member, T value) { for (typename QList::iterator it = workers_.begin() ; it != workers_.end() ; ++it) { if ((*it).*member == value) { return &(*it); } } return nullptr; } template void DeleteQObjectPointerLater(T **p) { if (*p) { (*p)->deleteLater(); *p = nullptr; } } // Creates a new reply future for the request with the next sequential ID, // and sets the request's ID to the ID of the reply. Can be called from any thread ReplyType *NewReply(MessageType *message); // Returns the next handler, or nullptr if there isn't one. Must be called from my thread. HandlerType *NextHandler() const; private: QString local_server_name_; QString executable_name_; QString executable_path_; int worker_count_; mutable int next_worker_; QList workers_; QAtomicInt next_id_; QMutex message_queue_mutex_; QQueue message_queue_; }; template WorkerPool::WorkerPool(QObject *parent) : _WorkerPoolBase(parent), next_worker_(0), next_id_(0) { worker_count_ = qBound(1, QThread::idealThreadCount() / 2, 4); local_server_name_ = qApp->applicationName().toLower(); if (local_server_name_.isEmpty()) local_server_name_ = "workerpool"; } template WorkerPool::~WorkerPool() { for (const Worker &worker : workers_) { if (worker.local_socket_ && worker.process_) { disconnect(worker.process_, SIGNAL(error(QProcess::ProcessError)), this, SLOT(ProcessError(QProcess::ProcessError))); // The worker is connected. Close his socket and wait for him to exit. qLog(Debug) << "Closing worker socket"; worker.local_socket_->close(); worker.process_->waitForFinished(500); } if (worker.process_ && worker.process_->state() == QProcess::Running) { // The worker is still running - kill it. qLog(Debug) << "Killing worker process"; worker.process_->terminate(); if (!worker.process_->waitForFinished(500)) { worker.process_->kill(); } } } for (ReplyType *reply : message_queue_) { reply->Abort(); } } template void WorkerPool::SetWorkerCount(int count) { Q_ASSERT(workers_.isEmpty()); worker_count_ = count; } template void WorkerPool::SetLocalServerName(const QString &local_server_name) { Q_ASSERT(workers_.isEmpty()); local_server_name_ = local_server_name; } template void WorkerPool::SetExecutableName(const QString &executable_name) { Q_ASSERT(workers_.isEmpty()); executable_name_ = executable_name; } template void WorkerPool::Start() { metaObject()->invokeMethod(this, "DoStart"); } template void WorkerPool::DoStart() { Q_ASSERT(workers_.isEmpty()); Q_ASSERT(!executable_name_.isEmpty()); Q_ASSERT(QThread::currentThread() == thread()); // Find the executable if we can, default to searching $PATH executable_path_ = executable_name_; QStringList search_path; search_path << qApp->applicationDirPath(); #if defined(Q_OS_MACOS) && defined(USE_BUNDLE) search_path << qApp->applicationDirPath() + "/" + USE_BUNDLE_DIR; #endif for (const QString &path_prefix : search_path) { const QString executable_path = path_prefix + "/" + executable_name_; if (QFile::exists(executable_path)) { executable_path_ = executable_path; break; } } // Start all the workers for (int i = 0; i < worker_count_; ++i) { Worker worker; StartOneWorker(&worker); workers_ << worker; } } template void WorkerPool::StartOneWorker(Worker *worker) { Q_ASSERT(QThread::currentThread() == thread()); DeleteQObjectPointerLater(&worker->local_server_); DeleteQObjectPointerLater(&worker->local_socket_); DeleteQObjectPointerLater(&worker->process_); DeleteQObjectPointerLater(&worker->handler_); worker->local_server_ = new QLocalServer(this); worker->process_ = new QProcess(this); connect(worker->local_server_, SIGNAL(newConnection()), SLOT(NewConnection())); connect(worker->process_, SIGNAL(error(QProcess::ProcessError)), SLOT(ProcessError(QProcess::ProcessError))); // Create a server, find an unused name and start listening forever { const int unique_number = qrand() ^ ((int)(quint64(this) & 0xFFFFFFFF)); const QString name = QString("%1_%2").arg(local_server_name_).arg(unique_number); if (worker->local_server_->listen(name)) { break; } } qLog(Debug) << "Starting worker" << worker << executable_path_ << worker->local_server_->fullServerName(); // Start the process worker->process_->setProcessChannelMode(QProcess::ForwardedChannels); worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName()); } template void WorkerPool::NewConnection() { Q_ASSERT(QThread::currentThread() == thread()); QLocalServer *server = qobject_cast(sender()); // Find the worker with this server. Worker *worker = FindWorker(&Worker::local_server_, server); if (!worker) return; qLog(Debug) << "Worker" << worker << "connected to" << server->fullServerName(); // Accept the connection. worker->local_socket_ = server->nextPendingConnection(); // We only ever accept one connection per worker, so destroy the server now. worker->local_socket_->setParent(this); worker->local_server_->deleteLater(); worker->local_server_ = nullptr; // Create the handler. worker->handler_ = new HandlerType(worker->local_socket_, this); SendQueuedMessages(); } template void WorkerPool::ProcessError(QProcess::ProcessError error) { Q_ASSERT(QThread::currentThread() == thread()); QProcess *process = qobject_cast(sender()); // Find the worker with this process. Worker *worker = FindWorker(&Worker::process_, process); if (!worker) return; switch (error) { case QProcess::FailedToStart: // Failed to start errors are bad - it usually means the worker isn't installed. // Don't restart the process, but tell our owner, who will probably want to do something fatal. qLog(Error) << "Worker failed to start"; emit WorkerFailedToStart(); break; default: // On any other error we just restart the process. qLog(Debug) << "Worker" << worker << "failed with error" << error << "- restarting"; StartOneWorker(worker); break; } } template typename WorkerPool::ReplyType* WorkerPool::NewReply(MessageType *message) { const int id = next_id_.fetchAndAddOrdered(1); message->set_id(id); return new ReplyType(*message); } template typename WorkerPool::ReplyType* WorkerPool::SendMessageWithReply(MessageType *message) { ReplyType *reply = NewReply(message); // Add the pending reply to the queue { QMutexLocker l(&message_queue_mutex_); message_queue_.enqueue(reply); } // Wake up the main thread metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection); return reply; } template void WorkerPool::SendQueuedMessages() { QMutexLocker l(&message_queue_mutex_); while (!message_queue_.isEmpty()) { ReplyType *reply = message_queue_.dequeue(); // Find a worker for this message HandlerType *handler = NextHandler(); if (!handler) { // No available handlers - put the message on the front of the queue. message_queue_.prepend(reply); qLog(Debug) << "No available handlers to process request"; break; } handler->SendRequest(reply); } } template HandlerType *WorkerPool::NextHandler() const { for (int i = 0; i < workers_.count(); ++i) { const int worker_index = (next_worker_ + i) % workers_.count(); if (workers_[worker_index].handler_ && !workers_[worker_index].handler_->is_device_closed()) { next_worker_ = (worker_index + 1) % workers_.count(); return workers_[worker_index].handler_; } } return nullptr; } #endif // WORKERPOOL_H