From f37d146dc5b1a9ba9364097d5b95bd61b0bcbe3b Mon Sep 17 00:00:00 2001 From: Andreas Date: Sun, 16 Mar 2014 19:16:16 +0100 Subject: [PATCH] First work on streaming. --- src/engines/gstenginepipeline.cpp | 11 +++++++++++ src/networkremote/networkremote.cpp | 27 +++++++++++++++++++++++++++ src/networkremote/networkremote.h | 4 ++++ src/networkremote/remoteclient.cpp | 19 ++++++++++++++++++- src/networkremote/remoteclient.h | 3 +++ 5 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/engines/gstenginepipeline.cpp b/src/engines/gstenginepipeline.cpp index ec8c0bbec..35fac42f0 100644 --- a/src/engines/gstenginepipeline.cpp +++ b/src/engines/gstenginepipeline.cpp @@ -254,6 +254,15 @@ bool GstEnginePipeline::Init() { return false; } + GstElement* tcp_queue = engine_->CreateElement("queue", audiobin_); + GstElement* vorbisenc = engine_->CreateElement("vorbisenc", audiobin_); + GstElement* oggmux = engine_->CreateElement("oggmux", audiobin_); + GstElement* tcp_sink = engine_->CreateElement("tcpclientsink", audiobin_); + + g_object_set(tcp_sink, "port", 30001, nullptr); + + gst_element_link_many(tcp_queue, vorbisenc, oggmux, tcp_sink, nullptr); + // Create the replaygain elements if it's enabled. event_probe is the // audioconvert element we attach the probe to, which will change depending // on whether replaygain is enabled. convert_sink is the element after the @@ -355,6 +364,8 @@ bool GstEnginePipeline::Init() { gst_element_get_static_pad(probe_queue, "sink")); gst_pad_link(gst_element_get_request_pad(tee, "src%d"), gst_element_get_static_pad(audio_queue, "sink")); + gst_pad_link(gst_element_get_request_pad(tee, "src%d"), + gst_element_get_static_pad(tcp_queue, "sink")); // Link replaygain elements if enabled. if (rg_enabled_) { diff --git a/src/networkremote/networkremote.cpp b/src/networkremote/networkremote.cpp index 2f211d719..66151eb32 100644 --- a/src/networkremote/networkremote.cpp +++ b/src/networkremote/networkremote.cpp @@ -54,6 +54,8 @@ void NetworkRemote::SetupServer() { incoming_data_parser_.reset(new IncomingDataParser(app_)); outgoing_data_creator_.reset(new OutgoingDataCreator(app_)); + stream_server_.reset(new QTcpServer()); + outgoing_data_creator_->SetClients(&clients_); connect(app_->current_art_loader(), @@ -66,6 +68,8 @@ void NetworkRemote::SetupServer() { SLOT(AcceptConnection())); connect(server_ipv6_.get(), SIGNAL(newConnection()), this, SLOT(AcceptConnection())); + connect(stream_server_.get(), SIGNAL(newConnection()), this, + SLOT(AcceptStream())); } void NetworkRemote::StartServer() { @@ -84,9 +88,11 @@ void NetworkRemote::StartServer() { server_->setProxy(QNetworkProxy::NoProxy); server_ipv6_->setProxy(QNetworkProxy::NoProxy); + stream_server_->setProxy(QNetworkProxy::NoProxy); server_->listen(QHostAddress::Any, port_); server_ipv6_->listen(QHostAddress::AnyIPv6, port_); + stream_server_->listen(QHostAddress::LocalHost, 30001); qLog(Info) << "Listening on port " << port_; @@ -219,3 +225,24 @@ void NetworkRemote::EnableKittens(bool aww) { void NetworkRemote::SendKitten(quint64 id, const QImage& kitten) { if (outgoing_data_creator_.get()) outgoing_data_creator_->SendKitten(kitten); } + +void NetworkRemote::AcceptStream() { + qLog(Debug) << "AcceptStream"; + QTcpServer* server = qobject_cast(sender()); + QTcpSocket* client_socket = server->nextPendingConnection(); + + connect(client_socket, SIGNAL(readyRead()), this, SLOT(IncomingData())); +} + +void NetworkRemote::IncomingData() { + QTcpSocket* client = qobject_cast(sender()); + + while (client->bytesAvailable() > 0) { + QByteArray data = client->read(client->bytesAvailable()); + for (RemoteClient* rc : clients_) { + if (rc->isStreamer()) { + rc->SendRawData(data); + } + } + } +} diff --git a/src/networkremote/networkremote.h b/src/networkremote/networkremote.h index ba455bc23..bdf2610d0 100644 --- a/src/networkremote/networkremote.h +++ b/src/networkremote/networkremote.h @@ -26,6 +26,8 @@ class NetworkRemote : public QObject { void StartServer(); void ReloadSettings(); void AcceptConnection(); + void AcceptStream(); + void IncomingData(); void EnableKittens(bool aww); void SendKitten(quint64 id, const QImage& kitten); @@ -35,6 +37,8 @@ class NetworkRemote : public QObject { std::unique_ptr incoming_data_parser_; std::unique_ptr outgoing_data_creator_; + std::unique_ptr stream_server_; + quint16 port_; bool use_remote_; bool only_non_public_ip_; diff --git a/src/networkremote/remoteclient.cpp b/src/networkremote/remoteclient.cpp index cd05bdade..23812bdc8 100644 --- a/src/networkremote/remoteclient.cpp +++ b/src/networkremote/remoteclient.cpp @@ -24,7 +24,7 @@ #include RemoteClient::RemoteClient(Application* app, QTcpSocket* client) - : app_(app), downloader_(false), client_(client) { + : app_(app), downloader_(false), stream_(false), client_(client) { // Open the buffer buffer_.setData(QByteArray()); buffer_.open(QIODevice::ReadWrite); @@ -56,8 +56,21 @@ RemoteClient::~RemoteClient() { void RemoteClient::setDownloader(bool downloader) { downloader_ = downloader; } void RemoteClient::IncomingData() { + if (stream_) return; + while (client_->bytesAvailable()) { if (!reading_protobuf_) { + qLog(Debug) << "peek" << client_->peek(11); + if (client_->peek(11) == QString("GET /listen").toUtf8()) { + stream_ = true; + qLog(Debug) << "Stream found!"; + client_->write("HTTP/1.0 200 OK\r\n"); + client_->write("Content-type: application/ogg\r\n"); + client_->write("Connection: close\r\n"); + client_->write("\r\n"); + client_->flush(); + return; + } // Read the length of the next message QDataStream s(client_); s >> expected_length_; @@ -181,4 +194,8 @@ void RemoteClient::SendData(pb::remote::Message* msg) { } } +void RemoteClient::SendRawData(QByteArray& data) { + client_->write(data.data(), data.length()); +} + QAbstractSocket::SocketState RemoteClient::State() { return client_->state(); } diff --git a/src/networkremote/remoteclient.h b/src/networkremote/remoteclient.h index b1812b571..8b465f4ad 100644 --- a/src/networkremote/remoteclient.h +++ b/src/networkremote/remoteclient.h @@ -16,9 +16,11 @@ class RemoteClient : public QObject { // This method checks if client is authenticated before sending the data void SendData(pb::remote::Message* msg); + void SendRawData(QByteArray& data); QAbstractSocket::SocketState State(); void setDownloader(bool downloader); bool isDownloader() { return downloader_; } + bool isStreamer() { return stream_; } void DisconnectClient(pb::remote::ReasonDisconnect reason); private slots: @@ -40,6 +42,7 @@ signals: bool authenticated_; bool allow_downloads_; bool downloader_; + bool stream_; QTcpSocket* client_; bool reading_protobuf_;