Format code
This commit is contained in:
parent
7e6c16b287
commit
3a27a3e868
@ -107,10 +107,10 @@ void _MessageHandlerBase::WriteMessage(const QByteArray &data) {
|
|||||||
else if (flush_local_socket_) {
|
else if (flush_local_socket_) {
|
||||||
((qobject_cast<QLocalSocket*>(device_))->*(flush_local_socket_))();
|
((qobject_cast<QLocalSocket*>(device_))->*(flush_local_socket_))();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void _MessageHandlerBase::DeviceClosed() {
|
void _MessageHandlerBase::DeviceClosed() {
|
||||||
is_device_closed_ = true;
|
is_device_closed_ = true;
|
||||||
AbortAll();
|
AbortAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ class QIODevice;
|
|||||||
class _MessageHandlerBase : public QObject {
|
class _MessageHandlerBase : public QObject {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// device can be nullptr, in which case you must call SetDevice before writing any messages.
|
// device can be nullptr, in which case you must call SetDevice before writing any messages.
|
||||||
_MessageHandlerBase(QIODevice *device, QObject *parent);
|
_MessageHandlerBase(QIODevice *device, QObject *parent);
|
||||||
|
|
||||||
@ -52,16 +52,16 @@ public:
|
|||||||
// After this is true, messages cannot be sent to the handler any more.
|
// After this is true, messages cannot be sent to the handler any more.
|
||||||
bool is_device_closed() const { return is_device_closed_; }
|
bool is_device_closed() const { return is_device_closed_; }
|
||||||
|
|
||||||
protected slots:
|
protected slots:
|
||||||
void WriteMessage(const QByteArray &data);
|
void WriteMessage(const QByteArray &data);
|
||||||
void DeviceReadyRead();
|
void DeviceReadyRead();
|
||||||
virtual void DeviceClosed();
|
virtual void DeviceClosed();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual bool RawMessageArrived(const QByteArray &data) = 0;
|
virtual bool RawMessageArrived(const QByteArray &data) = 0;
|
||||||
virtual void AbortAll() = 0;
|
virtual void AbortAll() = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
typedef bool (QAbstractSocket::*FlushAbstractSocket)();
|
typedef bool (QAbstractSocket::*FlushAbstractSocket)();
|
||||||
typedef bool (QLocalSocket::*FlushLocalSocket)();
|
typedef bool (QLocalSocket::*FlushLocalSocket)();
|
||||||
|
|
||||||
@ -80,7 +80,7 @@ protected:
|
|||||||
// You should subclass this and implement the MessageArrived(MessageType) method.
|
// You should subclass this and implement the MessageArrived(MessageType) method.
|
||||||
template <typename MT>
|
template <typename MT>
|
||||||
class AbstractMessageHandler : public _MessageHandlerBase {
|
class AbstractMessageHandler : public _MessageHandlerBase {
|
||||||
public:
|
public:
|
||||||
AbstractMessageHandler(QIODevice *device, QObject *parent);
|
AbstractMessageHandler(QIODevice *device, QObject *parent);
|
||||||
~AbstractMessageHandler() override { AbortAll(); }
|
~AbstractMessageHandler() override { AbortAll(); }
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ public:
|
|||||||
// Sets the "id" field of reply to the same as the request, and sends the reply on the socket. Used on the worker side.
|
// Sets the "id" field of reply to the same as the request, and sends the reply on the socket. Used on the worker side.
|
||||||
void SendReply(const MessageType &request, MessageType *reply);
|
void SendReply(const MessageType &request, MessageType *reply);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// Called when a message is received from the socket.
|
// Called when a message is received from the socket.
|
||||||
virtual void MessageArrived(const MessageType &message) { Q_UNUSED(message); }
|
virtual void MessageArrived(const MessageType &message) { Q_UNUSED(message); }
|
||||||
|
|
||||||
@ -110,7 +110,7 @@ protected:
|
|||||||
bool RawMessageArrived(const QByteArray &data) override;
|
bool RawMessageArrived(const QByteArray &data) override;
|
||||||
void AbortAll() override;
|
void AbortAll() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QMap<int, ReplyType*> pending_replies_;
|
QMap<int, ReplyType*> pending_replies_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -146,6 +146,7 @@ void AbstractMessageHandler<MT>::SendReply(const MessageType &request, MessageTy
|
|||||||
|
|
||||||
template<typename MT>
|
template<typename MT>
|
||||||
bool AbstractMessageHandler<MT>::RawMessageArrived(const QByteArray &data) {
|
bool AbstractMessageHandler<MT>::RawMessageArrived(const QByteArray &data) {
|
||||||
|
|
||||||
MessageType message;
|
MessageType message;
|
||||||
if (!message.ParseFromArray(data.constData(), data.size())) {
|
if (!message.ParseFromArray(data.constData(), data.size())) {
|
||||||
return false;
|
return false;
|
||||||
@ -161,14 +162,17 @@ bool AbstractMessageHandler<MT>::RawMessageArrived(const QByteArray &data) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MT>
|
template<typename MT>
|
||||||
void AbstractMessageHandler<MT>::AbortAll() {
|
void AbstractMessageHandler<MT>::AbortAll() {
|
||||||
|
|
||||||
for (ReplyType *reply : pending_replies_) {
|
for (ReplyType *reply : pending_replies_) {
|
||||||
reply->Abort();
|
reply->Abort();
|
||||||
}
|
}
|
||||||
pending_replies_.clear();
|
pending_replies_.clear();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // MESSAGEHANDLER_H
|
#endif // MESSAGEHANDLER_H
|
||||||
|
@ -57,29 +57,27 @@ class _MessageReplyBase : public QObject {
|
|||||||
template <typename MessageType>
|
template <typename MessageType>
|
||||||
class MessageReply : public _MessageReplyBase {
|
class MessageReply : public _MessageReplyBase {
|
||||||
public:
|
public:
|
||||||
explicit MessageReply(const MessageType& request_message, QObject *parent = nullptr);
|
explicit MessageReply(const MessageType &request_message, QObject *parent = nullptr);
|
||||||
|
|
||||||
int id() const override { return request_message_.id(); }
|
int id() const override { return request_message_.id(); }
|
||||||
const MessageType& request_message() const { return request_message_; }
|
const MessageType &request_message() const { return request_message_; }
|
||||||
const MessageType& message() const { return reply_message_; }
|
const MessageType &message() const { return reply_message_; }
|
||||||
|
|
||||||
void SetReply(const MessageType& message);
|
void SetReply(const MessageType &message);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MessageType request_message_;
|
MessageType request_message_;
|
||||||
MessageType reply_message_;
|
MessageType reply_message_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MessageType>
|
||||||
MessageReply<MessageType>::MessageReply(const MessageType& request_message, QObject *parent)
|
MessageReply<MessageType>::MessageReply(const MessageType &request_message, QObject *parent) : _MessageReplyBase(parent) {
|
||||||
: _MessageReplyBase(parent)
|
|
||||||
{
|
|
||||||
request_message_.MergeFrom(request_message);
|
request_message_.MergeFrom(request_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename MessageType>
|
template<typename MessageType>
|
||||||
void MessageReply<MessageType>::SetReply(const MessageType& message) {
|
void MessageReply<MessageType>::SetReply(const MessageType &message) {
|
||||||
|
|
||||||
Q_ASSERT(!finished_);
|
Q_ASSERT(!finished_);
|
||||||
|
|
||||||
|
@ -48,11 +48,11 @@ class _WorkerPoolBase : public QObject {
|
|||||||
public:
|
public:
|
||||||
explicit _WorkerPoolBase(QObject *parent = nullptr);
|
explicit _WorkerPoolBase(QObject *parent = nullptr);
|
||||||
|
|
||||||
signals:
|
signals:
|
||||||
// Emitted when a worker failed to start. This usually happens when the worker wasn't found, or couldn't be executed.
|
// Emitted when a worker failed to start. This usually happens when the worker wasn't found, or couldn't be executed.
|
||||||
void WorkerFailedToStart();
|
void WorkerFailedToStart();
|
||||||
|
|
||||||
protected slots:
|
protected slots:
|
||||||
virtual void DoStart() {}
|
virtual void DoStart() {}
|
||||||
virtual void NewConnection() {}
|
virtual void NewConnection() {}
|
||||||
virtual void ProcessError(QProcess::ProcessError) {}
|
virtual void ProcessError(QProcess::ProcessError) {}
|
||||||
@ -78,7 +78,7 @@ class WorkerPool : public _WorkerPoolBase {
|
|||||||
void SetExecutableName(const QString &executable_name);
|
void SetExecutableName(const QString &executable_name);
|
||||||
|
|
||||||
// Sets the number of worker process to use. Defaults to 1 <= (processors / 2) <= 2.
|
// Sets the number of worker process to use. Defaults to 1 <= (processors / 2) <= 2.
|
||||||
void SetWorkerCount(int count);
|
void SetWorkerCount(const int count);
|
||||||
|
|
||||||
// Sets the prefix to use for the local server (on unix this is a named pipe in /tmp).
|
// Sets the prefix to use for the local server (on unix this is a named pipe in /tmp).
|
||||||
// Defaults to QApplication::applicationName().
|
// Defaults to QApplication::applicationName().
|
||||||
@ -93,14 +93,14 @@ class WorkerPool : public _WorkerPoolBase {
|
|||||||
// Can be called from any thread.
|
// Can be called from any thread.
|
||||||
ReplyType *SendMessageWithReply(MessageType *message);
|
ReplyType *SendMessageWithReply(MessageType *message);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// These are all reimplemented slots, they are called on the WorkerPool's thread.
|
// These are all reimplemented slots, they are called on the WorkerPool's thread.
|
||||||
void DoStart() override;
|
void DoStart() override;
|
||||||
void NewConnection() override;
|
void NewConnection() override;
|
||||||
void ProcessError(QProcess::ProcessError error) override;
|
void ProcessError(QProcess::ProcessError error) override;
|
||||||
void SendQueuedMessages() override;
|
void SendQueuedMessages() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Worker {
|
struct Worker {
|
||||||
Worker() : local_server_(nullptr), local_socket_(nullptr), process_(nullptr), handler_(nullptr) {}
|
Worker() : local_server_(nullptr), local_socket_(nullptr), process_(nullptr), handler_(nullptr) {}
|
||||||
|
|
||||||
@ -138,7 +138,7 @@ private:
|
|||||||
// Returns the next handler, or nullptr if there isn't one. Must be called from my thread.
|
// Returns the next handler, or nullptr if there isn't one. Must be called from my thread.
|
||||||
HandlerType *NextHandler() const;
|
HandlerType *NextHandler() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QString local_server_name_;
|
QString local_server_name_;
|
||||||
QString executable_name_;
|
QString executable_name_;
|
||||||
QString executable_path_;
|
QString executable_path_;
|
||||||
@ -170,6 +170,7 @@ WorkerPool<HandlerType>::WorkerPool(QObject *parent)
|
|||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
WorkerPool<HandlerType>::~WorkerPool() {
|
WorkerPool<HandlerType>::~WorkerPool() {
|
||||||
|
|
||||||
for (const Worker &worker : workers_) {
|
for (const Worker &worker : workers_) {
|
||||||
if (worker.local_socket_ && worker.process_) {
|
if (worker.local_socket_ && worker.process_) {
|
||||||
disconnect(worker.process_, SIGNAL(errorOccurred(QProcess::ProcessError)), this, SLOT(ProcessError(QProcess::ProcessError)));
|
disconnect(worker.process_, SIGNAL(errorOccurred(QProcess::ProcessError)), this, SLOT(ProcessError(QProcess::ProcessError)));
|
||||||
@ -193,10 +194,11 @@ WorkerPool<HandlerType>::~WorkerPool() {
|
|||||||
for (ReplyType *reply : message_queue_) {
|
for (ReplyType *reply : message_queue_) {
|
||||||
reply->Abort();
|
reply->Abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::SetWorkerCount(int count) {
|
void WorkerPool<HandlerType>::SetWorkerCount(const int count) {
|
||||||
Q_ASSERT(workers_.isEmpty());
|
Q_ASSERT(workers_.isEmpty());
|
||||||
worker_count_ = count;
|
worker_count_ = count;
|
||||||
}
|
}
|
||||||
@ -220,6 +222,7 @@ void WorkerPool<HandlerType>::Start() {
|
|||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::DoStart() {
|
void WorkerPool<HandlerType>::DoStart() {
|
||||||
|
|
||||||
Q_ASSERT(workers_.isEmpty());
|
Q_ASSERT(workers_.isEmpty());
|
||||||
Q_ASSERT(!executable_name_.isEmpty());
|
Q_ASSERT(!executable_name_.isEmpty());
|
||||||
Q_ASSERT(QThread::currentThread() == thread());
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
@ -248,10 +251,12 @@ void WorkerPool<HandlerType>::DoStart() {
|
|||||||
|
|
||||||
workers_ << worker;
|
workers_ << worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::StartOneWorker(Worker *worker) {
|
void WorkerPool<HandlerType>::StartOneWorker(Worker *worker) {
|
||||||
|
|
||||||
Q_ASSERT(QThread::currentThread() == thread());
|
Q_ASSERT(QThread::currentThread() == thread());
|
||||||
|
|
||||||
DeleteQObjectPointerLater(&worker->local_server_);
|
DeleteQObjectPointerLater(&worker->local_server_);
|
||||||
@ -284,6 +289,7 @@ void WorkerPool<HandlerType>::StartOneWorker(Worker *worker) {
|
|||||||
// Start the process
|
// Start the process
|
||||||
worker->process_->setProcessChannelMode(QProcess::ForwardedChannels);
|
worker->process_->setProcessChannelMode(QProcess::ForwardedChannels);
|
||||||
worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName());
|
worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
@ -338,20 +344,24 @@ void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
|
|||||||
StartOneWorker(worker);
|
StartOneWorker(worker);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
typename WorkerPool<HandlerType>::ReplyType*
|
typename WorkerPool<HandlerType>::ReplyType*
|
||||||
WorkerPool<HandlerType>::NewReply(MessageType *message) {
|
WorkerPool<HandlerType>::NewReply(MessageType *message) {
|
||||||
|
|
||||||
const int id = next_id_.fetchAndAddOrdered(1);
|
const int id = next_id_.fetchAndAddOrdered(1);
|
||||||
message->set_id(id);
|
message->set_id(id);
|
||||||
|
|
||||||
return new ReplyType(*message);
|
return new ReplyType(*message);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
typename WorkerPool<HandlerType>::ReplyType*
|
typename WorkerPool<HandlerType>::ReplyType*
|
||||||
WorkerPool<HandlerType>::SendMessageWithReply(MessageType *message) {
|
WorkerPool<HandlerType>::SendMessageWithReply(MessageType *message) {
|
||||||
|
|
||||||
ReplyType *reply = NewReply(message);
|
ReplyType *reply = NewReply(message);
|
||||||
|
|
||||||
// Add the pending reply to the queue
|
// Add the pending reply to the queue
|
||||||
@ -364,10 +374,12 @@ WorkerPool<HandlerType>::SendMessageWithReply(MessageType *message) {
|
|||||||
metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection);
|
metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection);
|
||||||
|
|
||||||
return reply;
|
return reply;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
void WorkerPool<HandlerType>::SendQueuedMessages() {
|
void WorkerPool<HandlerType>::SendQueuedMessages() {
|
||||||
|
|
||||||
QMutexLocker l(&message_queue_mutex_);
|
QMutexLocker l(&message_queue_mutex_);
|
||||||
|
|
||||||
while (!message_queue_.isEmpty()) {
|
while (!message_queue_.isEmpty()) {
|
||||||
@ -384,10 +396,12 @@ void WorkerPool<HandlerType>::SendQueuedMessages() {
|
|||||||
|
|
||||||
handler->SendRequest(reply);
|
handler->SendRequest(reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HandlerType>
|
template <typename HandlerType>
|
||||||
HandlerType *WorkerPool<HandlerType>::NextHandler() const {
|
HandlerType *WorkerPool<HandlerType>::NextHandler() const {
|
||||||
|
|
||||||
for (int i = 0; i < workers_.count(); ++i) {
|
for (int i = 0; i < workers_.count(); ++i) {
|
||||||
const int worker_index = (next_worker_ + i) % workers_.count();
|
const int worker_index = (next_worker_ + i) % workers_.count();
|
||||||
|
|
||||||
@ -398,6 +412,7 @@ HandlerType *WorkerPool<HandlerType>::NextHandler() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // WORKERPOOL_H
|
#endif // WORKERPOOL_H
|
||||||
|
@ -27,11 +27,9 @@
|
|||||||
#include "tagreaderworker.h"
|
#include "tagreaderworker.h"
|
||||||
|
|
||||||
TagReaderWorker::TagReaderWorker(QIODevice *socket, QObject *parent)
|
TagReaderWorker::TagReaderWorker(QIODevice *socket, QObject *parent)
|
||||||
: AbstractMessageHandler<pb::tagreader::Message>(socket, parent)
|
: AbstractMessageHandler<pb::tagreader::Message>(socket, parent) {}
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void TagReaderWorker::MessageArrived(const pb::tagreader::Message& message) {
|
void TagReaderWorker::MessageArrived(const pb::tagreader::Message &message) {
|
||||||
|
|
||||||
pb::tagreader::Message reply;
|
pb::tagreader::Message reply;
|
||||||
|
|
||||||
@ -51,6 +49,7 @@ void TagReaderWorker::MessageArrived(const pb::tagreader::Message& message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SendReply(message, &reply);
|
SendReply(message, &reply);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -29,14 +29,14 @@
|
|||||||
class QIODevice;
|
class QIODevice;
|
||||||
|
|
||||||
class TagReaderWorker : public AbstractMessageHandler<pb::tagreader::Message> {
|
class TagReaderWorker : public AbstractMessageHandler<pb::tagreader::Message> {
|
||||||
public:
|
public:
|
||||||
explicit TagReaderWorker(QIODevice *socket, QObject *parent = nullptr);
|
explicit TagReaderWorker(QIODevice *socket, QObject *parent = nullptr);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void MessageArrived(const pb::tagreader::Message &message) override;
|
void MessageArrived(const pb::tagreader::Message &message) override;
|
||||||
void DeviceClosed() override;
|
void DeviceClosed() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
TagReader tag_reader_;
|
TagReader tag_reader_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user