GstEnginePipeline: Remove upstream events and buffer probes

Also rename some variables and callback functions

Possible fix for #1090
This commit is contained in:
Jonas Kvinge 2023-01-02 00:06:18 +01:00
parent 8b2e8d3804
commit 6d05bb2de5
2 changed files with 140 additions and 87 deletions

View File

@ -104,17 +104,22 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
audiobin_(nullptr),
audiosink_(nullptr),
audioqueue_(nullptr),
audioqueueconverter_(nullptr),
volume_(nullptr),
volume_sw_(nullptr),
volume_fading_(nullptr),
audiopanorama_(nullptr),
equalizer_(nullptr),
equalizer_preamp_(nullptr),
element_added_cb_id_(-1),
pad_added_cb_id_(-1),
notify_source_cb_id_(-1),
about_to_finish_cb_id_(-1),
notify_volume_cb_id_(-1),
eventprobe_(nullptr),
upstream_events_probe_cb_id_(-1UL),
buffer_probe_cb_id_(-1UL),
playbin_probe_cb_id_(-1UL),
element_added_cb_id_(-1UL),
pad_added_cb_id_(-1UL),
notify_source_cb_id_(-1UL),
about_to_finish_cb_id_(-1UL),
notify_volume_cb_id_(-1UL),
logged_unsupported_analyzer_format_(false) {
eq_band_gains_.reserve(kEqBandCount);
@ -133,31 +138,49 @@ GstEnginePipeline::~GstEnginePipeline() {
fader_.reset();
}
if (element_added_cb_id_ != -1) {
if (element_added_cb_id_ != -1UL) {
g_signal_handler_disconnect(G_OBJECT(audiobin_), element_added_cb_id_);
}
if (pad_added_cb_id_ != -1) {
if (pad_added_cb_id_ != -1UL) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), pad_added_cb_id_);
}
if (notify_source_cb_id_ != -1) {
if (notify_source_cb_id_ != -1UL) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), notify_source_cb_id_);
}
if (about_to_finish_cb_id_ != -1) {
if (about_to_finish_cb_id_ != -1UL) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), about_to_finish_cb_id_);
}
if (notify_volume_cb_id_ != -1) {
if (notify_volume_cb_id_ != -1UL) {
g_signal_handler_disconnect(G_OBJECT(volume_), notify_volume_cb_id_);
}
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
if (bus) {
gst_bus_remove_watch(bus);
gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
gst_object_unref(bus);
if (upstream_events_probe_cb_id_ > 0UL) {
GstPad *pad = gst_element_get_static_pad(eventprobe_, "src");
if (pad) {
gst_pad_remove_probe(pad, upstream_events_probe_cb_id_);
gst_object_unref(pad);
}
}
if (buffer_probe_cb_id_ > 0UL) {
GstPad *pad = gst_element_get_static_pad(audioqueueconverter_, "src");
if (pad) {
gst_pad_remove_probe(pad, buffer_probe_cb_id_);
gst_object_unref(pad);
}
}
{
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
if (bus) {
gst_bus_remove_watch(bus);
gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
gst_object_unref(bus);
}
}
gst_element_set_state(pipeline_, GST_STATE_NULL);
@ -261,8 +284,8 @@ bool GstEnginePipeline::InitFromUrl(const QByteArray &stream_url, const QUrl &or
pipeline_ = CreateElement("playbin", "pipeline", nullptr, error);
if (!pipeline_) return false;
pad_added_cb_id_ = CHECKED_GCONNECT(G_OBJECT(pipeline_), "pad-added", &NewPadCallback, this);
notify_source_cb_id_ = CHECKED_GCONNECT(G_OBJECT(pipeline_), "notify::source", &SourceSetupCallback, this);
pad_added_cb_id_ = CHECKED_GCONNECT(G_OBJECT(pipeline_), "pad-added", &PadAddedCallback, this);
notify_source_cb_id_ = CHECKED_GCONNECT(G_OBJECT(pipeline_), "notify::source", &NotifySourceCallback, this);
about_to_finish_cb_id_ = CHECKED_GCONNECT(G_OBJECT(pipeline_), "about-to-finish", &AboutToFinishCallback, this);
if (!InitAudioBin(error)) return false;
@ -431,8 +454,8 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
return false;
}
GstElement *audioqueueconverter = CreateElement("audioconvert", "audioqueueconverter", audiobin_, error);
if (!audioqueueconverter) {
audioqueueconverter_ = CreateElement("audioconvert", "audioqueueconverter", audiobin_, error);
if (!audioqueueconverter_) {
gst_object_unref(GST_OBJECT(audiobin_));
audiobin_ = nullptr;
return false;
@ -530,7 +553,7 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
}
// Create the replaygain elements if it's enabled.
GstElement *eventprobe = audioqueueconverter;
eventprobe_ = audioqueueconverter_;
GstElement *rgvolume = nullptr;
GstElement *rglimiter = nullptr;
GstElement *rgconverter = nullptr;
@ -553,7 +576,7 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
audiobin_ = nullptr;
return false;
}
eventprobe = rgconverter;
eventprobe_ = rgconverter;
// Set replaygain settings
g_object_set(G_OBJECT(rgvolume), "album-mode", rg_mode_, nullptr);
g_object_set(G_OBJECT(rgvolume), "pre-amp", rg_preamp_, nullptr);
@ -582,9 +605,9 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
// Add a data probe on the src pad of the audioconvert element for our scope.
// We do it here because we want pre-equalized and pre-volume samples so that our visualization are not be affected by them.
{
GstPad *pad = gst_element_get_static_pad(eventprobe, "src");
GstPad *pad = gst_element_get_static_pad(eventprobe_, "src");
if (pad) {
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, &EventHandoffCallback, this, nullptr);
upstream_events_probe_cb_id_ = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, &UpstreamEventsProbeCallback, this, nullptr);
gst_object_unref(pad);
}
}
@ -605,14 +628,14 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
// Link all elements
if (!gst_element_link(audioqueue_, audioqueueconverter)) {
if (!gst_element_link(audioqueue_, audioqueueconverter_)) {
gst_object_unref(GST_OBJECT(audiobin_));
audiobin_ = nullptr;
error = "gst_element_link() failed.";
return false;
}
GstElement *element_link = audioqueueconverter; // The next element to link from.
GstElement *element_link = audioqueueconverter_; // The next element to link from.
// Link replaygain elements if enabled.
if (rg_enabled_ && rgvolume && rglimiter && rgconverter) {
@ -700,14 +723,20 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
qLog(Debug) << "Setting channels to" << channels_;
gst_caps_set_simple(caps, "channels", G_TYPE_INT, channels_, nullptr);
}
gst_element_link_filtered(audiosinkconverter, audiosink_, caps);
if (!gst_element_link_filtered(audiosinkconverter, audiosink_, caps)) {
gst_caps_unref(caps);
gst_object_unref(GST_OBJECT(audiobin_));
audiobin_ = nullptr;
error = "gst_element_link_filtered() failed.";
return false;
}
gst_caps_unref(caps);
}
{ // Add probes and handlers.
GstPad *pad = gst_element_get_static_pad(audioqueueconverter, "src");
GstPad *pad = gst_element_get_static_pad(audioqueueconverter_, "src");
if (pad) {
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, HandoffCallback, this, nullptr);
buffer_probe_cb_id_ = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, BufferProbeCallback, this, nullptr);
gst_object_unref(pad);
}
}
@ -715,8 +744,8 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
{
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
if (bus) {
gst_bus_set_sync_handler(bus, BusCallbackSync, this, nullptr);
gst_bus_add_watch(bus, BusCallback, this);
gst_bus_set_sync_handler(bus, BusSyncCallback, this, nullptr);
gst_bus_add_watch(bus, BusWatchCallback, this);
gst_object_unref(bus);
}
}
@ -732,11 +761,13 @@ void GstEnginePipeline::SetupVolume(GstElement *element) {
if (volume_) return;
volume_ = element;
notify_volume_cb_id_ = CHECKED_GCONNECT(G_OBJECT(element), "notify::volume", &VolumeCallback, this);
notify_volume_cb_id_ = CHECKED_GCONNECT(G_OBJECT(element), "notify::volume", &NotifyVolumeCallback, this);
}
GstPadProbeReturn GstEnginePipeline::EventHandoffCallback(GstPad*, GstPadProbeInfo *info, gpointer self) {
GstPadProbeReturn GstEnginePipeline::UpstreamEventsProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self) {
Q_UNUSED(pad)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -787,7 +818,9 @@ void GstEnginePipeline::ElementAddedCallback(GstBin *bin, GstBin*, GstElement *e
}
void GstEnginePipeline::SourceSetupCallback(GstPlayBin *bin, GParamSpec*, gpointer self) {
void GstEnginePipeline::NotifySourceCallback(GstPlayBin *bin, GParamSpec *param_spec, gpointer self) {
Q_UNUSED(param_spec)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -833,7 +866,10 @@ void GstEnginePipeline::SourceSetupCallback(GstPlayBin *bin, GParamSpec*, gpoint
}
void GstEnginePipeline::VolumeCallback(GstElement*, GParamSpec*, gpointer self) {
void GstEnginePipeline::NotifyVolumeCallback(GstElement *element, GParamSpec *param_spec, gpointer self) {
Q_UNUSED(element)
Q_UNUSED(param_spec)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -848,7 +884,9 @@ void GstEnginePipeline::VolumeCallback(GstElement*, GParamSpec*, gpointer self)
}
void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self) {
void GstEnginePipeline::PadAddedCallback(GstElement *element, GstPad *pad, gpointer self) {
Q_UNUSED(element)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -869,7 +907,7 @@ void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self)
gst_pad_set_offset(pad, static_cast<gint64>(running_time));
// Add a probe to the pad so we can update last_playbin_segment_.
gst_pad_add_probe(pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH), PlaybinProbe, instance, nullptr);
instance->playbin_probe_cb_id_ = gst_pad_add_probe(pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH), PlaybinProbeCallback, instance, nullptr);
instance->pipeline_is_connected_ = true;
if (instance->pending_seek_nanosec_ != -1 && instance->pipeline_is_initialized_) {
@ -878,9 +916,9 @@ void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self)
}
GstPadProbeReturn GstEnginePipeline::PlaybinProbe(GstPad *pad, GstPadProbeInfo *info, gpointer data) {
GstPadProbeReturn GstEnginePipeline::PlaybinProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(data);
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
const GstPadProbeType info_type = GST_PAD_PROBE_INFO_TYPE(info);
@ -918,7 +956,7 @@ GstPadProbeReturn GstEnginePipeline::PlaybinProbe(GstPad *pad, GstPadProbeInfo *
}
GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self) {
GstPadProbeReturn GstEnginePipeline::BufferProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -1081,7 +1119,9 @@ GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInf
}
void GstEnginePipeline::AboutToFinishCallback(GstPlayBin*, gpointer self) {
void GstEnginePipeline::AboutToFinishCallback(GstPlayBin *playbin, gpointer self) {
Q_UNUSED(playbin)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -1094,32 +1134,9 @@ void GstEnginePipeline::AboutToFinishCallback(GstPlayBin*, gpointer self) {
}
gboolean GstEnginePipeline::BusCallback(GstBus*, GstMessage *msg, gpointer self) {
GstBusSyncReply GstEnginePipeline::BusSyncCallback(GstBus *bus, GstMessage *msg, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR:
instance->ErrorMessageReceived(msg);
break;
case GST_MESSAGE_TAG:
instance->TagMessageReceived(msg);
break;
case GST_MESSAGE_STATE_CHANGED:
instance->StateChangedMessageReceived(msg);
break;
default:
break;
}
return TRUE;
}
GstBusSyncReply GstEnginePipeline::BusCallbackSync(GstBus*, GstMessage *msg, gpointer self) {
Q_UNUSED(bus)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
@ -1164,6 +1181,33 @@ GstBusSyncReply GstEnginePipeline::BusCallbackSync(GstBus*, GstMessage *msg, gpo
}
gboolean GstEnginePipeline::BusWatchCallback(GstBus *bus, GstMessage *msg, gpointer self) {
Q_UNUSED(bus)
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR:
instance->ErrorMessageReceived(msg);
break;
case GST_MESSAGE_TAG:
instance->TagMessageReceived(msg);
break;
case GST_MESSAGE_STATE_CHANGED:
instance->StateChangedMessageReceived(msg);
break;
default:
break;
}
return TRUE;
}
void GstEnginePipeline::StreamStatusMessageReceived(GstMessage *msg) {
GstStreamStatusType type = GST_STREAM_STATUS_TYPE_CREATE;
@ -1198,7 +1242,11 @@ void GstEnginePipeline::StreamStartMessageReceived() {
}
void GstEnginePipeline::TaskEnterCallback(GstTask*, GThread*, gpointer) {
void GstEnginePipeline::TaskEnterCallback(GstTask *task, GThread *thread, gpointer self) {
Q_UNUSED(task)
Q_UNUSED(thread)
Q_UNUSED(self)
// Bump the priority of the thread only on macOS

View File

@ -146,24 +146,24 @@ class GstEnginePipeline : public QObject {
void SetupVolume(GstElement *element);
// Static callbacks. The GstEnginePipeline instance is passed in the last argument.
static GstPadProbeReturn EventHandoffCallback(GstPad*, GstPadProbeInfo*, gpointer);
static GstPadProbeReturn UpstreamEventsProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self);
static GstPadProbeReturn BufferProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self);
static GstPadProbeReturn PlaybinProbeCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self);
static void ElementAddedCallback(GstBin *bin, GstBin*, GstElement *element, gpointer self);
static void SourceSetupCallback(GstPlayBin*, GParamSpec *pspec, gpointer);
static void VolumeCallback(GstElement*, GParamSpec*, gpointer self);
static void NewPadCallback(GstElement*, GstPad*, gpointer);
static GstPadProbeReturn PlaybinProbe(GstPad*, GstPadProbeInfo*, gpointer);
static GstPadProbeReturn HandoffCallback(GstPad*, GstPadProbeInfo*, gpointer);
static void AboutToFinishCallback(GstPlayBin*, gpointer);
static GstBusSyncReply BusCallbackSync(GstBus*, GstMessage*, gpointer);
static gboolean BusCallback(GstBus*, GstMessage*, gpointer);
static void TaskEnterCallback(GstTask*, GThread*, gpointer);
static void PadAddedCallback(GstElement *element, GstPad *pad, gpointer self);
static void NotifySourceCallback(GstPlayBin *bin, GParamSpec *param_spec, gpointer self);
static void NotifyVolumeCallback(GstElement *element, GParamSpec *param_spec, gpointer self);
static void AboutToFinishCallback(GstPlayBin *playbin, gpointer self);
static GstBusSyncReply BusSyncCallback(GstBus *bus, GstMessage *msg, gpointer self);
static gboolean BusWatchCallback(GstBus *bus, GstMessage *msg, gpointer self);
static void TaskEnterCallback(GstTask *task, GThread *thread, gpointer self);
void TagMessageReceived(GstMessage*);
void ErrorMessageReceived(GstMessage*);
void ElementMessageReceived(GstMessage*);
void StateChangedMessageReceived(GstMessage*);
void BufferingMessageReceived(GstMessage*);
void StreamStatusMessageReceived(GstMessage*);
void TagMessageReceived(GstMessage *msg);
void ErrorMessageReceived(GstMessage *msg);
void ElementMessageReceived(GstMessage *msg);
void StateChangedMessageReceived(GstMessage *msg);
void BufferingMessageReceived(GstMessage *msg);
void StreamStatusMessageReceived(GstMessage *msg);
void StreamStartMessageReceived();
static QString ParseStrTag(GstTagList *list, const char *tag);
@ -285,18 +285,23 @@ class GstEnginePipeline : public QObject {
GstElement *audiobin_;
GstElement *audiosink_;
GstElement *audioqueue_;
GstElement *audioqueueconverter_;
GstElement *volume_;
GstElement *volume_sw_;
GstElement *volume_fading_;
GstElement *audiopanorama_;
GstElement *equalizer_;
GstElement *equalizer_preamp_;
GstElement *eventprobe_;
int element_added_cb_id_;
int pad_added_cb_id_;
int notify_source_cb_id_;
int about_to_finish_cb_id_;
int notify_volume_cb_id_;
gulong upstream_events_probe_cb_id_;
gulong buffer_probe_cb_id_;
gulong playbin_probe_cb_id_;
gulong element_added_cb_id_;
gulong pad_added_cb_id_;
gulong notify_source_cb_id_;
gulong about_to_finish_cb_id_;
gulong notify_volume_cb_id_;
QThreadPool set_state_threadpool_;