Merge pull request #9931 from liamwhite/sched
vk_scheduler: split work queue waits and execution waits
This commit is contained in:
		@@ -47,14 +47,15 @@ Scheduler::Scheduler(const Device& device_, StateTracker& state_tracker_)
 | 
			
		||||
Scheduler::~Scheduler() = default;
 | 
			
		||||
 | 
			
		||||
void Scheduler::Flush(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) {
 | 
			
		||||
    // When flushing, we only send data to the worker thread; no waiting is necessary.
 | 
			
		||||
    SubmitExecution(signal_semaphore, wait_semaphore);
 | 
			
		||||
    AllocateNewContext();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Scheduler::Finish(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) {
 | 
			
		||||
    // When finishing, we need to wait for the submission to have executed on the device.
 | 
			
		||||
    const u64 presubmit_tick = CurrentTick();
 | 
			
		||||
    SubmitExecution(signal_semaphore, wait_semaphore);
 | 
			
		||||
    WaitWorker();
 | 
			
		||||
    Wait(presubmit_tick);
 | 
			
		||||
    AllocateNewContext();
 | 
			
		||||
}
 | 
			
		||||
@@ -63,8 +64,13 @@ void Scheduler::WaitWorker() {
 | 
			
		||||
    MICROPROFILE_SCOPE(Vulkan_WaitForWorker);
 | 
			
		||||
    DispatchWork();
 | 
			
		||||
 | 
			
		||||
    std::unique_lock lock{work_mutex};
 | 
			
		||||
    wait_cv.wait(lock, [this] { return work_queue.empty(); });
 | 
			
		||||
    // Ensure the queue is drained.
 | 
			
		||||
    std::unique_lock ql{queue_mutex};
 | 
			
		||||
    event_cv.wait(ql, [this] { return work_queue.empty(); });
 | 
			
		||||
 | 
			
		||||
    // Now wait for execution to finish.
 | 
			
		||||
    // This needs to be done in the same order as WorkerThread.
 | 
			
		||||
    std::unique_lock el{execution_mutex};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Scheduler::DispatchWork() {
 | 
			
		||||
@@ -72,10 +78,10 @@ void Scheduler::DispatchWork() {
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    {
 | 
			
		||||
        std::scoped_lock lock{work_mutex};
 | 
			
		||||
        std::scoped_lock ql{queue_mutex};
 | 
			
		||||
        work_queue.push(std::move(chunk));
 | 
			
		||||
    }
 | 
			
		||||
    work_cv.notify_one();
 | 
			
		||||
    event_cv.notify_all();
 | 
			
		||||
    AcquireNewChunk();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -137,30 +143,55 @@ bool Scheduler::UpdateRescaling(bool is_rescaling) {
 | 
			
		||||
 | 
			
		||||
void Scheduler::WorkerThread(std::stop_token stop_token) {
 | 
			
		||||
    Common::SetCurrentThreadName("VulkanWorker");
 | 
			
		||||
    do {
 | 
			
		||||
        std::unique_ptr<CommandChunk> work;
 | 
			
		||||
        bool has_submit{false};
 | 
			
		||||
        {
 | 
			
		||||
            std::unique_lock lock{work_mutex};
 | 
			
		||||
            if (work_queue.empty()) {
 | 
			
		||||
                wait_cv.notify_all();
 | 
			
		||||
            }
 | 
			
		||||
            Common::CondvarWait(work_cv, lock, stop_token, [&] { return !work_queue.empty(); });
 | 
			
		||||
            if (stop_token.stop_requested()) {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            work = std::move(work_queue.front());
 | 
			
		||||
            work_queue.pop();
 | 
			
		||||
 | 
			
		||||
            has_submit = work->HasSubmit();
 | 
			
		||||
    const auto TryPopQueue{[this](auto& work) -> bool {
 | 
			
		||||
        if (work_queue.empty()) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        work = std::move(work_queue.front());
 | 
			
		||||
        work_queue.pop();
 | 
			
		||||
        event_cv.notify_all();
 | 
			
		||||
        return true;
 | 
			
		||||
    }};
 | 
			
		||||
 | 
			
		||||
    while (!stop_token.stop_requested()) {
 | 
			
		||||
        std::unique_ptr<CommandChunk> work;
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            std::unique_lock lk{queue_mutex};
 | 
			
		||||
 | 
			
		||||
            // Wait for work.
 | 
			
		||||
            Common::CondvarWait(event_cv, lk, stop_token, [&] { return TryPopQueue(work); });
 | 
			
		||||
 | 
			
		||||
            // If we've been asked to stop, we're done.
 | 
			
		||||
            if (stop_token.stop_requested()) {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Exchange lock ownership so that we take the execution lock before
 | 
			
		||||
            // the queue lock goes out of scope. This allows us to force execution
 | 
			
		||||
            // to complete in the next step.
 | 
			
		||||
            std::exchange(lk, std::unique_lock{execution_mutex});
 | 
			
		||||
 | 
			
		||||
            // Perform the work, tracking whether the chunk was a submission
 | 
			
		||||
            // before executing.
 | 
			
		||||
            const bool has_submit = work->HasSubmit();
 | 
			
		||||
            work->ExecuteAll(current_cmdbuf);
 | 
			
		||||
 | 
			
		||||
            // If the chunk was a submission, reallocate the command buffer.
 | 
			
		||||
            if (has_submit) {
 | 
			
		||||
                AllocateWorkerCommandBuffer();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if (has_submit) {
 | 
			
		||||
            AllocateWorkerCommandBuffer();
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            std::scoped_lock rl{reserve_mutex};
 | 
			
		||||
 | 
			
		||||
            // Recycle the chunk back to the reserve.
 | 
			
		||||
            chunk_reserve.emplace_back(std::move(work));
 | 
			
		||||
        }
 | 
			
		||||
        std::scoped_lock reserve_lock{reserve_mutex};
 | 
			
		||||
        chunk_reserve.push_back(std::move(work));
 | 
			
		||||
    } while (!stop_token.stop_requested());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Scheduler::AllocateWorkerCommandBuffer() {
 | 
			
		||||
@@ -289,13 +320,16 @@ void Scheduler::EndRenderPass() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Scheduler::AcquireNewChunk() {
 | 
			
		||||
    std::scoped_lock lock{reserve_mutex};
 | 
			
		||||
    std::scoped_lock rl{reserve_mutex};
 | 
			
		||||
 | 
			
		||||
    if (chunk_reserve.empty()) {
 | 
			
		||||
        // If we don't have anything reserved, we need to make a new chunk.
 | 
			
		||||
        chunk = std::make_unique<CommandChunk>();
 | 
			
		||||
        return;
 | 
			
		||||
    } else {
 | 
			
		||||
        // Otherwise, we can just take from the reserve.
 | 
			
		||||
        chunk = std::make_unique<CommandChunk>();
 | 
			
		||||
        chunk_reserve.pop_back();
 | 
			
		||||
    }
 | 
			
		||||
    chunk = std::move(chunk_reserve.back());
 | 
			
		||||
    chunk_reserve.pop_back();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
} // namespace Vulkan
 | 
			
		||||
 
 | 
			
		||||
@@ -232,10 +232,10 @@ private:
 | 
			
		||||
 | 
			
		||||
    std::queue<std::unique_ptr<CommandChunk>> work_queue;
 | 
			
		||||
    std::vector<std::unique_ptr<CommandChunk>> chunk_reserve;
 | 
			
		||||
    std::mutex execution_mutex;
 | 
			
		||||
    std::mutex reserve_mutex;
 | 
			
		||||
    std::mutex work_mutex;
 | 
			
		||||
    std::condition_variable_any work_cv;
 | 
			
		||||
    std::condition_variable wait_cv;
 | 
			
		||||
    std::mutex queue_mutex;
 | 
			
		||||
    std::condition_variable_any event_cv;
 | 
			
		||||
    std::jthread worker_thread;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user