Adressed review comments
This commit is contained in:
		| @@ -83,7 +83,8 @@ private: | |||||||
|                     backend->Write(e); |                     backend->Write(e); | ||||||
|                 } |                 } | ||||||
|             }; |             }; | ||||||
|             while (message_queue.PopWait(entry)) { |             while (true) { | ||||||
|  |                 entry = message_queue.PopWait(); | ||||||
|                 if (entry.final_entry) { |                 if (entry.final_entry) { | ||||||
|                     break; |                     break; | ||||||
|                 } |                 } | ||||||
|   | |||||||
| @@ -40,7 +40,7 @@ public: | |||||||
|     template <typename Arg> |     template <typename Arg> | ||||||
|     void Push(Arg&& t) { |     void Push(Arg&& t) { | ||||||
|         // create the element, add it to the queue |         // create the element, add it to the queue | ||||||
|         write_ptr->current = std::move(t); |         write_ptr->current = std::forward<Arg>(t); | ||||||
|         // set the next pointer to a new element ptr |         // set the next pointer to a new element ptr | ||||||
|         // then advance the write pointer |         // then advance the write pointer | ||||||
|         ElementPtr* new_ptr = new ElementPtr(); |         ElementPtr* new_ptr = new ElementPtr(); | ||||||
| @@ -69,7 +69,6 @@ public: | |||||||
|         --size; |         --size; | ||||||
|  |  | ||||||
|         ElementPtr* tmpptr = read_ptr; |         ElementPtr* tmpptr = read_ptr; | ||||||
|  |  | ||||||
|         read_ptr = tmpptr->next.load(std::memory_order_acquire); |         read_ptr = tmpptr->next.load(std::memory_order_acquire); | ||||||
|         t = std::move(tmpptr->current); |         t = std::move(tmpptr->current); | ||||||
|         tmpptr->next.store(nullptr); |         tmpptr->next.store(nullptr); | ||||||
| @@ -77,12 +76,14 @@ public: | |||||||
|         return true; |         return true; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     bool PopWait(T& t) { |     T PopWait() { | ||||||
|         if (Empty()) { |         if (Empty()) { | ||||||
|             std::unique_lock<std::mutex> lock(cv_mutex); |             std::unique_lock<std::mutex> lock(cv_mutex); | ||||||
|             cv.wait(lock, [this]() { return !Empty(); }); |             cv.wait(lock, [this]() { return !Empty(); }); | ||||||
|         } |         } | ||||||
|         return Pop(t); |         T t; | ||||||
|  |         Pop(t); | ||||||
|  |         return t; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // not thread-safe |     // not thread-safe | ||||||
| @@ -148,8 +149,8 @@ public: | |||||||
|         return spsc_queue.Pop(t); |         return spsc_queue.Pop(t); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     bool PopWait(T& t) { |     T PopWait() { | ||||||
|         return spsc_queue.PopWait(t); |         return spsc_queue.PopWait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // not thread-safe |     // not thread-safe | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user