cef/libcef/browser/net_service/stream_reader_url_loader.cc
2020-03-04 19:31:54 -05:00

871 lines
29 KiB
C++

// Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions
// Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this
// source code is governed by a BSD-style license that can be found in the
// LICENSE file.
#include "libcef/browser/net_service/stream_reader_url_loader.h"
#include "libcef/browser/thread_util.h"
#include "libcef/common/net_service/net_service_util.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
#include "base/task/post_task.h"
#include "base/threading/thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "content/public/browser/browser_thread.h"
#include "net/base/io_buffer.h"
#include "net/http/http_status_code.h"
#include "net/http/http_util.h"
#include "services/network/public/cpp/url_loader_completion_status.h"
namespace net_service {
namespace {
using OnInputStreamOpenedCallback =
base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>,
std::unique_ptr<InputStream>)>;
// Helper for executing the OnInputStreamOpenedCallback.
class OpenInputStreamWrapper
: public base::RefCountedThreadSafe<OpenInputStreamWrapper> {
public:
static base::OnceClosure Open(
std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
const RequestId& request_id,
const network::ResourceRequest& request,
OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
std::move(delegate), work_thread_task_runner,
base::ThreadTaskRunnerHandle::Get(), std::move(callback));
wrapper->Start(request_id, request);
return wrapper->GetCancelCallback();
}
private:
friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>;
OpenInputStreamWrapper(
std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
OnInputStreamOpenedCallback callback)
: delegate_(std::move(delegate)),
work_thread_task_runner_(work_thread_task_runner),
job_thread_task_runner_(job_thread_task_runner),
callback_(std::move(callback)) {}
virtual ~OpenInputStreamWrapper() {}
void Start(const RequestId& request_id,
const network::ResourceRequest& request) {
work_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread,
base::WrapRefCounted(this), request_id, request));
}
base::OnceClosure GetCancelCallback() {
return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread,
base::WrapRefCounted(this));
}
void CancelOnJobThread() {
DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
if (callback_.is_null())
return;
callback_.Reset();
work_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread,
base::WrapRefCounted(this)));
}
void CancelOnWorkThread() {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
if (is_canceled_)
return;
is_canceled_ = true;
OnCallback(nullptr);
}
void OpenOnWorkThread(const RequestId& request_id,
const network::ResourceRequest& request) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
if (is_canceled_)
return;
// |delegate_| will remain valid until OnCallback() is executed on
// |job_thread_task_runner_|.
if (!delegate_->OpenInputStream(
request_id, request,
base::BindOnce(&OpenInputStreamWrapper::OnCallback,
base::WrapRefCounted(this)))) {
is_canceled_ = true;
OnCallback(nullptr);
}
}
void OnCallback(std::unique_ptr<InputStream> input_stream) {
if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) {
job_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&OpenInputStreamWrapper::OnCallback,
base::WrapRefCounted(this), std::move(input_stream)));
return;
}
// May be null if CancelOnJobThread() was called on
// |job_thread_task_runner_| while OpenOnWorkThread() was pending on
// |work_thread_task_runner_|.
if (callback_.is_null()) {
delegate_.reset();
return;
}
std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
}
std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
// Only accessed on |job_thread_task_runner_|.
OnInputStreamOpenedCallback callback_;
// Only accessed on |work_thread_task_runner_|.
bool is_canceled_ = false;
DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper);
};
} // namespace
//==============================
// InputStreamReader
//=============================
// Class responsible for reading from the InputStream.
class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> {
public:
// The constructor is called on the IO thread, not on the worker thread.
// Callbacks will be executed on the IO thread.
InputStreamReader(
std::unique_ptr<InputStream> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner);
// Skip |skip_bytes| number of bytes from |stream_|. |callback| will be
// executed asynchronously on the IO thread. A negative value passed to
// |callback| will indicate an error code, a positive value will indicate the
// number of bytes skipped.
void Skip(int64_t skip_bytes, InputStream::SkipCallback callback);
// Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be
// executed asynchronously on the IO thread. A negative value passed to
// |callback| will indicate an error code, a positive value will indicate the
// number of bytes read.
void Read(scoped_refptr<net::IOBuffer> dest,
int dest_size,
InputStream::ReadCallback callback);
private:
friend class base::RefCountedThreadSafe<InputStreamReader>;
virtual ~InputStreamReader();
void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback);
void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer,
int buffer_size,
InputStream::ReadCallback callback);
void SkipToRequestedRange();
static void ContinueSkipCallback(
scoped_refptr<InputStreamReader> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
int callback_id,
int64_t bytes_skipped);
static void ContinueReadCallback(
scoped_refptr<InputStreamReader> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
int callback_id,
int bytes_read);
void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped);
void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read);
void RunSkipCallback(int64_t bytes_skipped);
void RunReadCallback(int bytes_read);
static void RunSkipCallbackOnJobThread(
int64_t bytes_skipped,
InputStream::SkipCallback skip_callback);
static void RunReadCallbackOnJobThread(
int bytes_read,
InputStream::ReadCallback read_callback);
std::unique_ptr<InputStream> stream_;
// All InputStream methods are called this task runner.
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
// All callbacks are executed on this task runner.
scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
// The below members are only accessed on the work thread.
int64_t bytes_skipped_;
int64_t bytes_to_skip_;
InputStream::SkipCallback pending_skip_callback_;
scoped_refptr<net::IOBuffer> buffer_;
InputStream::ReadCallback pending_read_callback_;
int pending_callback_id_ = -1;
int next_callback_id_ = 0;
DISALLOW_COPY_AND_ASSIGN(InputStreamReader);
};
InputStreamReader::InputStreamReader(
std::unique_ptr<InputStream> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)
: stream_(std::move(stream)),
work_thread_task_runner_(work_thread_task_runner),
job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
CEF_REQUIRE_IOT();
DCHECK(stream_);
DCHECK(work_thread_task_runner_);
}
InputStreamReader::~InputStreamReader() {}
void InputStreamReader::Skip(int64_t skip_bytes,
InputStream::SkipCallback callback) {
work_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread,
base::WrapRefCounted(this), skip_bytes,
std::move(callback)));
}
void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest,
int dest_size,
InputStream::ReadCallback callback) {
work_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread,
base::WrapRefCounted(this), dest, dest_size,
std::move(callback)));
}
void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes,
InputStream::SkipCallback callback) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
// No callback should currently be pending.
DCHECK_EQ(pending_callback_id_, -1);
DCHECK(pending_skip_callback_.is_null());
pending_skip_callback_ = std::move(callback);
if (skip_bytes <= 0) {
RunSkipCallback(0);
return;
}
bytes_skipped_ = bytes_to_skip_ = skip_bytes;
SkipToRequestedRange();
}
void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,
int dest_size,
InputStream::ReadCallback callback) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
// No callback should currently be pending.
DCHECK_EQ(pending_callback_id_, -1);
DCHECK(pending_read_callback_.is_null());
pending_read_callback_ = std::move(callback);
if (!dest_size) {
RunReadCallback(0);
return;
}
DCHECK_GT(dest_size, 0);
buffer_ = dest;
pending_callback_id_ = ++next_callback_id_;
int bytes_read = 0;
bool result = stream_->Read(
buffer_.get(), dest_size, &bytes_read,
base::BindOnce(&InputStreamReader::ContinueReadCallback,
base::WrapRefCounted(this), work_thread_task_runner_,
pending_callback_id_));
// Check if the callback will execute asynchronously.
if (result && bytes_read == 0)
return;
RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED);
}
void InputStreamReader::SkipToRequestedRange() {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
// Skip to the start of the requested data. This has to be done in a loop
// because the underlying InputStream is not guaranteed to skip the requested
// number of bytes.
do {
pending_callback_id_ = ++next_callback_id_;
int64_t skipped = 0;
bool result = stream_->Skip(
bytes_to_skip_, &skipped,
base::BindOnce(&InputStreamReader::ContinueSkipCallback,
base::WrapRefCounted(this), work_thread_task_runner_,
pending_callback_id_));
// Check if the callback will execute asynchronously.
if (result && skipped == 0)
return;
if (!result || skipped <= 0) {
RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
return;
}
DCHECK_LE(skipped, bytes_to_skip_);
bytes_to_skip_ -= skipped;
} while (bytes_to_skip_ > 0);
// All done, the requested number of bytes were skipped.
RunSkipCallback(bytes_skipped_);
}
// static
void InputStreamReader::ContinueSkipCallback(
scoped_refptr<InputStreamReader> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
int callback_id,
int64_t bytes_skipped) {
// Always execute asynchronously.
work_thread_task_runner->PostTask(
FROM_HERE,
base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread,
stream, callback_id, bytes_skipped));
}
// static
void InputStreamReader::ContinueReadCallback(
scoped_refptr<InputStreamReader> stream,
scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
int callback_id,
int bytes_read) {
// Always execute asynchronously.
work_thread_task_runner->PostTask(
FROM_HERE,
base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread,
stream, callback_id, bytes_read));
}
void InputStreamReader::ContinueSkipCallbackOnWorkThread(
int callback_id,
int64_t bytes_skipped) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
// Check for out of order callbacks.
if (pending_callback_id_ != callback_id)
return;
DCHECK_LE(bytes_skipped, bytes_to_skip_);
if (bytes_to_skip_ > 0 && bytes_skipped > 0)
bytes_to_skip_ -= bytes_skipped;
if (bytes_skipped <= 0) {
RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
} else if (bytes_to_skip_ > 0) {
// Continue execution asynchronously.
work_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&InputStreamReader::SkipToRequestedRange, this));
} else {
// All done, the requested number of bytes were skipped.
RunSkipCallback(bytes_skipped_);
}
}
void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id,
int bytes_read) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
// Check for out of order callbacks.
if (pending_callback_id_ != callback_id)
return;
RunReadCallback(bytes_read);
}
void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
DCHECK(!pending_skip_callback_.is_null());
job_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(InputStreamReader::RunSkipCallbackOnJobThread,
bytes_skipped, std::move(pending_skip_callback_)));
// Reset callback state.
pending_callback_id_ = -1;
bytes_skipped_ = bytes_to_skip_ = -1;
}
void InputStreamReader::RunReadCallback(int bytes_read) {
DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
DCHECK(!pending_read_callback_.is_null());
job_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(InputStreamReader::RunReadCallbackOnJobThread,
bytes_read, std::move(pending_read_callback_)));
// Reset callback state.
pending_callback_id_ = -1;
buffer_ = nullptr;
}
// static
void InputStreamReader::RunSkipCallbackOnJobThread(
int64_t bytes_skipped,
InputStream::SkipCallback skip_callback) {
std::move(skip_callback).Run(bytes_skipped);
}
// static
void InputStreamReader::RunReadCallbackOnJobThread(
int bytes_read,
InputStream::ReadCallback read_callback) {
std::move(read_callback).Run(bytes_read);
}
//==============================
// RequestId
//==============================
std::string RequestId::ToString() const {
return base::StringPrintf("RequestId(%u, %u)", request_id_, routing_id_);
}
std::string RequestId::ToString(base::StringPiece debug_label) const {
return base::StringPrintf("RequestId[%s](%u, %u)",
debug_label.as_string().c_str(), request_id_,
routing_id_);
}
std::ostream& operator<<(std::ostream& out, const RequestId& request_id) {
return out << request_id.ToString();
}
//==============================
// StreamReaderURLLoader
//=============================
StreamReaderURLLoader::StreamReaderURLLoader(
const RequestId& request_id,
const network::ResourceRequest& request,
network::mojom::URLLoaderClientPtr client,
network::mojom::TrustedHeaderClientPtr header_client,
const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
std::unique_ptr<Delegate> response_delegate)
: request_id_(request_id),
request_(request),
client_(std::move(client)),
header_client_(std::move(header_client)),
traffic_annotation_(traffic_annotation),
response_delegate_(std::move(response_delegate)),
writable_handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get()),
weak_factory_(this) {
DCHECK(response_delegate_);
// If there is a client error, clean up the request.
client_.set_connection_error_handler(
base::BindOnce(&StreamReaderURLLoader::RequestComplete,
weak_factory_.GetWeakPtr(), net::ERR_ABORTED));
// All InputStream work will be performed on this task runner.
stream_work_task_runner_ =
base::CreateSequencedTaskRunner({base::ThreadPool(), base::MayBlock()});
}
StreamReaderURLLoader::~StreamReaderURLLoader() {
if (open_cancel_callback_) {
// Release the Delegate held by OpenInputStreamWrapper.
std::move(open_cancel_callback_).Run();
}
}
void StreamReaderURLLoader::Start() {
DCHECK(thread_checker_.CalledOnValidThread());
if (!ParseRange(request_.headers)) {
RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
return;
}
if (header_client_.is_bound()) {
header_client_->OnBeforeSendHeaders(
request_.headers,
base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders,
weak_factory_.GetWeakPtr()));
} else {
ContinueWithRequestHeaders(net::OK, base::nullopt);
}
}
void StreamReaderURLLoader::ContinueWithRequestHeaders(
int32_t result,
const base::Optional<net::HttpRequestHeaders>& headers) {
if (result != net::OK) {
RequestComplete(result);
return;
}
if (headers) {
DCHECK(header_client_.is_bound());
request_.headers = *headers;
}
open_cancel_callback_ = OpenInputStreamWrapper::Open(
// This is intentional - the loader could be deleted while
// the callback is executing on the background thread. The
// delegate will be "returned" to the loader once the
// InputStream open attempt is completed.
std::move(response_delegate_), stream_work_task_runner_, request_id_,
request_,
base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened,
weak_factory_.GetWeakPtr()));
}
void StreamReaderURLLoader::FollowRedirect(
const std::vector<std::string>& removed_headers,
const net::HttpRequestHeaders& modified_headers,
const base::Optional<GURL>& new_url) {
NOTREACHED();
}
void StreamReaderURLLoader::SetPriority(net::RequestPriority priority,
int intra_priority_value) {}
void StreamReaderURLLoader::PauseReadingBodyFromNet() {}
void StreamReaderURLLoader::ResumeReadingBodyFromNet() {}
void StreamReaderURLLoader::OnInputStreamOpened(
std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,
std::unique_ptr<InputStream> input_stream) {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(returned_delegate);
response_delegate_ = std::move(returned_delegate);
open_cancel_callback_.Reset();
if (!input_stream) {
bool restarted = false;
response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted);
if (restarted) {
// The request has been restarted with a new loader.
// |this| will be deleted.
CleanUp();
} else {
HeadersComplete(net::HTTP_NOT_FOUND, -1);
}
return;
}
input_stream_reader_ = base::MakeRefCounted<InputStreamReader>(
std::move(input_stream), stream_work_task_runner_);
if (!byte_range_valid()) {
OnReaderSkipCompleted(0);
} else {
input_stream_reader_->Skip(
byte_range_.first_byte_position(),
base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted,
weak_factory_.GetWeakPtr()));
}
}
void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!byte_range_valid()) {
// Expected content length is unspecified.
HeadersComplete(net::HTTP_OK, -1);
} else if (bytes_skipped == byte_range_.first_byte_position()) {
// We skipped the expected number of bytes.
int64_t expected_content_length = byte_range_.last_byte_position() -
byte_range_.first_byte_position() + 1;
DCHECK_GE(expected_content_length, 0);
HeadersComplete(net::HTTP_OK, expected_content_length);
} else {
RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED);
}
}
void StreamReaderURLLoader::HeadersComplete(int orig_status_code,
int64_t expected_content_length) {
DCHECK(thread_checker_.CalledOnValidThread());
int status_code = orig_status_code;
std::string status_text =
net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code));
std::string mime_type, charset;
int64_t content_length = expected_content_length;
ResourceResponse::HeaderMap extra_headers;
response_delegate_->GetResponseHeaders(request_id_, &status_code,
&status_text, &mime_type, &charset,
&content_length, &extra_headers);
if (status_code < 0) {
// Early exit if the handler reported an error.
RequestComplete(status_code);
return;
}
auto pending_response = network::mojom::URLResponseHead::New();
pending_response->request_start = base::TimeTicks::Now();
pending_response->response_start = base::TimeTicks::Now();
auto headers = MakeResponseHeaders(
status_code, status_text, mime_type, charset, content_length,
extra_headers, false /* allow_existing_header_override */);
pending_response->headers = headers;
if (content_length >= 0)
pending_response->content_length = content_length;
if (!mime_type.empty()) {
pending_response->mime_type = mime_type;
if (!charset.empty())
pending_response->charset = charset;
}
if (header_client_.is_bound()) {
header_client_->OnHeadersReceived(
headers->raw_headers(), net::IPEndPoint(),
base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders,
weak_factory_.GetWeakPtr(),
std::move(pending_response)));
} else {
ContinueWithResponseHeaders(std::move(pending_response), net::OK,
base::nullopt, base::nullopt);
}
}
void StreamReaderURLLoader::ContinueWithResponseHeaders(
network::mojom::URLResponseHeadPtr pending_response,
int32_t result,
const base::Optional<std::string>& headers,
const base::Optional<GURL>& redirect_url) {
if (result != net::OK) {
RequestComplete(result);
return;
}
if (headers) {
DCHECK(header_client_.is_bound());
pending_response->headers =
base::MakeRefCounted<net::HttpResponseHeaders>(*headers);
}
auto pending_headers = pending_response->headers;
// What the length would be if we sent headers over the network. Used to
// calculate data length.
header_length_ = pending_headers->raw_headers().length();
DCHECK(client_.is_bound());
std::string location;
const auto has_redirect_url = redirect_url && !redirect_url->is_empty();
if (has_redirect_url || pending_headers->IsRedirect(&location)) {
pending_response->encoded_data_length = header_length_;
pending_response->content_length = pending_response->encoded_body_length =
0;
const GURL new_location =
has_redirect_url ? *redirect_url : request_.url.Resolve(location);
client_->OnReceiveRedirect(
MakeRedirectInfo(request_, pending_headers.get(), new_location,
pending_headers->response_code()),
std::move(pending_response));
// The client will restart the request with a new loader.
// |this| will be deleted.
CleanUp();
} else {
client_->OnReceiveResponse(std::move(pending_response));
}
}
void StreamReaderURLLoader::ContinueResponse(bool was_redirected) {
DCHECK(thread_checker_.CalledOnValidThread());
if (was_redirected) {
// Special case where we allow the client to perform the redirect.
// The client will restart the request with a new loader.
// |this| will be deleted.
CleanUp();
} else {
SendBody();
}
}
void StreamReaderURLLoader::SendBody() {
DCHECK(thread_checker_.CalledOnValidThread());
mojo::ScopedDataPipeConsumerHandle consumer_handle;
if (CreateDataPipe(nullptr /*options*/, &producer_handle_,
&consumer_handle) != MOJO_RESULT_OK) {
RequestComplete(net::ERR_FAILED);
return;
}
writable_handle_watcher_.Watch(
producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable,
base::Unretained(this)));
client_->OnStartLoadingResponseBody(std::move(consumer_handle));
ReadMore();
}
void StreamReaderURLLoader::ReadMore() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(!pending_buffer_.get());
uint32_t num_bytes;
MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
&producer_handle_, &pending_buffer_, &num_bytes);
if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
// The pipe is full. We need to wait for it to have more space.
writable_handle_watcher_.ArmOrNotify();
return;
} else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
// The data pipe consumer handle has been closed.
RequestComplete(net::ERR_ABORTED);
return;
} else if (mojo_result != MOJO_RESULT_OK) {
// The body stream is in a bad state. Bail out.
RequestComplete(net::ERR_UNEXPECTED);
return;
}
scoped_refptr<net::IOBuffer> buffer(
new network::NetToMojoIOBuffer(pending_buffer_.get()));
if (!input_stream_reader_.get()) {
// This will happen if opening the InputStream fails in which case the
// error is communicated by setting the HTTP response status header rather
// than failing the request during the header fetch phase.
OnReaderReadCompleted(0);
return;
}
input_stream_reader_->Read(
buffer, base::checked_cast<int>(num_bytes),
base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted,
weak_factory_.GetWeakPtr()));
}
void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) {
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
RequestComplete(net::ERR_ABORTED);
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK) << result;
ReadMore();
}
void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(pending_buffer_);
if (bytes_read < 0) {
// Error case.
RequestComplete(bytes_read);
return;
}
if (bytes_read == 0) {
// Eof, read completed.
pending_buffer_->Complete(0);
RequestComplete(net::OK);
return;
}
producer_handle_ = pending_buffer_->Complete(bytes_read);
pending_buffer_ = nullptr;
client_->OnTransferSizeUpdated(bytes_read);
total_bytes_read_ += bytes_read;
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore,
weak_factory_.GetWeakPtr()));
}
void StreamReaderURLLoader::RequestComplete(int status_code) {
DCHECK(thread_checker_.CalledOnValidThread());
auto status = network::URLLoaderCompletionStatus(status_code);
status.completion_time = base::TimeTicks::Now();
status.encoded_data_length = total_bytes_read_ + header_length_;
status.encoded_body_length = total_bytes_read_;
// We don't support decoders, so use the same value.
status.decoded_body_length = total_bytes_read_;
client_->OnComplete(status);
CleanUp();
}
void StreamReaderURLLoader::CleanUp() {
DCHECK(thread_checker_.CalledOnValidThread());
// Resets the watchers and pipes, so that we will never be called back.
writable_handle_watcher_.Cancel();
pending_buffer_ = nullptr;
producer_handle_.reset();
// Manages its own lifetime.
delete this;
}
bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) {
DCHECK(thread_checker_.CalledOnValidThread());
std::string range_header;
if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
// This loader only cares about the Range header so that we know how many
// bytes in the stream to skip and how many to read after that.
std::vector<net::HttpByteRange> ranges;
if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
// In case of multi-range request only use the first range.
// We don't support multirange requests.
if (ranges.size() == 1)
byte_range_ = ranges[0];
} else {
// This happens if the range header could not be parsed or is invalid.
return false;
}
}
return true;
}
bool StreamReaderURLLoader::byte_range_valid() const {
return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0;
}
} // namespace net_service