SongLoader: Remove probe and bus watch

This commit is contained in:
Jonas Kvinge 2023-03-19 05:08:30 +01:00
parent 7b282e21de
commit 0c62147536
2 changed files with 97 additions and 34 deletions

View File

@ -68,15 +68,19 @@ const int SongLoader::kDefaultTimeout = 5000;
SongLoader::SongLoader(CollectionBackendInterface *collection, const Player *player, QObject *parent) SongLoader::SongLoader(CollectionBackendInterface *collection, const Player *player, QObject *parent)
: QObject(parent), : QObject(parent),
player_(player),
collection_(collection),
timeout_timer_(new QTimer(this)), timeout_timer_(new QTimer(this)),
playlist_parser_(new PlaylistParser(collection, this)), playlist_parser_(new PlaylistParser(collection, this)),
cue_parser_(new CueParser(collection, this)), cue_parser_(new CueParser(collection, this)),
timeout_(kDefaultTimeout),
state_(State::WaitingForType),
success_(false),
parser_(nullptr), parser_(nullptr),
collection_(collection), state_(State::WaitingForType),
player_(player) { timeout_(kDefaultTimeout),
#ifdef HAVE_GSTREAMER
fakesink_(nullptr),
buffer_probe_cb_id_(0),
#endif
success_(false) {
if (sRawUriSchemes.isEmpty()) { if (sRawUriSchemes.isEmpty()) {
sRawUriSchemes << "udp" sRawUriSchemes << "udp"
@ -99,10 +103,7 @@ SongLoader::SongLoader(CollectionBackendInterface *collection, const Player *pla
SongLoader::~SongLoader() { SongLoader::~SongLoader() {
#ifdef HAVE_GSTREAMER #ifdef HAVE_GSTREAMER
if (pipeline_) { CleanupPipeline();
state_ = State::Finished;
gst_element_set_state(pipeline_.get(), GST_STATE_NULL);
}
#endif #endif
} }
@ -130,7 +131,7 @@ SongLoader::Result SongLoader::Load(const QUrl &url) {
return Result::BlockingLoadRequired; return Result::BlockingLoadRequired;
#else #else
errors_ << tr("You need GStreamer for this URL."); errors_ << tr("You need GStreamer for this URL.");
return Error; return Result::Error;
#endif #endif
} }
else { else {
@ -417,9 +418,11 @@ void SongLoader::AddAsRawStream() {
} }
void SongLoader::Timeout() { void SongLoader::Timeout() {
state_ = State::Finished; state_ = State::Finished;
success_ = false; success_ = false;
StopTypefind(); StopTypefind();
} }
void SongLoader::StopTypefind() { void SongLoader::StopTypefind() {
@ -428,7 +431,7 @@ void SongLoader::StopTypefind() {
// Destroy the pipeline // Destroy the pipeline
if (pipeline_) { if (pipeline_) {
gst_element_set_state(pipeline_.get(), GST_STATE_NULL); gst_element_set_state(pipeline_.get(), GST_STATE_NULL);
pipeline_.reset(); CleanupPipeline();
} }
#endif #endif
timeout_timer_->stop(); timeout_timer_->stop();
@ -477,33 +480,52 @@ SongLoader::Result SongLoader::LoadRemote() {
errors_ << tr("Couldn't create GStreamer source element for %1").arg(url_.toString()); errors_ << tr("Couldn't create GStreamer source element for %1").arg(url_.toString());
return Result::Error; return Result::Error;
} }
gst_bin_add(GST_BIN(pipeline.get()), source);
g_object_set(source, "ssl-strict", FALSE, nullptr); g_object_set(source, "ssl-strict", FALSE, nullptr);
// Create the other elements and link them up // Create the other elements and link them up
GstElement *typefind = gst_element_factory_make("typefind", nullptr); GstElement *typefind = gst_element_factory_make("typefind", nullptr);
GstElement *fakesink = gst_element_factory_make("fakesink", nullptr); if (!typefind) {
errors_ << tr("Couldn't create GStreamer typefind element for %1").arg(url_.toString());
return Result::Error;
}
gst_bin_add(GST_BIN(pipeline.get()), typefind);
gst_bin_add_many(GST_BIN(pipeline.get()), source, typefind, fakesink, nullptr); fakesink_ = gst_element_factory_make("fakesink", nullptr);
gst_element_link_many(source, typefind, fakesink, nullptr); if (!fakesink_) {
errors_ << tr("Couldn't create GStreamer fakesink element for %1").arg(url_.toString());
return Result::Error;
}
gst_bin_add(GST_BIN(pipeline.get()), fakesink_);
if (!gst_element_link_many(source, typefind, fakesink_, nullptr)) {
errors_ << tr("Couldn't link GStreamer source, typefind and fakesink elements for %1").arg(url_.toString());
return Result::Error;
}
// Connect callbacks // Connect callbacks
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline.get()));
CHECKED_GCONNECT(typefind, "have-type", &TypeFound, this); CHECKED_GCONNECT(typefind, "have-type", &TypeFound, this);
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline.get()));
if (bus) {
gst_bus_set_sync_handler(bus, BusCallbackSync, this, nullptr); gst_bus_set_sync_handler(bus, BusCallbackSync, this, nullptr);
gst_bus_add_watch(bus, BusCallback, this); gst_bus_add_watch(bus, BusWatchCallback, this);
gst_object_unref(bus); gst_object_unref(bus);
}
// Add a probe to the sink so we can capture the data if it's a playlist // Add a probe to the sink so we can capture the data if it's a playlist
GstPad *pad = gst_element_get_static_pad(fakesink, "sink"); GstPad *pad = gst_element_get_static_pad(fakesink_, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, &DataReady, this, nullptr); if (pad) {
buffer_probe_cb_id_ = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, &DataReady, this, nullptr);
gst_object_unref(pad); gst_object_unref(pad);
}
QEventLoop loop; QEventLoop loop;
loop.connect(this, &SongLoader::LoadRemoteFinished, &loop, &QEventLoop::quit); loop.connect(this, &SongLoader::LoadRemoteFinished, &loop, &QEventLoop::quit);
// Start "playing" // Start "playing"
gst_element_set_state(pipeline.get(), GST_STATE_PLAYING);
pipeline_ = pipeline; pipeline_ = pipeline;
gst_element_set_state(pipeline.get(), GST_STATE_PLAYING);
// Wait until loading is finished // Wait until loading is finished
loop.exec(); loop.exec();
@ -563,7 +585,7 @@ GstPadProbeReturn SongLoader::DataReady(GstPad*, GstPadProbeInfo *info, gpointer
#endif #endif
#ifdef HAVE_GSTREAMER #ifdef HAVE_GSTREAMER
gboolean SongLoader::BusCallback(GstBus*, GstMessage *msg, gpointer self) { gboolean SongLoader::BusWatchCallback(GstBus*, GstMessage *msg, gpointer self) {
SongLoader *instance = reinterpret_cast<SongLoader*>(self); SongLoader *instance = reinterpret_cast<SongLoader*>(self);
@ -633,6 +655,7 @@ void SongLoader::ErrorMessageReceived(GstMessage *msg) {
void SongLoader::EndOfStreamReached() { void SongLoader::EndOfStreamReached() {
qLog(Debug) << Q_FUNC_INFO << static_cast<int>(state_); qLog(Debug) << Q_FUNC_INFO << static_cast<int>(state_);
switch (state_) { switch (state_) {
case State::Finished: case State::Finished:
break; break;
@ -661,6 +684,7 @@ void SongLoader::EndOfStreamReached() {
void SongLoader::MagicReady() { void SongLoader::MagicReady() {
qLog(Debug) << Q_FUNC_INFO; qLog(Debug) << Q_FUNC_INFO;
parser_ = playlist_parser_->ParserForMagic(buffer_, mime_type_); parser_ = playlist_parser_->ParserForMagic(buffer_, mime_type_);
if (!parser_) { if (!parser_) {
@ -673,6 +697,7 @@ void SongLoader::MagicReady() {
// We'll get more data and parse the whole thing in EndOfStreamReached // We'll get more data and parse the whole thing in EndOfStreamReached
qLog(Debug) << "Magic says" << parser_->name(); qLog(Debug) << "Magic says" << parser_->name();
if (parser_->name() == "ASX/INI" && url_.scheme() == "http") { if (parser_->name() == "ASX/INI" && url_.scheme() == "http") {
// This is actually a weird MS-WMSP stream. Changing the protocol to MMS from HTTP makes it playable. // This is actually a weird MS-WMSP stream. Changing the protocol to MMS from HTTP makes it playable.
parser_ = nullptr; parser_ = nullptr;
@ -706,7 +731,7 @@ bool SongLoader::IsPipelinePlaying() {
#endif #endif
#ifdef HAVE_GSTREAMER #ifdef HAVE_GSTREAMER
void SongLoader::StopTypefindAsync(bool success) { void SongLoader::StopTypefindAsync(const bool success) {
state_ = State::Finished; state_ = State::Finished;
success_ = success; success_ = success;
@ -716,7 +741,6 @@ void SongLoader::StopTypefindAsync(bool success) {
} }
#endif #endif
void SongLoader::ScheduleTimeoutAsync() { void SongLoader::ScheduleTimeoutAsync() {
if (QThread::currentThread() == thread()) { if (QThread::currentThread() == thread()) {
@ -733,3 +757,38 @@ void SongLoader::ScheduleTimeout() {
timeout_timer_->start(timeout_); timeout_timer_->start(timeout_);
} }
#ifdef HAVE_GSTREAMER
void SongLoader::CleanupPipeline() {
if (pipeline_) {
gst_element_set_state(pipeline_.get(), GST_STATE_NULL);
if (fakesink_ && buffer_probe_cb_id_ != 0) {
GstPad *pad = gst_element_get_static_pad(fakesink_, "src");
if (pad) {
gst_pad_remove_probe(pad, buffer_probe_cb_id_);
gst_object_unref(pad);
}
}
{
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_.get()));
if (bus) {
gst_bus_remove_watch(bus);
gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
gst_object_unref(bus);
}
}
pipeline_.reset();
}
state_ = State::Finished;
}
#endif

View File

@ -56,6 +56,7 @@ class CddaSongLoader;
class SongLoader : public QObject { class SongLoader : public QObject {
Q_OBJECT Q_OBJECT
public: public:
explicit SongLoader(CollectionBackendInterface *collection, const Player *player, QObject *parent = nullptr); explicit SongLoader(CollectionBackendInterface *collection, const Player *player, QObject *parent = nullptr);
~SongLoader() override; ~SongLoader() override;
@ -124,13 +125,14 @@ class SongLoader : public QObject {
static void TypeFound(GstElement *typefind, uint probability, GstCaps *caps, void *self); static void TypeFound(GstElement *typefind, uint probability, GstCaps *caps, void *self);
static GstPadProbeReturn DataReady(GstPad*, GstPadProbeInfo *info, gpointer self); static GstPadProbeReturn DataReady(GstPad*, GstPadProbeInfo *info, gpointer self);
static GstBusSyncReply BusCallbackSync(GstBus*, GstMessage*, gpointer); static GstBusSyncReply BusCallbackSync(GstBus*, GstMessage*, gpointer);
static gboolean BusCallback(GstBus*, GstMessage*, gpointer); static gboolean BusWatchCallback(GstBus*, GstMessage*, gpointer);
void StopTypefindAsync(bool success);
void ErrorMessageReceived(GstMessage *msg); void ErrorMessageReceived(GstMessage *msg);
void EndOfStreamReached(); void EndOfStreamReached();
void MagicReady(); void MagicReady();
bool IsPipelinePlaying(); bool IsPipelinePlaying();
void StopTypefindAsync(const bool success);
void CleanupPipeline();
#endif #endif
void ScheduleTimeoutAsync(); void ScheduleTimeoutAsync();
@ -141,29 +143,31 @@ class SongLoader : public QObject {
QUrl url_; QUrl url_;
SongList songs_; SongList songs_;
const Player *player_;
CollectionBackendInterface *collection_;
QTimer *timeout_timer_; QTimer *timeout_timer_;
PlaylistParser *playlist_parser_; PlaylistParser *playlist_parser_;
CueParser *cue_parser_; CueParser *cue_parser_;
// For async loads // For async loads
std::function<Result()> preload_func_; std::function<Result()> preload_func_;
int timeout_;
State state_;
bool success_;
ParserBase *parser_;
QString mime_type_; QString mime_type_;
QByteArray buffer_; QByteArray buffer_;
CollectionBackendInterface *collection_; ParserBase *parser_;
const Player *player_; State state_;
int timeout_;
#ifdef HAVE_GSTREAMER #ifdef HAVE_GSTREAMER
std::shared_ptr<GstElement> pipeline_; std::shared_ptr<GstElement> pipeline_;
GstElement *fakesink_;
gulong buffer_probe_cb_id_;
#endif #endif
QThreadPool thread_pool_; QThreadPool thread_pool_;
QStringList errors_; QStringList errors_;
bool success_;
}; };
#endif // SONGLOADER_H #endif // SONGLOADER_H