vk_scheduler: split work queue waits and execution waits
This commit is contained in:
@ -43,13 +43,14 @@ Scheduler::Scheduler(const Instance& instance, RenderpassCache& renderpass_cache
|
|||||||
Scheduler::~Scheduler() = default;
|
Scheduler::~Scheduler() = default;
|
||||||
|
|
||||||
void Scheduler::Flush(vk::Semaphore signal, vk::Semaphore wait) {
|
void Scheduler::Flush(vk::Semaphore signal, vk::Semaphore wait) {
|
||||||
|
// When flushing, we only send data to the worker thread; no waiting is necessary.
|
||||||
SubmitExecution(signal, wait);
|
SubmitExecution(signal, wait);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Scheduler::Finish(vk::Semaphore signal, vk::Semaphore wait) {
|
void Scheduler::Finish(vk::Semaphore signal, vk::Semaphore wait) {
|
||||||
|
// When finishing, we need to wait for the submission to have executed on the device.
|
||||||
const u64 presubmit_tick = CurrentTick();
|
const u64 presubmit_tick = CurrentTick();
|
||||||
SubmitExecution(signal, wait);
|
SubmitExecution(signal, wait);
|
||||||
WaitWorker();
|
|
||||||
Wait(presubmit_tick);
|
Wait(presubmit_tick);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,8 +62,15 @@ void Scheduler::WaitWorker() {
|
|||||||
MICROPROFILE_SCOPE(Vulkan_WaitForWorker);
|
MICROPROFILE_SCOPE(Vulkan_WaitForWorker);
|
||||||
DispatchWork();
|
DispatchWork();
|
||||||
|
|
||||||
std::unique_lock lock{work_mutex};
|
// Ensure the queue is drained.
|
||||||
wait_cv.wait(lock, [this] { return work_queue.empty(); });
|
{
|
||||||
|
std::unique_lock ql{queue_mutex};
|
||||||
|
event_cv.wait(ql, [this] { return work_queue.empty(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now wait for execution to finish.
|
||||||
|
// This needs to be done in the same order as WorkerThread.
|
||||||
|
std::scoped_lock el{execution_mutex};
|
||||||
}
|
}
|
||||||
|
|
||||||
void Scheduler::Wait(u64 tick) {
|
void Scheduler::Wait(u64 tick) {
|
||||||
@ -79,40 +87,65 @@ void Scheduler::DispatchWork() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::scoped_lock lock{work_mutex};
|
std::scoped_lock ql{queue_mutex};
|
||||||
work_queue.push(std::move(chunk));
|
work_queue.push(std::move(chunk));
|
||||||
}
|
}
|
||||||
|
|
||||||
work_cv.notify_one();
|
event_cv.notify_all();
|
||||||
AcquireNewChunk();
|
AcquireNewChunk();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Scheduler::WorkerThread(std::stop_token stop_token) {
|
void Scheduler::WorkerThread(std::stop_token stop_token) {
|
||||||
Common::SetCurrentThreadName("VulkanWorker");
|
Common::SetCurrentThreadName("VulkanWorker");
|
||||||
do {
|
|
||||||
std::unique_ptr<CommandChunk> work;
|
const auto TryPopQueue{[this](auto& work) -> bool {
|
||||||
bool has_submit{false};
|
|
||||||
{
|
|
||||||
std::unique_lock lock{work_mutex};
|
|
||||||
if (work_queue.empty()) {
|
if (work_queue.empty()) {
|
||||||
wait_cv.notify_all();
|
return false;
|
||||||
}
|
|
||||||
Common::CondvarWait(work_cv, lock, stop_token, [&] { return !work_queue.empty(); });
|
|
||||||
if (stop_token.stop_requested()) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
work = std::move(work_queue.front());
|
work = std::move(work_queue.front());
|
||||||
work_queue.pop();
|
work_queue.pop();
|
||||||
|
event_cv.notify_all();
|
||||||
|
return true;
|
||||||
|
}};
|
||||||
|
|
||||||
has_submit = work->HasSubmit();
|
while (!stop_token.stop_requested()) {
|
||||||
work->ExecuteAll(current_cmdbuf);
|
std::unique_ptr<CommandChunk> work;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock lk{queue_mutex};
|
||||||
|
|
||||||
|
// Wait for work.
|
||||||
|
Common::CondvarWait(event_cv, lk, stop_token, [&] { return TryPopQueue(work); });
|
||||||
|
|
||||||
|
// If we've been asked to stop, we're done.
|
||||||
|
if (stop_token.stop_requested()) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exchange lock ownership so that we take the execution lock before
|
||||||
|
// the queue lock goes out of scope. This allows us to force execution
|
||||||
|
// to complete in the next step.
|
||||||
|
std::exchange(lk, std::unique_lock{execution_mutex});
|
||||||
|
|
||||||
|
// Perform the work, tracking whether the chunk was a submission
|
||||||
|
// before executing.
|
||||||
|
const bool has_submit = work->HasSubmit();
|
||||||
|
work->ExecuteAll(current_cmdbuf);
|
||||||
|
|
||||||
|
// If the chunk was a submission, reallocate the command buffer.
|
||||||
if (has_submit) {
|
if (has_submit) {
|
||||||
AllocateWorkerCommandBuffers();
|
AllocateWorkerCommandBuffers();
|
||||||
}
|
}
|
||||||
std::scoped_lock reserve_lock{reserve_mutex};
|
}
|
||||||
chunk_reserve.push_back(std::move(work));
|
|
||||||
} while (!stop_token.stop_requested());
|
{
|
||||||
|
std::scoped_lock rl{reserve_mutex};
|
||||||
|
|
||||||
|
// Recycle the chunk back to the reserve.
|
||||||
|
chunk_reserve.emplace_back(std::move(work));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Scheduler::AllocateWorkerCommandBuffers() {
|
void Scheduler::AllocateWorkerCommandBuffers() {
|
||||||
|
@ -207,11 +207,10 @@ private:
|
|||||||
std::vector<std::unique_ptr<CommandChunk>> chunk_reserve;
|
std::vector<std::unique_ptr<CommandChunk>> chunk_reserve;
|
||||||
vk::CommandBuffer current_cmdbuf;
|
vk::CommandBuffer current_cmdbuf;
|
||||||
StateFlags state{};
|
StateFlags state{};
|
||||||
|
std::mutex execution_mutex;
|
||||||
std::mutex reserve_mutex;
|
std::mutex reserve_mutex;
|
||||||
std::mutex work_mutex;
|
|
||||||
std::mutex queue_mutex;
|
std::mutex queue_mutex;
|
||||||
std::condition_variable_any work_cv;
|
std::condition_variable_any event_cv;
|
||||||
std::condition_variable wait_cv;
|
|
||||||
std::jthread worker_thread;
|
std::jthread worker_thread;
|
||||||
bool use_worker_thread;
|
bool use_worker_thread;
|
||||||
};
|
};
|
||||||
|
Reference in New Issue
Block a user