mirror of
				https://bitbucket.org/chromiumembedded/cef
				synced 2025-06-05 21:39:12 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			840 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			840 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions
 | |
| // Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this
 | |
| // source code is governed by a BSD-style license that can be found in the
 | |
| // LICENSE file.
 | |
| 
 | |
| #include "libcef/browser/net_service/stream_reader_url_loader.h"
 | |
| 
 | |
| #include "libcef/browser/thread_util.h"
 | |
| #include "libcef/common/net_service/net_service_util.h"
 | |
| 
 | |
| #include "base/bind.h"
 | |
| #include "base/callback.h"
 | |
| #include "base/strings/string_number_conversions.h"
 | |
| #include "base/strings/string_util.h"
 | |
| #include "base/strings/stringprintf.h"
 | |
| #include "base/task/post_task.h"
 | |
| #include "base/threading/thread.h"
 | |
| #include "base/threading/thread_task_runner_handle.h"
 | |
| #include "content/public/browser/browser_thread.h"
 | |
| #include "net/base/io_buffer.h"
 | |
| #include "net/http/http_status_code.h"
 | |
| #include "net/http/http_util.h"
 | |
| #include "services/network/public/cpp/url_loader_completion_status.h"
 | |
| 
 | |
| namespace net_service {
 | |
| 
 | |
| namespace {
 | |
| 
 | |
| using OnInputStreamOpenedCallback =
 | |
|     base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>,
 | |
|                             std::unique_ptr<InputStream>)>;
 | |
| 
 | |
| // Helper for executing the OnInputStreamOpenedCallback.
 | |
| class OpenInputStreamWrapper
 | |
|     : public base::RefCountedThreadSafe<OpenInputStreamWrapper> {
 | |
|  public:
 | |
|   static base::OnceClosure Open(
 | |
|       std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
 | |
|       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | |
|       const RequestId& request_id,
 | |
|       const network::ResourceRequest& request,
 | |
|       OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
 | |
|     scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
 | |
|         std::move(delegate), base::ThreadTaskRunnerHandle::Get(),
 | |
|         std::move(callback));
 | |
| 
 | |
|     work_thread_task_runner->PostTask(
 | |
|         FROM_HERE,
 | |
|         base::BindOnce(OpenInputStreamWrapper::OpenOnWorkThread,
 | |
|                        // This is intentional - the loader could be deleted
 | |
|                        // while the callback is executing on the background
 | |
|                        // thread. The delegate will be "returned" to the loader
 | |
|                        // once the InputStream open attempt is completed.
 | |
|                        wrapper, request_id, request));
 | |
| 
 | |
|     return wrapper->GetCancelCallback();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>;
 | |
| 
 | |
|   OpenInputStreamWrapper(
 | |
|       std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
 | |
|       scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
 | |
|       OnInputStreamOpenedCallback callback)
 | |
|       : delegate_(std::move(delegate)),
 | |
|         job_thread_task_runner_(job_thread_task_runner),
 | |
|         callback_(std::move(callback)) {}
 | |
|   virtual ~OpenInputStreamWrapper() {}
 | |
| 
 | |
|   static void OpenOnWorkThread(scoped_refptr<OpenInputStreamWrapper> wrapper,
 | |
|                                const RequestId& request_id,
 | |
|                                const network::ResourceRequest& request) {
 | |
|     DCHECK(!CEF_CURRENTLY_ON_IOT());
 | |
|     DCHECK(!CEF_CURRENTLY_ON_UIT());
 | |
| 
 | |
|     wrapper->Open(request_id, request);
 | |
|   }
 | |
| 
 | |
|   base::OnceClosure GetCancelCallback() {
 | |
|     return base::BindOnce(&OpenInputStreamWrapper::Cancel,
 | |
|                           base::WrapRefCounted(this));
 | |
|   }
 | |
| 
 | |
|   void Cancel() {
 | |
|     DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
|     delegate_.reset();
 | |
|     callback_.Reset();
 | |
|   }
 | |
| 
 | |
|   void Open(const RequestId& request_id,
 | |
|             const network::ResourceRequest& request) {
 | |
|     if (!delegate_->OpenInputStream(
 | |
|             request_id, request,
 | |
|             base::BindOnce(&OpenInputStreamWrapper::OnCallback,
 | |
|                            base::WrapRefCounted(this)))) {
 | |
|       OnCallback(nullptr);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void OnCallback(std::unique_ptr<InputStream> input_stream) {
 | |
|     if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) {
 | |
|       job_thread_task_runner_->PostTask(
 | |
|           FROM_HERE,
 | |
|           base::BindOnce(&OpenInputStreamWrapper::OnCallback,
 | |
|                          base::WrapRefCounted(this), std::move(input_stream)));
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     // May be null if we were canceled.
 | |
|     if (callback_.is_null())
 | |
|       return;
 | |
| 
 | |
|     std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
 | |
|   }
 | |
| 
 | |
|   std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
 | |
|   scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
 | |
|   OnInputStreamOpenedCallback callback_;
 | |
| 
 | |
|   DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper);
 | |
| };
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| //==============================
 | |
| // InputStreamReader
 | |
| //=============================
 | |
| 
 | |
| // Class responsible for reading from the InputStream.
 | |
| class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> {
 | |
|  public:
 | |
|   // The constructor is called on the IO thread, not on the worker thread.
 | |
|   // Callbacks will be executed on the IO thread.
 | |
|   InputStreamReader(
 | |
|       std::unique_ptr<InputStream> stream,
 | |
|       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner);
 | |
| 
 | |
|   // Skip |skip_bytes| number of bytes from |stream_|. |callback| will be
 | |
|   // executed asynchronously on the IO thread. A negative value passed to
 | |
|   // |callback| will indicate an error code, a positive value will indicate the
 | |
|   // number of bytes skipped.
 | |
|   void Skip(int64_t skip_bytes, InputStream::SkipCallback callback);
 | |
| 
 | |
|   // Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be
 | |
|   // executed asynchronously on the IO thread. A negative value passed to
 | |
|   // |callback| will indicate an error code, a positive value will indicate the
 | |
|   // number of bytes read.
 | |
|   void Read(scoped_refptr<net::IOBuffer> dest,
 | |
|             int dest_size,
 | |
|             InputStream::ReadCallback callback);
 | |
| 
 | |
|  private:
 | |
|   friend class base::RefCountedThreadSafe<InputStreamReader>;
 | |
|   virtual ~InputStreamReader();
 | |
| 
 | |
|   void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback);
 | |
|   void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer,
 | |
|                         int buffer_size,
 | |
|                         InputStream::ReadCallback callback);
 | |
| 
 | |
|   void SkipToRequestedRange();
 | |
| 
 | |
|   static void ContinueSkipCallback(
 | |
|       scoped_refptr<InputStreamReader> stream,
 | |
|       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | |
|       int callback_id,
 | |
|       int64_t bytes_skipped);
 | |
|   static void ContinueReadCallback(
 | |
|       scoped_refptr<InputStreamReader> stream,
 | |
|       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | |
|       int callback_id,
 | |
|       int bytes_read);
 | |
| 
 | |
|   void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped);
 | |
|   void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read);
 | |
| 
 | |
|   void RunSkipCallback(int64_t bytes_skipped);
 | |
|   void RunReadCallback(int bytes_read);
 | |
| 
 | |
|   static void RunSkipCallbackOnJobThread(
 | |
|       int64_t bytes_skipped,
 | |
|       InputStream::SkipCallback skip_callback);
 | |
|   static void RunReadCallbackOnJobThread(
 | |
|       int bytes_read,
 | |
|       InputStream::ReadCallback read_callback);
 | |
| 
 | |
|   std::unique_ptr<InputStream> stream_;
 | |
| 
 | |
|   // All InputStream methods are called this task runner.
 | |
|   scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
 | |
| 
 | |
|   // All callbacks are executed on this task runner.
 | |
|   scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
 | |
| 
 | |
|   // The below members are only accessed on the work thread.
 | |
|   int64_t bytes_skipped_;
 | |
|   int64_t bytes_to_skip_;
 | |
|   InputStream::SkipCallback pending_skip_callback_;
 | |
| 
 | |
|   scoped_refptr<net::IOBuffer> buffer_;
 | |
|   InputStream::ReadCallback pending_read_callback_;
 | |
| 
 | |
|   int pending_callback_id_ = -1;
 | |
| 
 | |
|   int next_callback_id_ = 0;
 | |
| 
 | |
|   DISALLOW_COPY_AND_ASSIGN(InputStreamReader);
 | |
| };
 | |
| 
 | |
| InputStreamReader::InputStreamReader(
 | |
|     std::unique_ptr<InputStream> stream,
 | |
|     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)
 | |
|     : stream_(std::move(stream)),
 | |
|       work_thread_task_runner_(work_thread_task_runner),
 | |
|       job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
 | |
|   CEF_REQUIRE_IOT();
 | |
|   DCHECK(stream_);
 | |
|   DCHECK(work_thread_task_runner_);
 | |
| }
 | |
| 
 | |
| InputStreamReader::~InputStreamReader() {}
 | |
| 
 | |
| void InputStreamReader::Skip(int64_t skip_bytes,
 | |
|                              InputStream::SkipCallback callback) {
 | |
|   work_thread_task_runner_->PostTask(
 | |
|       FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread,
 | |
|                                 base::WrapRefCounted(this), skip_bytes,
 | |
|                                 std::move(callback)));
 | |
| }
 | |
| 
 | |
| void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest,
 | |
|                              int dest_size,
 | |
|                              InputStream::ReadCallback callback) {
 | |
|   work_thread_task_runner_->PostTask(
 | |
|       FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread,
 | |
|                                 base::WrapRefCounted(this), dest, dest_size,
 | |
|                                 std::move(callback)));
 | |
| }
 | |
| 
 | |
| void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes,
 | |
|                                          InputStream::SkipCallback callback) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   // No callback should currently be pending.
 | |
|   DCHECK_EQ(pending_callback_id_, -1);
 | |
|   DCHECK(pending_skip_callback_.is_null());
 | |
| 
 | |
|   pending_skip_callback_ = std::move(callback);
 | |
| 
 | |
|   if (skip_bytes <= 0) {
 | |
|     RunSkipCallback(0);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   bytes_skipped_ = bytes_to_skip_ = skip_bytes;
 | |
|   SkipToRequestedRange();
 | |
| }
 | |
| 
 | |
| void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,
 | |
|                                          int dest_size,
 | |
|                                          InputStream::ReadCallback callback) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   // No callback should currently be pending.
 | |
|   DCHECK_EQ(pending_callback_id_, -1);
 | |
|   DCHECK(pending_read_callback_.is_null());
 | |
| 
 | |
|   pending_read_callback_ = std::move(callback);
 | |
| 
 | |
|   if (!dest_size) {
 | |
|     RunReadCallback(0);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   DCHECK_GT(dest_size, 0);
 | |
| 
 | |
|   buffer_ = dest;
 | |
|   pending_callback_id_ = ++next_callback_id_;
 | |
| 
 | |
|   int bytes_read = 0;
 | |
|   bool result = stream_->Read(
 | |
|       buffer_.get(), dest_size, &bytes_read,
 | |
|       base::BindOnce(&InputStreamReader::ContinueReadCallback,
 | |
|                      base::WrapRefCounted(this), work_thread_task_runner_,
 | |
|                      pending_callback_id_));
 | |
| 
 | |
|   // Check if the callback will execute asynchronously.
 | |
|   if (result && bytes_read == 0)
 | |
|     return;
 | |
| 
 | |
|   RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED);
 | |
| }
 | |
| 
 | |
| void InputStreamReader::SkipToRequestedRange() {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   // Skip to the start of the requested data. This has to be done in a loop
 | |
|   // because the underlying InputStream is not guaranteed to skip the requested
 | |
|   // number of bytes.
 | |
|   do {
 | |
|     pending_callback_id_ = ++next_callback_id_;
 | |
| 
 | |
|     int64_t skipped = 0;
 | |
|     bool result = stream_->Skip(
 | |
|         bytes_to_skip_, &skipped,
 | |
|         base::BindOnce(&InputStreamReader::ContinueSkipCallback,
 | |
|                        base::WrapRefCounted(this), work_thread_task_runner_,
 | |
|                        pending_callback_id_));
 | |
| 
 | |
|     // Check if the callback will execute asynchronously.
 | |
|     if (result && skipped == 0)
 | |
|       return;
 | |
| 
 | |
|     if (!result || skipped <= 0) {
 | |
|       RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | |
|       return;
 | |
|     }
 | |
|     DCHECK_LE(skipped, bytes_to_skip_);
 | |
| 
 | |
|     bytes_to_skip_ -= skipped;
 | |
|   } while (bytes_to_skip_ > 0);
 | |
| 
 | |
|   // All done, the requested number of bytes were skipped.
 | |
|   RunSkipCallback(bytes_skipped_);
 | |
| }
 | |
| 
 | |
| // static
 | |
| void InputStreamReader::ContinueSkipCallback(
 | |
|     scoped_refptr<InputStreamReader> stream,
 | |
|     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | |
|     int callback_id,
 | |
|     int64_t bytes_skipped) {
 | |
|   // Always execute asynchronously.
 | |
|   work_thread_task_runner->PostTask(
 | |
|       FROM_HERE,
 | |
|       base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread,
 | |
|                      stream, callback_id, bytes_skipped));
 | |
| }
 | |
| 
 | |
| // static
 | |
| void InputStreamReader::ContinueReadCallback(
 | |
|     scoped_refptr<InputStreamReader> stream,
 | |
|     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | |
|     int callback_id,
 | |
|     int bytes_read) {
 | |
|   // Always execute asynchronously.
 | |
|   work_thread_task_runner->PostTask(
 | |
|       FROM_HERE,
 | |
|       base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread,
 | |
|                      stream, callback_id, bytes_read));
 | |
| }
 | |
| 
 | |
| void InputStreamReader::ContinueSkipCallbackOnWorkThread(
 | |
|     int callback_id,
 | |
|     int64_t bytes_skipped) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   // Check for out of order callbacks.
 | |
|   if (pending_callback_id_ != callback_id)
 | |
|     return;
 | |
| 
 | |
|   DCHECK_LE(bytes_skipped, bytes_to_skip_);
 | |
| 
 | |
|   if (bytes_to_skip_ > 0 && bytes_skipped > 0)
 | |
|     bytes_to_skip_ -= bytes_skipped;
 | |
| 
 | |
|   if (bytes_skipped <= 0) {
 | |
|     RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | |
|   } else if (bytes_to_skip_ > 0) {
 | |
|     // Continue execution asynchronously.
 | |
|     work_thread_task_runner_->PostTask(
 | |
|         FROM_HERE,
 | |
|         base::BindOnce(&InputStreamReader::SkipToRequestedRange, this));
 | |
|   } else {
 | |
|     // All done, the requested number of bytes were skipped.
 | |
|     RunSkipCallback(bytes_skipped_);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id,
 | |
|                                                          int bytes_read) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   // Check for out of order callbacks.
 | |
|   if (pending_callback_id_ != callback_id)
 | |
|     return;
 | |
| 
 | |
|   RunReadCallback(bytes_read);
 | |
| }
 | |
| 
 | |
| void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   DCHECK(!pending_skip_callback_.is_null());
 | |
|   job_thread_task_runner_->PostTask(
 | |
|       FROM_HERE,
 | |
|       base::Bind(InputStreamReader::RunSkipCallbackOnJobThread, bytes_skipped,
 | |
|                  base::Passed(std::move(pending_skip_callback_))));
 | |
| 
 | |
|   // Reset callback state.
 | |
|   pending_callback_id_ = -1;
 | |
|   bytes_skipped_ = bytes_to_skip_ = -1;
 | |
| }
 | |
| 
 | |
| void InputStreamReader::RunReadCallback(int bytes_read) {
 | |
|   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | |
| 
 | |
|   DCHECK(!pending_read_callback_.is_null());
 | |
|   job_thread_task_runner_->PostTask(
 | |
|       FROM_HERE,
 | |
|       base::Bind(InputStreamReader::RunReadCallbackOnJobThread, bytes_read,
 | |
|                  base::Passed(std::move(pending_read_callback_))));
 | |
| 
 | |
|   // Reset callback state.
 | |
|   pending_callback_id_ = -1;
 | |
|   buffer_ = nullptr;
 | |
| }
 | |
| 
 | |
| // static
 | |
| void InputStreamReader::RunSkipCallbackOnJobThread(
 | |
|     int64_t bytes_skipped,
 | |
|     InputStream::SkipCallback skip_callback) {
 | |
|   std::move(skip_callback).Run(bytes_skipped);
 | |
| }
 | |
| 
 | |
| // static
 | |
| void InputStreamReader::RunReadCallbackOnJobThread(
 | |
|     int bytes_read,
 | |
|     InputStream::ReadCallback read_callback) {
 | |
|   std::move(read_callback).Run(bytes_read);
 | |
| }
 | |
| 
 | |
| //==============================
 | |
| // RequestId
 | |
| //==============================
 | |
| 
 | |
| std::string RequestId::ToString() const {
 | |
|   return base::StringPrintf("RequestId(%u, %u)", request_id_, routing_id_);
 | |
| }
 | |
| 
 | |
| std::string RequestId::ToString(base::StringPiece debug_label) const {
 | |
|   return base::StringPrintf("RequestId[%s](%u, %u)",
 | |
|                             debug_label.as_string().c_str(), request_id_,
 | |
|                             routing_id_);
 | |
| }
 | |
| 
 | |
| std::ostream& operator<<(std::ostream& out, const RequestId& request_id) {
 | |
|   return out << request_id.ToString();
 | |
| }
 | |
| 
 | |
| //==============================
 | |
| // StreamReaderURLLoader
 | |
| //=============================
 | |
| 
 | |
| StreamReaderURLLoader::StreamReaderURLLoader(
 | |
|     const RequestId& request_id,
 | |
|     const network::ResourceRequest& request,
 | |
|     network::mojom::URLLoaderClientPtr client,
 | |
|     network::mojom::TrustedHeaderClientPtr header_client,
 | |
|     const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
 | |
|     std::unique_ptr<Delegate> response_delegate)
 | |
|     : request_id_(request_id),
 | |
|       request_(request),
 | |
|       client_(std::move(client)),
 | |
|       header_client_(std::move(header_client)),
 | |
|       traffic_annotation_(traffic_annotation),
 | |
|       response_delegate_(std::move(response_delegate)),
 | |
|       writable_handle_watcher_(FROM_HERE,
 | |
|                                mojo::SimpleWatcher::ArmingPolicy::MANUAL,
 | |
|                                base::SequencedTaskRunnerHandle::Get()),
 | |
|       weak_factory_(this) {
 | |
|   DCHECK(response_delegate_);
 | |
|   // If there is a client error, clean up the request.
 | |
|   client_.set_connection_error_handler(
 | |
|       base::BindOnce(&StreamReaderURLLoader::RequestComplete,
 | |
|                      weak_factory_.GetWeakPtr(), net::ERR_ABORTED));
 | |
| 
 | |
|   // All InputStream work will be performed on this task runner.
 | |
|   stream_work_task_runner_ =
 | |
|       base::CreateSequencedTaskRunner({base::ThreadPool(), base::MayBlock()});
 | |
| }
 | |
| 
 | |
| StreamReaderURLLoader::~StreamReaderURLLoader() {
 | |
|   if (open_cancel_callback_) {
 | |
|     // Release the Delegate held by OpenInputStreamWrapper.
 | |
|     std::move(open_cancel_callback_).Run();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::Start() {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   if (!ParseRange(request_.headers)) {
 | |
|     RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (header_client_.is_bound()) {
 | |
|     header_client_->OnBeforeSendHeaders(
 | |
|         request_.headers,
 | |
|         base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders,
 | |
|                        weak_factory_.GetWeakPtr()));
 | |
|   } else {
 | |
|     ContinueWithRequestHeaders(net::OK, base::nullopt);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::ContinueWithRequestHeaders(
 | |
|     int32_t result,
 | |
|     const base::Optional<net::HttpRequestHeaders>& headers) {
 | |
|   if (result != net::OK) {
 | |
|     RequestComplete(result);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (headers) {
 | |
|     DCHECK(header_client_.is_bound());
 | |
|     request_.headers = *headers;
 | |
|   }
 | |
| 
 | |
|   open_cancel_callback_ = OpenInputStreamWrapper::Open(
 | |
|       // This is intentional - the loader could be deleted while
 | |
|       // the callback is executing on the background thread. The
 | |
|       // delegate will be "returned" to the loader once the
 | |
|       // InputStream open attempt is completed.
 | |
|       std::move(response_delegate_), stream_work_task_runner_, request_id_,
 | |
|       request_,
 | |
|       base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened,
 | |
|                      weak_factory_.GetWeakPtr()));
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::FollowRedirect(
 | |
|     const std::vector<std::string>& removed_headers,
 | |
|     const net::HttpRequestHeaders& modified_headers,
 | |
|     const base::Optional<GURL>& new_url) {
 | |
|   NOTREACHED();
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::SetPriority(net::RequestPriority priority,
 | |
|                                         int intra_priority_value) {}
 | |
| 
 | |
| void StreamReaderURLLoader::PauseReadingBodyFromNet() {}
 | |
| 
 | |
| void StreamReaderURLLoader::ResumeReadingBodyFromNet() {}
 | |
| 
 | |
| void StreamReaderURLLoader::OnInputStreamOpened(
 | |
|     std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,
 | |
|     std::unique_ptr<InputStream> input_stream) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
|   DCHECK(returned_delegate);
 | |
|   response_delegate_ = std::move(returned_delegate);
 | |
|   open_cancel_callback_.Reset();
 | |
| 
 | |
|   if (!input_stream) {
 | |
|     bool restarted = false;
 | |
|     response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted);
 | |
|     if (restarted) {
 | |
|       // The request has been restarted with a new loader.
 | |
|       // |this| will be deleted.
 | |
|       CleanUp();
 | |
|     } else {
 | |
|       HeadersComplete(net::HTTP_NOT_FOUND, -1);
 | |
|     }
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   input_stream_reader_ = base::MakeRefCounted<InputStreamReader>(
 | |
|       std::move(input_stream), stream_work_task_runner_);
 | |
| 
 | |
|   if (!byte_range_valid()) {
 | |
|     OnReaderSkipCompleted(0);
 | |
|   } else {
 | |
|     input_stream_reader_->Skip(
 | |
|         byte_range_.first_byte_position(),
 | |
|         base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted,
 | |
|                        weak_factory_.GetWeakPtr()));
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   if (!byte_range_valid()) {
 | |
|     // Expected content length is unspecified.
 | |
|     HeadersComplete(net::HTTP_OK, -1);
 | |
|   } else if (bytes_skipped == byte_range_.first_byte_position()) {
 | |
|     // We skipped the expected number of bytes.
 | |
|     int64_t expected_content_length = byte_range_.last_byte_position() -
 | |
|                                       byte_range_.first_byte_position() + 1;
 | |
|     DCHECK_GE(expected_content_length, 0);
 | |
|     HeadersComplete(net::HTTP_OK, expected_content_length);
 | |
|   } else {
 | |
|     RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::HeadersComplete(int orig_status_code,
 | |
|                                             int64_t expected_content_length) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   int status_code = orig_status_code;
 | |
|   std::string status_text =
 | |
|       net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code));
 | |
|   std::string mime_type, charset;
 | |
|   int64_t content_length = expected_content_length;
 | |
|   ResourceResponse::HeaderMap extra_headers;
 | |
|   response_delegate_->GetResponseHeaders(request_id_, &status_code,
 | |
|                                          &status_text, &mime_type, &charset,
 | |
|                                          &content_length, &extra_headers);
 | |
| 
 | |
|   if (status_code < 0) {
 | |
|     // Early exit if the handler reported an error.
 | |
|     RequestComplete(status_code);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   pending_response_.request_start = base::TimeTicks::Now();
 | |
|   pending_response_.response_start = base::TimeTicks::Now();
 | |
|   pending_response_.headers = MakeResponseHeaders(
 | |
|       status_code, status_text, mime_type, charset, content_length,
 | |
|       extra_headers, false /* allow_existing_header_override */);
 | |
| 
 | |
|   if (content_length >= 0)
 | |
|     pending_response_.content_length = content_length;
 | |
| 
 | |
|   if (!mime_type.empty()) {
 | |
|     pending_response_.mime_type = mime_type;
 | |
|     if (!charset.empty())
 | |
|       pending_response_.charset = charset;
 | |
|   }
 | |
| 
 | |
|   if (header_client_.is_bound()) {
 | |
|     header_client_->OnHeadersReceived(
 | |
|         pending_response_.headers->raw_headers(),
 | |
|         base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders,
 | |
|                        weak_factory_.GetWeakPtr()));
 | |
|   } else {
 | |
|     ContinueWithResponseHeaders(net::OK, base::nullopt, GURL());
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::ContinueWithResponseHeaders(
 | |
|     int32_t result,
 | |
|     const base::Optional<std::string>& headers,
 | |
|     const GURL& redirect_url) {
 | |
|   if (result != net::OK) {
 | |
|     RequestComplete(result);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (headers) {
 | |
|     DCHECK(header_client_.is_bound());
 | |
|     pending_response_.headers =
 | |
|         base::MakeRefCounted<net::HttpResponseHeaders>(*headers);
 | |
|   }
 | |
| 
 | |
|   // What the length would be if we sent headers over the network. Used to
 | |
|   // calculate data length.
 | |
|   header_length_ = pending_response_.headers->raw_headers().length();
 | |
| 
 | |
|   DCHECK(client_.is_bound());
 | |
| 
 | |
|   std::string location;
 | |
|   if (!redirect_url.is_empty() ||
 | |
|       pending_response_.headers->IsRedirect(&location)) {
 | |
|     pending_response_.encoded_data_length = header_length_;
 | |
|     pending_response_.content_length = pending_response_.encoded_body_length =
 | |
|         0;
 | |
|     const GURL new_location =
 | |
|         redirect_url.is_empty() ? request_.url.Resolve(location) : redirect_url;
 | |
|     client_->OnReceiveRedirect(
 | |
|         MakeRedirectInfo(request_, pending_response_.headers.get(),
 | |
|                          new_location,
 | |
|                          pending_response_.headers->response_code()),
 | |
|         pending_response_);
 | |
|     // The client will restart the request with a new loader.
 | |
|     // |this| will be deleted.
 | |
|     CleanUp();
 | |
|   } else {
 | |
|     client_->OnReceiveResponse(pending_response_);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::ContinueResponse(bool was_redirected) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   if (was_redirected) {
 | |
|     // Special case where we allow the client to perform the redirect.
 | |
|     // The client will restart the request with a new loader.
 | |
|     // |this| will be deleted.
 | |
|     CleanUp();
 | |
|   } else {
 | |
|     SendBody();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::SendBody() {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   mojo::ScopedDataPipeConsumerHandle consumer_handle;
 | |
|   if (CreateDataPipe(nullptr /*options*/, &producer_handle_,
 | |
|                      &consumer_handle) != MOJO_RESULT_OK) {
 | |
|     RequestComplete(net::ERR_FAILED);
 | |
|     return;
 | |
|   }
 | |
|   writable_handle_watcher_.Watch(
 | |
|       producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
 | |
|       base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable,
 | |
|                           base::Unretained(this)));
 | |
|   client_->OnStartLoadingResponseBody(std::move(consumer_handle));
 | |
| 
 | |
|   ReadMore();
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::ReadMore() {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
|   DCHECK(!pending_buffer_.get());
 | |
| 
 | |
|   uint32_t num_bytes;
 | |
|   MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
 | |
|       &producer_handle_, &pending_buffer_, &num_bytes);
 | |
|   if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
 | |
|     // The pipe is full. We need to wait for it to have more space.
 | |
|     writable_handle_watcher_.ArmOrNotify();
 | |
|     return;
 | |
|   } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
 | |
|     // The data pipe consumer handle has been closed.
 | |
|     RequestComplete(net::ERR_ABORTED);
 | |
|     return;
 | |
|   } else if (mojo_result != MOJO_RESULT_OK) {
 | |
|     // The body stream is in a bad state. Bail out.
 | |
|     RequestComplete(net::ERR_UNEXPECTED);
 | |
|     return;
 | |
|   }
 | |
|   scoped_refptr<net::IOBuffer> buffer(
 | |
|       new network::NetToMojoIOBuffer(pending_buffer_.get()));
 | |
| 
 | |
|   if (!input_stream_reader_.get()) {
 | |
|     // This will happen if opening the InputStream fails in which case the
 | |
|     // error is communicated by setting the HTTP response status header rather
 | |
|     // than failing the request during the header fetch phase.
 | |
|     OnReaderReadCompleted(0);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   input_stream_reader_->Read(
 | |
|       buffer, base::checked_cast<int>(num_bytes),
 | |
|       base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted,
 | |
|                      weak_factory_.GetWeakPtr()));
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) {
 | |
|   if (result == MOJO_RESULT_FAILED_PRECONDITION) {
 | |
|     RequestComplete(net::ERR_ABORTED);
 | |
|     return;
 | |
|   }
 | |
|   DCHECK_EQ(result, MOJO_RESULT_OK) << result;
 | |
| 
 | |
|   ReadMore();
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   DCHECK(pending_buffer_);
 | |
|   if (bytes_read < 0) {
 | |
|     // Error case.
 | |
|     RequestComplete(bytes_read);
 | |
|     return;
 | |
|   }
 | |
|   if (bytes_read == 0) {
 | |
|     // Eof, read completed.
 | |
|     pending_buffer_->Complete(0);
 | |
|     RequestComplete(net::OK);
 | |
|     return;
 | |
|   }
 | |
|   producer_handle_ = pending_buffer_->Complete(bytes_read);
 | |
|   pending_buffer_ = nullptr;
 | |
| 
 | |
|   client_->OnTransferSizeUpdated(bytes_read);
 | |
|   total_bytes_read_ += bytes_read;
 | |
| 
 | |
|   base::ThreadTaskRunnerHandle::Get()->PostTask(
 | |
|       FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore,
 | |
|                                 weak_factory_.GetWeakPtr()));
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::RequestComplete(int status_code) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   auto status = network::URLLoaderCompletionStatus(status_code);
 | |
|   status.completion_time = base::TimeTicks::Now();
 | |
|   status.encoded_data_length = total_bytes_read_ + header_length_;
 | |
|   status.encoded_body_length = total_bytes_read_;
 | |
|   // We don't support decoders, so use the same value.
 | |
|   status.decoded_body_length = total_bytes_read_;
 | |
| 
 | |
|   client_->OnComplete(status);
 | |
|   CleanUp();
 | |
| }
 | |
| 
 | |
| void StreamReaderURLLoader::CleanUp() {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   // Resets the watchers and pipes, so that we will never be called back.
 | |
|   writable_handle_watcher_.Cancel();
 | |
|   pending_buffer_ = nullptr;
 | |
|   producer_handle_.reset();
 | |
| 
 | |
|   // Manages its own lifetime.
 | |
|   delete this;
 | |
| }
 | |
| 
 | |
| bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) {
 | |
|   DCHECK(thread_checker_.CalledOnValidThread());
 | |
| 
 | |
|   std::string range_header;
 | |
|   if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
 | |
|     // This loader only cares about the Range header so that we know how many
 | |
|     // bytes in the stream to skip and how many to read after that.
 | |
|     std::vector<net::HttpByteRange> ranges;
 | |
|     if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
 | |
|       // In case of multi-range request only use the first range.
 | |
|       // We don't support multirange requests.
 | |
|       if (ranges.size() == 1)
 | |
|         byte_range_ = ranges[0];
 | |
|     } else {
 | |
|       // This happens if the range header could not be parsed or is invalid.
 | |
|       return false;
 | |
|     }
 | |
|   }
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| bool StreamReaderURLLoader::byte_range_valid() const {
 | |
|   return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0;
 | |
| }
 | |
| 
 | |
| }  // namespace net_service
 |