// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include #include "browser_socket_stream_bridge.h" #include "base/atomicops.h" #include "base/bind.h" #include "base/memory/ref_counted.h" #include "base/message_loop.h" #include "googleurl/src/gurl.h" #include "net/socket_stream/socket_stream_job.h" #include "net/websockets/websocket_job.h" #include "net/url_request/url_request_context.h" #include "third_party/WebKit/Source/WebKit/chromium/public/WebSocketStreamHandle.h" #include "webkit/glue/websocketstreamhandle_bridge.h" #include "webkit/glue/websocketstreamhandle_delegate.h" using webkit_glue::WebSocketStreamHandleBridge; static const int kNoSocketId = 0; namespace { MessageLoop* g_io_thread; scoped_refptr g_request_context; class WebSocketStreamHandleBridgeImpl : public WebSocketStreamHandleBridge, public net::SocketStream::Delegate { public: WebSocketStreamHandleBridgeImpl( WebKit::WebSocketStreamHandle* handle, webkit_glue::WebSocketStreamHandleDelegate* delegate); // WebSocketStreamHandleBridge methods. virtual void Connect(const GURL& url); virtual bool Send(const std::vector& data); virtual void Close(); // net::SocketStream::Delegate methods. virtual void OnConnected(net::SocketStream* req, int max_pending_send_allowed); virtual void OnSentData(net::SocketStream* req, int amount_sent); virtual void OnReceivedData(net::SocketStream* req, const char* data, int len); virtual void OnClose(net::SocketStream* req); private: virtual ~WebSocketStreamHandleBridgeImpl(); // Runs on |g_io_thread|; void DoConnect(const GURL& url); void DoSend(std::vector* data); void DoClose(); // Runs on |message_loop_|; void DoOnConnected(int max_amount_send_allowed); void DoOnSentData(int amount_sent); void DoOnReceivedData(std::vector* data); void DoOnClose(); int socket_id_; MessageLoop* message_loop_; WebKit::WebSocketStreamHandle* handle_; webkit_glue::WebSocketStreamHandleDelegate* delegate_; scoped_refptr socket_; // Number of pending tasks to handle net::SocketStream::Delegate methods. base::subtle::Atomic32 num_pending_tasks_; DISALLOW_COPY_AND_ASSIGN(WebSocketStreamHandleBridgeImpl); }; WebSocketStreamHandleBridgeImpl::WebSocketStreamHandleBridgeImpl( WebKit::WebSocketStreamHandle* handle, webkit_glue::WebSocketStreamHandleDelegate* delegate) : socket_id_(kNoSocketId), message_loop_(MessageLoop::current()), handle_(handle), delegate_(delegate), num_pending_tasks_(0) { net::WebSocketJob::EnsureInit(); } WebSocketStreamHandleBridgeImpl::~WebSocketStreamHandleBridgeImpl() { DCHECK_EQ(socket_id_, kNoSocketId); } void WebSocketStreamHandleBridgeImpl::Connect(const GURL& url) { DCHECK(g_io_thread); AddRef(); // Released in DoOnClose(). g_io_thread->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoConnect, this, url)); if (delegate_) delegate_->WillOpenStream(handle_, url); } bool WebSocketStreamHandleBridgeImpl::Send( const std::vector& data) { DCHECK(g_io_thread); g_io_thread->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoSend, this, new std::vector(data))); return true; } void WebSocketStreamHandleBridgeImpl::Close() { DCHECK(g_io_thread); g_io_thread->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoClose, this)); } void WebSocketStreamHandleBridgeImpl::OnConnected( net::SocketStream* socket, int max_pending_send_allowed) { base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, 1); message_loop_->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoOnConnected, this, max_pending_send_allowed)); } void WebSocketStreamHandleBridgeImpl::OnSentData( net::SocketStream* socket, int amount_sent) { base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, 1); message_loop_->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoOnSentData, this, amount_sent)); } void WebSocketStreamHandleBridgeImpl::OnReceivedData( net::SocketStream* socket, const char* data, int len) { base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, 1); message_loop_->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoOnReceivedData, this, new std::vector(data, data + len))); } void WebSocketStreamHandleBridgeImpl::OnClose(net::SocketStream* socket) { base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, 1); // Release socket_ on IO thread. socket_ = NULL; socket_id_ = kNoSocketId; message_loop_->PostTask( FROM_HERE, base::Bind(&WebSocketStreamHandleBridgeImpl::DoOnClose, this)); } void WebSocketStreamHandleBridgeImpl::DoConnect(const GURL& url) { DCHECK(MessageLoop::current() == g_io_thread); socket_ = net::SocketStreamJob::CreateSocketStreamJob( url, this, g_request_context->transport_security_state(), g_request_context->ssl_config_service()); socket_->set_context(g_request_context); socket_->Connect(); } void WebSocketStreamHandleBridgeImpl::DoSend(std::vector* data) { DCHECK(MessageLoop::current() == g_io_thread); scoped_ptr > scoped_data(data); if (!socket_) return; if (!socket_->SendData(&(data->at(0)), data->size())) socket_->Close(); } void WebSocketStreamHandleBridgeImpl::DoClose() { DCHECK(MessageLoop::current() == g_io_thread); if (!socket_) return; socket_->Close(); } void WebSocketStreamHandleBridgeImpl::DoOnConnected( int max_pending_send_allowed) { DCHECK(MessageLoop::current() == message_loop_); base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, -1); if (delegate_) delegate_->DidOpenStream(handle_, max_pending_send_allowed); } void WebSocketStreamHandleBridgeImpl::DoOnSentData(int amount_sent) { DCHECK(MessageLoop::current() == message_loop_); base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, -1); if (delegate_) delegate_->DidSendData(handle_, amount_sent); } void WebSocketStreamHandleBridgeImpl::DoOnReceivedData( std::vector* data) { DCHECK(MessageLoop::current() == message_loop_); base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, -1); scoped_ptr > scoped_data(data); if (delegate_) delegate_->DidReceiveData(handle_, &(data->at(0)), data->size()); } void WebSocketStreamHandleBridgeImpl::DoOnClose() { DCHECK(MessageLoop::current() == message_loop_); base::subtle::NoBarrier_AtomicIncrement(&num_pending_tasks_, -1); // Don't handle OnClose if there are pending tasks. DCHECK_EQ(num_pending_tasks_, 0); DCHECK(!socket_); DCHECK_EQ(socket_id_, kNoSocketId); webkit_glue::WebSocketStreamHandleDelegate* delegate = delegate_; delegate_ = NULL; if (delegate) delegate->DidClose(handle_); Release(); } } // namespace /* static */ void BrowserSocketStreamBridge::InitializeOnIOThread( net::URLRequestContext* request_context) { g_io_thread = MessageLoop::current(); g_request_context = request_context; } void BrowserSocketStreamBridge::Cleanup() { g_io_thread = NULL; g_request_context = NULL; } namespace webkit_glue { /* static */ WebSocketStreamHandleBridge* WebSocketStreamHandleBridge::Create( WebKit::WebSocketStreamHandle* handle, WebSocketStreamHandleDelegate* delegate) { return new WebSocketStreamHandleBridgeImpl(handle, delegate); } } // namespace webkit_glue