From 9489a1baa25c878e3f44bf3587e31ba8d02e86d7 Mon Sep 17 00:00:00 2001 From: charlag Date: Mon, 12 Aug 2019 00:31:18 +0200 Subject: [PATCH] Load on reconnecting streaming, fix some races --- .../com/keylesspalace/tusky/MainActivity.java | 3 +- .../tusky/ProfileStreamListener.kt | 13 ++- .../keylesspalace/tusky/appstore/Events.kt | 3 +- .../tusky/fragment/TimelineFragment.java | 110 +++++++++++------- 4 files changed, 82 insertions(+), 47 deletions(-) diff --git a/app/src/main/java/com/keylesspalace/tusky/MainActivity.java b/app/src/main/java/com/keylesspalace/tusky/MainActivity.java index 1d96c4919..d729a2427 100644 --- a/app/src/main/java/com/keylesspalace/tusky/MainActivity.java +++ b/app/src/main/java/com/keylesspalace/tusky/MainActivity.java @@ -112,7 +112,7 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut @Inject public ProfileStreamingListenerProvider streamingListenerProvider; - private ProfileStreamListener profileStreamListener; + public ProfileStreamListener profileStreamListener; private FloatingActionButton composeButton; private AccountHeader headerResult; private Drawer drawer; @@ -266,7 +266,6 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut super.onResume(); NotificationHelper.clearNotificationsForActiveAccount(this, accountManager); - } @Override diff --git a/app/src/main/java/com/keylesspalace/tusky/ProfileStreamListener.kt b/app/src/main/java/com/keylesspalace/tusky/ProfileStreamListener.kt index d380b8de0..56263875e 100644 --- a/app/src/main/java/com/keylesspalace/tusky/ProfileStreamListener.kt +++ b/app/src/main/java/com/keylesspalace/tusky/ProfileStreamListener.kt @@ -7,10 +7,7 @@ import androidx.lifecycle.LifecycleObserver import androidx.lifecycle.LifecycleOwner import androidx.lifecycle.OnLifecycleEvent import com.google.gson.Gson -import com.keylesspalace.tusky.appstore.EventHub -import com.keylesspalace.tusky.appstore.NewHomeTimelineStatusEvent -import com.keylesspalace.tusky.appstore.NewNotificationEvent -import com.keylesspalace.tusky.appstore.StatusDeletedEvent +import com.keylesspalace.tusky.appstore.* import com.keylesspalace.tusky.entity.Notification import com.keylesspalace.tusky.entity.Status import com.keylesspalace.tusky.network.MastodonApi @@ -62,6 +59,9 @@ class ProfileStreamListener @Inject constructor( this.internalStop() } + val isStreaming + get() = !isStoppedManually + private fun internalResume() { Log.d(TAG, "internal resume") if (this.isStoppedManually) { @@ -75,7 +75,10 @@ class ProfileStreamListener @Inject constructor( Log.d(TAG, "internal resume cancelled: ther was a call alrady") return@fromCallable Optional(null) } - call.execute().body()!!.charStream().useLines { linesSequence -> + val callResponse = call.execute() + Log.d(TAG, "Dispatching reconenct event") + eventHub.dispatch(ProflileStreamingReconnectedEvent) + callResponse.body()!!.charStream().useLines { linesSequence -> // Mastodon Event types: update, notification, delete, filters_changed // we react only on notification for now // To detect that it's a notification, we should notice that previous line was diff --git a/app/src/main/java/com/keylesspalace/tusky/appstore/Events.kt b/app/src/main/java/com/keylesspalace/tusky/appstore/Events.kt index 15f6778bc..38a56b2b6 100644 --- a/app/src/main/java/com/keylesspalace/tusky/appstore/Events.kt +++ b/app/src/main/java/com/keylesspalace/tusky/appstore/Events.kt @@ -19,4 +19,5 @@ data class MainTabsChangedEvent(val newTabs: List) : Dispatchable data class PollVoteEvent(val statusId: String, val poll: Poll) : Dispatchable data class DomainMuteEvent(val instance: String): Dispatchable data class NewNotificationEvent(val notification: Notification) : Dispatchable -data class NewHomeTimelineStatusEvent(val status: Status) : Dispatchable \ No newline at end of file +data class NewHomeTimelineStatusEvent(val status: Status) : Dispatchable +object ProflileStreamingReconnectedEvent : Dispatchable \ No newline at end of file diff --git a/app/src/main/java/com/keylesspalace/tusky/fragment/TimelineFragment.java b/app/src/main/java/com/keylesspalace/tusky/fragment/TimelineFragment.java index 259ac2498..53d2a7a35 100644 --- a/app/src/main/java/com/keylesspalace/tusky/fragment/TimelineFragment.java +++ b/app/src/main/java/com/keylesspalace/tusky/fragment/TimelineFragment.java @@ -55,6 +55,7 @@ import com.keylesspalace.tusky.appstore.FavoriteEvent; import com.keylesspalace.tusky.appstore.MuteEvent; import com.keylesspalace.tusky.appstore.NewHomeTimelineStatusEvent; import com.keylesspalace.tusky.appstore.PreferenceChangedEvent; +import com.keylesspalace.tusky.appstore.ProflileStreamingReconnectedEvent; import com.keylesspalace.tusky.appstore.ReblogEvent; import com.keylesspalace.tusky.appstore.StatusComposedEvent; import com.keylesspalace.tusky.appstore.StatusDeletedEvent; @@ -97,6 +98,7 @@ import javax.inject.Inject; import at.connyduck.sparkbutton.helpers.Utils; import io.reactivex.Observable; +import io.reactivex.Single; import io.reactivex.android.schedulers.AndroidSchedulers; import kotlin.Unit; import kotlin.collections.CollectionsKt; @@ -120,7 +122,7 @@ public class TimelineFragment extends SFragment implements private static final int LOAD_AT_ONCE = 30; private boolean isSwipeToRefreshEnabled = true; private boolean isNeedRefresh; - private Status newStatusEventStatus; + private List newStreamingStatuses = new ArrayList<>(); public enum Kind { HOME, @@ -170,6 +172,7 @@ public class TimelineFragment extends SFragment implements private boolean alwaysShowSensitiveMedia; private boolean alwaysOpenSpoiler; private boolean initialUpdateFailed = false; + private Single loadingAbove; private PairedList, StatusViewData> statuses = new PairedList<>(new Function, StatusViewData>() { @@ -310,7 +313,7 @@ public class TimelineFragment extends SFragment implements Iterator> iterator = this.statuses.iterator(); while (iterator.hasNext()) { Either item = iterator.next(); - if(item.isRight()) { + if (item.isRight()) { Status status = item.asRight(); if (status.getId().length() < topId.length() || status.getId().compareTo(topId) < 0) { @@ -526,9 +529,20 @@ public class TimelineFragment extends SFragment implements } else if (event instanceof PreferenceChangedEvent) { onPreferenceChanged(((PreferenceChangedEvent) event).getPreferenceKey()); } else if (event instanceof NewHomeTimelineStatusEvent && kind == Kind.HOME) { - this.newStatusEventStatus = ((NewHomeTimelineStatusEvent) event).getStatus(); - this.statuses.add(0, new Either.Right<>(this.newStatusEventStatus)); - this.updateAdapter(); + this.loadingAbove.as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY))) + .subscribe((__) -> { + Status status = + ((NewHomeTimelineStatusEvent) event).getStatus(); + Either.Right either = + new Either.Right<>(status); + if (!this.statuses.contains(either)) { + this.newStreamingStatuses.add(status.getId()); + this.statuses.add(0, either); + this.updateAdapter(); + } + }); + } else if (event instanceof ProflileStreamingReconnectedEvent) { + this.loadAbove(); } }); eventRegistered = true; @@ -563,9 +577,9 @@ public class TimelineFragment extends SFragment implements } } if (firstOrNull != null) { - this.sendFetchTimelineRequest(null, firstOrNull, secondOrNull, FetchEnd.TOP, -1); + this.loadingAbove = this.sendFetchTimelineRequest(null, firstOrNull, secondOrNull, FetchEnd.TOP, -1); } else { - this.sendFetchTimelineRequest(null, null, null, FetchEnd.BOTTOM, -1); + this.loadingAbove = this.sendFetchTimelineRequest(null, null, null, FetchEnd.BOTTOM, -1); } } @@ -962,9 +976,9 @@ public class TimelineFragment extends SFragment implements } } - private void sendFetchTimelineRequest(@Nullable String maxId, @Nullable String sinceId, - @Nullable String sinceIdMinusOne, - final FetchEnd fetchEnd, final int pos) { + private Single sendFetchTimelineRequest(@Nullable String maxId, @Nullable String sinceId, + @Nullable String sinceIdMinusOne, + final FetchEnd fetchEnd, final int pos) { if (isAdded() && (fetchEnd == FetchEnd.TOP || fetchEnd == FetchEnd.BOTTOM && maxId == null && progressBar.getVisibility() != View.VISIBLE) && !isSwipeToRefreshEnabled) topProgressBar.show(); @@ -976,33 +990,50 @@ public class TimelineFragment extends SFragment implements } else { mode = TimelineRequestMode.NETWORK; } - timelineRepo.getStatuses(maxId, sinceId, sinceIdMinusOne, LOAD_AT_ONCE, mode) - .observeOn(AndroidSchedulers.mainThread()) + + Single result = + timelineRepo.getStatuses(maxId, sinceId, sinceIdMinusOne, LOAD_AT_ONCE, mode) + .observeOn(AndroidSchedulers.mainThread()) + .doOnSuccess((response) -> onFetchTimelineSuccess(response, fetchEnd, pos)) + .doOnError((err) -> onFetchTimelineFailure(new Exception(err), fetchEnd, pos)) + .cache(); // So that multiple subscriptions don't call API multiple times + + result .as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY))) - .subscribe( - (result) -> onFetchTimelineSuccess(result, fetchEnd, pos), - (err) -> onFetchTimelineFailure(new Exception(err), fetchEnd, pos) - ); + .subscribe(); + return result; } else { - Callback> callback = new Callback>() { - @Override - public void onResponse(@NonNull Call> call, @NonNull Response> response) { - if (response.isSuccessful()) { - onFetchTimelineSuccess(liftStatusList(response.body()), fetchEnd, pos); - } else { - onFetchTimelineFailure(new Exception(response.message()), fetchEnd, pos); - } - } + Single result = Single + .create((supplier) -> { + Callback> callback = new Callback>() { + @Override + public void onResponse(@NonNull Call> call, @NonNull Response> response) { + if (response.isSuccessful()) { + onFetchTimelineSuccess(liftStatusList(response.body()), fetchEnd, pos); + supplier.onSuccess(response.body()); + } else { + Exception exception = new Exception(response.message()); + onFetchTimelineFailure(exception, fetchEnd, pos); + supplier.onError(exception); + } + } - @Override - public void onFailure(@NonNull Call> call, @NonNull Throwable t) { - onFetchTimelineFailure((Exception) t, fetchEnd, pos); - } - }; + @Override + public void onFailure(@NonNull Call> call, @NonNull Throwable t) { + onFetchTimelineFailure((Exception) t, fetchEnd, pos); + supplier.onError(t); + } + }; - Call> listCall = getFetchCallByTimelineType(kind, hashtagOrId, maxId, sinceId); - callList.add(listCall); - listCall.enqueue(callback); + Call> listCall = getFetchCallByTimelineType(kind, hashtagOrId, maxId, sinceId); + callList.add(listCall); + listCall.enqueue(callback); + }) + .cache(); + result + .as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY))) + .subscribe(); + return result; } } @@ -1241,7 +1272,7 @@ public class TimelineFragment extends SFragment implements if ((someOldViewData instanceof StatusViewData.Placeholder) || !((StatusViewData.Concrete) someOldViewData).getId().equals(status.getId())) { // try to find the status we need to update - int foundPos = statuses.indexOf(new Either.Right<>(status)); + int foundPos = statuses.indexOf(new Either.Right(status)); if (foundPos < 0) return null; // okay, it's hopeless, give up statusToUpdate = ((StatusViewData.Concrete) statuses.getPairedItem(foundPos)); @@ -1302,11 +1333,12 @@ public class TimelineFragment extends SFragment implements if (isAdded()) { adapter.notifyItemRangeInserted(position, count); Context context = getContext(); - if ( - count > - 0 && position == 0 - && context != null - && statuses.get(0).asRightOrNull() != newStatusEventStatus + // Scroll when it's very unlikely that this is a streaming update + if (count > 0 + && position == 0 + && context != null + && (count > 1 + || !newStreamingStatuses.contains(statuses.get(0).asRight().getId())) ) { if (isSwipeToRefreshEnabled) recyclerView.scrollBy(0, Utils.dpToPx(context, -30));