Improved syncing speed for large amounts of feeds. fixes #371

- Bundle db operations in FeedSyncThread
- Show "Processing downloads" message instead of "0 downloads left"
This commit is contained in:
daniel oeh 2014-05-17 21:31:52 +02:00
parent c1c3dc593e
commit cd91098fde
8 changed files with 337 additions and 148 deletions

View File

@ -109,6 +109,7 @@
<string name="download_error_request_error">Request error</string>
<string name="download_error_db_access">Database access error</string>
<string name="downloads_left">\u0020Downloads left</string>
<string name="downloads_processing">Processing downloads</string>
<string name="download_notification_title">Downloading podcast data</string>
<string name="download_report_content">%1$d downloads succeeded, %2$d failed</string>
<string name="download_log_title_unknown">Unknown title</string>

View File

@ -33,7 +33,7 @@ public class AntennapodHttpClient {
public static final int CONNECTION_TIMEOUT = 30000;
public static final int SOCKET_TIMEOUT = 30000;
public static final int MAX_CONNECTIONS = 6;
public static final int MAX_CONNECTIONS = 8;
private static volatile HttpClient httpClient = null;

View File

@ -38,10 +38,7 @@ import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@ -89,10 +86,12 @@ public class DownloadService extends Service {
private ExecutorService syncExecutor;
private CompletionService<Downloader> downloadExecutor;
private FeedSyncThread feedSyncThread;
/**
* Number of threads of downloadExecutor.
*/
private static final int NUM_PARALLEL_DOWNLOADS = 4;
private static final int NUM_PARALLEL_DOWNLOADS = 6;
private DownloadRequester requester;
@ -168,8 +167,7 @@ public class DownloadService extends Service {
Log.d(TAG, "Requested invalid range, restarting download from the beginning");
FileUtils.deleteQuietly(new File(downloader.getDownloadRequest().getDestination()));
DownloadRequester.getInstance().download(DownloadService.this, downloader.getDownloadRequest());
}
else {
} else {
Log.e(TAG, "Download failed");
saveDownloadStatus(status);
handleFailedDownload(status, downloader.getDownloadRequest());
@ -255,6 +253,9 @@ public class DownloadService extends Service {
}
);
downloadCompletionThread.start();
feedSyncThread = new FeedSyncThread();
feedSyncThread.start();
setupNotificationBuilders();
requester = DownloadRequester.getInstance();
}
@ -278,6 +279,7 @@ public class DownloadService extends Service {
downloadCompletionThread.interrupt();
syncExecutor.shutdown();
schedExecutor.shutdown();
feedSyncThread.shutdown();
cancelNotificationUpdater();
unregisterReceiver(cancelDownloadReceiver);
}
@ -315,8 +317,14 @@ public class DownloadService extends Service {
@SuppressLint("NewApi")
private Notification updateNotifications() {
String contentTitle = getString(R.string.download_notification_title);
String downloadsLeft = requester.getNumberOfDownloads()
+ getString(R.string.downloads_left);
int numDownloads = requester.getNumberOfDownloads();
String downloadsLeft;
if (numDownloads > 0) {
downloadsLeft = requester.getNumberOfDownloads()
+ getString(R.string.downloads_left);
} else {
downloadsLeft = getString(R.string.downloads_processing);
}
if (android.os.Build.VERSION.SDK_INT >= 16) {
if (notificationBuilder != null) {
@ -596,7 +604,7 @@ public class DownloadService extends Service {
private void handleCompletedFeedDownload(DownloadRequest request) {
if (BuildConfig.DEBUG)
Log.d(TAG, "Handling completed Feed Download");
syncExecutor.execute(new FeedSyncThread(request));
feedSyncThread.submitCompletedDownload(request);
}
@ -627,23 +635,187 @@ public class DownloadService extends Service {
* Takes a single Feed, parses the corresponding file and refreshes
* information in the manager
*/
class FeedSyncThread implements Runnable {
class FeedSyncThread extends Thread {
private static final String TAG = "FeedSyncThread";
private DownloadRequest request;
private BlockingQueue<DownloadRequest> completedRequests = new LinkedBlockingDeque<DownloadRequest>();
private CompletionService<Feed> parserService = new ExecutorCompletionService<Feed>(Executors.newSingleThreadExecutor());
private ExecutorService dbService = Executors.newSingleThreadExecutor();
private Future<?> dbUpdateFuture;
private volatile boolean isActive = true;
private volatile boolean isCollectingRequests = false;
private DownloadError reason;
private boolean successful;
private final long WAIT_TIMEOUT = 3000;
public FeedSyncThread(DownloadRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request must not be null");
/**
* Waits for completed requests. Once the first request has been taken, the method will wait WAIT_TIMEOUT ms longer to
* collect more completed requests.
*
* @return Collected feeds or null if the method has been interrupted during the first waiting period.
*/
private List<Feed> collectCompletedRequests() {
List<Feed> results = new LinkedList<Feed>();
DownloadRequester requester = DownloadRequester.getInstance();
int tasks = 0;
try {
DownloadRequest request = completedRequests.take();
parserService.submit(new FeedParserTask(request));
tasks++;
} catch (InterruptedException e) {
return null;
}
this.request = request;
tasks += pollCompletedDownloads();
isCollectingRequests = true;
if (requester.isDownloadingFeeds()) {
// wait for completion of more downloads
long startTime = System.currentTimeMillis();
long currentTime = startTime;
while (requester.isDownloadingFeeds() && (currentTime - startTime) < WAIT_TIMEOUT) {
try {
if (BuildConfig.DEBUG)
Log.d(TAG, "Waiting for " + (startTime + WAIT_TIMEOUT - currentTime) + " ms");
sleep(startTime + WAIT_TIMEOUT - currentTime);
} catch (InterruptedException e) {
if (BuildConfig.DEBUG) Log.d(TAG, "interrupted while waiting for more downloads");
tasks += pollCompletedDownloads();
} finally {
currentTime = System.currentTimeMillis();
}
}
tasks += pollCompletedDownloads();
}
isCollectingRequests = false;
for (int i = 0; i < tasks; i++) {
try {
Feed f = parserService.take().get();
if (f != null) {
results.add(f);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return results;
}
private int pollCompletedDownloads() {
int tasks = 0;
for (int i = 0; i < completedRequests.size(); i++) {
parserService.submit(new FeedParserTask(completedRequests.poll()));
tasks++;
}
return tasks;
}
@Override
public void run() {
while (isActive) {
final List<Feed> feeds = collectCompletedRequests();
if (feeds == null) {
continue;
}
if (BuildConfig.DEBUG) Log.d(TAG, "Bundling " + feeds.size() + " feeds");
for (Feed feed : feeds) {
removeDuplicateImages(feed); // duplicate images have to removed because the DownloadRequester does not accept two downloads with the same download URL yet.
}
// Save information of feed in DB
if (dbUpdateFuture != null) {
try {
dbUpdateFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
dbUpdateFuture = dbService.submit(new Runnable() {
@Override
public void run() {
Feed[] savedFeeds = DBTasks.updateFeed(DownloadService.this, feeds.toArray(new Feed[feeds.size()]));
for (Feed savedFeed : savedFeeds) {
// Download Feed Image if provided and not downloaded
if (savedFeed.getImage() != null
&& savedFeed.getImage().isDownloaded() == false) {
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed has image; Downloading....");
savedFeed.getImage().setOwner(savedFeed);
final Feed savedFeedRef = savedFeed;
try {
requester.downloadImage(DownloadService.this,
savedFeedRef.getImage());
} catch (DownloadRequestException e) {
e.printStackTrace();
DBWriter.addDownloadStatus(
DownloadService.this,
new DownloadStatus(
savedFeedRef.getImage(),
savedFeedRef
.getImage()
.getHumanReadableIdentifier(),
DownloadError.ERROR_REQUEST_ERROR,
false, e.getMessage()
)
);
}
}
numberOfDownloads.decrementAndGet();
}
sendDownloadHandledIntent();
queryDownloadsAsync();
}
});
}
if (dbUpdateFuture != null) {
try {
dbUpdateFuture.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
e.printStackTrace();
}
}
if (BuildConfig.DEBUG) Log.d(TAG, "Shutting down");
}
private class FeedParserTask implements Callable<Feed> {
private DownloadRequest request;
private FeedParserTask(DownloadRequest request) {
this.request = request;
}
@Override
public Feed call() throws Exception {
return parseFeed(request);
}
}
private Feed parseFeed(DownloadRequest request) {
Feed savedFeed = null;
Feed feed = new Feed(request.getSource(), new Date());
@ -652,9 +824,9 @@ public class DownloadService extends Service {
feed.setDownloaded(true);
feed.setPreferences(new FeedPreferences(0, true, request.getUsername(), request.getPassword()));
reason = null;
DownloadError reason = null;
String reasonDetailed = null;
successful = true;
boolean successful = true;
FeedHandler feedHandler = new FeedHandler();
try {
@ -665,35 +837,6 @@ public class DownloadService extends Service {
throw new InvalidFeedException();
}
removeDuplicateImages(feed); // duplicate images have to removed because the DownloadRequester does not accept two downloads with the same download URL yet.
// Save information of feed in DB
savedFeed = DBTasks.updateFeed(DownloadService.this, feed);
// Download Feed Image if provided and not downloaded
if (savedFeed.getImage() != null
&& savedFeed.getImage().isDownloaded() == false) {
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed has image; Downloading....");
savedFeed.getImage().setOwner(savedFeed);
final Feed savedFeedRef = savedFeed;
try {
requester.downloadImage(DownloadService.this,
savedFeedRef.getImage());
} catch (DownloadRequestException e) {
e.printStackTrace();
DBWriter.addDownloadStatus(
DownloadService.this,
new DownloadStatus(
savedFeedRef.getImage(),
savedFeedRef
.getImage()
.getHumanReadableIdentifier(),
DownloadError.ERROR_REQUEST_ERROR,
false, e.getMessage()
)
);
}
}
} catch (SAXException e) {
successful = false;
e.printStackTrace();
@ -726,14 +869,18 @@ public class DownloadService extends Service {
savedFeed = feed;
}
saveDownloadStatus(new DownloadStatus(savedFeed,
savedFeed.getHumanReadableIdentifier(), reason, successful,
reasonDetailed));
sendDownloadHandledIntent();
numberOfDownloads.decrementAndGet();
queryDownloadsAsync();
if (successful) {
return savedFeed;
} else {
saveDownloadStatus(new DownloadStatus(savedFeed,
savedFeed.getHumanReadableIdentifier(), reason, successful,
reasonDetailed));
return null;
}
}
/**
* Checks if the feed was parsed correctly.
*/
@ -746,8 +893,6 @@ public class DownloadService extends Service {
Log.e(TAG, "Feed has invalid items");
return false;
}
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed appears to be valid.");
return true;
}
@ -805,6 +950,20 @@ public class DownloadService extends Service {
}
}
public void shutdown() {
isActive = false;
if (isCollectingRequests) {
interrupt();
}
}
public void submitCompletedDownload(DownloadRequest request) {
completedRequests.offer(request);
if (isCollectingRequests) {
interrupt();
}
}
}
/**

View File

@ -619,12 +619,18 @@ public final class DBReader {
* database and the items-attribute will be set correctly.
*/
public static Feed getFeed(final Context context, final long feedId) {
PodDBAdapter adapter = new PodDBAdapter(context);
adapter.open();
Feed result = getFeed(context, feedId, adapter);
adapter.close();
return result;
}
static Feed getFeed(final Context context, final long feedId, PodDBAdapter adapter) {
if (BuildConfig.DEBUG)
Log.d(TAG, "Loading feed with id " + feedId);
Feed feed = null;
PodDBAdapter adapter = new PodDBAdapter(context);
adapter.open();
Cursor feedCursor = adapter.getFeedCursor(feedId);
if (feedCursor.moveToFirst()) {
feed = extractFeedFromCursorRow(adapter, feedCursor);
@ -633,7 +639,6 @@ public final class DBReader {
Log.e(TAG, "getFeed could not find feed with id " + feedId);
}
feedCursor.close();
adapter.close();
return feed;
}

View File

@ -228,7 +228,9 @@ public final class DBTasks {
new DownloadStatus(feed, feed
.getHumanReadableIdentifier(),
DownloadError.ERROR_REQUEST_ERROR, false, e
.getMessage()));
.getMessage()
)
);
}
}
@ -249,7 +251,7 @@ public final class DBTasks {
f = new Feed(feed.getDownload_url(), new Date(), feed.getTitle(),
feed.getPreferences().getUsername(), feed.getPreferences().getPassword());
}
f.setId(feed.getId());
DownloadRequester.getInstance().downloadFeed(context, f);
}
@ -347,7 +349,9 @@ public final class DBTasks {
.getMedia()
.getHumanReadableIdentifier(),
DownloadError.ERROR_REQUEST_ERROR,
false, e.getMessage()));
false, e.getMessage()
)
);
}
} else {
requester.downloadMedia(context, item.getMedia());
@ -449,7 +453,8 @@ public final class DBTasks {
try {
downloadFeedItems(false, context,
itemsToDownload.toArray(new FeedItem[itemsToDownload
.size()]));
.size()])
);
} catch (DownloadRequestException e) {
e.printStackTrace();
}
@ -595,14 +600,15 @@ public final class DBTasks {
return QueueAccess.IDListAccess(queue).contains(feedItemId);
}
private static Feed searchFeedByIdentifyingValueOrID(Context context,
Feed feed) {
private static Feed searchFeedByIdentifyingValueOrID(Context context, PodDBAdapter adapter,
Feed feed) {
if (feed.getId() != 0) {
return DBReader.getFeed(context, feed.getId());
return DBReader.getFeed(context, feed.getId(), adapter);
} else {
List<Feed> feeds = DBReader.getFeedList(context);
for (Feed f : feeds) {
if (f.getIdentifyingValue().equals(feed.getIdentifyingValue())) {
f.setItems(DBReader.getFeedItemList(context, f));
return f;
}
}
@ -624,79 +630,95 @@ public final class DBTasks {
}
/**
* Adds a new Feed to the database or updates the old version if it already exists. If another Feed with the same
* Adds new Feeds to the database or updates the old versions if they already exists. If another Feed with the same
* identifying value already exists, this method will add new FeedItems from the new Feed to the existing Feed.
* These FeedItems will be marked as unread.
* <p/>
* This method can update multiple feeds at once. Submitting a feed twice in the same method call can result in undefined behavior.
* <p/>
* This method should NOT be executed on the GUI thread.
*
* @param context Used for accessing the DB.
* @param newFeed The new Feed object.
* @return The updated Feed from the database if it already existed, or the new Feed from the parameters otherwise.
* @param context Used for accessing the DB.
* @param newFeeds The new Feed objects.
* @return The updated Feeds from the database if it already existed, or the new Feed from the parameters otherwise.
*/
public static synchronized Feed updateFeed(final Context context,
final Feed newFeed) {
// Look up feed in the feedslist
final Feed savedFeed = searchFeedByIdentifyingValueOrID(context,
newFeed);
if (savedFeed == null) {
if (BuildConfig.DEBUG)
Log.d(TAG,
"Found no existing Feed with title "
+ newFeed.getTitle() + ". Adding as new one.");
// Add a new Feed
try {
DBWriter.addNewFeed(context, newFeed).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return newFeed;
} else {
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed with title " + newFeed.getTitle()
+ " already exists. Syncing new with existing one.");
public static synchronized Feed[] updateFeed(final Context context,
final Feed... newFeeds) {
List<Feed> newFeedsList = new ArrayList<Feed>();
List<Feed> updatedFeedsList = new ArrayList<Feed>();
Feed[] resultFeeds = new Feed[newFeeds.length];
PodDBAdapter adapter = new PodDBAdapter(context);
adapter.open();
Collections.sort(newFeed.getItems(), new FeedItemPubdateComparator());
savedFeed.setItems(DBReader.getFeedItemList(context, savedFeed));
if (savedFeed.compareWithOther(newFeed)) {
for (int feedIdx = 0; feedIdx < newFeeds.length; feedIdx++) {
final Feed newFeed = newFeeds[feedIdx];
// Look up feed in the feedslist
final Feed savedFeed = searchFeedByIdentifyingValueOrID(context, adapter,
newFeed);
if (savedFeed == null) {
if (BuildConfig.DEBUG)
Log.d(TAG,
"Feed has updated attribute values. Updating old feed's attributes");
savedFeed.updateFromOther(newFeed);
}
if (savedFeed.getPreferences().compareWithOther(newFeed.getPreferences())) {
"Found no existing Feed with title "
+ newFeed.getTitle() + ". Adding as new one."
);
// Add a new Feed
newFeedsList.add(newFeed);
resultFeeds[feedIdx] = newFeed;
} else {
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed has updated preferences. Updating old feed's preferences");
savedFeed.getPreferences().updateFromOther(newFeed.getPreferences());
}
// Look for new or updated Items
for (int idx = 0; idx < newFeed.getItems().size(); idx++) {
final FeedItem item = newFeed.getItems().get(idx);
FeedItem oldItem = searchFeedItemByIdentifyingValue(savedFeed,
item.getIdentifyingValue());
if (oldItem == null) {
// item is new
final int i = idx;
item.setFeed(savedFeed);
savedFeed.getItems().add(i, item);
item.setRead(false);
} else {
oldItem.updateFromOther(item);
Log.d(TAG, "Feed with title " + newFeed.getTitle()
+ " already exists. Syncing new with existing one.");
Collections.sort(newFeed.getItems(), new FeedItemPubdateComparator());
if (savedFeed.compareWithOther(newFeed)) {
if (BuildConfig.DEBUG)
Log.d(TAG,
"Feed has updated attribute values. Updating old feed's attributes");
savedFeed.updateFromOther(newFeed);
}
if (savedFeed.getPreferences().compareWithOther(newFeed.getPreferences())) {
if (BuildConfig.DEBUG)
Log.d(TAG, "Feed has updated preferences. Updating old feed's preferences");
savedFeed.getPreferences().updateFromOther(newFeed.getPreferences());
}
// Look for new or updated Items
for (int idx = 0; idx < newFeed.getItems().size(); idx++) {
final FeedItem item = newFeed.getItems().get(idx);
FeedItem oldItem = searchFeedItemByIdentifyingValue(savedFeed,
item.getIdentifyingValue());
if (oldItem == null) {
// item is new
final int i = idx;
item.setFeed(savedFeed);
savedFeed.getItems().add(i, item);
item.setRead(false);
} else {
oldItem.updateFromOther(item);
}
}
// update attributes
savedFeed.setLastUpdate(newFeed.getLastUpdate());
savedFeed.setType(newFeed.getType());
updatedFeedsList.add(savedFeed);
resultFeeds[feedIdx] = savedFeed;
}
// update attributes
savedFeed.setLastUpdate(newFeed.getLastUpdate());
savedFeed.setType(newFeed.getType());
try {
DBWriter.setCompleteFeed(context, savedFeed).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return savedFeed;
}
adapter.close();
try {
DBWriter.addNewFeed(context, newFeedsList.toArray(new Feed[newFeedsList.size()])).get();
DBWriter.setCompleteFeed(context, updatedFeedsList.toArray(new Feed[updatedFeedsList.size()])).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return resultFeeds;
}
/**

View File

@ -687,18 +687,19 @@ public class DBWriter {
}
static Future<?> addNewFeed(final Context context, final Feed feed) {
static Future<?> addNewFeed(final Context context, final Feed... feeds) {
return dbExec.submit(new Runnable() {
@Override
public void run() {
final PodDBAdapter adapter = new PodDBAdapter(context);
adapter.open();
adapter.setCompleteFeed(feed);
adapter.setCompleteFeed(feeds);
adapter.close();
GpodnetPreferences.addAddedFeed(feed.getDownload_url());
EventDistributor.getInstance().sendFeedUpdateBroadcast();
for (Feed feed : feeds) {
GpodnetPreferences.addAddedFeed(feed.getDownload_url());
}
BackupManager backupManager = new BackupManager(context);
backupManager.dataChanged();
@ -706,17 +707,16 @@ public class DBWriter {
});
}
static Future<?> setCompleteFeed(final Context context, final Feed feed) {
static Future<?> setCompleteFeed(final Context context, final Feed... feeds) {
return dbExec.submit(new Runnable() {
@Override
public void run() {
PodDBAdapter adapter = new PodDBAdapter(context);
adapter.open();
adapter.setCompleteFeed(feed);
adapter.setCompleteFeed(feeds);
adapter.close();
EventDistributor.getInstance().sendFeedUpdateBroadcast();
}
});

View File

@ -497,16 +497,18 @@ public class PodDBAdapter {
* Insert all FeedItems of a feed and the feed object itself in a single
* transaction
*/
public void setCompleteFeed(Feed feed) {
public void setCompleteFeed(Feed... feeds) {
db.beginTransaction();
setFeed(feed);
if (feed.getItems() != null) {
for (FeedItem item : feed.getItems()) {
setFeedItem(item, false);
for (Feed feed : feeds) {
setFeed(feed);
if (feed.getItems() != null) {
for (FeedItem item : feed.getItems()) {
setFeedItem(item, false);
}
}
if (feed.getPreferences() != null) {
setFeedPreferences(feed.getPreferences());
}
}
if (feed.getPreferences() != null) {
setFeedPreferences(feed.getPreferences());
}
db.setTransactionSuccessful();
db.endTransaction();

View File

@ -202,7 +202,7 @@ public class DBTasksTest extends InstrumentationTestCase {
for (int i = 0; i < NUM_ITEMS; i++) {
feed.getItems().add(new FeedItem(0, "item " + i, "id " + i, "link " + i, new Date(), false, feed));
}
Feed newFeed = DBTasks.updateFeed(context, feed);
Feed newFeed = DBTasks.updateFeed(context, feed)[0];
assertTrue(newFeed == feed);
assertTrue(feed.getId() != 0);
@ -222,8 +222,8 @@ public class DBTasksTest extends InstrumentationTestCase {
feed1.setItems(new ArrayList<FeedItem>());
feed2.setItems(new ArrayList<FeedItem>());
Feed savedFeed1 = DBTasks.updateFeed(context, feed1);
Feed savedFeed2 = DBTasks.updateFeed(context, feed2);
Feed savedFeed1 = DBTasks.updateFeed(context, feed1)[0];
Feed savedFeed2 = DBTasks.updateFeed(context, feed2)[0];
assertTrue(savedFeed1.getId() != savedFeed2.getId());
}
@ -258,7 +258,7 @@ public class DBTasksTest extends InstrumentationTestCase {
feed.getItems().add(0, new FeedItem(0, "item " + i, "id " + i, "link " + i, new Date(i), true, feed));
}
final Feed newFeed = DBTasks.updateFeed(context, feed);
final Feed newFeed = DBTasks.updateFeed(context, feed)[0];
assertTrue(feed != newFeed);
updatedFeedTest(newFeed, feedID, itemIDs, NUM_ITEMS_OLD, NUM_ITEMS_NEW);