From ba12854e6c0bec5f7c452758e626fb5260189f91 Mon Sep 17 00:00:00 2001 From: Antoine POPINEAU Date: Thu, 31 Oct 2019 16:17:37 +0100 Subject: [PATCH] Migrate to Flows. --- .../apognu/otter/activities/MainActivity.kt | 5 +- .../otter/fragments/FavoritesFragment.kt | 3 +- .../otter/fragments/FunkwhaleFragment.kt | 28 ++++--- .../otter/fragments/PlaylistTracksFragment.kt | 3 +- .../apognu/otter/fragments/QueueFragment.kt | 3 +- .../apognu/otter/fragments/TracksFragment.kt | 3 +- .../apognu/otter/playback/PlayerService.kt | 3 +- .../apognu/otter/repositories/HttpUpstream.kt | 74 ++++++++----------- .../repositories/PlaylistTracksRepository.kt | 7 +- .../apognu/otter/repositories/Repository.kt | 52 ++++--------- .../otter/repositories/SearchRepository.kt | 7 +- .../otter/repositories/TracksRepository.kt | 7 +- .../com/github/apognu/otter/utils/EventBus.kt | 22 ++---- .../github/apognu/otter/utils/Extensions.kt | 20 +---- 14 files changed, 104 insertions(+), 133 deletions(-) diff --git a/app/src/main/java/com/github/apognu/otter/activities/MainActivity.kt b/app/src/main/java/com/github/apognu/otter/activities/MainActivity.kt index 568330a..128ebff 100644 --- a/app/src/main/java/com/github/apognu/otter/activities/MainActivity.kt +++ b/app/src/main/java/com/github/apognu/otter/activities/MainActivity.kt @@ -32,6 +32,7 @@ import kotlinx.android.synthetic.main.partial_now_playing.* import kotlinx.coroutines.Dispatchers.IO import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class MainActivity : AppCompatActivity() { @@ -178,7 +179,7 @@ class MainActivity : AppCompatActivity() { @SuppressLint("NewApi") private fun watchEventBus() { GlobalScope.launch(Main) { - for (message in EventBus.asChannel()) { + EventBus.get().collect { message -> when (message) { is Event.LogOut -> { PowerPreference.clearAllData() @@ -315,7 +316,7 @@ class MainActivity : AppCompatActivity() { } GlobalScope.launch(Main) { - for ((current, duration, percent) in ProgressBus.asChannel()) { + ProgressBus.get().collect { (current, duration, percent) -> now_playing_progress.progress = percent now_playing_details_progress.progress = percent diff --git a/app/src/main/java/com/github/apognu/otter/fragments/FavoritesFragment.kt b/app/src/main/java/com/github/apognu/otter/fragments/FavoritesFragment.kt index ef1c815..bb3b196 100644 --- a/app/src/main/java/com/github/apognu/otter/fragments/FavoritesFragment.kt +++ b/app/src/main/java/com/github/apognu/otter/fragments/FavoritesFragment.kt @@ -9,6 +9,7 @@ import com.github.apognu.otter.utils.* import kotlinx.android.synthetic.main.fragment_favorites.* import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class FavoritesFragment : FunkwhaleFragment() { @@ -43,7 +44,7 @@ class FavoritesFragment : FunkwhaleFragment() { private fun watchEventBus() { GlobalScope.launch(Main) { - for (message in EventBus.asChannel()) { + EventBus.get().collect { message -> when (message) { is Event.TrackPlayed -> { GlobalScope.launch(Main) { diff --git a/app/src/main/java/com/github/apognu/otter/fragments/FunkwhaleFragment.kt b/app/src/main/java/com/github/apognu/otter/fragments/FunkwhaleFragment.kt index d3f90fa..745fe0f 100644 --- a/app/src/main/java/com/github/apognu/otter/fragments/FunkwhaleFragment.kt +++ b/app/src/main/java/com/github/apognu/otter/fragments/FunkwhaleFragment.kt @@ -10,6 +10,7 @@ import androidx.recyclerview.widget.RecyclerView import com.github.apognu.otter.repositories.HttpUpstream import com.github.apognu.otter.repositories.Repository import com.github.apognu.otter.utils.Cache +import com.github.apognu.otter.utils.log import com.github.apognu.otter.utils.untilNetwork import com.google.gson.Gson import kotlinx.android.synthetic.main.fragment_artists.* @@ -44,8 +45,8 @@ abstract class FunkwhaleFragment> : Fragment (repository.upstream as? HttpUpstream<*, *>)?.let { upstream -> if (upstream.behavior == HttpUpstream.Behavior.Progressive) { - recycler.setOnScrollChangeListener { _, _, _, _, _ -> - if (!recycler.canScrollVertically(1)) { + recycler.setOnScrollChangeListener { _, _, y, _, _ -> + if (y > 0 && !recycler.canScrollVertically(1)) { fetch(Repository.Origin.Network.origin, adapter.data.size) } } @@ -66,24 +67,28 @@ abstract class FunkwhaleFragment> : Fragment open fun onDataFetched(data: List) {} private fun fetch(upstreams: Int = (Repository.Origin.Network.origin and Repository.Origin.Cache.origin), size: Int = 0) { - var cleared = false + var first = true swiper?.isRefreshing = true - if (size == 0) { - cleared = true - adapter.data.clear() - } - repository.fetch(upstreams, size).untilNetwork(IO) { data, isCache, hasMore -> if (isCache) { adapter.data = data.toMutableList() + adapter.notifyDataSetChanged() return@untilNetwork } + if (first) { + first = false + + adapter.data.clear() + } + onDataFetched(data) + adapter.data.addAll(data) + if (!hasMore) { swiper?.isRefreshing = false @@ -97,13 +102,12 @@ abstract class FunkwhaleFragment> : Fragment } GlobalScope.launch(Main) { - adapter.data.addAll(data) - - when (cleared) { + when (first) { true -> { adapter.notifyDataSetChanged() - cleared = false + first = false } + false -> adapter.notifyItemRangeInserted(adapter.data.size, data.size) } } diff --git a/app/src/main/java/com/github/apognu/otter/fragments/PlaylistTracksFragment.kt b/app/src/main/java/com/github/apognu/otter/fragments/PlaylistTracksFragment.kt index 1415d0a..cdef053 100644 --- a/app/src/main/java/com/github/apognu/otter/fragments/PlaylistTracksFragment.kt +++ b/app/src/main/java/com/github/apognu/otter/fragments/PlaylistTracksFragment.kt @@ -13,6 +13,7 @@ import com.squareup.picasso.Picasso import kotlinx.android.synthetic.main.fragment_tracks.* import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class PlaylistTracksFragment : FunkwhaleFragment() { @@ -109,7 +110,7 @@ class PlaylistTracksFragment : FunkwhaleFragment()) { + EventBus.get().collect { message -> when (message) { is Event.TrackPlayed -> { GlobalScope.launch(Main) { diff --git a/app/src/main/java/com/github/apognu/otter/fragments/QueueFragment.kt b/app/src/main/java/com/github/apognu/otter/fragments/QueueFragment.kt index 8f2a7e5..55eb780 100644 --- a/app/src/main/java/com/github/apognu/otter/fragments/QueueFragment.kt +++ b/app/src/main/java/com/github/apognu/otter/fragments/QueueFragment.kt @@ -16,6 +16,7 @@ import kotlinx.android.synthetic.main.fragment_queue.* import kotlinx.android.synthetic.main.fragment_queue.view.* import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class QueueFragment : BottomSheetDialogFragment() { @@ -78,7 +79,7 @@ class QueueFragment : BottomSheetDialogFragment() { private fun watchEventBus() { GlobalScope.launch(Main) { - for (message in EventBus.asChannel()) { + EventBus.get().collect { message -> when (message) { is Event.TrackPlayed -> refresh() is Event.QueueChanged -> refresh() diff --git a/app/src/main/java/com/github/apognu/otter/fragments/TracksFragment.kt b/app/src/main/java/com/github/apognu/otter/fragments/TracksFragment.kt index 53c905f..a1829df 100644 --- a/app/src/main/java/com/github/apognu/otter/fragments/TracksFragment.kt +++ b/app/src/main/java/com/github/apognu/otter/fragments/TracksFragment.kt @@ -13,6 +13,7 @@ import com.squareup.picasso.Picasso import kotlinx.android.synthetic.main.fragment_tracks.* import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class TracksFragment : FunkwhaleFragment() { @@ -95,7 +96,7 @@ class TracksFragment : FunkwhaleFragment() { private fun watchEventBus() { GlobalScope.launch(Main) { - for (message in EventBus.asChannel()) { + EventBus.get().collect { message -> when (message) { is Event.TrackPlayed -> { GlobalScope.launch(Main) { diff --git a/app/src/main/java/com/github/apognu/otter/playback/PlayerService.kt b/app/src/main/java/com/github/apognu/otter/playback/PlayerService.kt index d568782..f72a940 100644 --- a/app/src/main/java/com/github/apognu/otter/playback/PlayerService.kt +++ b/app/src/main/java/com/github/apognu/otter/playback/PlayerService.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class PlayerService : Service() { @@ -183,7 +184,7 @@ class PlayerService : Service() { }) jobs.add(GlobalScope.launch(Main) { - for (request in RequestBus.asChannel()) { + RequestBus.get().collect { request -> when (request) { is Request.GetCurrentTrack -> request.channel?.offer( Response.CurrentTrack( diff --git a/app/src/main/java/com/github/apognu/otter/repositories/HttpUpstream.kt b/app/src/main/java/com/github/apognu/otter/repositories/HttpUpstream.kt index a55c126..8818cb6 100644 --- a/app/src/main/java/com/github/apognu/otter/repositories/HttpUpstream.kt +++ b/app/src/main/java/com/github/apognu/otter/repositories/HttpUpstream.kt @@ -10,10 +10,10 @@ import com.github.kittinunf.fuel.coroutines.awaitObjectResult import com.github.kittinunf.result.Result import com.google.gson.Gson import com.preference.PowerPreference -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch +import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import java.io.Reader import java.lang.reflect.Type import kotlin.math.ceil @@ -23,53 +23,39 @@ class HttpUpstream>(val behavior: Behavior, pr Single, AtOnce, Progressive } - private var _channel: Channel>? = null - private val channel: Channel> - get() { - if (_channel?.isClosedForSend ?: true) { - _channel = Channel() - } - - return _channel!! - } - - override fun fetch(size: Int): Channel>? { - if (behavior == Behavior.Single && size != 0) return null + override fun fetch(size: Int): Flow> = flow { + if (behavior == Behavior.Single && size != 0) return@flow val page = ceil(size / AppContext.PAGE_SIZE.toDouble()).toInt() + 1 - GlobalScope.launch(Dispatchers.IO) { - val offsetUrl = - Uri.parse(url) - .buildUpon() - .appendQueryParameter("page_size", AppContext.PAGE_SIZE.toString()) - .appendQueryParameter("page", page.toString()) - .build() - .toString() + val offsetUrl = + Uri.parse(url) + .buildUpon() + .appendQueryParameter("page_size", AppContext.PAGE_SIZE.toString()) + .appendQueryParameter("page", page.toString()) + .build() + .toString() - get(offsetUrl).fold( - { response -> - val data = response.getData() + get(offsetUrl).fold( + { response -> + val data = response.getData() - if (behavior == Behavior.Progressive || response.next == null) { - channel.offer(Repository.Response(Repository.Origin.Network, data, false)) - } else { - channel.offer(Repository.Response(Repository.Origin.Network, data, true)) + if (behavior == Behavior.Progressive || response.next == null) { + emit(Repository.Response(Repository.Origin.Network, data, false)) + } else { + emit(Repository.Response(Repository.Origin.Network, data, true)) - fetch(size + data.size) - } - }, - { error -> - when (error.exception) { - is RefreshError -> EventBus.send(Event.LogOut) - else -> channel.offer(Repository.Response(Repository.Origin.Network, listOf(), false)) - } + fetch(size + data.size) } - ) - } - - return channel - } + }, + { error -> + when (error.exception) { + is RefreshError -> EventBus.send(Event.LogOut) + else -> emit(Repository.Response(Repository.Origin.Network, listOf(), false)) + } + } + ) + }.flowOn(IO) class GenericDeserializer>(val type: Type) : ResponseDeserializable { override fun deserialize(reader: Reader): T? { diff --git a/app/src/main/java/com/github/apognu/otter/repositories/PlaylistTracksRepository.kt b/app/src/main/java/com/github/apognu/otter/repositories/PlaylistTracksRepository.kt index b1f9e00..668d59c 100644 --- a/app/src/main/java/com/github/apognu/otter/repositories/PlaylistTracksRepository.kt +++ b/app/src/main/java/com/github/apognu/otter/repositories/PlaylistTracksRepository.kt @@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.PlaylistTracksCache import com.github.apognu.otter.utils.PlaylistTracksResponse import com.github.kittinunf.fuel.gson.gsonDeserializerOf import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import java.io.BufferedReader @@ -18,7 +20,10 @@ class PlaylistTracksRepository(override val context: Context?, playlistId: Int) override fun uncache(reader: BufferedReader) = gsonDeserializerOf(PlaylistTracksCache::class.java).deserialize(reader) override fun onDataFetched(data: List): List = runBlocking { - val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data + val favorites = FavoritedRepository(context).fetch(Origin.Network.origin) + .map { it.data } + .toList() + .flatten() data.map { track -> track.track.favorite = favorites.contains(track.track.id) diff --git a/app/src/main/java/com/github/apognu/otter/repositories/Repository.kt b/app/src/main/java/com/github/apognu/otter/repositories/Repository.kt index bb50bb4..d6ba54d 100644 --- a/app/src/main/java/com/github/apognu/otter/repositories/Repository.kt +++ b/app/src/main/java/com/github/apognu/otter/repositories/Repository.kt @@ -3,17 +3,12 @@ package com.github.apognu.otter.repositories import android.content.Context import com.github.apognu.otter.utils.Cache import com.github.apognu.otter.utils.CacheItem -import com.github.apognu.otter.utils.log -import com.github.apognu.otter.utils.untilNetwork -import com.google.gson.Gson import kotlinx.coroutines.Dispatchers.IO -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.* import java.io.BufferedReader interface Upstream { - fun fetch(size: Int = 0): Channel>? + fun fetch(size: Int = 0): Flow> } abstract class Repository> { @@ -28,44 +23,29 @@ abstract class Repository> { abstract val cacheId: String? abstract val upstream: Upstream - private var _channel: Channel>? = null - private val channel: Channel> - get() { - if (_channel?.isClosedForSend ?: true) { - _channel = Channel(10) - } - - return _channel!! - } - open fun cache(data: List): C? = null protected open fun uncache(reader: BufferedReader): C? = null - fun fetch(upstreams: Int = Origin.Cache.origin and Origin.Network.origin, size: Int = 0): Channel> { - if (Origin.Cache.origin and upstreams == upstreams) fromCache() - if (Origin.Network.origin and upstreams == upstreams) fromNetwork(size) - - return channel + fun fetch(upstreams: Int = Origin.Cache.origin and Origin.Network.origin, size: Int = 0): Flow> = flow { + if (Origin.Cache.origin and upstreams == upstreams) fromCache().collect { emit(it) } + if (Origin.Network.origin and upstreams == upstreams) fromNetwork(size).collect { emit(it) } } - private fun fromCache() { - GlobalScope.launch(IO) { - cacheId?.let { cacheId -> - Cache.get(context, cacheId)?.let { reader -> - uncache(reader)?.let { cache -> - channel.offer(Response(Origin.Cache, cache.data, false)) - } + private fun fromCache() = flow { + cacheId?.let { cacheId -> + Cache.get(context, cacheId)?.let { reader -> + uncache(reader)?.let { cache -> + emit(Response(Origin.Cache, cache.data, false)) } } } - } + }.flowOn(IO) - private fun fromNetwork(size: Int) { - upstream.fetch(size)?.untilNetwork(IO) { data, _, hasMore -> - val data = onDataFetched(data) - - channel.offer(Response(Origin.Network, data, hasMore)) - } + private fun fromNetwork(size: Int) = flow { + upstream + .fetch(size) + .map { response -> Response(Origin.Network, onDataFetched(response.data), response.hasMore) } + .collect { response -> emit(Response(Origin.Network, response.data, response.hasMore)) } } protected open fun onDataFetched(data: List) = data diff --git a/app/src/main/java/com/github/apognu/otter/repositories/SearchRepository.kt b/app/src/main/java/com/github/apognu/otter/repositories/SearchRepository.kt index 32cda6d..7607cb4 100644 --- a/app/src/main/java/com/github/apognu/otter/repositories/SearchRepository.kt +++ b/app/src/main/java/com/github/apognu/otter/repositories/SearchRepository.kt @@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.TracksCache import com.github.apognu.otter.utils.TracksResponse import com.github.kittinunf.fuel.gson.gsonDeserializerOf import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import java.io.BufferedReader @@ -18,7 +20,10 @@ class SearchRepository(override val context: Context?, query: String) : Reposito override fun uncache(reader: BufferedReader) = gsonDeserializerOf(TracksCache::class.java).deserialize(reader) override fun onDataFetched(data: List): List = runBlocking { - val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data + val favorites = FavoritedRepository(context).fetch(Origin.Network.origin) + .map { it.data } + .toList() + .flatten() data.map { track -> track.favorite = favorites.contains(track.id) diff --git a/app/src/main/java/com/github/apognu/otter/repositories/TracksRepository.kt b/app/src/main/java/com/github/apognu/otter/repositories/TracksRepository.kt index a140aea..074f32f 100644 --- a/app/src/main/java/com/github/apognu/otter/repositories/TracksRepository.kt +++ b/app/src/main/java/com/github/apognu/otter/repositories/TracksRepository.kt @@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.TracksCache import com.github.apognu.otter.utils.TracksResponse import com.github.kittinunf.fuel.gson.gsonDeserializerOf import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import java.io.BufferedReader @@ -18,7 +20,10 @@ class TracksRepository(override val context: Context?, albumId: Int) : Repositor override fun uncache(reader: BufferedReader) = gsonDeserializerOf(TracksCache::class.java).deserialize(reader) override fun onDataFetched(data: List): List = runBlocking { - val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data + val favorites = FavoritedRepository(context).fetch(Origin.Network.origin) + .map { it.data } + .toList() + .flatten() data.map { track -> track.favorite = favorites.contains(track.id) diff --git a/app/src/main/java/com/github/apognu/otter/utils/EventBus.kt b/app/src/main/java/com/github/apognu/otter/utils/EventBus.kt index 56f8918..61954a5 100644 --- a/app/src/main/java/com/github/apognu/otter/utils/EventBus.kt +++ b/app/src/main/java/com/github/apognu/otter/utils/EventBus.kt @@ -7,6 +7,8 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.filter import kotlinx.coroutines.channels.map +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.conflate import kotlinx.coroutines.launch sealed class Command { @@ -55,15 +57,11 @@ sealed class Response { object EventBus { fun send(event: Event) { GlobalScope.launch { - get().offer(event) + Otter.get().eventBus.send(event) } } - fun get() = Otter.get().eventBus - - inline fun asChannel(): ReceiveChannel { - return get().openSubscription().filter { it is T }.map { it as T } - } + fun get() = Otter.get().eventBus.asFlow() } object CommandBus { @@ -82,16 +80,12 @@ object RequestBus { GlobalScope.launch(Main) { request.channel = it - get().offer(request) + Otter.get().requestBus.offer(request) } } } - fun get() = Otter.get().requestBus - - inline fun asChannel(): ReceiveChannel { - return get().openSubscription().filter { it is T }.map { it as T } - } + fun get() = Otter.get().requestBus.asFlow() } object ProgressBus { @@ -101,9 +95,7 @@ object ProgressBus { } } - fun asChannel(): ReceiveChannel> { - return Otter.get().progressBus.openSubscription() - } + fun get() = Otter.get().progressBus.asFlow().conflate() } suspend inline fun Channel.wait(): T? { diff --git a/app/src/main/java/com/github/apognu/otter/utils/Extensions.kt b/app/src/main/java/com/github/apognu/otter/utils/Extensions.kt index 71bec1a..55bee25 100644 --- a/app/src/main/java/com/github/apognu/otter/utils/Extensions.kt +++ b/app/src/main/java/com/github/apognu/otter/utils/Extensions.kt @@ -11,7 +11,8 @@ import com.squareup.picasso.Picasso import com.squareup.picasso.RequestCreator import kotlinx.coroutines.Dispatchers.Main import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlin.coroutines.CoroutineContext @@ -19,23 +20,10 @@ fun Context.getColor(colorRes: Int): Int { return ContextCompat.getColor(this, colorRes) } -inline fun Channel>.await(context: CoroutineContext = Main, crossinline callback: (data: List) -> Unit) { +inline fun Flow>.untilNetwork(context: CoroutineContext = Main, crossinline callback: (data: List, isCache: Boolean, hasMore: Boolean) -> Unit) { GlobalScope.launch(context) { - this@await.receive().also { - callback(it.data) - close() - } - } -} - -inline fun Channel>.untilNetwork(context: CoroutineContext = Main, crossinline callback: (data: List, isCache: Boolean, hasMore: Boolean) -> Unit) { - GlobalScope.launch(context) { - for (data in this@untilNetwork) { + collect { data -> callback(data.data, data.origin == Repository.Origin.Cache, data.hasMore) - - if (data.origin == Repository.Origin.Network && !data.hasMore) { - close() - } } } }