Cancelling the transcoder now works

This commit is contained in:
David Sansome 2010-05-06 16:28:19 +00:00
parent 111f0d066b
commit 1f0733caf5
3 changed files with 156 additions and 63 deletions

View File

@ -107,6 +107,7 @@ void TranscodeDialog::Start() {
}
void TranscodeDialog::Cancel() {
transcoder_->Cancel();
SetWorking(false);
}

View File

@ -19,14 +19,15 @@
#include <QtConcurrentMap>
#include <QtDebug>
#include <QEventLoop>
#include <QFile>
#include <QCoreApplication>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
using boost::shared_ptr;
int Transcoder::JobFinishedEvent::sEventType = -1;
GstElement* TranscoderFormat::CreateElement(const QString &factory_name,
GstElement *bin,
@ -75,18 +76,32 @@ GstElement* TranscoderFormat::CreateBin(const QStringList& elements) const {
return bin;
}
Transcoder::JobFinishedEvent::JobFinishedEvent(JobState *state, bool success)
: QEvent(QEvent::Type(sEventType)),
state_(state),
success_(success)
{
}
void Transcoder::JobState::PostFinished(bool success) {
QCoreApplication::postEvent(parent_, new Transcoder::JobFinishedEvent(this, success));
}
Transcoder::Transcoder(QObject* parent)
: QObject(parent),
future_watcher_(new QFutureWatcher<void>(this))
max_threads_(QThread::idealThreadCount())
{
if (JobFinishedEvent::sEventType == -1)
JobFinishedEvent::sEventType = QEvent::registerEventType();
formats_ << new OggVorbisTranscoder;
formats_ << new OggSpeexTranscoder;
formats_ << new FlacTranscoder;
formats_ << new Mp3Transcoder;
formats_ << new M4aTranscoder;
formats_ << new ThreeGPTranscoder;
connect(future_watcher_, SIGNAL(finished()), SLOT(JobsFinished()));
}
Transcoder::~Transcoder() {
@ -103,8 +118,6 @@ QList<const TranscoderFormat*> Transcoder::formats() const {
void Transcoder::AddJob(const QString &input,
const TranscoderFormat *output_format,
const QString &output) {
Q_ASSERT(!future_watcher_->isRunning());
Job job;
job.input = input;
job.output_format = output_format;
@ -127,26 +140,34 @@ void Transcoder::AddJob(const QString &input,
}
}
jobs_ << job;
queued_jobs_ << job;
}
void Transcoder::Start() {
Q_ASSERT(!future_watcher_->isRunning());
QFuture<void> future = QtConcurrent::map(
jobs_, boost::bind(&Transcoder::RunJob, this, _1));
future_watcher_->setFuture(future);
forever {
StartJobStatus status = MaybeStartNextJob();
if (status == AllThreadsBusy || status == NoMoreJobs)
break;
}
}
void Transcoder::RunJob(const Job& job) {
bool success = Transcode(job);
Transcoder::StartJobStatus Transcoder::MaybeStartNextJob() {
if (current_jobs_.count() >= max_threads())
return AllThreadsBusy;
if (queued_jobs_.isEmpty()) {
if (current_jobs_.isEmpty())
emit AllJobsComplete();
emit JobComplete(job.input, success);
return NoMoreJobs;
}
Job job = queued_jobs_.takeFirst();
return StartJob(job) ? StartedSuccessfully : FailedToStart;
}
void Transcoder::NewPadCallback(GstElement*, GstPad* pad, gboolean, gpointer data) {
JobState* state = reinterpret_cast<JobState*>(data);
GstPad* const audiopad = gst_element_get_pad(state->convert_element, "sink");
GstPad* const audiopad = gst_element_get_pad(state->convert_element_, "sink");
if (GST_PAD_IS_LINKED(audiopad)) {
qDebug() << "audiopad is already linked. Unlinking old pad.";
@ -162,8 +183,7 @@ gboolean Transcoder::BusCallback(GstBus*, GstMessage* msg, gpointer data) {
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR:
state->success = false;
state->event_loop->exit();
state->PostFinished(false);
break;
default:
@ -176,12 +196,11 @@ GstBusSyncReply Transcoder::BusCallbackSync(GstBus*, GstMessage* msg, gpointer d
JobState* state = reinterpret_cast<JobState*>(data);
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_EOS:
state->event_loop->exit();
state->PostFinished(true);
break;
case GST_MESSAGE_ERROR:
state->success = false;
state->event_loop->exit();
state->PostFinished(false);
break;
default:
@ -190,27 +209,29 @@ GstBusSyncReply Transcoder::BusCallbackSync(GstBus*, GstMessage* msg, gpointer d
return GST_BUS_PASS;
}
bool Transcoder::Transcode(const Job &job) const {
bool Transcoder::StartJob(const Job &job) {
shared_ptr<JobState> state(new JobState(job, this));
// Create the pipeline.
// This should be a scoped_ptr, but scoped_ptr doesn't support custom
// destructors.
shared_ptr<GstElement> pipeline(gst_pipeline_new("pipeline"),
boost::bind(gst_object_unref, _1));
if (!pipeline) return false;
state->pipeline_.reset(gst_pipeline_new("pipeline"),
boost::bind(gst_object_unref, _1));
if (!state->pipeline_) return false;
// Create all the elements
const TranscoderFormat* f = job.output_format;
GstElement* src = f->CreateElement("filesrc", pipeline.get());
GstElement* decode = f->CreateElement("decodebin", pipeline.get());
GstElement* convert = f->CreateElement("audioconvert", pipeline.get());
GstElement* src = f->CreateElement("filesrc", state->pipeline_.get());
GstElement* decode = f->CreateElement("decodebin", state->pipeline_.get());
GstElement* convert = f->CreateElement("audioconvert", state->pipeline_.get());
GstElement* encode = f->CreateEncodeBin();
GstElement* sink = f->CreateElement("filesink", pipeline.get());
GstElement* sink = f->CreateElement("filesink", state->pipeline_.get());
if (!src || !decode || !convert || !encode || !sink)
return false;
// Join them together
gst_bin_add(GST_BIN(pipeline.get()), encode);
gst_bin_add(GST_BIN(state->pipeline_.get()), encode);
gst_element_link(src, decode);
gst_element_link_many(convert, encode, sink, NULL);
@ -219,29 +240,71 @@ bool Transcoder::Transcode(const Job &job) const {
g_object_set(sink, "location", job.output.toLocal8Bit().constData(), NULL);
// Set callbacks
JobState state;
state.convert_element = convert;
state.event_loop.reset(new QEventLoop);
state.success = true;
state->convert_element_ = convert;
g_signal_connect(decode, "new-decoded-pad", G_CALLBACK(NewPadCallback), &state);
gst_bus_set_sync_handler(gst_pipeline_get_bus(GST_PIPELINE(pipeline.get())), BusCallbackSync, &state);
gst_bus_add_watch(gst_pipeline_get_bus(GST_PIPELINE(pipeline.get())), BusCallback, &state);
g_signal_connect(decode, "new-decoded-pad", G_CALLBACK(NewPadCallback), state.get());
gst_bus_set_sync_handler(gst_pipeline_get_bus(GST_PIPELINE(state->pipeline_.get())), BusCallbackSync, state.get());
gst_bus_add_watch(gst_pipeline_get_bus(GST_PIPELINE(state->pipeline_.get())), BusCallback, state.get());
// Start the pipeline and wait until it finishes
gst_element_set_state(pipeline.get(), GST_STATE_PLAYING);
// Start the pipeline
gst_element_set_state(state->pipeline_.get(), GST_STATE_PLAYING);
state.event_loop->exec();
// GStreamer now transcodes in another thread, so we can return now and do
// something else. Keep the JobState object around. It'll post an event
// to our event loop when it finishes.
current_jobs_ << state;
// Do this explicitly so that it's guaranteed to happen before the event
// loop is destroyed.
pipeline.reset();
return state.success;
return true;
}
void Transcoder::JobsFinished() {
jobs_.clear();
emit AllJobsComplete();
bool Transcoder::event(QEvent* e) {
if (e->type() == JobFinishedEvent::sEventType) {
JobFinishedEvent* finished_event = static_cast<JobFinishedEvent*>(e);
// Find this job in the list
JobStateList::iterator it = current_jobs_.begin();
while (it != current_jobs_.end()) {
if (it->get() == finished_event->state_)
break;
++it;
}
if (it == current_jobs_.end()) {
// Couldn't find it, maybe GStreamer gave us an event after we'd destroyed
// the pipeline?
return true;
}
// Emit the finished signal
emit JobComplete((*it)->job_.input, finished_event->success_);
// Remove it from the list - this will also destroy the GStreamer pipeline
current_jobs_.erase(it);
// Start some more jobs
MaybeStartNextJob();
return true;
}
return QObject::event(e);
}
void Transcoder::Cancel() {
// Remove all pending jobs
queued_jobs_.clear();
// Stop the running ones
JobStateList::iterator it = current_jobs_.begin();
while (it != current_jobs_.end()) {
shared_ptr<JobState> state(*it);
// Stop the pipeline
if (gst_element_set_state(state->pipeline_.get(), GST_STATE_NULL) == GST_STATE_CHANGE_ASYNC) {
// Wait for it to finish stopping...
gst_element_get_state(state->pipeline_.get(), NULL, NULL, GST_CLOCK_TIME_NONE);
}
// Remove the job, this destroys the GStreamer pipeline too
it = current_jobs_.erase(it);
}
}

View File

@ -21,13 +21,12 @@
#include <QObject>
#include <QStringList>
#include <QFutureWatcher>
#include <QEvent>
#include <QMetaType>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
class QEventLoop;
class Transcoder;
class TranscoderFormat {
@ -57,17 +56,24 @@ class Transcoder : public QObject {
~Transcoder();
QList<const TranscoderFormat*> formats() const;
int max_threads() const { return max_threads_; }
void set_max_threads(int count) { max_threads_ = count; }
void AddJob(const QString& input, const TranscoderFormat* output_format,
const QString& output = QString());
public slots:
void Start();
void Cancel();
signals:
void JobComplete(const QString& filename, bool success);
void AllJobsComplete();
protected:
bool event(QEvent* e);
private:
// The description of a file to transcode - lives in the main thread.
struct Job {
@ -79,26 +85,49 @@ class Transcoder : public QObject {
// State held by a job and shared across gstreamer callbacks - lives in the
// job's thread.
struct JobState {
GstElement* convert_element;
boost::scoped_ptr<QEventLoop> event_loop;
bool success;
JobState(const Job& job, Transcoder* parent)
: job_(job), parent_(parent), convert_element_(NULL) {}
void PostFinished(bool success);
Job job_;
Transcoder* parent_;
boost::shared_ptr<GstElement> pipeline_;
GstElement* convert_element_;
};
void RunJob(const Job& job);
bool Transcode(const Job& job) const;
// Event passed from a GStreamer callback to the Transcoder when a job
// finishes.
struct JobFinishedEvent : public QEvent {
JobFinishedEvent(JobState* state, bool success);
static int sEventType;
JobState* state_;
bool success_;
};
enum StartJobStatus {
StartedSuccessfully,
FailedToStart,
NoMoreJobs,
AllThreadsBusy,
};
StartJobStatus MaybeStartNextJob();
bool StartJob(const Job& job);
static void NewPadCallback(GstElement*, GstPad* pad, gboolean, gpointer data);
static gboolean BusCallback(GstBus*, GstMessage* msg, gpointer data);
static GstBusSyncReply BusCallbackSync(GstBus*, GstMessage* msg, gpointer data);
private slots:
void JobsFinished();
private:
QList<TranscoderFormat*> formats_;
QList<Job> jobs_;
typedef QList<boost::shared_ptr<JobState> > JobStateList;
QFutureWatcher<void>* future_watcher_;
int max_threads_;
QList<TranscoderFormat*> formats_;
QList<Job> queued_jobs_;
JobStateList current_jobs_;
};
#endif // TRANSCODER_H