mirror of
https://github.com/clementine-player/Clementine
synced 2025-01-29 18:49:51 +01:00
Queue messages in WorkerPool instead of in each individual handler, so the entire queue isn't lost if a worker crashes.
This commit is contained in:
parent
84a1ae2f87
commit
58c0f464cd
@ -8,6 +8,7 @@ set(SOURCES
|
|||||||
core/encoding.cpp
|
core/encoding.cpp
|
||||||
core/logging.cpp
|
core/logging.cpp
|
||||||
core/messagehandler.cpp
|
core/messagehandler.cpp
|
||||||
|
core/messagereply.cpp
|
||||||
core/waitforsignal.cpp
|
core/waitforsignal.cpp
|
||||||
core/workerpool.cpp
|
core/workerpool.cpp
|
||||||
)
|
)
|
||||||
@ -15,6 +16,7 @@ set(SOURCES
|
|||||||
set(HEADERS
|
set(HEADERS
|
||||||
core/closure.h
|
core/closure.h
|
||||||
core/messagehandler.h
|
core/messagehandler.h
|
||||||
|
core/messagereply.h
|
||||||
core/workerpool.h
|
core/workerpool.h
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -100,24 +100,4 @@ void _MessageHandlerBase::WriteMessage(const QByteArray& data) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_MessageReplyBase::_MessageReplyBase(int id, QObject* parent)
|
|
||||||
: QObject(parent),
|
|
||||||
id_(id),
|
|
||||||
finished_(false),
|
|
||||||
success_(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
bool _MessageReplyBase::WaitForFinished() {
|
|
||||||
semaphore_.acquire();
|
|
||||||
return success_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void _MessageReplyBase::Abort() {
|
|
||||||
Q_ASSERT(!finished_);
|
|
||||||
finished_ = true;
|
|
||||||
success_ = false;
|
|
||||||
|
|
||||||
emit Finished(success_);
|
|
||||||
semaphore_.release();
|
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,9 @@
|
|||||||
#include <QSemaphore>
|
#include <QSemaphore>
|
||||||
#include <QThread>
|
#include <QThread>
|
||||||
|
|
||||||
|
#include "core/logging.h"
|
||||||
|
#include "core/messagereply.h"
|
||||||
|
|
||||||
class QAbstractSocket;
|
class QAbstractSocket;
|
||||||
class QIODevice;
|
class QIODevice;
|
||||||
class QLocalSocket;
|
class QLocalSocket;
|
||||||
@ -40,54 +43,6 @@ class QLocalSocket;
|
|||||||
x.toUtf8().constData(), x.toUtf8().length()
|
x.toUtf8().constData(), x.toUtf8().length()
|
||||||
|
|
||||||
|
|
||||||
// Base QObject for a reply future class that is returned immediately for
|
|
||||||
// requests that will occur in the background. Similar to QNetworkReply.
|
|
||||||
// Use MessageReply instead.
|
|
||||||
class _MessageReplyBase : public QObject {
|
|
||||||
Q_OBJECT
|
|
||||||
|
|
||||||
public:
|
|
||||||
_MessageReplyBase(int id, QObject* parent = 0);
|
|
||||||
|
|
||||||
int id() const { return id_; }
|
|
||||||
bool is_finished() const { return finished_; }
|
|
||||||
bool is_successful() const { return success_; }
|
|
||||||
|
|
||||||
// Waits for the reply to finish by waiting on a semaphore. Never call this
|
|
||||||
// from the MessageHandler's thread or it will block forever.
|
|
||||||
// Returns true if the call was successful.
|
|
||||||
bool WaitForFinished();
|
|
||||||
|
|
||||||
void Abort();
|
|
||||||
|
|
||||||
signals:
|
|
||||||
void Finished(bool success);
|
|
||||||
|
|
||||||
protected:
|
|
||||||
int id_;
|
|
||||||
bool finished_;
|
|
||||||
bool success_;
|
|
||||||
|
|
||||||
QSemaphore semaphore_;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
// A reply future class that is returned immediately for requests that will
|
|
||||||
// occur in the background. Similar to QNetworkReply.
|
|
||||||
template <typename MessageType>
|
|
||||||
class MessageReply : public _MessageReplyBase {
|
|
||||||
public:
|
|
||||||
MessageReply(int id, QObject* parent = 0);
|
|
||||||
|
|
||||||
const MessageType& message() const { return message_; }
|
|
||||||
|
|
||||||
void SetReply(const MessageType& message);
|
|
||||||
|
|
||||||
private:
|
|
||||||
MessageType message_;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
// Reads and writes uint32 length encoded protobufs to a socket.
|
// Reads and writes uint32 length encoded protobufs to a socket.
|
||||||
// This base QObject is separate from AbstractMessageHandler because moc can't
|
// This base QObject is separate from AbstractMessageHandler because moc can't
|
||||||
// handle templated classes. Use AbstractMessageHandler instead.
|
// handle templated classes. Use AbstractMessageHandler instead.
|
||||||
@ -126,12 +81,13 @@ protected:
|
|||||||
// Reads and writes uint32 length encoded MessageType messages to a socket.
|
// Reads and writes uint32 length encoded MessageType messages to a socket.
|
||||||
// You should subclass this and implement the MessageArrived(MessageType)
|
// You should subclass this and implement the MessageArrived(MessageType)
|
||||||
// method.
|
// method.
|
||||||
template <typename MessageType>
|
template <typename MT>
|
||||||
class AbstractMessageHandler : public _MessageHandlerBase {
|
class AbstractMessageHandler : public _MessageHandlerBase {
|
||||||
public:
|
public:
|
||||||
AbstractMessageHandler(QIODevice* device, QObject* parent);
|
AbstractMessageHandler(QIODevice* device, QObject* parent);
|
||||||
|
|
||||||
typedef MessageReply<MessageType> ReplyType;
|
typedef MT MessageType;
|
||||||
|
typedef MessageReply<MT> ReplyType;
|
||||||
|
|
||||||
// Serialises the message and writes it to the socket. This version MUST be
|
// Serialises the message and writes it to the socket. This version MUST be
|
||||||
// called from the thread in which the AbstractMessageHandler was created.
|
// called from the thread in which the AbstractMessageHandler was created.
|
||||||
@ -141,15 +97,10 @@ public:
|
|||||||
// called from any thread.
|
// called from any thread.
|
||||||
void SendMessageAsync(const MessageType& message);
|
void SendMessageAsync(const MessageType& message);
|
||||||
|
|
||||||
// Creates a new reply future for the request with the next sequential ID,
|
// Sends the request message inside and takes ownership of the MessageReply.
|
||||||
// and sets the request's ID to the ID of the reply. When a reply arrives
|
// The MessageReply's Finished() signal will be emitted when a reply arrives
|
||||||
// for this request the reply is triggered automatically and MessageArrived
|
// with the same ID. Must be called from my thread.
|
||||||
// is NOT called. Can be called from any thread.
|
void SendRequest(ReplyType* reply);
|
||||||
ReplyType* NewReply(MessageType* message);
|
|
||||||
|
|
||||||
// Same as NewReply, except the message is sent as well. Can be called from
|
|
||||||
// any thread.
|
|
||||||
ReplyType* SendMessageWithReply(MessageType* message);
|
|
||||||
|
|
||||||
// Sets the "id" field of reply to the same as the request, and sends the
|
// Sets the "id" field of reply to the same as the request, and sends the
|
||||||
// reply on the socket. Used on the worker side.
|
// reply on the socket. Used on the worker side.
|
||||||
@ -164,54 +115,53 @@ protected:
|
|||||||
void SocketClosed();
|
void SocketClosed();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QMutex mutex_;
|
|
||||||
int next_id_;
|
|
||||||
QMap<int, ReplyType*> pending_replies_;
|
QMap<int, ReplyType*> pending_replies_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
AbstractMessageHandler<MessageType>::AbstractMessageHandler(
|
AbstractMessageHandler<MT>::AbstractMessageHandler(
|
||||||
QIODevice* device, QObject* parent)
|
QIODevice* device, QObject* parent)
|
||||||
: _MessageHandlerBase(device, parent),
|
: _MessageHandlerBase(device, parent)
|
||||||
next_id_(1)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
void AbstractMessageHandler<MessageType>::SendMessage(const MessageType& message) {
|
void AbstractMessageHandler<MT>::SendMessage(const MessageType& message) {
|
||||||
Q_ASSERT(QThread::currentThread() == thread());
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
std::string data = message.SerializeAsString();
|
std::string data = message.SerializeAsString();
|
||||||
WriteMessage(QByteArray(data.data(), data.size()));
|
WriteMessage(QByteArray(data.data(), data.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
void AbstractMessageHandler<MessageType>::SendMessageAsync(const MessageType& message) {
|
void AbstractMessageHandler<MT>::SendMessageAsync(const MessageType& message) {
|
||||||
std::string data = message.SerializeAsString();
|
std::string data = message.SerializeAsString();
|
||||||
metaObject()->invokeMethod(this, "WriteMessage", Qt::QueuedConnection,
|
metaObject()->invokeMethod(this, "WriteMessage", Qt::QueuedConnection,
|
||||||
Q_ARG(QByteArray, QByteArray(data.data(), data.size())));
|
Q_ARG(QByteArray, QByteArray(data.data(), data.size())));
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
void AbstractMessageHandler<MessageType>::SendReply(const MessageType& request,
|
void AbstractMessageHandler<MT>::SendRequest(ReplyType* reply) {
|
||||||
MessageType* reply) {
|
pending_replies_[reply->id()] = reply;
|
||||||
|
SendMessage(reply->request_message());
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename MT>
|
||||||
|
void AbstractMessageHandler<MT>::SendReply(const MessageType& request,
|
||||||
|
MessageType* reply) {
|
||||||
reply->set_id(request.id());
|
reply->set_id(request.id());
|
||||||
SendMessage(*reply);
|
SendMessage(*reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
bool AbstractMessageHandler<MessageType>::RawMessageArrived(const QByteArray& data) {
|
bool AbstractMessageHandler<MT>::RawMessageArrived(const QByteArray& data) {
|
||||||
MessageType message;
|
MessageType message;
|
||||||
if (!message.ParseFromArray(data.constData(), data.size())) {
|
if (!message.ParseFromArray(data.constData(), data.size())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplyType* reply = NULL;
|
ReplyType* reply = pending_replies_.take(message.id());
|
||||||
{
|
|
||||||
QMutexLocker l(&mutex_);
|
|
||||||
reply = pending_replies_.take(message.id());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply) {
|
if (reply) {
|
||||||
// This is a reply to a message that we created earlier.
|
// This is a reply to a message that we created earlier.
|
||||||
@ -223,37 +173,8 @@ bool AbstractMessageHandler<MessageType>::RawMessageArrived(const QByteArray& da
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MT>
|
||||||
typename AbstractMessageHandler<MessageType>::ReplyType*
|
void AbstractMessageHandler<MT>::SocketClosed() {
|
||||||
AbstractMessageHandler<MessageType>::NewReply(
|
|
||||||
MessageType* message) {
|
|
||||||
ReplyType* reply = NULL;
|
|
||||||
|
|
||||||
{
|
|
||||||
QMutexLocker l(&mutex_);
|
|
||||||
|
|
||||||
const int id = next_id_ ++;
|
|
||||||
reply = new ReplyType(id);
|
|
||||||
pending_replies_[id] = reply;
|
|
||||||
}
|
|
||||||
|
|
||||||
message->set_id(reply->id());
|
|
||||||
return reply;
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename MessageType>
|
|
||||||
typename AbstractMessageHandler<MessageType>::ReplyType*
|
|
||||||
AbstractMessageHandler<MessageType>::SendMessageWithReply(
|
|
||||||
MessageType* message) {
|
|
||||||
ReplyType* reply = NewReply(message);
|
|
||||||
|
|
||||||
SendMessageAsync(*message);
|
|
||||||
|
|
||||||
return reply;
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename MessageType>
|
|
||||||
void AbstractMessageHandler<MessageType>::SocketClosed() {
|
|
||||||
QMutexLocker l(&mutex_);
|
QMutexLocker l(&mutex_);
|
||||||
|
|
||||||
foreach (ReplyType* reply, pending_replies_) {
|
foreach (ReplyType* reply, pending_replies_) {
|
||||||
@ -262,22 +183,6 @@ void AbstractMessageHandler<MessageType>::SocketClosed() {
|
|||||||
pending_replies_.clear();
|
pending_replies_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
|
||||||
MessageReply<MessageType>::MessageReply(int id, QObject* parent)
|
|
||||||
: _MessageReplyBase(id, parent)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename MessageType>
|
|
||||||
void MessageReply<MessageType>::SetReply(const MessageType& message) {
|
|
||||||
Q_ASSERT(!finished_);
|
|
||||||
|
|
||||||
message_.MergeFrom(message);
|
|
||||||
finished_ = true;
|
|
||||||
success_ = true;
|
|
||||||
|
|
||||||
emit Finished(success_);
|
|
||||||
semaphore_.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // MESSAGEHANDLER_H
|
#endif // MESSAGEHANDLER_H
|
||||||
|
42
ext/libclementine-common/core/messagereply.cpp
Normal file
42
ext/libclementine-common/core/messagereply.cpp
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
/* This file is part of Clementine.
|
||||||
|
Copyright 2011, David Sansome <me@davidsansome.com>
|
||||||
|
|
||||||
|
Clementine is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
Clementine is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with Clementine. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "messagereply.h"
|
||||||
|
|
||||||
|
_MessageReplyBase::_MessageReplyBase(QObject* parent)
|
||||||
|
: QObject(parent),
|
||||||
|
finished_(false),
|
||||||
|
success_(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _MessageReplyBase::WaitForFinished() {
|
||||||
|
qLog(Debug) << "Waiting on ID" << id();
|
||||||
|
semaphore_.acquire();
|
||||||
|
qLog(Debug) << "Acquired ID" << id();
|
||||||
|
return success_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _MessageReplyBase::Abort() {
|
||||||
|
Q_ASSERT(!finished_);
|
||||||
|
finished_ = true;
|
||||||
|
success_ = false;
|
||||||
|
|
||||||
|
emit Finished(success_);
|
||||||
|
qLog(Debug) << "Releasing ID" << id() << "(aborted)";
|
||||||
|
semaphore_.release();
|
||||||
|
}
|
97
ext/libclementine-common/core/messagereply.h
Normal file
97
ext/libclementine-common/core/messagereply.h
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
/* This file is part of Clementine.
|
||||||
|
Copyright 2011, David Sansome <me@davidsansome.com>
|
||||||
|
|
||||||
|
Clementine is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
Clementine is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with Clementine. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MESSAGEREPLY_H
|
||||||
|
#define MESSAGEREPLY_H
|
||||||
|
|
||||||
|
#include <QObject>
|
||||||
|
#include <QSemaphore>
|
||||||
|
|
||||||
|
#include "core/logging.h"
|
||||||
|
|
||||||
|
// Base QObject for a reply future class that is returned immediately for
|
||||||
|
// requests that will occur in the background. Similar to QNetworkReply.
|
||||||
|
// Use MessageReply instead.
|
||||||
|
class _MessageReplyBase : public QObject {
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
public:
|
||||||
|
_MessageReplyBase(QObject* parent = 0);
|
||||||
|
|
||||||
|
virtual int id() const = 0;
|
||||||
|
bool is_finished() const { return finished_; }
|
||||||
|
bool is_successful() const { return success_; }
|
||||||
|
|
||||||
|
// Waits for the reply to finish by waiting on a semaphore. Never call this
|
||||||
|
// from the MessageHandler's thread or it will block forever.
|
||||||
|
// Returns true if the call was successful.
|
||||||
|
bool WaitForFinished();
|
||||||
|
|
||||||
|
void Abort();
|
||||||
|
|
||||||
|
signals:
|
||||||
|
void Finished(bool success);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
bool finished_;
|
||||||
|
bool success_;
|
||||||
|
|
||||||
|
QSemaphore semaphore_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// A reply future class that is returned immediately for requests that will
|
||||||
|
// occur in the background. Similar to QNetworkReply.
|
||||||
|
template <typename MessageType>
|
||||||
|
class MessageReply : public _MessageReplyBase {
|
||||||
|
public:
|
||||||
|
MessageReply(const MessageType& request_message, QObject* parent = 0);
|
||||||
|
|
||||||
|
int id() const { return request_message_.id(); }
|
||||||
|
const MessageType& request_message() const { return request_message_; }
|
||||||
|
const MessageType& message() const { return reply_message_; }
|
||||||
|
|
||||||
|
void SetReply(const MessageType& message);
|
||||||
|
|
||||||
|
private:
|
||||||
|
MessageType request_message_;
|
||||||
|
MessageType reply_message_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template<typename MessageType>
|
||||||
|
MessageReply<MessageType>::MessageReply(const MessageType& request_message,
|
||||||
|
QObject* parent)
|
||||||
|
: _MessageReplyBase(parent)
|
||||||
|
{
|
||||||
|
request_message_.MergeFrom(request_message);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename MessageType>
|
||||||
|
void MessageReply<MessageType>::SetReply(const MessageType& message) {
|
||||||
|
Q_ASSERT(!finished_);
|
||||||
|
|
||||||
|
reply_message_.MergeFrom(message);
|
||||||
|
finished_ = true;
|
||||||
|
success_ = true;
|
||||||
|
|
||||||
|
emit Finished(success_);
|
||||||
|
qLog(Debug) << "Releasing ID" << id() << "(finished)";
|
||||||
|
semaphore_.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // MESSAGEREPLY_H
|
@ -18,17 +18,19 @@
|
|||||||
#ifndef WORKERPOOL_H
|
#ifndef WORKERPOOL_H
|
||||||
#define WORKERPOOL_H
|
#define WORKERPOOL_H
|
||||||
|
|
||||||
|
#include <QAtomicInt>
|
||||||
#include <QCoreApplication>
|
#include <QCoreApplication>
|
||||||
#include <QFile>
|
#include <QFile>
|
||||||
#include <QLocalServer>
|
#include <QLocalServer>
|
||||||
#include <QLocalSocket>
|
#include <QLocalSocket>
|
||||||
|
#include <QMutex>
|
||||||
#include <QObject>
|
#include <QObject>
|
||||||
#include <QProcess>
|
#include <QProcess>
|
||||||
|
#include <QQueue>
|
||||||
#include <QThread>
|
#include <QThread>
|
||||||
|
|
||||||
#include "core/closure.h"
|
#include "core/closure.h"
|
||||||
#include "core/logging.h"
|
#include "core/logging.h"
|
||||||
#include "core/waitforsignal.h"
|
|
||||||
|
|
||||||
|
|
||||||
// Base class containing signals and slots - required because moc doesn't do
|
// Base class containing signals and slots - required because moc doesn't do
|
||||||
@ -44,14 +46,11 @@ signals:
|
|||||||
// worker wasn't found, or couldn't be executed.
|
// worker wasn't found, or couldn't be executed.
|
||||||
void WorkerFailedToStart();
|
void WorkerFailedToStart();
|
||||||
|
|
||||||
// A worker connected and a handler was created for it. The next call to
|
|
||||||
// NextHandler() won't return NULL.
|
|
||||||
void WorkerConnected();
|
|
||||||
|
|
||||||
protected slots:
|
protected slots:
|
||||||
virtual void DoStart() {}
|
virtual void DoStart() {}
|
||||||
virtual void NewConnection() {}
|
virtual void NewConnection() {}
|
||||||
virtual void ProcessError(QProcess::ProcessError) {}
|
virtual void ProcessError(QProcess::ProcessError) {}
|
||||||
|
virtual void SendQueuedMessages() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -59,12 +58,16 @@ protected slots:
|
|||||||
// started for each process, and the address is passed to the process as
|
// started for each process, and the address is passed to the process as
|
||||||
// argv[1]. The process is expected to connect back to the socket server, and
|
// argv[1]. The process is expected to connect back to the socket server, and
|
||||||
// when it does a HandlerType is created for it.
|
// when it does a HandlerType is created for it.
|
||||||
|
// Instances of HandlerType are created in the WorkerPool's thread.
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
class WorkerPool : public _WorkerPoolBase {
|
class WorkerPool : public _WorkerPoolBase {
|
||||||
public:
|
public:
|
||||||
WorkerPool(QObject* parent = 0);
|
WorkerPool(QObject* parent = 0);
|
||||||
~WorkerPool();
|
~WorkerPool();
|
||||||
|
|
||||||
|
typedef typename HandlerType::MessageType MessageType;
|
||||||
|
typedef typename HandlerType::ReplyType ReplyType;
|
||||||
|
|
||||||
// Sets the name of the worker executable. This is looked for first in the
|
// Sets the name of the worker executable. This is looked for first in the
|
||||||
// current directory, and then in $PATH. You must call this before calling
|
// current directory, and then in $PATH. You must call this before calling
|
||||||
// Start().
|
// Start().
|
||||||
@ -82,14 +85,18 @@ public:
|
|||||||
// Starts all workers.
|
// Starts all workers.
|
||||||
void Start();
|
void Start();
|
||||||
|
|
||||||
// Returns a handler in a round-robin fashion. Will block if no handlers are
|
// Fills in the message's "id" field and creates a reply future. The message
|
||||||
// available yet.
|
// is queued and the WorkerPool's thread will send it to the next available
|
||||||
HandlerType* NextHandler();
|
// worker. Can be called from any thread.
|
||||||
|
ReplyType* SendMessageWithReply(MessageType* message);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
// These are all reimplemented slots, they are called on the WorkerPool's
|
||||||
|
// thread.
|
||||||
void DoStart();
|
void DoStart();
|
||||||
void NewConnection();
|
void NewConnection();
|
||||||
void ProcessError(QProcess::ProcessError error);
|
void ProcessError(QProcess::ProcessError error);
|
||||||
|
void SendQueuedMessages();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Worker {
|
struct Worker {
|
||||||
@ -102,6 +109,7 @@ private:
|
|||||||
HandlerType* handler_;
|
HandlerType* handler_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Must only ever be called on my thread.
|
||||||
void StartOneWorker(Worker* worker);
|
void StartOneWorker(Worker* worker);
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -123,21 +131,36 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Creates a new reply future for the request with the next sequential ID,
|
||||||
|
// and sets the request's ID to the ID of the reply. Can be called from any
|
||||||
|
// thread
|
||||||
|
ReplyType* NewReply(MessageType* message);
|
||||||
|
|
||||||
|
// Returns the next handler, or NULL if there isn't one. Must be called from
|
||||||
|
// my thread.
|
||||||
|
HandlerType* NextHandler() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QString local_server_name_;
|
QString local_server_name_;
|
||||||
QString executable_name_;
|
QString executable_name_;
|
||||||
QString executable_path_;
|
QString executable_path_;
|
||||||
|
|
||||||
int worker_count_;
|
int worker_count_;
|
||||||
int next_worker_;
|
mutable int next_worker_;
|
||||||
QList<Worker> workers_;
|
QList<Worker> workers_;
|
||||||
|
|
||||||
|
QAtomicInt next_id_;
|
||||||
|
|
||||||
|
QMutex message_queue_mutex_;
|
||||||
|
QQueue<ReplyType*> message_queue_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
WorkerPool<HandlerType>::WorkerPool(QObject* parent)
|
WorkerPool<HandlerType>::WorkerPool(QObject* parent)
|
||||||
: _WorkerPoolBase(parent),
|
: _WorkerPoolBase(parent),
|
||||||
next_worker_(0)
|
next_worker_(0),
|
||||||
|
next_id_(0)
|
||||||
{
|
{
|
||||||
worker_count_ = qBound(1, QThread::idealThreadCount() / 2, 2);
|
worker_count_ = qBound(1, QThread::idealThreadCount() / 2, 2);
|
||||||
local_server_name_ = qApp->applicationName().toLower();
|
local_server_name_ = qApp->applicationName().toLower();
|
||||||
@ -194,6 +217,7 @@ template <typename HandlerType>
|
|||||||
void WorkerPool<HandlerType>::DoStart() {
|
void WorkerPool<HandlerType>::DoStart() {
|
||||||
Q_ASSERT(workers_.isEmpty());
|
Q_ASSERT(workers_.isEmpty());
|
||||||
Q_ASSERT(!executable_name_.isEmpty());
|
Q_ASSERT(!executable_name_.isEmpty());
|
||||||
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
// Find the executable if we can, default to searching $PATH
|
// Find the executable if we can, default to searching $PATH
|
||||||
executable_path_ = executable_name_;
|
executable_path_ = executable_name_;
|
||||||
@ -223,6 +247,8 @@ void WorkerPool<HandlerType>::DoStart() {
|
|||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::StartOneWorker(Worker* worker) {
|
void WorkerPool<HandlerType>::StartOneWorker(Worker* worker) {
|
||||||
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
DeleteQObjectPointerLater(&worker->local_server_);
|
DeleteQObjectPointerLater(&worker->local_server_);
|
||||||
DeleteQObjectPointerLater(&worker->local_socket_);
|
DeleteQObjectPointerLater(&worker->local_socket_);
|
||||||
DeleteQObjectPointerLater(&worker->process_);
|
DeleteQObjectPointerLater(&worker->process_);
|
||||||
@ -245,7 +271,7 @@ void WorkerPool<HandlerType>::StartOneWorker(Worker* worker) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qLog(Debug) << "Starting worker" << executable_path_
|
qLog(Debug) << "Starting worker" << worker << executable_path_
|
||||||
<< worker->local_server_->fullServerName();
|
<< worker->local_server_->fullServerName();
|
||||||
|
|
||||||
// Start the process
|
// Start the process
|
||||||
@ -256,6 +282,8 @@ void WorkerPool<HandlerType>::StartOneWorker(Worker* worker) {
|
|||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::NewConnection() {
|
void WorkerPool<HandlerType>::NewConnection() {
|
||||||
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
QLocalServer* server = qobject_cast<QLocalServer*>(sender());
|
QLocalServer* server = qobject_cast<QLocalServer*>(sender());
|
||||||
|
|
||||||
// Find the worker with this server.
|
// Find the worker with this server.
|
||||||
@ -263,7 +291,7 @@ void WorkerPool<HandlerType>::NewConnection() {
|
|||||||
if (!worker)
|
if (!worker)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
qLog(Debug) << "Worker connected to" << server->fullServerName();
|
qLog(Debug) << "Worker" << worker << "connected to" << server->fullServerName();
|
||||||
|
|
||||||
// Accept the connection.
|
// Accept the connection.
|
||||||
worker->local_socket_ = server->nextPendingConnection();
|
worker->local_socket_ = server->nextPendingConnection();
|
||||||
@ -276,11 +304,13 @@ void WorkerPool<HandlerType>::NewConnection() {
|
|||||||
// Create the handler.
|
// Create the handler.
|
||||||
worker->handler_ = new HandlerType(worker->local_socket_, this);
|
worker->handler_ = new HandlerType(worker->local_socket_, this);
|
||||||
|
|
||||||
emit WorkerConnected();
|
SendQueuedMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
|
void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
|
||||||
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
QProcess* process = qobject_cast<QProcess*>(sender());
|
QProcess* process = qobject_cast<QProcess*>(sender());
|
||||||
|
|
||||||
// Find the worker with this process.
|
// Find the worker with this process.
|
||||||
@ -299,27 +329,74 @@ void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
// On any other error we just restart the process.
|
// On any other error we just restart the process.
|
||||||
qLog(Debug) << "Worker failed with error" << error << "- restarting";
|
qLog(Debug) << "Worker" << worker << "failed with error" << error << "- restarting";
|
||||||
StartOneWorker(worker);
|
StartOneWorker(worker);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
HandlerType* WorkerPool<HandlerType>::NextHandler() {
|
typename WorkerPool<HandlerType>::ReplyType*
|
||||||
forever {
|
WorkerPool<HandlerType>::NewReply(MessageType* message) {
|
||||||
for (int i=0 ; i<workers_.count() ; ++i) {
|
const int id = next_id_.fetchAndAddOrdered(1);
|
||||||
const int worker_index = (next_worker_ + i) % workers_.count();
|
message->set_id(id);
|
||||||
|
|
||||||
if (workers_[worker_index].handler_) {
|
return new ReplyType(*message);
|
||||||
next_worker_ = (worker_index + 1) % workers_.count();
|
}
|
||||||
return workers_[worker_index].handler_;
|
|
||||||
}
|
template <typename HandlerType>
|
||||||
|
typename WorkerPool<HandlerType>::ReplyType*
|
||||||
|
WorkerPool<HandlerType>::SendMessageWithReply(MessageType* message) {
|
||||||
|
ReplyType* reply = NewReply(message);
|
||||||
|
|
||||||
|
// Copy the message
|
||||||
|
MessageType copy;
|
||||||
|
copy.MergeFrom(*message);
|
||||||
|
|
||||||
|
// Add the copy to the queue
|
||||||
|
{
|
||||||
|
QMutexLocker l(&message_queue_mutex_);
|
||||||
|
message_queue_.enqueue(reply);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wake up the main thread
|
||||||
|
metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection);
|
||||||
|
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename HandlerType>
|
||||||
|
void WorkerPool<HandlerType>::SendQueuedMessages() {
|
||||||
|
QMutexLocker l(&message_queue_mutex_);
|
||||||
|
|
||||||
|
while (!message_queue_.isEmpty()) {
|
||||||
|
ReplyType* reply = message_queue_.dequeue();
|
||||||
|
|
||||||
|
// Find a worker for this message
|
||||||
|
HandlerType* handler = NextHandler();
|
||||||
|
if (!handler) {
|
||||||
|
// No available handlers - put the message on the front of the queue.
|
||||||
|
message_queue_.prepend(reply);
|
||||||
|
qLog(Debug) << "No available handlers to process request";
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// No workers were connected, wait for one.
|
handler->SendRequest(reply);
|
||||||
WaitForSignal(this, SIGNAL(WorkerConnected()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename HandlerType>
|
||||||
|
HandlerType* WorkerPool<HandlerType>::NextHandler() const {
|
||||||
|
for (int i=0 ; i<workers_.count() ; ++i) {
|
||||||
|
const int worker_index = (next_worker_ + i) % workers_.count();
|
||||||
|
|
||||||
|
if (workers_[worker_index].handler_) {
|
||||||
|
next_worker_ = (worker_index + 1) % workers_.count();
|
||||||
|
return workers_[worker_index].handler_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
#endif // WORKERPOOL_H
|
#endif // WORKERPOOL_H
|
||||||
|
@ -52,7 +52,7 @@ TagReaderReply* TagReaderClient::ReadFile(const QString& filename) {
|
|||||||
|
|
||||||
req->set_filename(DataCommaSizeFromQString(filename));
|
req->set_filename(DataCommaSizeFromQString(filename));
|
||||||
|
|
||||||
return worker_pool_->NextHandler()->SendMessageWithReply(&message);
|
return worker_pool_->SendMessageWithReply(&message);
|
||||||
}
|
}
|
||||||
|
|
||||||
TagReaderReply* TagReaderClient::SaveFile(const QString& filename, const Song& metadata) {
|
TagReaderReply* TagReaderClient::SaveFile(const QString& filename, const Song& metadata) {
|
||||||
@ -62,7 +62,7 @@ TagReaderReply* TagReaderClient::SaveFile(const QString& filename, const Song& m
|
|||||||
req->set_filename(DataCommaSizeFromQString(filename));
|
req->set_filename(DataCommaSizeFromQString(filename));
|
||||||
metadata.ToProtobuf(req->mutable_metadata());
|
metadata.ToProtobuf(req->mutable_metadata());
|
||||||
|
|
||||||
return worker_pool_->NextHandler()->SendMessageWithReply(&message);
|
return worker_pool_->SendMessageWithReply(&message);
|
||||||
}
|
}
|
||||||
|
|
||||||
TagReaderReply* TagReaderClient::IsMediaFile(const QString& filename) {
|
TagReaderReply* TagReaderClient::IsMediaFile(const QString& filename) {
|
||||||
@ -71,7 +71,7 @@ TagReaderReply* TagReaderClient::IsMediaFile(const QString& filename) {
|
|||||||
|
|
||||||
req->set_filename(DataCommaSizeFromQString(filename));
|
req->set_filename(DataCommaSizeFromQString(filename));
|
||||||
|
|
||||||
return worker_pool_->NextHandler()->SendMessageWithReply(&message);
|
return worker_pool_->SendMessageWithReply(&message);
|
||||||
}
|
}
|
||||||
|
|
||||||
TagReaderReply* TagReaderClient::LoadEmbeddedArt(const QString& filename) {
|
TagReaderReply* TagReaderClient::LoadEmbeddedArt(const QString& filename) {
|
||||||
@ -80,7 +80,7 @@ TagReaderReply* TagReaderClient::LoadEmbeddedArt(const QString& filename) {
|
|||||||
|
|
||||||
req->set_filename(DataCommaSizeFromQString(filename));
|
req->set_filename(DataCommaSizeFromQString(filename));
|
||||||
|
|
||||||
return worker_pool_->NextHandler()->SendMessageWithReply(&message);
|
return worker_pool_->SendMessageWithReply(&message);
|
||||||
}
|
}
|
||||||
|
|
||||||
void TagReaderClient::ReadFileBlocking(const QString& filename, Song* song) {
|
void TagReaderClient::ReadFileBlocking(const QString& filename, Song* song) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user