Fix race between OpenInputStreamWrapper Open and Cancel (fixes issue #2843)

This commit is contained in:
Marshall Greenblatt 2020-01-29 16:31:08 -05:00
parent c3d4f24720
commit 1da3f71357
1 changed files with 54 additions and 30 deletions

View File

@ -41,17 +41,9 @@ class OpenInputStreamWrapper
const network::ResourceRequest& request, const network::ResourceRequest& request,
OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT { OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper( scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
std::move(delegate), base::ThreadTaskRunnerHandle::Get(), std::move(delegate), work_thread_task_runner,
std::move(callback)); base::ThreadTaskRunnerHandle::Get(), std::move(callback));
wrapper->Start(request_id, request);
work_thread_task_runner->PostTask(
FROM_HERE,
base::BindOnce(OpenInputStreamWrapper::OpenOnWorkThread,
// This is intentional - the loader could be deleted
// while the callback is executing on the background
// thread. The delegate will be "returned" to the loader
// once the InputStream open attempt is completed.
wrapper, request_id, request));
return wrapper->GetCancelCallback(); return wrapper->GetCancelCallback();
} }
@ -61,40 +53,61 @@ class OpenInputStreamWrapper
OpenInputStreamWrapper( OpenInputStreamWrapper(
std::unique_ptr<StreamReaderURLLoader::Delegate> delegate, std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner, scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
OnInputStreamOpenedCallback callback) OnInputStreamOpenedCallback callback)
: delegate_(std::move(delegate)), : delegate_(std::move(delegate)),
work_thread_task_runner_(work_thread_task_runner),
job_thread_task_runner_(job_thread_task_runner), job_thread_task_runner_(job_thread_task_runner),
callback_(std::move(callback)) {} callback_(std::move(callback)) {}
virtual ~OpenInputStreamWrapper() {} virtual ~OpenInputStreamWrapper() {}
static void OpenOnWorkThread(scoped_refptr<OpenInputStreamWrapper> wrapper, void Start(const RequestId& request_id,
const RequestId& request_id,
const network::ResourceRequest& request) { const network::ResourceRequest& request) {
DCHECK(!CEF_CURRENTLY_ON_IOT()); work_thread_task_runner_->PostTask(
DCHECK(!CEF_CURRENTLY_ON_UIT()); FROM_HERE,
base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread,
wrapper->Open(request_id, request); base::WrapRefCounted(this), request_id, request));
} }
base::OnceClosure GetCancelCallback() { base::OnceClosure GetCancelCallback() {
return base::BindOnce(&OpenInputStreamWrapper::Cancel, return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread,
base::WrapRefCounted(this)); base::WrapRefCounted(this));
} }
void Cancel() { void CancelOnJobThread() {
DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence()); DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
delegate_.reset(); if (callback_.is_null())
return;
callback_.Reset(); callback_.Reset();
work_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread,
base::WrapRefCounted(this)));
} }
void Open(const RequestId& request_id, void CancelOnWorkThread() {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
if (is_canceled_)
return;
is_canceled_ = true;
OnCallback(nullptr);
}
void OpenOnWorkThread(const RequestId& request_id,
const network::ResourceRequest& request) { const network::ResourceRequest& request) {
// |delegate_| may be null if we were canceled. DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
if (delegate_ && !delegate_->OpenInputStream( if (is_canceled_)
return;
// |delegate_| will remain valid until OnCallback() is executed on
// |job_thread_task_runner_|.
if (!delegate_->OpenInputStream(
request_id, request, request_id, request,
base::BindOnce(&OpenInputStreamWrapper::OnCallback, base::BindOnce(&OpenInputStreamWrapper::OnCallback,
base::WrapRefCounted(this)))) { base::WrapRefCounted(this)))) {
is_canceled_ = true;
OnCallback(nullptr); OnCallback(nullptr);
} }
} }
@ -108,17 +121,28 @@ class OpenInputStreamWrapper
return; return;
} }
// May be null if we were canceled. // May be null if CancelOnJobThread() was called on
if (callback_.is_null()) // |job_thread_task_runner_| while OpenOnWorkThread() was pending on
// |work_thread_task_runner_|.
if (callback_.is_null()) {
delegate_.reset();
return; return;
}
std::move(callback_).Run(std::move(delegate_), std::move(input_stream)); std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
} }
std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_; std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_; scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
// Only accessed on |job_thread_task_runner_|.
OnInputStreamOpenedCallback callback_; OnInputStreamOpenedCallback callback_;
// Only accessed on |work_thread_task_runner_|.
bool is_canceled_ = false;
DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper); DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper);
}; };