Lifecycle-aware streaming, streaming error handling, DI

This commit is contained in:
charlag 2019-05-29 21:00:38 +02:00
parent bc812a8f6c
commit 0682eaea63
4 changed files with 179 additions and 71 deletions

View File

@ -45,6 +45,7 @@ import com.keylesspalace.tusky.appstore.ProfileEditedEvent;
import com.keylesspalace.tusky.components.conversation.ConversationsRepository;
import com.keylesspalace.tusky.components.search.SearchActivity;
import com.keylesspalace.tusky.db.AccountEntity;
import com.keylesspalace.tusky.di.ProfileStreamingListenerProvider;
import com.keylesspalace.tusky.entity.Account;
import com.keylesspalace.tusky.fragment.SFragment;
import com.keylesspalace.tusky.interfaces.ActionButtonActivity;
@ -63,6 +64,7 @@ import com.mikepenz.materialdrawer.model.PrimaryDrawerItem;
import com.mikepenz.materialdrawer.model.ProfileDrawerItem;
import com.mikepenz.materialdrawer.model.ProfileSettingDrawerItem;
import com.mikepenz.materialdrawer.model.SecondaryDrawerItem;
import com.mikepenz.materialdrawer.model.ToggleDrawerItem;
import com.mikepenz.materialdrawer.model.interfaces.IDrawerItem;
import com.mikepenz.materialdrawer.model.interfaces.IProfile;
import com.mikepenz.materialdrawer.util.AbstractDrawerImageLoader;
@ -108,8 +110,9 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
@Inject
ConversationsRepository conversationRepository;
@Inject
public ProfileStreamListener streamingListener;
public ProfileStreamingListenerProvider streamingListenerProvider;
private ProfileStreamListener profileStreamListener;
private FloatingActionButton composeButton;
private AccountHeader headerResult;
private Drawer drawer;
@ -132,6 +135,10 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
profileStreamListener = streamingListenerProvider.get(this);
this.profileStreamListener.resume();
if (accountManager.getActiveAccount() == null) {
// will be redirected to LoginActivity by BaseActivity
return;
@ -362,7 +369,7 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
DrawerImageLoader.init(new AbstractDrawerImageLoader() {
@Override
public void set(ImageView imageView, Uri uri, Drawable placeholder, String tag) {
if(animateAvatars) {
if (animateAvatars) {
Glide.with(MainActivity.this)
.load(uri)
.placeholder(placeholder)
@ -384,6 +391,10 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
});
List<IDrawerItem> listItems = new ArrayList<>(10);
listItems.add(new ToggleDrawerItem().withChecked(true).withName("Streaming").withSelectable(false).withOnCheckedChangeListener((d, iv, checked) -> {
if (checked) this.profileStreamListener.resume();
else this.profileStreamListener.stop();
}));
listItems.add(new PrimaryDrawerItem().withIdentifier(DRAWER_ITEM_EDIT_PROFILE).withName(R.string.action_edit_profile).withSelectable(false).withIcon(GoogleMaterial.Icon.gmd_person));
listItems.add(new PrimaryDrawerItem().withIdentifier(DRAWER_ITEM_FAVOURITES).withName(R.string.action_view_favourites).withSelectable(false).withIcon(GoogleMaterial.Icon.gmd_star));
listItems.add(new PrimaryDrawerItem().withIdentifier(DRAWER_ITEM_LISTS).withName(R.string.action_lists).withSelectable(false).withIcon(GoogleMaterial.Icon.gmd_list));
@ -502,7 +513,7 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
private void changeAccount(long newSelectedId, @Nullable Intent forward) {
cacheUpdater.stop();
SFragment.flushFilters();
streamingListener.stop();
profileStreamListener.stop();
accountManager.setActiveAccount(newSelectedId);
Intent intent = new Intent(this, MainActivity.class);

View File

@ -2,103 +2,169 @@ package com.keylesspalace.tusky
import android.content.Context
import android.util.Log
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleObserver
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.OnLifecycleEvent
import com.google.gson.Gson
import com.keylesspalace.tusky.appstore.EventHub
import com.keylesspalace.tusky.appstore.NewHomeTimelineStatusEvent
import com.keylesspalace.tusky.appstore.NewNotificationEvent
import com.keylesspalace.tusky.appstore.StatusDeletedEvent
import com.keylesspalace.tusky.entity.Notification
import com.keylesspalace.tusky.entity.Status
import com.keylesspalace.tusky.network.MastodonApi
import com.keylesspalace.tusky.util.NotificationHelper
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.rxkotlin.addTo
import io.reactivex.schedulers.Schedulers
import retrofit2.Call
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import javax.inject.Inject
private const val TAG = "ProfileStreamListener"
private data class Optional<T>(val value: T)
/**
* Subscribes to the profile events using SSE.
*/
class ProfileStreamListener @Inject constructor(
api: MastodonApi,
gson: Gson,
context: Context,
accountManager: com.keylesspalace.tusky.db.AccountManager,
eventHub: EventHub
) {
private val disposable: Disposable = Single
.fromCallable {
// TODO: listen for the network state
api.userStream().execute().body()!!.charStream().useLines { linesSequence ->
// Mastodon Event types: update, notification, delete, filters_changed
// we react only on notification for now
// To detect that it's a notification, we should notice that previous line was
// "event: notification"
// Then notification will be on the next line as:
// "data: {...}"
var lastEventType: String? = null
linesSequence.forEach { line ->
Log.d(TAG, "new line: $line")
failedAttempts = 0
val parts = line.split(": ", limit = 2)
if (parts.size < 2) {
return@forEach
}
when (parts[0]) {
"event" -> lastEventType = parts[1]
"data" -> when (lastEventType) {
null -> {
private val api: MastodonApi,
private val gson: Gson,
private val context: Context,
private val accountManager: com.keylesspalace.tusky.db.AccountManager,
private val eventHub: EventHub,
lifecycleOwner: LifecycleOwner
) : LifecycleObserver {
private val disposable = CompositeDisposable()
private var failedAttempts = 0
private var isStoppedManually = true
private val call: AtomicReference<Call<*>?> = AtomicReference(null)
init {
lifecycleOwner.lifecycle.addObserver(this)
}
fun resume() {
Log.d(TAG, "resume")
this.isStoppedManually = false
this.internalResume()
}
fun stop() {
Log.d(TAG, "pause")
this.isStoppedManually = true
this.internalStop()
}
private fun internalResume() {
Log.d(TAG, "internal resume")
if (this.isStoppedManually) {
Log.d(TAG, "Was stopped manually, not resumming")
return
}
Single.fromCallable {
val call = api.userStream()
// If it wasn't null, don't do anything
if (!this.call.compareAndSet(null, call)) {
Log.d(TAG, "internal resume cancelled: ther was a call alrady")
return@fromCallable Optional(null)
}
call.execute().body()!!.charStream().useLines { linesSequence ->
// Mastodon Event types: update, notification, delete, filters_changed
// we react only on notification for now
// To detect that it's a notification, we should notice that previous line was
// "event: notification"
// Then notification will be on the next line as:
// "data: {...}"
var lastEventType: String? = null
linesSequence.forEach { line ->
failedAttempts = 0
val parts = line.split(": ", limit = 2)
if (parts.size < 2) {
return@forEach
}
when (parts[0]) {
"event" -> lastEventType = parts[1]
"data" -> when (lastEventType) {
null -> {
}
"notification" -> {
Log.d(TAG, "new notification")
val account = accountManager.activeAccount
if (account != null) {
val notification = gson.fromJson(parts[1], Notification::class.java)
NotificationHelper.make(context, notification, account, true)
account.lastNotificationId = notification.id
accountManager.saveAccount(account)
eventHub.dispatch(NewNotificationEvent(notification))
}
"notification" -> {
Log.d(TAG, "new notification")
val account = accountManager.activeAccount
if (account != null) {
val notification = gson.fromJson(parts[1], Notification::class.java)
NotificationHelper.make(context, notification, account, true)
account.lastNotificationId = notification.id
accountManager.saveAccount(account)
eventHub.dispatch(NewNotificationEvent(notification))
}
}
"update" -> {
Log.d(TAG, "new update")
accountManager.activeAccount?.let { account ->
val status = gson.fromJson(parts[1], Status::class.java)
eventHub.dispatch(NewHomeTimelineStatusEvent(status))
}
}
"update" -> {
Log.d(TAG, "new update")
accountManager.activeAccount?.let { account ->
val status = gson.fromJson(parts[1], Status::class.java)
eventHub.dispatch(NewHomeTimelineStatusEvent(status))
}
}
"delete" -> {
Log.d(TAG, "new delete")
eventHub.dispatch(StatusDeletedEvent(parts[1]))
}
}
}
}
}
.retryWhen { attempts ->
attempts.flatMap { error ->
this.failedAttempts++
if (failedAttempts < 10) {
val delay = failedAttempts * 10L
Log.d(TAG, "Error while listening to profile stream, trying after $delay",
error)
Flowable.timer(delay, TimeUnit.SECONDS)
} else {
Flowable.error(error)
Optional(call)
}
.retryWhen { attempts ->
attempts.flatMap { error ->
this.failedAttempts++
val existingCall = this.call.get()
Log.d(TAG, "Error, existing call: ${existingCall != null}")
if (failedAttempts < 10 && existingCall != null) {
val delay = failedAttempts * 10L
Log.d(TAG, "Error while listening to profile stream, trying after $delay",
error)
Flowable.timer(delay, TimeUnit.SECONDS)
} else {
Flowable.error(error)
}
}
}
}
.subscribeOn(Schedulers.newThread())
.subscribe { _, err ->
if (err != null) {
Log.w(TAG, "Error while listening to profile stream", err)
.subscribeOn(Schedulers.newThread())
.subscribe { maybeCall, err ->
// Override it with null if it's the same call
val wasThisCall = err == null && this.call.compareAndSet(maybeCall.value, null)
if (wasThisCall) {
Log.w(TAG, "Error while listening to profile stream")
}
}
}
private var failedAttempts = 0
fun stop() {
this.disposable.dispose()
.addTo(disposable)
}
@OnLifecycleEvent(Lifecycle.Event.ON_RESUME)
private fun onResume() {
Log.d(TAG, "onResume")
this.internalResume()
}
@OnLifecycleEvent(Lifecycle.Event.ON_PAUSE)
private fun onPause() {
Log.d(TAG, "onPause")
this.internalStop()
}
private fun internalStop() {
// Set it to null. Cancel it if it was there.
this.call.getAndSet(null)
?.also { previous ->
Log.d(TAG, "internal stop, previous call: $previous")
}?.cancel()
}
}

View File

@ -35,7 +35,8 @@ import javax.inject.Singleton
ServicesModule::class,
BroadcastReceiverModule::class,
ViewModelModule::class,
RepositoryModule::class
RepositoryModule::class,
StreamingModule::class
])
interface AppComponent {
@Component.Builder

View File

@ -0,0 +1,30 @@
package com.keylesspalace.tusky.di
import android.content.Context
import androidx.lifecycle.LifecycleOwner
import com.google.gson.Gson
import com.keylesspalace.tusky.ProfileStreamListener
import com.keylesspalace.tusky.appstore.EventHub
import com.keylesspalace.tusky.db.AccountManager
import com.keylesspalace.tusky.network.MastodonApi
import dagger.Module
import dagger.Provides
// Interface is for Java
interface ProfileStreamingListenerProvider {
fun get(lifecycleOwner: LifecycleOwner): ProfileStreamListener
}
@Module
class StreamingModule {
@Provides
fun providesSProfileStreamingListener(
api: MastodonApi,
gson: Gson,
context: Context,
accountManager: AccountManager,
eventHub: EventHub
): ProfileStreamingListenerProvider = object : ProfileStreamingListenerProvider {
override fun get(lifecycleOwner: LifecycleOwner) = ProfileStreamListener(api, gson, context, accountManager, eventHub, lifecycleOwner)
}
}