diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index eb88cc1d1..975215863 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -23,97 +23,21 @@ class SPSCQueue { public: bool TryPush(T&& t) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); - - return true; + return Push(std::move(t)); } template - bool TryPush(Args&&... args) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); - - return true; + bool TryEmplace(Args&&... args) { + return Emplace(std::forward(args)...); } - void Push(T&& t) { - const size_t write_index = m_write_index.load(); - - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + void PushWait(T&& t) { + Push(std::move(t)); } template - void Push(Args&&... args) { - const size_t write_index = m_write_index.load(); - - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + void EmplaceWait(Args&&... args) { + Emplace(std::forward(args)...); } bool TryPop(T& t) { @@ -147,6 +71,80 @@ public: } private: + enum class PushMode { + Try, + Wait, + Count, + }; + + template + bool Push(T&& t) { + const size_t write_index = m_write_index.load(); + + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + + template + bool Emplace(Args&&... args) { + const size_t write_index = m_write_index.load(); + + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + void Pop() { const size_t read_index = m_read_index.load(); @@ -208,20 +206,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { @@ -262,20 +260,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index e1ce9db99..f96c7c222 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -207,7 +207,7 @@ public: if (!filter.CheckMessage(log_class, log_level)) { return; } - message_queue.Push( + message_queue.EmplaceWait( CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index 469a59cf9..3c5317777 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -118,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(std::move(command_data), fence, block); + state.queue.EmplaceWait(std::move(command_data), fence, block); if (block) { Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {