mirror of
				https://bitbucket.org/chromiumembedded/cef
				synced 2025-06-05 21:39:12 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			875 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			875 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
// Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions
 | 
						|
// Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this
 | 
						|
// source code is governed by a BSD-style license that can be found in the
 | 
						|
// LICENSE file.
 | 
						|
 | 
						|
#include "libcef/browser/net_service/stream_reader_url_loader.h"
 | 
						|
 | 
						|
#include "libcef/browser/thread_util.h"
 | 
						|
#include "libcef/common/net_service/net_service_util.h"
 | 
						|
 | 
						|
#include "base/bind.h"
 | 
						|
#include "base/callback.h"
 | 
						|
#include "base/strings/string_number_conversions.h"
 | 
						|
#include "base/strings/string_util.h"
 | 
						|
#include "base/strings/stringprintf.h"
 | 
						|
#include "base/task/post_task.h"
 | 
						|
#include "base/threading/thread.h"
 | 
						|
#include "base/threading/thread_task_runner_handle.h"
 | 
						|
#include "content/public/browser/browser_thread.h"
 | 
						|
#include "net/base/io_buffer.h"
 | 
						|
#include "net/http/http_status_code.h"
 | 
						|
#include "net/http/http_util.h"
 | 
						|
#include "services/network/public/cpp/url_loader_completion_status.h"
 | 
						|
 | 
						|
namespace net_service {
 | 
						|
 | 
						|
namespace {
 | 
						|
 | 
						|
using OnInputStreamOpenedCallback =
 | 
						|
    base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>,
 | 
						|
                            std::unique_ptr<InputStream>)>;
 | 
						|
 | 
						|
// Helper for executing the OnInputStreamOpenedCallback.
 | 
						|
class OpenInputStreamWrapper
 | 
						|
    : public base::RefCountedThreadSafe<OpenInputStreamWrapper> {
 | 
						|
 public:
 | 
						|
  static base::OnceClosure Open(
 | 
						|
      std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
 | 
						|
      scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
      const RequestId& request_id,
 | 
						|
      const network::ResourceRequest& request,
 | 
						|
      OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
 | 
						|
    scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
 | 
						|
        std::move(delegate), work_thread_task_runner,
 | 
						|
        base::ThreadTaskRunnerHandle::Get(), std::move(callback));
 | 
						|
    wrapper->Start(request_id, request);
 | 
						|
 | 
						|
    return wrapper->GetCancelCallback();
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>;
 | 
						|
 | 
						|
  OpenInputStreamWrapper(
 | 
						|
      std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
 | 
						|
      scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
      scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
 | 
						|
      OnInputStreamOpenedCallback callback)
 | 
						|
      : delegate_(std::move(delegate)),
 | 
						|
        work_thread_task_runner_(work_thread_task_runner),
 | 
						|
        job_thread_task_runner_(job_thread_task_runner),
 | 
						|
        callback_(std::move(callback)) {}
 | 
						|
  virtual ~OpenInputStreamWrapper() {}
 | 
						|
 | 
						|
  void Start(const RequestId& request_id,
 | 
						|
             const network::ResourceRequest& request) {
 | 
						|
    work_thread_task_runner_->PostTask(
 | 
						|
        FROM_HERE,
 | 
						|
        base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread,
 | 
						|
                       base::WrapRefCounted(this), request_id, request));
 | 
						|
  }
 | 
						|
 | 
						|
  base::OnceClosure GetCancelCallback() {
 | 
						|
    return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread,
 | 
						|
                          base::WrapRefCounted(this));
 | 
						|
  }
 | 
						|
 | 
						|
  void CancelOnJobThread() {
 | 
						|
    DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
    if (callback_.is_null())
 | 
						|
      return;
 | 
						|
 | 
						|
    callback_.Reset();
 | 
						|
 | 
						|
    work_thread_task_runner_->PostTask(
 | 
						|
        FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread,
 | 
						|
                                  base::WrapRefCounted(this)));
 | 
						|
  }
 | 
						|
 | 
						|
  void CancelOnWorkThread() {
 | 
						|
    DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
    if (is_canceled_)
 | 
						|
      return;
 | 
						|
    is_canceled_ = true;
 | 
						|
    OnCallback(nullptr);
 | 
						|
  }
 | 
						|
 | 
						|
  void OpenOnWorkThread(const RequestId& request_id,
 | 
						|
                        const network::ResourceRequest& request) {
 | 
						|
    DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
    if (is_canceled_)
 | 
						|
      return;
 | 
						|
 | 
						|
    // |delegate_| will remain valid until OnCallback() is executed on
 | 
						|
    // |job_thread_task_runner_|.
 | 
						|
    if (!delegate_->OpenInputStream(
 | 
						|
            request_id, request,
 | 
						|
            base::BindOnce(&OpenInputStreamWrapper::OnCallback,
 | 
						|
                           base::WrapRefCounted(this)))) {
 | 
						|
      is_canceled_ = true;
 | 
						|
      OnCallback(nullptr);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  void OnCallback(std::unique_ptr<InputStream> input_stream) {
 | 
						|
    if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) {
 | 
						|
      job_thread_task_runner_->PostTask(
 | 
						|
          FROM_HERE,
 | 
						|
          base::BindOnce(&OpenInputStreamWrapper::OnCallback,
 | 
						|
                         base::WrapRefCounted(this), std::move(input_stream)));
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    // May be null if CancelOnJobThread() was called on
 | 
						|
    // |job_thread_task_runner_| while OpenOnWorkThread() was pending on
 | 
						|
    // |work_thread_task_runner_|.
 | 
						|
    if (callback_.is_null()) {
 | 
						|
      delegate_.reset();
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
 | 
						|
  }
 | 
						|
 | 
						|
  std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
 | 
						|
 | 
						|
  scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
 | 
						|
  scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
 | 
						|
 | 
						|
  // Only accessed on |job_thread_task_runner_|.
 | 
						|
  OnInputStreamOpenedCallback callback_;
 | 
						|
 | 
						|
  // Only accessed on |work_thread_task_runner_|.
 | 
						|
  bool is_canceled_ = false;
 | 
						|
 | 
						|
  DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper);
 | 
						|
};
 | 
						|
 | 
						|
}  // namespace
 | 
						|
 | 
						|
//==============================
 | 
						|
// InputStreamReader
 | 
						|
//=============================
 | 
						|
 | 
						|
// Class responsible for reading from the InputStream.
 | 
						|
class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> {
 | 
						|
 public:
 | 
						|
  // The constructor is called on the IO thread, not on the worker thread.
 | 
						|
  // Callbacks will be executed on the IO thread.
 | 
						|
  InputStreamReader(
 | 
						|
      std::unique_ptr<InputStream> stream,
 | 
						|
      scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner);
 | 
						|
 | 
						|
  // Skip |skip_bytes| number of bytes from |stream_|. |callback| will be
 | 
						|
  // executed asynchronously on the IO thread. A negative value passed to
 | 
						|
  // |callback| will indicate an error code, a positive value will indicate the
 | 
						|
  // number of bytes skipped.
 | 
						|
  void Skip(int64_t skip_bytes, InputStream::SkipCallback callback);
 | 
						|
 | 
						|
  // Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be
 | 
						|
  // executed asynchronously on the IO thread. A negative value passed to
 | 
						|
  // |callback| will indicate an error code, a positive value will indicate the
 | 
						|
  // number of bytes read.
 | 
						|
  void Read(scoped_refptr<net::IOBuffer> dest,
 | 
						|
            int dest_size,
 | 
						|
            InputStream::ReadCallback callback);
 | 
						|
 | 
						|
 private:
 | 
						|
  friend class base::RefCountedThreadSafe<InputStreamReader>;
 | 
						|
  virtual ~InputStreamReader();
 | 
						|
 | 
						|
  void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback);
 | 
						|
  void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer,
 | 
						|
                        int buffer_size,
 | 
						|
                        InputStream::ReadCallback callback);
 | 
						|
 | 
						|
  void SkipToRequestedRange();
 | 
						|
 | 
						|
  static void ContinueSkipCallback(
 | 
						|
      scoped_refptr<InputStreamReader> stream,
 | 
						|
      scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
      int callback_id,
 | 
						|
      int64_t bytes_skipped);
 | 
						|
  static void ContinueReadCallback(
 | 
						|
      scoped_refptr<InputStreamReader> stream,
 | 
						|
      scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
      int callback_id,
 | 
						|
      int bytes_read);
 | 
						|
 | 
						|
  void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped);
 | 
						|
  void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read);
 | 
						|
 | 
						|
  void RunSkipCallback(int64_t bytes_skipped);
 | 
						|
  void RunReadCallback(int bytes_read);
 | 
						|
 | 
						|
  static void RunSkipCallbackOnJobThread(
 | 
						|
      int64_t bytes_skipped,
 | 
						|
      InputStream::SkipCallback skip_callback);
 | 
						|
  static void RunReadCallbackOnJobThread(
 | 
						|
      int bytes_read,
 | 
						|
      InputStream::ReadCallback read_callback);
 | 
						|
 | 
						|
  std::unique_ptr<InputStream> stream_;
 | 
						|
 | 
						|
  // All InputStream methods are called this task runner.
 | 
						|
  scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
 | 
						|
 | 
						|
  // All callbacks are executed on this task runner.
 | 
						|
  scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
 | 
						|
 | 
						|
  // The below members are only accessed on the work thread.
 | 
						|
  int64_t bytes_skipped_;
 | 
						|
  int64_t bytes_to_skip_;
 | 
						|
  InputStream::SkipCallback pending_skip_callback_;
 | 
						|
 | 
						|
  scoped_refptr<net::IOBuffer> buffer_;
 | 
						|
  InputStream::ReadCallback pending_read_callback_;
 | 
						|
 | 
						|
  int pending_callback_id_ = -1;
 | 
						|
 | 
						|
  int next_callback_id_ = 0;
 | 
						|
 | 
						|
  DISALLOW_COPY_AND_ASSIGN(InputStreamReader);
 | 
						|
};
 | 
						|
 | 
						|
InputStreamReader::InputStreamReader(
 | 
						|
    std::unique_ptr<InputStream> stream,
 | 
						|
    scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)
 | 
						|
    : stream_(std::move(stream)),
 | 
						|
      work_thread_task_runner_(work_thread_task_runner),
 | 
						|
      job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
 | 
						|
  CEF_REQUIRE_IOT();
 | 
						|
  DCHECK(stream_);
 | 
						|
  DCHECK(work_thread_task_runner_);
 | 
						|
}
 | 
						|
 | 
						|
InputStreamReader::~InputStreamReader() {}
 | 
						|
 | 
						|
void InputStreamReader::Skip(int64_t skip_bytes,
 | 
						|
                             InputStream::SkipCallback callback) {
 | 
						|
  work_thread_task_runner_->PostTask(
 | 
						|
      FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread,
 | 
						|
                                base::WrapRefCounted(this), skip_bytes,
 | 
						|
                                std::move(callback)));
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest,
 | 
						|
                             int dest_size,
 | 
						|
                             InputStream::ReadCallback callback) {
 | 
						|
  work_thread_task_runner_->PostTask(
 | 
						|
      FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread,
 | 
						|
                                base::WrapRefCounted(this), dest, dest_size,
 | 
						|
                                std::move(callback)));
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes,
 | 
						|
                                         InputStream::SkipCallback callback) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  // No callback should currently be pending.
 | 
						|
  DCHECK_EQ(pending_callback_id_, -1);
 | 
						|
  DCHECK(pending_skip_callback_.is_null());
 | 
						|
 | 
						|
  pending_skip_callback_ = std::move(callback);
 | 
						|
 | 
						|
  if (skip_bytes <= 0) {
 | 
						|
    RunSkipCallback(0);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  bytes_skipped_ = bytes_to_skip_ = skip_bytes;
 | 
						|
  SkipToRequestedRange();
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,
 | 
						|
                                         int dest_size,
 | 
						|
                                         InputStream::ReadCallback callback) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  // No callback should currently be pending.
 | 
						|
  DCHECK_EQ(pending_callback_id_, -1);
 | 
						|
  DCHECK(pending_read_callback_.is_null());
 | 
						|
 | 
						|
  pending_read_callback_ = std::move(callback);
 | 
						|
 | 
						|
  if (!dest_size) {
 | 
						|
    RunReadCallback(0);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  DCHECK_GT(dest_size, 0);
 | 
						|
 | 
						|
  buffer_ = dest;
 | 
						|
  pending_callback_id_ = ++next_callback_id_;
 | 
						|
 | 
						|
  int bytes_read = 0;
 | 
						|
  bool result = stream_->Read(
 | 
						|
      buffer_.get(), dest_size, &bytes_read,
 | 
						|
      base::BindOnce(&InputStreamReader::ContinueReadCallback,
 | 
						|
                     base::WrapRefCounted(this), work_thread_task_runner_,
 | 
						|
                     pending_callback_id_));
 | 
						|
 | 
						|
  // Check if the callback will execute asynchronously.
 | 
						|
  if (result && bytes_read == 0)
 | 
						|
    return;
 | 
						|
 | 
						|
  RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED);
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::SkipToRequestedRange() {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  // Skip to the start of the requested data. This has to be done in a loop
 | 
						|
  // because the underlying InputStream is not guaranteed to skip the requested
 | 
						|
  // number of bytes.
 | 
						|
  do {
 | 
						|
    pending_callback_id_ = ++next_callback_id_;
 | 
						|
 | 
						|
    int64_t skipped = 0;
 | 
						|
    bool result = stream_->Skip(
 | 
						|
        bytes_to_skip_, &skipped,
 | 
						|
        base::BindOnce(&InputStreamReader::ContinueSkipCallback,
 | 
						|
                       base::WrapRefCounted(this), work_thread_task_runner_,
 | 
						|
                       pending_callback_id_));
 | 
						|
 | 
						|
    // Check if the callback will execute asynchronously.
 | 
						|
    if (result && skipped == 0)
 | 
						|
      return;
 | 
						|
 | 
						|
    if (!result || skipped <= 0) {
 | 
						|
      RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    DCHECK_LE(skipped, bytes_to_skip_);
 | 
						|
 | 
						|
    bytes_to_skip_ -= skipped;
 | 
						|
  } while (bytes_to_skip_ > 0);
 | 
						|
 | 
						|
  // All done, the requested number of bytes were skipped.
 | 
						|
  RunSkipCallback(bytes_skipped_);
 | 
						|
}
 | 
						|
 | 
						|
// static
 | 
						|
void InputStreamReader::ContinueSkipCallback(
 | 
						|
    scoped_refptr<InputStreamReader> stream,
 | 
						|
    scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
    int callback_id,
 | 
						|
    int64_t bytes_skipped) {
 | 
						|
  // Always execute asynchronously.
 | 
						|
  work_thread_task_runner->PostTask(
 | 
						|
      FROM_HERE,
 | 
						|
      base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread,
 | 
						|
                     stream, callback_id, bytes_skipped));
 | 
						|
}
 | 
						|
 | 
						|
// static
 | 
						|
void InputStreamReader::ContinueReadCallback(
 | 
						|
    scoped_refptr<InputStreamReader> stream,
 | 
						|
    scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
 | 
						|
    int callback_id,
 | 
						|
    int bytes_read) {
 | 
						|
  // Always execute asynchronously.
 | 
						|
  work_thread_task_runner->PostTask(
 | 
						|
      FROM_HERE,
 | 
						|
      base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread,
 | 
						|
                     stream, callback_id, bytes_read));
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::ContinueSkipCallbackOnWorkThread(
 | 
						|
    int callback_id,
 | 
						|
    int64_t bytes_skipped) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  // Check for out of order callbacks.
 | 
						|
  if (pending_callback_id_ != callback_id)
 | 
						|
    return;
 | 
						|
 | 
						|
  DCHECK_LE(bytes_skipped, bytes_to_skip_);
 | 
						|
 | 
						|
  if (bytes_to_skip_ > 0 && bytes_skipped > 0)
 | 
						|
    bytes_to_skip_ -= bytes_skipped;
 | 
						|
 | 
						|
  if (bytes_skipped <= 0) {
 | 
						|
    RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | 
						|
  } else if (bytes_to_skip_ > 0) {
 | 
						|
    // Continue execution asynchronously.
 | 
						|
    work_thread_task_runner_->PostTask(
 | 
						|
        FROM_HERE,
 | 
						|
        base::BindOnce(&InputStreamReader::SkipToRequestedRange, this));
 | 
						|
  } else {
 | 
						|
    // All done, the requested number of bytes were skipped.
 | 
						|
    RunSkipCallback(bytes_skipped_);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id,
 | 
						|
                                                         int bytes_read) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  // Check for out of order callbacks.
 | 
						|
  if (pending_callback_id_ != callback_id)
 | 
						|
    return;
 | 
						|
 | 
						|
  RunReadCallback(bytes_read);
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  DCHECK(!pending_skip_callback_.is_null());
 | 
						|
  job_thread_task_runner_->PostTask(
 | 
						|
      FROM_HERE,
 | 
						|
      base::BindOnce(InputStreamReader::RunSkipCallbackOnJobThread,
 | 
						|
                     bytes_skipped, std::move(pending_skip_callback_)));
 | 
						|
 | 
						|
  // Reset callback state.
 | 
						|
  pending_callback_id_ = -1;
 | 
						|
  bytes_skipped_ = bytes_to_skip_ = -1;
 | 
						|
}
 | 
						|
 | 
						|
void InputStreamReader::RunReadCallback(int bytes_read) {
 | 
						|
  DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
 | 
						|
 | 
						|
  DCHECK(!pending_read_callback_.is_null());
 | 
						|
  job_thread_task_runner_->PostTask(
 | 
						|
      FROM_HERE, base::BindOnce(InputStreamReader::RunReadCallbackOnJobThread,
 | 
						|
                                bytes_read, std::move(pending_read_callback_)));
 | 
						|
 | 
						|
  // Reset callback state.
 | 
						|
  pending_callback_id_ = -1;
 | 
						|
  buffer_ = nullptr;
 | 
						|
}
 | 
						|
 | 
						|
// static
 | 
						|
void InputStreamReader::RunSkipCallbackOnJobThread(
 | 
						|
    int64_t bytes_skipped,
 | 
						|
    InputStream::SkipCallback skip_callback) {
 | 
						|
  std::move(skip_callback).Run(bytes_skipped);
 | 
						|
}
 | 
						|
 | 
						|
// static
 | 
						|
void InputStreamReader::RunReadCallbackOnJobThread(
 | 
						|
    int bytes_read,
 | 
						|
    InputStream::ReadCallback read_callback) {
 | 
						|
  std::move(read_callback).Run(bytes_read);
 | 
						|
}
 | 
						|
 | 
						|
//==============================
 | 
						|
// RequestId
 | 
						|
//==============================
 | 
						|
 | 
						|
std::string RequestId::ToString() const {
 | 
						|
  return base::StringPrintf("RequestId(%u, %u)", request_id_, routing_id_);
 | 
						|
}
 | 
						|
 | 
						|
std::string RequestId::ToString(base::StringPiece debug_label) const {
 | 
						|
  return base::StringPrintf("RequestId[%s](%u, %u)",
 | 
						|
                            debug_label.as_string().c_str(), request_id_,
 | 
						|
                            routing_id_);
 | 
						|
}
 | 
						|
 | 
						|
std::ostream& operator<<(std::ostream& out, const RequestId& request_id) {
 | 
						|
  return out << request_id.ToString();
 | 
						|
}
 | 
						|
 | 
						|
//==============================
 | 
						|
// StreamReaderURLLoader
 | 
						|
//=============================
 | 
						|
 | 
						|
StreamReaderURLLoader::StreamReaderURLLoader(
 | 
						|
    const RequestId& request_id,
 | 
						|
    const network::ResourceRequest& request,
 | 
						|
    network::mojom::URLLoaderClientPtr client,
 | 
						|
    network::mojom::TrustedHeaderClientPtr header_client,
 | 
						|
    const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
 | 
						|
    std::unique_ptr<Delegate> response_delegate)
 | 
						|
    : request_id_(request_id),
 | 
						|
      request_(request),
 | 
						|
      client_(std::move(client)),
 | 
						|
      header_client_(std::move(header_client)),
 | 
						|
      traffic_annotation_(traffic_annotation),
 | 
						|
      response_delegate_(std::move(response_delegate)),
 | 
						|
      writable_handle_watcher_(FROM_HERE,
 | 
						|
                               mojo::SimpleWatcher::ArmingPolicy::MANUAL,
 | 
						|
                               base::SequencedTaskRunnerHandle::Get()),
 | 
						|
      weak_factory_(this) {
 | 
						|
  DCHECK(response_delegate_);
 | 
						|
  // If there is a client error, clean up the request.
 | 
						|
  client_.set_connection_error_handler(
 | 
						|
      base::BindOnce(&StreamReaderURLLoader::RequestComplete,
 | 
						|
                     weak_factory_.GetWeakPtr(), net::ERR_ABORTED));
 | 
						|
 | 
						|
  // All InputStream work will be performed on this task runner.
 | 
						|
  stream_work_task_runner_ =
 | 
						|
      base::CreateSequencedTaskRunner({base::ThreadPool(), base::MayBlock()});
 | 
						|
}
 | 
						|
 | 
						|
StreamReaderURLLoader::~StreamReaderURLLoader() {
 | 
						|
  if (open_cancel_callback_) {
 | 
						|
    // Release the Delegate held by OpenInputStreamWrapper.
 | 
						|
    std::move(open_cancel_callback_).Run();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::Start() {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  if (!ParseRange(request_.headers)) {
 | 
						|
    RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (header_client_.is_bound()) {
 | 
						|
    header_client_->OnBeforeSendHeaders(
 | 
						|
        request_.headers,
 | 
						|
        base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders,
 | 
						|
                       weak_factory_.GetWeakPtr()));
 | 
						|
  } else {
 | 
						|
    ContinueWithRequestHeaders(net::OK, base::nullopt);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::ContinueWithRequestHeaders(
 | 
						|
    int32_t result,
 | 
						|
    const base::Optional<net::HttpRequestHeaders>& headers) {
 | 
						|
  if (result != net::OK) {
 | 
						|
    RequestComplete(result);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (headers) {
 | 
						|
    DCHECK(header_client_.is_bound());
 | 
						|
    request_.headers = *headers;
 | 
						|
  }
 | 
						|
 | 
						|
  open_cancel_callback_ = OpenInputStreamWrapper::Open(
 | 
						|
      // This is intentional - the loader could be deleted while
 | 
						|
      // the callback is executing on the background thread. The
 | 
						|
      // delegate will be "returned" to the loader once the
 | 
						|
      // InputStream open attempt is completed.
 | 
						|
      std::move(response_delegate_), stream_work_task_runner_, request_id_,
 | 
						|
      request_,
 | 
						|
      base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened,
 | 
						|
                     weak_factory_.GetWeakPtr()));
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::FollowRedirect(
 | 
						|
    const std::vector<std::string>& removed_headers,
 | 
						|
    const net::HttpRequestHeaders& modified_headers,
 | 
						|
    const net::HttpRequestHeaders& modified_cors_exempt_headers,
 | 
						|
    const base::Optional<GURL>& new_url) {
 | 
						|
  NOTREACHED();
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::SetPriority(net::RequestPriority priority,
 | 
						|
                                        int intra_priority_value) {}
 | 
						|
 | 
						|
void StreamReaderURLLoader::PauseReadingBodyFromNet() {}
 | 
						|
 | 
						|
void StreamReaderURLLoader::ResumeReadingBodyFromNet() {}
 | 
						|
 | 
						|
void StreamReaderURLLoader::OnInputStreamOpened(
 | 
						|
    std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,
 | 
						|
    std::unique_ptr<InputStream> input_stream) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
  DCHECK(returned_delegate);
 | 
						|
  response_delegate_ = std::move(returned_delegate);
 | 
						|
  open_cancel_callback_.Reset();
 | 
						|
 | 
						|
  if (!input_stream) {
 | 
						|
    bool restarted = false;
 | 
						|
    response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted);
 | 
						|
    if (restarted) {
 | 
						|
      // The request has been restarted with a new loader.
 | 
						|
      // |this| will be deleted.
 | 
						|
      CleanUp();
 | 
						|
    } else {
 | 
						|
      HeadersComplete(net::HTTP_NOT_FOUND, -1);
 | 
						|
    }
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  input_stream_reader_ = base::MakeRefCounted<InputStreamReader>(
 | 
						|
      std::move(input_stream), stream_work_task_runner_);
 | 
						|
 | 
						|
  if (!byte_range_valid()) {
 | 
						|
    OnReaderSkipCompleted(0);
 | 
						|
  } else {
 | 
						|
    input_stream_reader_->Skip(
 | 
						|
        byte_range_.first_byte_position(),
 | 
						|
        base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted,
 | 
						|
                       weak_factory_.GetWeakPtr()));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  if (!byte_range_valid()) {
 | 
						|
    // Expected content length is unspecified.
 | 
						|
    HeadersComplete(net::HTTP_OK, -1);
 | 
						|
  } else if (bytes_skipped == byte_range_.first_byte_position()) {
 | 
						|
    // We skipped the expected number of bytes.
 | 
						|
    int64_t expected_content_length = -1;
 | 
						|
    if (byte_range_.HasLastBytePosition()) {
 | 
						|
      expected_content_length = byte_range_.last_byte_position() -
 | 
						|
                                byte_range_.first_byte_position() + 1;
 | 
						|
      DCHECK_GE(expected_content_length, 0);
 | 
						|
    }
 | 
						|
    HeadersComplete(net::HTTP_OK, expected_content_length);
 | 
						|
  } else {
 | 
						|
    RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::HeadersComplete(int orig_status_code,
 | 
						|
                                            int64_t expected_content_length) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  int status_code = orig_status_code;
 | 
						|
  std::string status_text =
 | 
						|
      net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code));
 | 
						|
  std::string mime_type, charset;
 | 
						|
  int64_t content_length = expected_content_length;
 | 
						|
  ResourceResponse::HeaderMap extra_headers;
 | 
						|
  response_delegate_->GetResponseHeaders(request_id_, &status_code,
 | 
						|
                                         &status_text, &mime_type, &charset,
 | 
						|
                                         &content_length, &extra_headers);
 | 
						|
 | 
						|
  if (status_code < 0) {
 | 
						|
    // Early exit if the handler reported an error.
 | 
						|
    RequestComplete(status_code);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  auto pending_response = network::mojom::URLResponseHead::New();
 | 
						|
  pending_response->request_start = base::TimeTicks::Now();
 | 
						|
  pending_response->response_start = base::TimeTicks::Now();
 | 
						|
 | 
						|
  auto headers = MakeResponseHeaders(
 | 
						|
      status_code, status_text, mime_type, charset, content_length,
 | 
						|
      extra_headers, false /* allow_existing_header_override */);
 | 
						|
  pending_response->headers = headers;
 | 
						|
 | 
						|
  if (content_length >= 0)
 | 
						|
    pending_response->content_length = content_length;
 | 
						|
 | 
						|
  if (!mime_type.empty()) {
 | 
						|
    pending_response->mime_type = mime_type;
 | 
						|
    if (!charset.empty())
 | 
						|
      pending_response->charset = charset;
 | 
						|
  }
 | 
						|
 | 
						|
  if (header_client_.is_bound()) {
 | 
						|
    header_client_->OnHeadersReceived(
 | 
						|
        headers->raw_headers(), net::IPEndPoint(),
 | 
						|
        base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders,
 | 
						|
                       weak_factory_.GetWeakPtr(),
 | 
						|
                       std::move(pending_response)));
 | 
						|
  } else {
 | 
						|
    ContinueWithResponseHeaders(std::move(pending_response), net::OK,
 | 
						|
                                base::nullopt, base::nullopt);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::ContinueWithResponseHeaders(
 | 
						|
    network::mojom::URLResponseHeadPtr pending_response,
 | 
						|
    int32_t result,
 | 
						|
    const base::Optional<std::string>& headers,
 | 
						|
    const base::Optional<GURL>& redirect_url) {
 | 
						|
  if (result != net::OK) {
 | 
						|
    RequestComplete(result);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (headers) {
 | 
						|
    DCHECK(header_client_.is_bound());
 | 
						|
    pending_response->headers =
 | 
						|
        base::MakeRefCounted<net::HttpResponseHeaders>(*headers);
 | 
						|
  }
 | 
						|
 | 
						|
  auto pending_headers = pending_response->headers;
 | 
						|
 | 
						|
  // What the length would be if we sent headers over the network. Used to
 | 
						|
  // calculate data length.
 | 
						|
  header_length_ = pending_headers->raw_headers().length();
 | 
						|
 | 
						|
  DCHECK(client_.is_bound());
 | 
						|
 | 
						|
  std::string location;
 | 
						|
  const auto has_redirect_url = redirect_url && !redirect_url->is_empty();
 | 
						|
  if (has_redirect_url || pending_headers->IsRedirect(&location)) {
 | 
						|
    pending_response->encoded_data_length = header_length_;
 | 
						|
    pending_response->content_length = pending_response->encoded_body_length =
 | 
						|
        0;
 | 
						|
    const GURL new_location =
 | 
						|
        has_redirect_url ? *redirect_url : request_.url.Resolve(location);
 | 
						|
    client_->OnReceiveRedirect(
 | 
						|
        MakeRedirectInfo(request_, pending_headers.get(), new_location,
 | 
						|
                         pending_headers->response_code()),
 | 
						|
        std::move(pending_response));
 | 
						|
    // The client will restart the request with a new loader.
 | 
						|
    // |this| will be deleted.
 | 
						|
    CleanUp();
 | 
						|
  } else {
 | 
						|
    client_->OnReceiveResponse(std::move(pending_response));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::ContinueResponse(bool was_redirected) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  if (was_redirected) {
 | 
						|
    // Special case where we allow the client to perform the redirect.
 | 
						|
    // The client will restart the request with a new loader.
 | 
						|
    // |this| will be deleted.
 | 
						|
    CleanUp();
 | 
						|
  } else {
 | 
						|
    SendBody();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::SendBody() {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  mojo::ScopedDataPipeConsumerHandle consumer_handle;
 | 
						|
  if (CreateDataPipe(nullptr /*options*/, &producer_handle_,
 | 
						|
                     &consumer_handle) != MOJO_RESULT_OK) {
 | 
						|
    RequestComplete(net::ERR_FAILED);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  writable_handle_watcher_.Watch(
 | 
						|
      producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
 | 
						|
      base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable,
 | 
						|
                          base::Unretained(this)));
 | 
						|
  client_->OnStartLoadingResponseBody(std::move(consumer_handle));
 | 
						|
 | 
						|
  ReadMore();
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::ReadMore() {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
  DCHECK(!pending_buffer_.get());
 | 
						|
 | 
						|
  uint32_t num_bytes;
 | 
						|
  MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
 | 
						|
      &producer_handle_, &pending_buffer_, &num_bytes);
 | 
						|
  if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
 | 
						|
    // The pipe is full. We need to wait for it to have more space.
 | 
						|
    writable_handle_watcher_.ArmOrNotify();
 | 
						|
    return;
 | 
						|
  } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
 | 
						|
    // The data pipe consumer handle has been closed.
 | 
						|
    RequestComplete(net::ERR_ABORTED);
 | 
						|
    return;
 | 
						|
  } else if (mojo_result != MOJO_RESULT_OK) {
 | 
						|
    // The body stream is in a bad state. Bail out.
 | 
						|
    RequestComplete(net::ERR_UNEXPECTED);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  scoped_refptr<net::IOBuffer> buffer(
 | 
						|
      new network::NetToMojoIOBuffer(pending_buffer_.get()));
 | 
						|
 | 
						|
  if (!input_stream_reader_.get()) {
 | 
						|
    // This will happen if opening the InputStream fails in which case the
 | 
						|
    // error is communicated by setting the HTTP response status header rather
 | 
						|
    // than failing the request during the header fetch phase.
 | 
						|
    OnReaderReadCompleted(0);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  input_stream_reader_->Read(
 | 
						|
      buffer, base::checked_cast<int>(num_bytes),
 | 
						|
      base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted,
 | 
						|
                     weak_factory_.GetWeakPtr()));
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) {
 | 
						|
  if (result == MOJO_RESULT_FAILED_PRECONDITION) {
 | 
						|
    RequestComplete(net::ERR_ABORTED);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  DCHECK_EQ(result, MOJO_RESULT_OK) << result;
 | 
						|
 | 
						|
  ReadMore();
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  DCHECK(pending_buffer_);
 | 
						|
  if (bytes_read < 0) {
 | 
						|
    // Error case.
 | 
						|
    RequestComplete(bytes_read);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  if (bytes_read == 0) {
 | 
						|
    // Eof, read completed.
 | 
						|
    pending_buffer_->Complete(0);
 | 
						|
    RequestComplete(net::OK);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  producer_handle_ = pending_buffer_->Complete(bytes_read);
 | 
						|
  pending_buffer_ = nullptr;
 | 
						|
 | 
						|
  client_->OnTransferSizeUpdated(bytes_read);
 | 
						|
  total_bytes_read_ += bytes_read;
 | 
						|
 | 
						|
  base::ThreadTaskRunnerHandle::Get()->PostTask(
 | 
						|
      FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore,
 | 
						|
                                weak_factory_.GetWeakPtr()));
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::RequestComplete(int status_code) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  auto status = network::URLLoaderCompletionStatus(status_code);
 | 
						|
  status.completion_time = base::TimeTicks::Now();
 | 
						|
  status.encoded_data_length = total_bytes_read_ + header_length_;
 | 
						|
  status.encoded_body_length = total_bytes_read_;
 | 
						|
  // We don't support decoders, so use the same value.
 | 
						|
  status.decoded_body_length = total_bytes_read_;
 | 
						|
 | 
						|
  client_->OnComplete(status);
 | 
						|
  CleanUp();
 | 
						|
}
 | 
						|
 | 
						|
void StreamReaderURLLoader::CleanUp() {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  // Resets the watchers and pipes, so that we will never be called back.
 | 
						|
  writable_handle_watcher_.Cancel();
 | 
						|
  pending_buffer_ = nullptr;
 | 
						|
  producer_handle_.reset();
 | 
						|
 | 
						|
  // Manages its own lifetime.
 | 
						|
  delete this;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) {
 | 
						|
  DCHECK(thread_checker_.CalledOnValidThread());
 | 
						|
 | 
						|
  std::string range_header;
 | 
						|
  if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
 | 
						|
    // This loader only cares about the Range header so that we know how many
 | 
						|
    // bytes in the stream to skip and how many to read after that.
 | 
						|
    std::vector<net::HttpByteRange> ranges;
 | 
						|
    if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
 | 
						|
      // In case of multi-range request only use the first range.
 | 
						|
      // We don't support multirange requests.
 | 
						|
      if (ranges.size() == 1)
 | 
						|
        byte_range_ = ranges[0];
 | 
						|
    } else {
 | 
						|
      // This happens if the range header could not be parsed or is invalid.
 | 
						|
      return false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamReaderURLLoader::byte_range_valid() const {
 | 
						|
  return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0;
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace net_service
 |