diff --git a/app/src/main/java/org/schabi/newpipe/player/BasePlayer.java b/app/src/main/java/org/schabi/newpipe/player/BasePlayer.java index 3746927be..6e68253f1 100644 --- a/app/src/main/java/org/schabi/newpipe/player/BasePlayer.java +++ b/app/src/main/java/org/schabi/newpipe/player/BasePlayer.java @@ -47,9 +47,7 @@ import com.google.android.exoplayer2.RenderersFactory; import com.google.android.exoplayer2.SimpleExoPlayer; import com.google.android.exoplayer2.Timeline; import com.google.android.exoplayer2.extractor.DefaultExtractorsFactory; -import com.google.android.exoplayer2.source.DynamicConcatenatingMediaSource; import com.google.android.exoplayer2.source.ExtractorMediaSource; -import com.google.android.exoplayer2.source.LoopingMediaSource; import com.google.android.exoplayer2.source.MediaSource; import com.google.android.exoplayer2.source.TrackGroupArray; import com.google.android.exoplayer2.source.dash.DashMediaSource; @@ -72,6 +70,7 @@ import com.nostra13.universalimageloader.core.listener.SimpleImageLoadingListene import org.schabi.newpipe.Downloader; import org.schabi.newpipe.R; +import org.schabi.newpipe.playlist.PlayQueue; import java.io.File; import java.text.DecimalFormat; @@ -86,9 +85,9 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author mauriciocolli */ @SuppressWarnings({"WeakerAccess", "unused"}) -public abstract class BasePlayer implements Player.EventListener, AudioManager.OnAudioFocusChangeListener { +public abstract class BasePlayer implements Player.EventListener, + AudioManager.OnAudioFocusChangeListener, PlaybackManager.PlaybackListener { // TODO: Check api version for deprecated audio manager methods - public static final boolean DEBUG = false; public static final String TAG = "BasePlayer"; @@ -117,6 +116,13 @@ public abstract class BasePlayer implements Player.EventListener, AudioManager.O protected long videoStartPos = -1; protected String uploaderName = ""; + /*////////////////////////////////////////////////////////////////////////// + // Playlist + //////////////////////////////////////////////////////////////////////////*/ + + protected PlaybackManager playbackManager; + protected PlayQueue playQueue; + /*////////////////////////////////////////////////////////////////////////// // Player //////////////////////////////////////////////////////////////////////////*/ @@ -540,6 +546,22 @@ public abstract class BasePlayer implements Player.EventListener, AudioManager.O @Override public void onPositionDiscontinuity() { + int newIndex = simpleExoPlayer.getCurrentWindowIndex(); + + } + + /*////////////////////////////////////////////////////////////////////////// + // Playback Listener + //////////////////////////////////////////////////////////////////////////*/ + + @Override + public void block() { + if (currentState != STATE_LOADING) changeState(STATE_LOADING); + } + + @Override + public void unblock() { + if (currentState != STATE_PLAYING) changeState(STATE_PLAYING); } /*////////////////////////////////////////////////////////////////////////// diff --git a/app/src/main/java/org/schabi/newpipe/player/PlaybackManager.java b/app/src/main/java/org/schabi/newpipe/player/PlaybackManager.java index 8b3973a54..3873d7c1c 100644 --- a/app/src/main/java/org/schabi/newpipe/player/PlaybackManager.java +++ b/app/src/main/java/org/schabi/newpipe/player/PlaybackManager.java @@ -3,31 +3,200 @@ package org.schabi.newpipe.player; import com.google.android.exoplayer2.source.DynamicConcatenatingMediaSource; import com.google.android.exoplayer2.source.MediaSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.schabi.newpipe.extractor.stream_info.StreamInfo; import org.schabi.newpipe.playlist.PlayQueue; +import org.schabi.newpipe.playlist.PlayQueueEvent; +import org.schabi.newpipe.playlist.PlayQueueItem; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import io.reactivex.Maybe; +import io.reactivex.annotations.NonNull; + public class PlaybackManager { - private DynamicConcatenatingMediaSource source; + private DynamicConcatenatingMediaSource mediaSource; + private List queueSource; + private int sourceIndex; + private PlaybackListener listener; private PlayQueue playQueue; - private int index; - - private List sources; - - public PlaybackManager(PlayQueue playQueue, int index) { - this.source = new DynamicConcatenatingMediaSource(); - - this.playQueue = playQueue; - this.index = index; + private Subscription playQueueReactor; + interface PlaybackListener { + void block(); + void unblock(); + void sync(); + MediaSource sourceOf(StreamInfo info); } - interface OnChangeListener { - void isLoading(); - void isLoaded(); + public PlaybackManager(@NonNull final PlaybackListener listener, + @NonNull final PlayQueue playQueue) { + this.mediaSource = new DynamicConcatenatingMediaSource(); + this.queueSource = Collections.synchronizedList(new ArrayList(10)); + this.sourceIndex = 0; + + this.listener = listener; + this.playQueue = playQueue; + + playQueue.getPlayQueueFlowable().subscribe(getReactor()); + } + + @NonNull + public DynamicConcatenatingMediaSource getMediaSource() { + return mediaSource; + } + + private void reload() { + listener.block(); + load(0); + } + + public void refreshMedia(final int newMediaIndex) { + if (newMediaIndex == sourceIndex) return; + + if (newMediaIndex == sourceIndex + 1) { + playQueue.incrementIndex(); + mediaSource.removeMediaSource(0); + queueSource.remove(0); + } else { + //something went wrong + onInit(); + } + } + + private void removeCurrent() { + listener.block(); + mediaSource.removeMediaSource(0); + queueSource.remove(0); + listener.unblock(); + } + + private Subscription loaderReactor; + + private void load() { + if (mediaSource.getSize() < 5 && queueSource.size() < 5) load(mediaSource.getSize()); + } + + private void load(final int from) { + clear(from); + + if (loaderReactor != null) loaderReactor.cancel(); + + List> maybes = new ArrayList<>(); + for (int i = from; i < 5; i++) { + final int index = playQueue.getIndex() + i; + final PlayQueueItem item = playQueue.get(index); + queueSource.set(i, item); + maybes.add(item.getStream()); + } + + Maybe.concat(maybes).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + loaderReactor = s; + } + + @Override + public void onNext(StreamInfo streamInfo) { + mediaSource.addMediaSource(listener.sourceOf(streamInfo)); + onLoaded(); + } + + @Override + public void onError(Throwable t) { + playQueue.remove(queueSource.size()); + } + + @Override + public void onComplete() { + } + }); + } + + private void onLoaded() { + if (mediaSource.getSize() > 0 && queueSource.size() > 0) listener.unblock(); + } + + private void onInit() { + listener.block(); + load(); + } + + private void clear(int from) { + listener.block(); + while (mediaSource.getSize() > from) { + queueSource.remove(from); + mediaSource.removeMediaSource(from); + } + listener.unblock(); + } + + private Subscriber getReactor() { + return new Subscriber() { + @Override + public void onSubscribe(@NonNull Subscription d) { + if (playQueueReactor != null) playQueueReactor.cancel(); + playQueueReactor = d; + playQueueReactor.request(1); + } + + @Override + public void onNext(@NonNull PlayQueueEvent event) { + if (playQueue.getStreams().size() - playQueue.getIndex() < 10 && !playQueue.isComplete()) { + listener.block(); + playQueue.fetch(); + } + + switch (event) { + case INIT: + onInit(); + break; + case APPEND: + load(); + break; + case REMOVE_CURRENT: + removeCurrent(); + load(); + break; + case SELECT: + reload(); + break; + case REMOVE: + case SWAP: + load(1); + break; + case CLEAR: + clear(0); + break; + case NEXT: + default: + break; + } + + onLoaded(); + if (playQueueReactor != null) playQueueReactor.request(1); + } + + @Override + public void onError(@NonNull Throwable e) { + + } + + @Override + public void onComplete() { + // Never completes, only canceled + } + }; + } + + public void dispose() { + if (playQueueReactor != null) playQueueReactor.cancel(); } } diff --git a/app/src/main/java/org/schabi/newpipe/playlist/ExternalPlayQueue.java b/app/src/main/java/org/schabi/newpipe/playlist/ExternalPlayQueue.java index fba48d82d..3d1831a0e 100644 --- a/app/src/main/java/org/schabi/newpipe/playlist/ExternalPlayQueue.java +++ b/app/src/main/java/org/schabi/newpipe/playlist/ExternalPlayQueue.java @@ -1,44 +1,45 @@ package org.schabi.newpipe.playlist; import org.schabi.newpipe.extractor.InfoItem; -import org.schabi.newpipe.extractor.NewPipe; import org.schabi.newpipe.extractor.StreamingService; -import org.schabi.newpipe.extractor.exceptions.ExtractionException; import org.schabi.newpipe.extractor.playlist.PlayListExtractor; import org.schabi.newpipe.extractor.playlist.PlayListInfo; -import org.schabi.newpipe.extractor.playlist.PlayListInfoItem; import org.schabi.newpipe.extractor.stream_info.StreamInfo; import org.schabi.newpipe.extractor.stream_info.StreamInfoItem; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.Maybe; import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; import io.reactivex.schedulers.Schedulers; public class ExternalPlayQueue extends PlayQueue { - - private final static int LOAD_PROXIMITY = 10; + private final String TAG = "ExternalPlayQueue@" + Integer.toHexString(hashCode()); private boolean isComplete; - private AtomicInteger pageNumber; - private StreamingService service; + private String playlistUrl; - private PlayListInfoItem playlist; + private AtomicInteger pageNumber; + private Disposable fetchReactor; - public ExternalPlayQueue(final PlayListInfoItem playlist) { - super(); - this.service = getService(playlist.serviceId); - this.pageNumber = new AtomicInteger(0); - this.playlist = playlist; + public ExternalPlayQueue(final String playlistUrl, + final PlayListInfo info, + final int nextPage, + final int index) { + super(index); + this.service = getService(info.service_id); + this.pageNumber = new AtomicInteger(nextPage); + this.playlistUrl = playlistUrl; - fetch(); + getStreams().addAll(extractPlaylistItems(info)); } @Override @@ -47,36 +48,25 @@ public class ExternalPlayQueue extends PlayQueue { } @Override - public void load(int index, boolean loadNeighbors) { - if (index > streams.size() || streams.get(index) == null) return; - - streams.get(index).load(); - - if (loadNeighbors) { - int leftBound = index - LOAD_BOUND >= 0 ? index - LOAD_BOUND : 0; - int rightBound = index + LOAD_BOUND < streams.size() ? index + LOAD_BOUND : streams.size() - 1; - - for (int i = leftBound; i < rightBound; i++) { - final PlayQueueItem item = streams.get(i); - if (item != null) item.load(); - } - } + public void load(int index) { + if (index > getStreams().size() || getStreams().get(index) == null) return; + getStreams().get(index).load(); } @Override - public Maybe get(int index) { - if (index > streams.size() || streams.get(index) == null) return Maybe.empty(); - return streams.get(index).getStream(); + public PlayQueueItem get(int index) { + if (index > getStreams().size() || getStreams().get(index) == null) return null; + return getStreams().get(index); } - - public synchronized void fetch() { - final int page = pageNumber.getAndIncrement(); + @Override + public void fetch() { + if (fetchReactor != null && !fetchReactor.isDisposed()) return; final Callable task = new Callable() { @Override public PlayListInfo call() throws Exception { - PlayListExtractor extractor = service.getPlayListExtractorInstance(playlist.getLink(), page); + PlayListExtractor extractor = service.getPlayListExtractorInstance(playlistUrl, pageNumber.get()); return PlayListInfo.getInfo(extractor); } }; @@ -86,18 +76,23 @@ public class ExternalPlayQueue extends PlayQueue { public void accept(PlayListInfo playListInfo) throws Exception { if (!playListInfo.hasNextPage) isComplete = true; - streams.addAll(extractPlaylistItems(playListInfo)); - notifyChange(); + append(extractPlaylistItems(playListInfo)); + pageNumber.incrementAndGet(); } }; - Maybe.fromCallable(task) + fetchReactor = Maybe.fromCallable(task) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .onErrorComplete() .subscribe(onSuccess); } + @Override + public void dispose() { + if (fetchReactor != null) fetchReactor.dispose(); + } + private List extractPlaylistItems(final PlayListInfo info) { List result = new ArrayList<>(); for (final InfoItem stream : info.related_streams) { @@ -107,12 +102,4 @@ public class ExternalPlayQueue extends PlayQueue { } return result; } - - private StreamingService getService(final int serviceId) { - try { - return NewPipe.getService(serviceId); - } catch (ExtractionException e) { - return null; - } - } } diff --git a/app/src/main/java/org/schabi/newpipe/playlist/PlayQueue.java b/app/src/main/java/org/schabi/newpipe/playlist/PlayQueue.java index ecf9e578f..87e21cfee 100644 --- a/app/src/main/java/org/schabi/newpipe/playlist/PlayQueue.java +++ b/app/src/main/java/org/schabi/newpipe/playlist/PlayQueue.java @@ -2,41 +2,138 @@ package org.schabi.newpipe.playlist; import android.support.annotation.NonNull; +import org.schabi.newpipe.extractor.NewPipe; +import org.schabi.newpipe.extractor.StreamingService; +import org.schabi.newpipe.extractor.exceptions.ExtractionException; import org.schabi.newpipe.extractor.stream_info.StreamInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; import io.reactivex.Maybe; import io.reactivex.subjects.BehaviorSubject; public abstract class PlayQueue { private final String TAG = "PlayQueue@" + Integer.toHexString(hashCode()); - final int LOAD_BOUND = 2; + private List streams; + private AtomicInteger queueIndex; - protected List streams; - private BehaviorSubject> changeBroadcast; + private BehaviorSubject changeBroadcast; + private Flowable playQueueFlowable; - PlayQueue() { + PlayQueue(final int index) { streams = Collections.synchronizedList(new ArrayList()); + queueIndex = new AtomicInteger(index); + changeBroadcast = BehaviorSubject.create(); + playQueueFlowable = changeBroadcast.startWith(PlayQueueEvent.INIT).toFlowable(BackpressureStrategy.BUFFER); + } + + // a queue is complete if it has loaded all items in an external playlist + // single stream or local queues are always complete + public abstract boolean isComplete(); + + // load in the background the item at index, may do nothing if the queue is incomplete + public abstract void load(int index); + + // load partial queue in the background, does nothing if the queue is complete + public abstract void fetch(); + + // returns a Rx Future to the stream info of the play queue item at index + // may return an empty of the queue is incomplete + public abstract PlayQueueItem get(int index); + + public abstract void dispose(); + + public int size() { + return streams.size(); } @NonNull public List getStreams() { - return streams; + return Collections.unmodifiableList(streams); } - public void notifyChange() { - changeBroadcast.onNext(streams); + @NonNull + public Flowable getPlayQueueFlowable() { + return playQueueFlowable; } - public abstract boolean isComplete(); + private void broadcast(final PlayQueueEvent event) { + changeBroadcast.onNext(event); + } - public abstract void load(int index, boolean loadNeighbors); + public int getIndex() { + return queueIndex.get(); + } - public abstract Maybe get(int index); + public void setIndex(final int index) { + queueIndex.set(index); + broadcast(PlayQueueEvent.SELECT); + } + + public void incrementIndex() { + queueIndex.incrementAndGet(); + broadcast(PlayQueueEvent.NEXT); + } + + protected void append(final PlayQueueItem item) { + streams.add(item); + broadcast(PlayQueueEvent.APPEND); + } + + protected void append(final Collection items) { + streams.addAll(items); + broadcast(PlayQueueEvent.APPEND); + } + + public void remove(final int index) { + if (index < streams.size()) { + streams.remove(index); + broadcast(PlayQueueEvent.REMOVE); + } + } + + protected void clear() { + if (!streams.isEmpty()) { + streams.clear(); + broadcast(PlayQueueEvent.CLEAR); + } + } + + protected void swap(final int source, final int target) { + final List items = streams; + if (source < items.size() && target < items.size()) { + // Swap two items + final PlayQueueItem sourceItem = items.get(source); + final PlayQueueItem targetItem = items.get(target); + + items.set(target, sourceItem); + items.set(source, targetItem); + + // If the current playing index is one of the swapped indices, change that as well + final int index = queueIndex.get(); + if (index == source || index == target) { + final int newIndex = index == source ? target : source; + queueIndex.set(newIndex); + } + + broadcast(PlayQueueEvent.SWAP); + } + } + + protected StreamingService getService(final int serviceId) { + try { + return NewPipe.getService(serviceId); + } catch (ExtractionException e) { + return null; + } + } } diff --git a/app/src/main/java/org/schabi/newpipe/playlist/PlayQueueAdapter.java b/app/src/main/java/org/schabi/newpipe/playlist/PlayQueueAdapter.java index 662f9f2f1..170311f7d 100644 --- a/app/src/main/java/org/schabi/newpipe/playlist/PlayQueueAdapter.java +++ b/app/src/main/java/org/schabi/newpipe/playlist/PlayQueueAdapter.java @@ -11,6 +11,9 @@ import org.schabi.newpipe.info_list.StreamInfoItemHolder; import java.util.List; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; + /** * Created by Christian Schabesberger on 01.08.16. * @@ -34,12 +37,14 @@ import java.util.List; public class PlayQueueAdapter extends RecyclerView.Adapter { private static final String TAG = PlayQueueAdapter.class.toString(); - private final PlaylistItemBuilder playlistItemBuilder; + private final PlayQueueItemBuilder playQueueItemBuilder; private final PlayQueue playQueue; private boolean showFooter = false; private View header = null; private View footer = null; + private Disposable playQueueReactor; + public class HFHolder extends RecyclerView.ViewHolder { public HFHolder(View v) { super(v); @@ -48,66 +53,57 @@ public class PlayQueueAdapter extends RecyclerView.Adapter data) { - if(data != null) { - playQueue.getStreams().addAll(data); - notifyPlaylistChange(); - } + public void add(final List data) { + playQueue.append(data); } - public void addItem(PlayQueueItem data) { - if (data != null) { - playQueue.getStreams().add(data); - notifyPlaylistChange(); - } + public void add(final PlayQueueItem data) { + playQueue.append(data); } - public void removeItem(int index) { - if (index < playQueue.getStreams().size()) { - playQueue.getStreams().remove(index); - notifyPlaylistChange(); - } + public void remove(final int index) { + playQueue.remove(index); } - public void swapItems(int source, int target) { - final List items = playQueue.getStreams(); - if (source < items.size() && target < items.size()) { - final PlayQueueItem sourceItem = items.get(source); - final PlayQueueItem targetItem = items.get(target); - - items.set(target, sourceItem); - items.set(source, targetItem); - - notifyPlaylistChange(); - } + public void swap(final int source, final int target) { + playQueue.swap(source, target); } public void clear() { - if(playQueue.getStreams().isEmpty()) { - return; - } - playQueue.getStreams().clear(); - - notifyPlaylistChange(); + playQueue.clear(); } - private void notifyPlaylistChange() { - playQueue.notifyChange(); - notifyDataSetChanged(); + private Disposable getReactor() { + final Consumer onNext = new Consumer() { + @Override + public void accept(PlayQueueEvent playQueueEvent) throws Exception { + notifyDataSetChanged(); + } + }; + + return playQueue.getPlayQueueFlowable() + .toObservable() + .subscribe(onNext); + } + + public void dispose() { + if (playQueueReactor != null) playQueueReactor.dispose(); } public void setHeader(View header) { @@ -155,7 +151,7 @@ public class PlayQueueAdapter extends RecyclerView.Adapter