Load on reconnecting streaming, fix some races

This commit is contained in:
charlag 2019-08-12 00:31:18 +02:00
parent 0682eaea63
commit 9489a1baa2
4 changed files with 82 additions and 47 deletions

View File

@ -112,7 +112,7 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
@Inject
public ProfileStreamingListenerProvider streamingListenerProvider;
private ProfileStreamListener profileStreamListener;
public ProfileStreamListener profileStreamListener;
private FloatingActionButton composeButton;
private AccountHeader headerResult;
private Drawer drawer;
@ -266,7 +266,6 @@ public final class MainActivity extends BottomSheetActivity implements ActionBut
super.onResume();
NotificationHelper.clearNotificationsForActiveAccount(this, accountManager);
}
@Override

View File

@ -7,10 +7,7 @@ import androidx.lifecycle.LifecycleObserver
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.OnLifecycleEvent
import com.google.gson.Gson
import com.keylesspalace.tusky.appstore.EventHub
import com.keylesspalace.tusky.appstore.NewHomeTimelineStatusEvent
import com.keylesspalace.tusky.appstore.NewNotificationEvent
import com.keylesspalace.tusky.appstore.StatusDeletedEvent
import com.keylesspalace.tusky.appstore.*
import com.keylesspalace.tusky.entity.Notification
import com.keylesspalace.tusky.entity.Status
import com.keylesspalace.tusky.network.MastodonApi
@ -62,6 +59,9 @@ class ProfileStreamListener @Inject constructor(
this.internalStop()
}
val isStreaming
get() = !isStoppedManually
private fun internalResume() {
Log.d(TAG, "internal resume")
if (this.isStoppedManually) {
@ -75,7 +75,10 @@ class ProfileStreamListener @Inject constructor(
Log.d(TAG, "internal resume cancelled: ther was a call alrady")
return@fromCallable Optional(null)
}
call.execute().body()!!.charStream().useLines { linesSequence ->
val callResponse = call.execute()
Log.d(TAG, "Dispatching reconenct event")
eventHub.dispatch(ProflileStreamingReconnectedEvent)
callResponse.body()!!.charStream().useLines { linesSequence ->
// Mastodon Event types: update, notification, delete, filters_changed
// we react only on notification for now
// To detect that it's a notification, we should notice that previous line was

View File

@ -19,4 +19,5 @@ data class MainTabsChangedEvent(val newTabs: List<TabData>) : Dispatchable
data class PollVoteEvent(val statusId: String, val poll: Poll) : Dispatchable
data class DomainMuteEvent(val instance: String): Dispatchable
data class NewNotificationEvent(val notification: Notification) : Dispatchable
data class NewHomeTimelineStatusEvent(val status: Status) : Dispatchable
data class NewHomeTimelineStatusEvent(val status: Status) : Dispatchable
object ProflileStreamingReconnectedEvent : Dispatchable

View File

@ -55,6 +55,7 @@ import com.keylesspalace.tusky.appstore.FavoriteEvent;
import com.keylesspalace.tusky.appstore.MuteEvent;
import com.keylesspalace.tusky.appstore.NewHomeTimelineStatusEvent;
import com.keylesspalace.tusky.appstore.PreferenceChangedEvent;
import com.keylesspalace.tusky.appstore.ProflileStreamingReconnectedEvent;
import com.keylesspalace.tusky.appstore.ReblogEvent;
import com.keylesspalace.tusky.appstore.StatusComposedEvent;
import com.keylesspalace.tusky.appstore.StatusDeletedEvent;
@ -97,6 +98,7 @@ import javax.inject.Inject;
import at.connyduck.sparkbutton.helpers.Utils;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
@ -120,7 +122,7 @@ public class TimelineFragment extends SFragment implements
private static final int LOAD_AT_ONCE = 30;
private boolean isSwipeToRefreshEnabled = true;
private boolean isNeedRefresh;
private Status newStatusEventStatus;
private List<String> newStreamingStatuses = new ArrayList<>();
public enum Kind {
HOME,
@ -170,6 +172,7 @@ public class TimelineFragment extends SFragment implements
private boolean alwaysShowSensitiveMedia;
private boolean alwaysOpenSpoiler;
private boolean initialUpdateFailed = false;
private Single<?> loadingAbove;
private PairedList<Either<Placeholder, Status>, StatusViewData> statuses =
new PairedList<>(new Function<Either<Placeholder, Status>, StatusViewData>() {
@ -310,7 +313,7 @@ public class TimelineFragment extends SFragment implements
Iterator<Either<Placeholder, Status>> iterator = this.statuses.iterator();
while (iterator.hasNext()) {
Either<Placeholder, Status> item = iterator.next();
if(item.isRight()) {
if (item.isRight()) {
Status status = item.asRight();
if (status.getId().length() < topId.length() || status.getId().compareTo(topId) < 0) {
@ -526,9 +529,20 @@ public class TimelineFragment extends SFragment implements
} else if (event instanceof PreferenceChangedEvent) {
onPreferenceChanged(((PreferenceChangedEvent) event).getPreferenceKey());
} else if (event instanceof NewHomeTimelineStatusEvent && kind == Kind.HOME) {
this.newStatusEventStatus = ((NewHomeTimelineStatusEvent) event).getStatus();
this.statuses.add(0, new Either.Right<>(this.newStatusEventStatus));
this.updateAdapter();
this.loadingAbove.as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY)))
.subscribe((__) -> {
Status status =
((NewHomeTimelineStatusEvent) event).getStatus();
Either.Right<Placeholder, Status> either =
new Either.Right<>(status);
if (!this.statuses.contains(either)) {
this.newStreamingStatuses.add(status.getId());
this.statuses.add(0, either);
this.updateAdapter();
}
});
} else if (event instanceof ProflileStreamingReconnectedEvent) {
this.loadAbove();
}
});
eventRegistered = true;
@ -563,9 +577,9 @@ public class TimelineFragment extends SFragment implements
}
}
if (firstOrNull != null) {
this.sendFetchTimelineRequest(null, firstOrNull, secondOrNull, FetchEnd.TOP, -1);
this.loadingAbove = this.sendFetchTimelineRequest(null, firstOrNull, secondOrNull, FetchEnd.TOP, -1);
} else {
this.sendFetchTimelineRequest(null, null, null, FetchEnd.BOTTOM, -1);
this.loadingAbove = this.sendFetchTimelineRequest(null, null, null, FetchEnd.BOTTOM, -1);
}
}
@ -962,9 +976,9 @@ public class TimelineFragment extends SFragment implements
}
}
private void sendFetchTimelineRequest(@Nullable String maxId, @Nullable String sinceId,
@Nullable String sinceIdMinusOne,
final FetchEnd fetchEnd, final int pos) {
private Single<?> sendFetchTimelineRequest(@Nullable String maxId, @Nullable String sinceId,
@Nullable String sinceIdMinusOne,
final FetchEnd fetchEnd, final int pos) {
if (isAdded() && (fetchEnd == FetchEnd.TOP || fetchEnd == FetchEnd.BOTTOM && maxId == null && progressBar.getVisibility() != View.VISIBLE) && !isSwipeToRefreshEnabled)
topProgressBar.show();
@ -976,33 +990,50 @@ public class TimelineFragment extends SFragment implements
} else {
mode = TimelineRequestMode.NETWORK;
}
timelineRepo.getStatuses(maxId, sinceId, sinceIdMinusOne, LOAD_AT_ONCE, mode)
.observeOn(AndroidSchedulers.mainThread())
Single<?> result =
timelineRepo.getStatuses(maxId, sinceId, sinceIdMinusOne, LOAD_AT_ONCE, mode)
.observeOn(AndroidSchedulers.mainThread())
.doOnSuccess((response) -> onFetchTimelineSuccess(response, fetchEnd, pos))
.doOnError((err) -> onFetchTimelineFailure(new Exception(err), fetchEnd, pos))
.cache(); // So that multiple subscriptions don't call API multiple times
result
.as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY)))
.subscribe(
(result) -> onFetchTimelineSuccess(result, fetchEnd, pos),
(err) -> onFetchTimelineFailure(new Exception(err), fetchEnd, pos)
);
.subscribe();
return result;
} else {
Callback<List<Status>> callback = new Callback<List<Status>>() {
@Override
public void onResponse(@NonNull Call<List<Status>> call, @NonNull Response<List<Status>> response) {
if (response.isSuccessful()) {
onFetchTimelineSuccess(liftStatusList(response.body()), fetchEnd, pos);
} else {
onFetchTimelineFailure(new Exception(response.message()), fetchEnd, pos);
}
}
Single<?> result = Single
.create((supplier) -> {
Callback<List<Status>> callback = new Callback<List<Status>>() {
@Override
public void onResponse(@NonNull Call<List<Status>> call, @NonNull Response<List<Status>> response) {
if (response.isSuccessful()) {
onFetchTimelineSuccess(liftStatusList(response.body()), fetchEnd, pos);
supplier.onSuccess(response.body());
} else {
Exception exception = new Exception(response.message());
onFetchTimelineFailure(exception, fetchEnd, pos);
supplier.onError(exception);
}
}
@Override
public void onFailure(@NonNull Call<List<Status>> call, @NonNull Throwable t) {
onFetchTimelineFailure((Exception) t, fetchEnd, pos);
}
};
@Override
public void onFailure(@NonNull Call<List<Status>> call, @NonNull Throwable t) {
onFetchTimelineFailure((Exception) t, fetchEnd, pos);
supplier.onError(t);
}
};
Call<List<Status>> listCall = getFetchCallByTimelineType(kind, hashtagOrId, maxId, sinceId);
callList.add(listCall);
listCall.enqueue(callback);
Call<List<Status>> listCall = getFetchCallByTimelineType(kind, hashtagOrId, maxId, sinceId);
callList.add(listCall);
listCall.enqueue(callback);
})
.cache();
result
.as(autoDisposable(from(this, Lifecycle.Event.ON_DESTROY)))
.subscribe();
return result;
}
}
@ -1241,7 +1272,7 @@ public class TimelineFragment extends SFragment implements
if ((someOldViewData instanceof StatusViewData.Placeholder) ||
!((StatusViewData.Concrete) someOldViewData).getId().equals(status.getId())) {
// try to find the status we need to update
int foundPos = statuses.indexOf(new Either.Right<>(status));
int foundPos = statuses.indexOf(new Either.Right<Placeholder, Status>(status));
if (foundPos < 0) return null; // okay, it's hopeless, give up
statusToUpdate = ((StatusViewData.Concrete)
statuses.getPairedItem(foundPos));
@ -1302,11 +1333,12 @@ public class TimelineFragment extends SFragment implements
if (isAdded()) {
adapter.notifyItemRangeInserted(position, count);
Context context = getContext();
if (
count >
0 && position == 0
&& context != null
&& statuses.get(0).asRightOrNull() != newStatusEventStatus
// Scroll when it's very unlikely that this is a streaming update
if (count > 0
&& position == 0
&& context != null
&& (count > 1
|| !newStreamingStatuses.contains(statuses.get(0).asRight().getId()))
) {
if (isSwipeToRefreshEnabled)
recyclerView.scrollBy(0, Utils.dpToPx(context, -30));