fixed streaming cancellation

This commit is contained in:
Mariotaku Lee 2017-03-13 09:40:11 +08:00
parent 62a52c78ed
commit a89dbdf842
No known key found for this signature in database
GPG Key ID: 15C10F89D7C33535
1 changed files with 40 additions and 17 deletions

View File

@ -46,7 +46,6 @@ import org.mariotaku.twidere.util.streaming.TwitterTimelineStreamCallback
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
class StreamingService : BaseService() {
@ -54,7 +53,7 @@ class StreamingService : BaseService() {
internal lateinit var threadPoolExecutor: ExecutorService
internal lateinit var handler: Handler
private val stateMap = WeakHashMap<UserKey, Future<*>>()
private val submittedTasks = WeakHashMap<UserKey, StreamingRunnable<*>>()
private val accountChangeObserver = OnAccountsUpdateListener {
setupStreaming()
@ -63,14 +62,19 @@ class StreamingService : BaseService() {
override fun onCreate() {
super.onCreate()
GeneralComponentHelper.build(this).inject(this)
threadPoolExecutor = Executors.newCachedThreadPool(BasicThreadFactory.Builder().priority(Thread.NORM_PRIORITY - 1).build())
threadPoolExecutor = Executors.newCachedThreadPool(BasicThreadFactory.Builder()
.namingPattern("twidere-streaming-%d")
.priority(Thread.NORM_PRIORITY - 1).build())
handler = Handler(Looper.getMainLooper())
AccountManager.get(this).addOnAccountsUpdatedListenerSafe(accountChangeObserver, updateImmediately = false)
}
override fun onDestroy() {
submittedTasks.forEach { _, future ->
future.cancel()
}
threadPoolExecutor.shutdown()
stateMap.clear()
submittedTasks.clear()
removeNotification()
AccountManager.get(this).removeOnAccountsUpdatedListenerSafe(accountChangeObserver)
super.onDestroy()
@ -105,33 +109,31 @@ class StreamingService : BaseService() {
}
}
private val Future<*>.isRunning: Boolean
get() = !isCancelled && !isDone
private fun updateStreamingInstances(): Boolean {
val am = AccountManager.get(this)
val supportedAccounts = AccountUtils.getAllAccountDetails(am, true).filter { it.isStreamingSupported }
val enabledPrefs = supportedAccounts.map { AccountPreferences(this, it.key) }
val enabledAccounts = supportedAccounts.filter { account ->
return@filter enabledPrefs.any {
account.key == it.accountKey
account.key == it.accountKey && it.isStreamingEnabled
}
}
if (enabledAccounts.isEmpty()) return false
// Remove all disabled instances
stateMap.forEach { k, v ->
if (enabledAccounts.none { k == it.key } && v.isRunning) {
v.cancel(true)
submittedTasks.forEach { k, v ->
if (enabledAccounts.none { k == it.key } && !v.cancelled) {
v.cancel()
}
}
// Add instances if not running
enabledAccounts.forEach { account ->
val existing = stateMap[account.key]
if (existing == null || !existing.isRunning) {
val existing = submittedTasks[account.key]
if (existing == null || existing.cancelled) {
val runnable = account.newStreamingRunnable() ?: return@forEach
stateMap[account.key] = threadPoolExecutor.submit(runnable)
threadPoolExecutor.submit(runnable)
submittedTasks[account.key] = runnable
}
}
return true
@ -157,6 +159,10 @@ class StreamingService : BaseService() {
stopForeground(true)
}
private fun buildNotification() {
}
private fun AccountDetails.newStreamingRunnable(): StreamingRunnable<*>? {
when (type) {
AccountType.TWITTER -> {
@ -168,21 +174,33 @@ class StreamingService : BaseService() {
internal abstract class StreamingRunnable<T>(val context: Context, val account: AccountDetails) : Runnable {
var cancelled: Boolean = false
private set
override fun run() {
val instance = createStreamingInstance()
while (!Thread.currentThread().isInterrupted) {
while (!cancelled && !Thread.currentThread().isInterrupted) {
try {
instance.beginStreaming()
} catch (e: MicroBlogException) {
DebugLog.w(LOGTAG, tr = e)
DebugLog.w(LOGTAG, msg = "Can't stream for ${account.key}", tr = e)
}
Thread.sleep(TimeUnit.MINUTES.toMillis(1))
}
}
fun cancel(): Boolean {
if (cancelled) return false
cancelled = true
onCancelled()
return true
}
abstract fun createStreamingInstance(): T
abstract fun T.beginStreaming()
abstract fun onCancelled()
}
internal class TwitterStreamingRunnable(context: Context, val handler: Handler, account: AccountDetails) :
@ -208,6 +226,7 @@ class StreamingService : BaseService() {
private var homeInsertGap = false
private var interactionsInsertGap = false
override fun onConnected(): Boolean {
homeInsertGap = true
interactionsInsertGap = true
@ -268,7 +287,7 @@ class StreamingService : BaseService() {
}
override fun onException(ex: Throwable): Boolean {
DebugLog.w(LOGTAG, tr = ex)
DebugLog.w(LOGTAG, msg = "Exception for ${account.key}", tr = ex)
return true
}
@ -310,6 +329,10 @@ class StreamingService : BaseService() {
getUserStream(StreamWith.USER, callback)
}
override fun onCancelled() {
callback.disconnect()
}
}
companion object {