diff --git a/ext/libstrawberry-common/core/messagehandler.cpp b/ext/libstrawberry-common/core/messagehandler.cpp index 52e2ab1f..129a41e7 100644 --- a/ext/libstrawberry-common/core/messagehandler.cpp +++ b/ext/libstrawberry-common/core/messagehandler.cpp @@ -107,10 +107,10 @@ void _MessageHandlerBase::WriteMessage(const QByteArray &data) { else if (flush_local_socket_) { ((qobject_cast(device_))->*(flush_local_socket_))(); } + } void _MessageHandlerBase::DeviceClosed() { is_device_closed_ = true; AbortAll(); } - diff --git a/ext/libstrawberry-common/core/messagehandler.h b/ext/libstrawberry-common/core/messagehandler.h index 67df9a71..e482a67b 100644 --- a/ext/libstrawberry-common/core/messagehandler.h +++ b/ext/libstrawberry-common/core/messagehandler.h @@ -43,7 +43,7 @@ class QIODevice; class _MessageHandlerBase : public QObject { Q_OBJECT -public: + public: // device can be nullptr, in which case you must call SetDevice before writing any messages. _MessageHandlerBase(QIODevice *device, QObject *parent); @@ -52,16 +52,16 @@ public: // After this is true, messages cannot be sent to the handler any more. bool is_device_closed() const { return is_device_closed_; } -protected slots: + protected slots: void WriteMessage(const QByteArray &data); void DeviceReadyRead(); virtual void DeviceClosed(); -protected: + protected: virtual bool RawMessageArrived(const QByteArray &data) = 0; virtual void AbortAll() = 0; -protected: + protected: typedef bool (QAbstractSocket::*FlushAbstractSocket)(); typedef bool (QLocalSocket::*FlushLocalSocket)(); @@ -80,7 +80,7 @@ protected: // You should subclass this and implement the MessageArrived(MessageType) method. template class AbstractMessageHandler : public _MessageHandlerBase { -public: + public: AbstractMessageHandler(QIODevice *device, QObject *parent); ~AbstractMessageHandler() override { AbortAll(); } @@ -102,7 +102,7 @@ public: // Sets the "id" field of reply to the same as the request, and sends the reply on the socket. Used on the worker side. void SendReply(const MessageType &request, MessageType *reply); -protected: + protected: // Called when a message is received from the socket. virtual void MessageArrived(const MessageType &message) { Q_UNUSED(message); } @@ -110,7 +110,7 @@ protected: bool RawMessageArrived(const QByteArray &data) override; void AbortAll() override; -private: + private: QMap pending_replies_; }; @@ -146,6 +146,7 @@ void AbstractMessageHandler::SendReply(const MessageType &request, MessageTy template bool AbstractMessageHandler::RawMessageArrived(const QByteArray &data) { + MessageType message; if (!message.ParseFromArray(data.constData(), data.size())) { return false; @@ -161,14 +162,17 @@ bool AbstractMessageHandler::RawMessageArrived(const QByteArray &data) { } return true; + } template void AbstractMessageHandler::AbortAll() { + for (ReplyType *reply : pending_replies_) { reply->Abort(); } pending_replies_.clear(); + } #endif // MESSAGEHANDLER_H diff --git a/ext/libstrawberry-common/core/messagereply.h b/ext/libstrawberry-common/core/messagereply.h index 5eb71a50..abcf28d5 100644 --- a/ext/libstrawberry-common/core/messagereply.h +++ b/ext/libstrawberry-common/core/messagereply.h @@ -57,29 +57,27 @@ class _MessageReplyBase : public QObject { template class MessageReply : public _MessageReplyBase { public: - explicit MessageReply(const MessageType& request_message, QObject *parent = nullptr); + explicit MessageReply(const MessageType &request_message, QObject *parent = nullptr); int id() const override { return request_message_.id(); } - const MessageType& request_message() const { return request_message_; } - const MessageType& message() const { return reply_message_; } + const MessageType &request_message() const { return request_message_; } + const MessageType &message() const { return reply_message_; } - void SetReply(const MessageType& message); + void SetReply(const MessageType &message); -private: + private: MessageType request_message_; MessageType reply_message_; }; template -MessageReply::MessageReply(const MessageType& request_message, QObject *parent) - : _MessageReplyBase(parent) -{ +MessageReply::MessageReply(const MessageType &request_message, QObject *parent) : _MessageReplyBase(parent) { request_message_.MergeFrom(request_message); } template -void MessageReply::SetReply(const MessageType& message) { +void MessageReply::SetReply(const MessageType &message) { Q_ASSERT(!finished_); diff --git a/ext/libstrawberry-common/core/workerpool.h b/ext/libstrawberry-common/core/workerpool.h index 8c3be8a7..9bf3dba0 100644 --- a/ext/libstrawberry-common/core/workerpool.h +++ b/ext/libstrawberry-common/core/workerpool.h @@ -48,11 +48,11 @@ class _WorkerPoolBase : public QObject { public: explicit _WorkerPoolBase(QObject *parent = nullptr); -signals: + signals: // Emitted when a worker failed to start. This usually happens when the worker wasn't found, or couldn't be executed. void WorkerFailedToStart(); -protected slots: + protected slots: virtual void DoStart() {} virtual void NewConnection() {} virtual void ProcessError(QProcess::ProcessError) {} @@ -78,7 +78,7 @@ class WorkerPool : public _WorkerPoolBase { void SetExecutableName(const QString &executable_name); // Sets the number of worker process to use. Defaults to 1 <= (processors / 2) <= 2. - void SetWorkerCount(int count); + void SetWorkerCount(const int count); // Sets the prefix to use for the local server (on unix this is a named pipe in /tmp). // Defaults to QApplication::applicationName(). @@ -93,14 +93,14 @@ class WorkerPool : public _WorkerPoolBase { // Can be called from any thread. ReplyType *SendMessageWithReply(MessageType *message); -protected: + protected: // These are all reimplemented slots, they are called on the WorkerPool's thread. void DoStart() override; void NewConnection() override; void ProcessError(QProcess::ProcessError error) override; void SendQueuedMessages() override; -private: + private: struct Worker { Worker() : local_server_(nullptr), local_socket_(nullptr), process_(nullptr), handler_(nullptr) {} @@ -138,7 +138,7 @@ private: // Returns the next handler, or nullptr if there isn't one. Must be called from my thread. HandlerType *NextHandler() const; -private: + private: QString local_server_name_; QString executable_name_; QString executable_path_; @@ -170,6 +170,7 @@ WorkerPool::WorkerPool(QObject *parent) template WorkerPool::~WorkerPool() { + for (const Worker &worker : workers_) { if (worker.local_socket_ && worker.process_) { disconnect(worker.process_, SIGNAL(errorOccurred(QProcess::ProcessError)), this, SLOT(ProcessError(QProcess::ProcessError))); @@ -193,10 +194,11 @@ WorkerPool::~WorkerPool() { for (ReplyType *reply : message_queue_) { reply->Abort(); } + } template -void WorkerPool::SetWorkerCount(int count) { +void WorkerPool::SetWorkerCount(const int count) { Q_ASSERT(workers_.isEmpty()); worker_count_ = count; } @@ -220,6 +222,7 @@ void WorkerPool::Start() { template void WorkerPool::DoStart() { + Q_ASSERT(workers_.isEmpty()); Q_ASSERT(!executable_name_.isEmpty()); Q_ASSERT(QThread::currentThread() == thread()); @@ -248,10 +251,12 @@ void WorkerPool::DoStart() { workers_ << worker; } + } template void WorkerPool::StartOneWorker(Worker *worker) { + Q_ASSERT(QThread::currentThread() == thread()); DeleteQObjectPointerLater(&worker->local_server_); @@ -284,6 +289,7 @@ void WorkerPool::StartOneWorker(Worker *worker) { // Start the process worker->process_->setProcessChannelMode(QProcess::ForwardedChannels); worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName()); + } template @@ -338,20 +344,24 @@ void WorkerPool::ProcessError(QProcess::ProcessError error) { StartOneWorker(worker); break; } + } template typename WorkerPool::ReplyType* WorkerPool::NewReply(MessageType *message) { + const int id = next_id_.fetchAndAddOrdered(1); message->set_id(id); return new ReplyType(*message); + } template typename WorkerPool::ReplyType* WorkerPool::SendMessageWithReply(MessageType *message) { + ReplyType *reply = NewReply(message); // Add the pending reply to the queue @@ -364,10 +374,12 @@ WorkerPool::SendMessageWithReply(MessageType *message) { metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection); return reply; + } template void WorkerPool::SendQueuedMessages() { + QMutexLocker l(&message_queue_mutex_); while (!message_queue_.isEmpty()) { @@ -384,10 +396,12 @@ void WorkerPool::SendQueuedMessages() { handler->SendRequest(reply); } + } template HandlerType *WorkerPool::NextHandler() const { + for (int i = 0; i < workers_.count(); ++i) { const int worker_index = (next_worker_ + i) % workers_.count(); @@ -398,6 +412,7 @@ HandlerType *WorkerPool::NextHandler() const { } return nullptr; + } #endif // WORKERPOOL_H diff --git a/ext/strawberry-tagreader/tagreaderworker.cpp b/ext/strawberry-tagreader/tagreaderworker.cpp index 55536198..d348f7b6 100644 --- a/ext/strawberry-tagreader/tagreaderworker.cpp +++ b/ext/strawberry-tagreader/tagreaderworker.cpp @@ -27,11 +27,9 @@ #include "tagreaderworker.h" TagReaderWorker::TagReaderWorker(QIODevice *socket, QObject *parent) - : AbstractMessageHandler(socket, parent) -{ -} + : AbstractMessageHandler(socket, parent) {} -void TagReaderWorker::MessageArrived(const pb::tagreader::Message& message) { +void TagReaderWorker::MessageArrived(const pb::tagreader::Message &message) { pb::tagreader::Message reply; @@ -51,6 +49,7 @@ void TagReaderWorker::MessageArrived(const pb::tagreader::Message& message) { } SendReply(message, &reply); + } diff --git a/ext/strawberry-tagreader/tagreaderworker.h b/ext/strawberry-tagreader/tagreaderworker.h index e7063aa5..1e75b0ff 100644 --- a/ext/strawberry-tagreader/tagreaderworker.h +++ b/ext/strawberry-tagreader/tagreaderworker.h @@ -29,14 +29,14 @@ class QIODevice; class TagReaderWorker : public AbstractMessageHandler { -public: + public: explicit TagReaderWorker(QIODevice *socket, QObject *parent = nullptr); -protected: + protected: void MessageArrived(const pb::tagreader::Message &message) override; void DeviceClosed() override; -private: + private: TagReader tag_reader_; };