Use multithreading for feed updates (using ThreadWeaver)

This contains the following changes:
- Use separate db connections for feed updates (required for
  multithreading.
- Add ThreadWeaver dependency.
- Port update job from KJob to ThreadWeaver::Job
- This should also solve the bug where the update process would hang
  on the "processEvents" call, which was intended to keep the UI
  responsive during updates.

BUG: 452585
This commit is contained in:
Bart De Vries 2022-05-24 13:43:13 +02:00
parent 6f280af176
commit 633f4fd0f0
12 changed files with 130 additions and 92 deletions

View File

@ -10,3 +10,4 @@ Dependencies:
'frameworks/syndication': '@stable'
'frameworks/ki18n': '@stable'
'frameworks/kconfig': '@stable'
'frameworks/threadweaver': '@stable'

View File

@ -37,7 +37,7 @@ ecm_setup_version(${PROJECT_VERSION}
)
find_package(Qt${QT_MAJOR_VERSION} ${QT_MIN_VERSION} REQUIRED NO_MODULE COMPONENTS Core Quick Test Gui QuickControls2 Sql Multimedia)
find_package(KF5 ${KF5_MIN_VERSION} REQUIRED COMPONENTS CoreAddons Syndication Config I18n)
find_package(KF5 ${KF5_MIN_VERSION} REQUIRED COMPONENTS CoreAddons Syndication Config I18n ThreadWeaver)
find_package(Taglib REQUIRED)
find_package(Qt${QT_MAJOR_VERSION}Keychain)
set_package_properties(Qt${QT_MAJOR_VERSION}Keychain PROPERTIES

View File

@ -20,6 +20,7 @@ Note: When using versions of kasts built from git-master, it's possible that the
- Syndication
- TagLib
- QtKeychain
- ThreadWeaver
### On debian
@ -28,7 +29,7 @@ Note: When using versions of kasts built from git-master, it's possible that the
qtdeclarative5-dev qtquickcontrols2-5-dev qtmultimedia5-dev \
libkf5syndication-dev libkf5config-dev libkf5i18n-dev \
libkf5coreaddons-dev libtag1-dev qtkeychain-qt5-dev \
libkf5networkmanagerqt-dev
libkf5networkmanagerqt-dev libkf5threadweaver-dev
```
## Linux

View File

@ -145,7 +145,7 @@ add_executable(kasts ${SRCS})
kconfig_add_kcfg_files(kasts settingsmanager.kcfgc GENERATE_MOC)
target_include_directories(kasts PRIVATE ${CMAKE_BINARY_DIR})
target_link_libraries(kasts PRIVATE Qt::Core Qt::Qml Qt::Quick Qt::QuickControls2 Qt::Sql Qt::Multimedia KF5::Syndication KF5::CoreAddons KF5::ConfigGui KF5::I18n Taglib::Taglib KastsSolidExtras ${QTKEYCHAIN_LIBRARIES})
target_link_libraries(kasts PRIVATE Qt::Core Qt::Qml Qt::Quick Qt::QuickControls2 Qt::Sql Qt::Multimedia KF5::Syndication KF5::CoreAddons KF5::ConfigGui KF5::I18n Taglib::Taglib KastsSolidExtras ${QTKEYCHAIN_LIBRARIES} KF5::ThreadWeaver)
if(ANDROID)
target_link_libraries(kasts PRIVATE

View File

@ -1,6 +1,6 @@
/**
* SPDX-FileCopyrightText: 2020 Tobias Fella <fella@posteo.de>
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
@ -14,8 +14,6 @@
#include <QSqlError>
#include <QStandardPaths>
#include <QUrl>
#include <QXmlStreamReader>
#include <QXmlStreamWriter>
#define TRUE_OR_RETURN(x) \
if (!x) \
@ -23,11 +21,7 @@
Database::Database()
{
QSqlDatabase db = QSqlDatabase::addDatabase(QStringLiteral("QSQLITE"));
QString databasePath = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir(databasePath).mkpath(databasePath);
db.setDatabaseName(databasePath + QStringLiteral("/database.db3"));
db.open();
Database::openDatabase();
if (!migrate()) {
qCritical() << "Failed to migrate the database";
@ -36,6 +30,21 @@ Database::Database()
cleanup();
}
void Database::openDatabase(const QString &connectionName)
{
QSqlDatabase db = QSqlDatabase::addDatabase(QStringLiteral("QSQLITE"), connectionName);
QString databasePath = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
QDir(databasePath).mkpath(databasePath);
db.setDatabaseName(databasePath + QStringLiteral("/") + m_dbName);
db.open();
}
void Database::closeDatabase(const QString &connectionName)
{
QSqlDatabase::database(connectionName).close();
QSqlDatabase::removeDatabase(connectionName);
}
bool Database::migrate()
{
int dbversion = version();
@ -140,15 +149,17 @@ bool Database::migrateTo6()
return true;
}
bool Database::execute(const QString &query)
bool Database::execute(const QString &query, const QString &connectionName)
{
QSqlQuery q;
QSqlQuery q(connectionName);
q.prepare(query);
return execute(q);
}
bool Database::execute(QSqlQuery &query)
{
// NOTE that this will execute the query on the database that was specified
// when the QSqlQuery was created. There is no way to change that later on.
if (!query.exec()) {
qWarning() << "Failed to execute SQL Query";
qWarning() << query.lastQuery();
@ -158,14 +169,14 @@ bool Database::execute(QSqlQuery &query)
return true;
}
bool Database::transaction()
bool Database::transaction(const QString &connectionName)
{
return QSqlDatabase::database().transaction();
return QSqlDatabase::database(connectionName).transaction();
}
bool Database::commit()
bool Database::commit(const QString &connectionName)
{
return QSqlDatabase::database().commit();
return QSqlDatabase::database(connectionName).commit();
}
int Database::version()

View File

@ -1,6 +1,6 @@
/**
* SPDX-FileCopyrightText: 2020 Tobias Fella <fella@posteo.de>
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
@ -21,11 +21,15 @@ public:
static Database _instance;
return _instance;
}
bool execute(QSqlQuery &query);
bool execute(const QString &query);
bool transaction();
bool commit();
static void openDatabase(const QString &connectionName = QLatin1String(QSqlDatabase::defaultConnection));
static void closeDatabase(const QString &connectionName = QLatin1String(QSqlDatabase::defaultConnection));
static bool execute(QSqlQuery &query);
static bool execute(const QString &query, const QString &connectionName = QLatin1String(QSqlDatabase::defaultConnection));
static bool transaction(const QString &connectionName = QLatin1String(QSqlDatabase::defaultConnection));
static bool commit(const QString &connectionName = QLatin1String(QSqlDatabase::defaultConnection));
private:
Database();
@ -39,4 +43,6 @@ private:
bool migrateTo5();
bool migrateTo6();
void cleanup();
inline static const QString m_dbName = QStringLiteral("database.db3");
};

View File

@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
@ -32,7 +32,7 @@ DataManager::DataManager()
&Fetcher::feedDetailsUpdated,
this,
[this](const QString &url, const QString &name, const QString &image, const QString &link, const QString &description, const QDateTime &lastUpdated) {
qCDebug(kastsDataManager) << "Start updating feed details" << m_feeds;
qCDebug(kastsDataManager) << "Start updating feed details for" << url;
Feed *feed = getFeed(url);
if (feed != nullptr) {
feed->setName(name);

View File

@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/

View File

@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
@ -9,6 +9,7 @@
#include <QTimer>
#include <KLocalizedString>
#include <ThreadWeaver/Queue>
#include "error.h"
#include "fetcher.h"
@ -17,6 +18,8 @@
#include "settingsmanager.h"
#include "updatefeedjob.h"
using namespace ThreadWeaver;
FetchFeedsJob::FetchFeedsJob(const QStringList &urls, QObject *parent)
: KJob(parent)
, m_urls(urls)
@ -43,20 +46,45 @@ void FetchFeedsJob::fetch()
setTotalAmount(KJob::Unit::Items, m_urls.count());
setProcessedAmount(KJob::Unit::Items, 0);
qCDebug(kastsFetcher) << "Number of feed update threads:" << Queue::instance()->currentNumberOfThreads();
for (int i = 0; i < m_urls.count(); i++) {
QString url = m_urls[i];
UpdateFeedJob *updateFeedJob = new UpdateFeedJob(url, this);
m_feedjobs[i] = updateFeedJob;
connect(this, &FetchFeedsJob::aborting, updateFeedJob, &UpdateFeedJob::abort);
connect(updateFeedJob, &UpdateFeedJob::result, this, [this, url, updateFeedJob]() {
if (updateFeedJob->error()) {
Q_EMIT logError(Error::Type::FeedUpdate, url, QString(), updateFeedJob->error(), updateFeedJob->errorString(), QString());
qCDebug(kastsFetcher) << "Starting to fetch" << url;
Q_EMIT Fetcher::instance().feedUpdateStatusChanged(url, true);
QNetworkRequest request((QUrl(url)));
request.setTransferTimeout();
QNetworkReply *reply = Fetcher::instance().get(request);
connect(this, &FetchFeedsJob::aborting, reply, &QNetworkReply::abort);
connect(reply, &QNetworkReply::finished, this, [this, reply, i, url]() {
qCDebug(kastsFetcher) << "got networkreply for" << reply;
if (reply->error()) {
if (!m_abort) {
qCDebug(kastsFetcher) << "Error fetching feed" << reply->errorString();
Q_EMIT logError(Error::Type::FeedUpdate, url, QString(), reply->error(), reply->errorString(), QString());
}
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT Fetcher::instance().feedUpdateStatusChanged(url, false);
} else {
QByteArray data = reply->readAll();
UpdateFeedJob *updateFeedJob = new UpdateFeedJob(url, data);
m_feedjobs[i] = updateFeedJob;
connect(this, &FetchFeedsJob::aborting, updateFeedJob, &UpdateFeedJob::abort);
connect(updateFeedJob, &UpdateFeedJob::finished, this, [this, url]() {
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT Fetcher::instance().feedUpdateStatusChanged(url, false);
});
stream() << updateFeedJob;
qCDebug(kastsFetcher) << "Just started updateFeedJob" << i + 1;
}
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
reply->deleteLater();
});
updateFeedJob->start();
qCDebug(kastsFetcher) << "Just started updateFeedJob" << i + 1;
qCDebug(kastsFetcher) << "End of retrieveFeed for" << url;
}
qCDebug(kastsFetcher) << "End of FetchFeedsJob::fetch";
}

View File

@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/

View File

@ -1,11 +1,12 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#include "updatefeedjob.h"
#include <QDir>
#include <QDomElement>
#include <QMultiMap>
#include <QNetworkReply>
@ -14,16 +15,21 @@
#include <QTimer>
#include <KLocalizedString>
#include <ThreadWeaver/Thread>
#include "database.h"
#include "enclosure.h"
#include "fetcher.h"
#include "fetcherlogging.h"
#include "kasts-version.h"
#include "settingsmanager.h"
UpdateFeedJob::UpdateFeedJob(const QString &url, QObject *parent)
: KJob(parent)
using namespace ThreadWeaver;
UpdateFeedJob::UpdateFeedJob(const QString &url, const QByteArray &data, QObject *parent)
: QObject(parent)
, m_url(url)
, m_data(data)
{
// connect to signals in Fetcher such that GUI can pick up the changes
connect(this, &UpdateFeedJob::feedDetailsUpdated, &Fetcher::instance(), &Fetcher::feedDetailsUpdated);
@ -32,45 +38,27 @@ UpdateFeedJob::UpdateFeedJob(const QString &url, QObject *parent)
connect(this, &UpdateFeedJob::feedUpdateStatusChanged, &Fetcher::instance(), &Fetcher::feedUpdateStatusChanged);
}
void UpdateFeedJob::start()
UpdateFeedJob::~UpdateFeedJob()
{
QTimer::singleShot(0, this, &UpdateFeedJob::retrieveFeed);
qCDebug(kastsFetcher) << "destroyed UpdateFeedJob for" << m_url;
}
void UpdateFeedJob::retrieveFeed()
void UpdateFeedJob::run(JobPointer, Thread *)
{
if (m_abort) {
emitResult();
Q_EMIT finished();
return;
}
qCDebug(kastsFetcher) << "Starting to fetch" << m_url;
Q_EMIT feedUpdateStatusChanged(m_url, true);
Database::openDatabase(m_url);
QNetworkRequest request((QUrl(m_url)));
request.setTransferTimeout();
m_reply = Fetcher::instance().get(request);
connect(this, &UpdateFeedJob::aborting, m_reply, &QNetworkReply::abort);
connect(m_reply, &QNetworkReply::finished, this, [this]() {
qCDebug(kastsFetcher) << "got networkreply for" << m_reply;
if (m_reply->error()) {
if (!m_abort) {
qWarning() << "Error fetching feed";
qWarning() << m_reply->errorString();
setError(m_reply->error());
setErrorText(m_reply->errorString());
}
} else {
QByteArray data = m_reply->readAll();
Syndication::DocumentSource document(data, m_url);
Syndication::FeedPtr feed = Syndication::parserCollection()->parse(document, QStringLiteral("Atom"));
processFeed(feed);
}
Q_EMIT feedUpdateStatusChanged(m_url, false);
m_reply->deleteLater();
emitResult();
});
qCDebug(kastsFetcher) << "End of retrieveFeed for" << m_url;
Syndication::DocumentSource document(m_data, m_url);
Syndication::FeedPtr feed = Syndication::parserCollection()->parse(document, QStringLiteral("Atom"));
processFeed(feed);
Database::closeDatabase(m_url);
Q_EMIT finished();
}
void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
@ -82,10 +70,10 @@ void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
// First check if this is a newly added feed
m_isNewFeed = false;
QSqlQuery query;
QSqlQuery query(QSqlDatabase::database(m_url));
query.prepare(QStringLiteral("SELECT new FROM Feeds WHERE url=:url;"));
query.bindValue(QStringLiteral(":url"), m_url);
Database::instance().execute(query);
Database::execute(query);
if (query.next()) {
m_isNewFeed = query.value(QStringLiteral("new")).toBool();
} else {
@ -119,7 +107,7 @@ void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
query.bindValue(QStringLiteral(":image"), image);
// Do the actual database UPDATE of this feed
Database::instance().execute(query);
Database::execute(query);
// Now that we have the feed details, we make vectors of the data that's
// already in the database relating to this feed
@ -127,28 +115,28 @@ void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
// we can't check for duplicates and we'll keep adding more of the same!
query.prepare(QStringLiteral("SELECT id FROM Entries WHERE feed=:feed;"));
query.bindValue(QStringLiteral(":feed"), m_url);
Database::instance().execute(query);
Database::execute(query);
while (query.next()) {
m_existingEntryIds += query.value(QStringLiteral("id")).toString();
}
query.prepare(QStringLiteral("SELECT id, url FROM Enclosures WHERE feed=:feed;"));
query.bindValue(QStringLiteral(":feed"), m_url);
Database::instance().execute(query);
Database::execute(query);
while (query.next()) {
m_existingEnclosures += qMakePair(query.value(QStringLiteral("id")).toString(), query.value(QStringLiteral("url")).toString());
}
query.prepare(QStringLiteral("SELECT id, name FROM Authors WHERE feed=:feed;"));
query.bindValue(QStringLiteral(":feed"), m_url);
Database::instance().execute(query);
Database::execute(query);
while (query.next()) {
m_existingAuthors += qMakePair(query.value(QStringLiteral("id")).toString(), query.value(QStringLiteral("name")).toString());
}
query.prepare(QStringLiteral("SELECT id, start FROM Chapters WHERE feed=:feed;"));
query.bindValue(QStringLiteral(":feed"), m_url);
Database::instance().execute(query);
Database::execute(query);
while (query.next()) {
m_existingChapters += qMakePair(query.value(QStringLiteral("id")).toString(), query.value(QStringLiteral("start")).toInt());
}
@ -194,7 +182,7 @@ void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
for (const auto &entry : feed->items()) {
if (m_abort)
return;
QCoreApplication::processEvents(); // keep the main thread semi-responsive
bool isNewEntry = processEntry(entry);
updatedEntries = updatedEntries || isNewEntry;
}
@ -209,7 +197,7 @@ void UpdateFeedJob::processFeed(Syndication::FeedPtr feed)
query.prepare(QStringLiteral("UPDATE Feeds SET new=:new WHERE url=:url;"));
query.bindValue(QStringLiteral(":url"), m_url);
query.bindValue(QStringLiteral(":new"), false);
Database::instance().execute(query);
Database::execute(query);
}
if (updatedEntries || m_isNewFeed)
@ -370,9 +358,9 @@ void UpdateFeedJob::processChapter(const QString &entryId, const int &start, con
void UpdateFeedJob::writeToDatabase()
{
QSqlQuery writeQuery;
QSqlQuery writeQuery(QSqlDatabase::database(m_url));
Database::instance().transaction();
Database::transaction(m_url);
// Entries
writeQuery.prepare(
@ -389,7 +377,7 @@ void UpdateFeedJob::writeToDatabase()
writeQuery.bindValue(QStringLiteral(":read"), entryDetails.read);
writeQuery.bindValue(QStringLiteral(":new"), entryDetails.isNew);
writeQuery.bindValue(QStringLiteral(":image"), entryDetails.image);
Database::instance().execute(writeQuery);
Database::execute(writeQuery);
}
// Authors
@ -400,7 +388,7 @@ void UpdateFeedJob::writeToDatabase()
writeQuery.bindValue(QStringLiteral(":name"), authorDetails.name);
writeQuery.bindValue(QStringLiteral(":uri"), authorDetails.uri);
writeQuery.bindValue(QStringLiteral(":email"), authorDetails.email);
Database::instance().execute(writeQuery);
Database::execute(writeQuery);
}
// Enclosures
@ -415,7 +403,7 @@ void UpdateFeedJob::writeToDatabase()
writeQuery.bindValue(QStringLiteral(":url"), enclosureDetails.url);
writeQuery.bindValue(QStringLiteral(":playposition"), enclosureDetails.playPosition);
writeQuery.bindValue(QStringLiteral(":downloaded"), Enclosure::statusToDb(enclosureDetails.downloaded));
Database::instance().execute(writeQuery);
Database::execute(writeQuery);
}
// Chapters
@ -427,10 +415,10 @@ void UpdateFeedJob::writeToDatabase()
writeQuery.bindValue(QStringLiteral(":title"), chapterDetails.title);
writeQuery.bindValue(QStringLiteral(":link"), chapterDetails.link);
writeQuery.bindValue(QStringLiteral(":image"), chapterDetails.image);
Database::instance().execute(writeQuery);
Database::execute(writeQuery);
}
if (Database::instance().commit()) {
if (Database::commit(m_url)) {
for (EntryDetails entryDetails : m_entries) {
Q_EMIT entryAdded(m_url, entryDetails.id);
}

View File

@ -1,27 +1,30 @@
/**
* SPDX-FileCopyrightText: 2021 Bart De Vries <bart@mogwai.be>
* SPDX-FileCopyrightText: 2021-2022 Bart De Vries <bart@mogwai.be>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#pragma once
#include <KJob>
#include <QNetworkAccessManager>
#include <QNetworkReply>
#include <QNetworkRequest>
#include <QString>
#include <Syndication/Syndication>
#include <ThreadWeaver/Job>
#include "enclosure.h"
class UpdateFeedJob : public KJob
class UpdateFeedJob : public QObject, public ThreadWeaver::Job
{
Q_OBJECT
public:
explicit UpdateFeedJob(const QString &url, QObject *parent = nullptr);
explicit UpdateFeedJob(const QString &url, const QByteArray &data, QObject *parent = nullptr);
~UpdateFeedJob();
void start() override;
void run(ThreadWeaver::JobPointer, ThreadWeaver::Thread *) override;
void abort();
struct EntryDetails {
@ -78,9 +81,9 @@ Q_SIGNALS:
void entryAdded(const QString &feedurl, const QString &id);
void feedUpdateStatusChanged(const QString &url, bool status);
void aborting();
void finished();
private:
void retrieveFeed();
void processFeed(Syndication::FeedPtr feed);
bool processEntry(Syndication::ItemPtr entry);
void processAuthor(const QString &entryId, const QString &authorName, const QString &authorUri, const QString &authorEmail);
@ -91,7 +94,7 @@ private:
bool m_abort = false;
QString m_url;
QNetworkReply *m_reply = nullptr;
QByteArray m_data;
bool m_isNewFeed;
QVector<EntryDetails> m_entries;