Migrate to Flows.

This commit is contained in:
Antoine POPINEAU 2019-10-31 16:17:37 +01:00
parent 4e60906fe9
commit ba12854e6c
No known key found for this signature in database
GPG Key ID: A78AC64694F84063
14 changed files with 104 additions and 133 deletions

View File

@ -32,6 +32,7 @@ import kotlinx.android.synthetic.main.partial_now_playing.*
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class MainActivity : AppCompatActivity() {
@ -178,7 +179,7 @@ class MainActivity : AppCompatActivity() {
@SuppressLint("NewApi")
private fun watchEventBus() {
GlobalScope.launch(Main) {
for (message in EventBus.asChannel<Event>()) {
EventBus.get().collect { message ->
when (message) {
is Event.LogOut -> {
PowerPreference.clearAllData()
@ -315,7 +316,7 @@ class MainActivity : AppCompatActivity() {
}
GlobalScope.launch(Main) {
for ((current, duration, percent) in ProgressBus.asChannel()) {
ProgressBus.get().collect { (current, duration, percent) ->
now_playing_progress.progress = percent
now_playing_details_progress.progress = percent

View File

@ -9,6 +9,7 @@ import com.github.apognu.otter.utils.*
import kotlinx.android.synthetic.main.fragment_favorites.*
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class FavoritesFragment : FunkwhaleFragment<Track, FavoritesAdapter>() {
@ -43,7 +44,7 @@ class FavoritesFragment : FunkwhaleFragment<Track, FavoritesAdapter>() {
private fun watchEventBus() {
GlobalScope.launch(Main) {
for (message in EventBus.asChannel<Event>()) {
EventBus.get().collect { message ->
when (message) {
is Event.TrackPlayed -> {
GlobalScope.launch(Main) {

View File

@ -10,6 +10,7 @@ import androidx.recyclerview.widget.RecyclerView
import com.github.apognu.otter.repositories.HttpUpstream
import com.github.apognu.otter.repositories.Repository
import com.github.apognu.otter.utils.Cache
import com.github.apognu.otter.utils.log
import com.github.apognu.otter.utils.untilNetwork
import com.google.gson.Gson
import kotlinx.android.synthetic.main.fragment_artists.*
@ -44,8 +45,8 @@ abstract class FunkwhaleFragment<D : Any, A : FunkwhaleAdapter<D, *>> : Fragment
(repository.upstream as? HttpUpstream<*, *>)?.let { upstream ->
if (upstream.behavior == HttpUpstream.Behavior.Progressive) {
recycler.setOnScrollChangeListener { _, _, _, _, _ ->
if (!recycler.canScrollVertically(1)) {
recycler.setOnScrollChangeListener { _, _, y, _, _ ->
if (y > 0 && !recycler.canScrollVertically(1)) {
fetch(Repository.Origin.Network.origin, adapter.data.size)
}
}
@ -66,24 +67,28 @@ abstract class FunkwhaleFragment<D : Any, A : FunkwhaleAdapter<D, *>> : Fragment
open fun onDataFetched(data: List<D>) {}
private fun fetch(upstreams: Int = (Repository.Origin.Network.origin and Repository.Origin.Cache.origin), size: Int = 0) {
var cleared = false
var first = true
swiper?.isRefreshing = true
if (size == 0) {
cleared = true
adapter.data.clear()
}
repository.fetch(upstreams, size).untilNetwork(IO) { data, isCache, hasMore ->
if (isCache) {
adapter.data = data.toMutableList()
adapter.notifyDataSetChanged()
return@untilNetwork
}
if (first) {
first = false
adapter.data.clear()
}
onDataFetched(data)
adapter.data.addAll(data)
if (!hasMore) {
swiper?.isRefreshing = false
@ -97,13 +102,12 @@ abstract class FunkwhaleFragment<D : Any, A : FunkwhaleAdapter<D, *>> : Fragment
}
GlobalScope.launch(Main) {
adapter.data.addAll(data)
when (cleared) {
when (first) {
true -> {
adapter.notifyDataSetChanged()
cleared = false
first = false
}
false -> adapter.notifyItemRangeInserted(adapter.data.size, data.size)
}
}

View File

@ -13,6 +13,7 @@ import com.squareup.picasso.Picasso
import kotlinx.android.synthetic.main.fragment_tracks.*
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class PlaylistTracksFragment : FunkwhaleFragment<PlaylistTrack, PlaylistTracksAdapter>() {
@ -109,7 +110,7 @@ class PlaylistTracksFragment : FunkwhaleFragment<PlaylistTrack, PlaylistTracksAd
private fun watchEventBus() {
GlobalScope.launch(Main) {
for (message in EventBus.asChannel<Event>()) {
EventBus.get().collect { message ->
when (message) {
is Event.TrackPlayed -> {
GlobalScope.launch(Main) {

View File

@ -16,6 +16,7 @@ import kotlinx.android.synthetic.main.fragment_queue.*
import kotlinx.android.synthetic.main.fragment_queue.view.*
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class QueueFragment : BottomSheetDialogFragment() {
@ -78,7 +79,7 @@ class QueueFragment : BottomSheetDialogFragment() {
private fun watchEventBus() {
GlobalScope.launch(Main) {
for (message in EventBus.asChannel<Event>()) {
EventBus.get().collect { message ->
when (message) {
is Event.TrackPlayed -> refresh()
is Event.QueueChanged -> refresh()

View File

@ -13,6 +13,7 @@ import com.squareup.picasso.Picasso
import kotlinx.android.synthetic.main.fragment_tracks.*
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class TracksFragment : FunkwhaleFragment<Track, TracksAdapter>() {
@ -95,7 +96,7 @@ class TracksFragment : FunkwhaleFragment<Track, TracksAdapter>() {
private fun watchEventBus() {
GlobalScope.launch(Main) {
for (message in EventBus.asChannel<Event>()) {
EventBus.get().collect { message ->
when (message) {
is Event.TrackPlayed -> {
GlobalScope.launch(Main) {

View File

@ -22,6 +22,7 @@ import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
class PlayerService : Service() {
@ -183,7 +184,7 @@ class PlayerService : Service() {
})
jobs.add(GlobalScope.launch(Main) {
for (request in RequestBus.asChannel<Request>()) {
RequestBus.get().collect { request ->
when (request) {
is Request.GetCurrentTrack -> request.channel?.offer(
Response.CurrentTrack(

View File

@ -10,10 +10,10 @@ import com.github.kittinunf.fuel.coroutines.awaitObjectResult
import com.github.kittinunf.result.Result
import com.google.gson.Gson
import com.preference.PowerPreference
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import java.io.Reader
import java.lang.reflect.Type
import kotlin.math.ceil
@ -23,53 +23,39 @@ class HttpUpstream<D : Any, R : FunkwhaleResponse<D>>(val behavior: Behavior, pr
Single, AtOnce, Progressive
}
private var _channel: Channel<Repository.Response<D>>? = null
private val channel: Channel<Repository.Response<D>>
get() {
if (_channel?.isClosedForSend ?: true) {
_channel = Channel()
}
return _channel!!
}
override fun fetch(size: Int): Channel<Repository.Response<D>>? {
if (behavior == Behavior.Single && size != 0) return null
override fun fetch(size: Int): Flow<Repository.Response<D>> = flow {
if (behavior == Behavior.Single && size != 0) return@flow
val page = ceil(size / AppContext.PAGE_SIZE.toDouble()).toInt() + 1
GlobalScope.launch(Dispatchers.IO) {
val offsetUrl =
Uri.parse(url)
.buildUpon()
.appendQueryParameter("page_size", AppContext.PAGE_SIZE.toString())
.appendQueryParameter("page", page.toString())
.build()
.toString()
val offsetUrl =
Uri.parse(url)
.buildUpon()
.appendQueryParameter("page_size", AppContext.PAGE_SIZE.toString())
.appendQueryParameter("page", page.toString())
.build()
.toString()
get(offsetUrl).fold(
{ response ->
val data = response.getData()
get(offsetUrl).fold(
{ response ->
val data = response.getData()
if (behavior == Behavior.Progressive || response.next == null) {
channel.offer(Repository.Response(Repository.Origin.Network, data, false))
} else {
channel.offer(Repository.Response(Repository.Origin.Network, data, true))
if (behavior == Behavior.Progressive || response.next == null) {
emit(Repository.Response(Repository.Origin.Network, data, false))
} else {
emit(Repository.Response(Repository.Origin.Network, data, true))
fetch(size + data.size)
}
},
{ error ->
when (error.exception) {
is RefreshError -> EventBus.send(Event.LogOut)
else -> channel.offer(Repository.Response(Repository.Origin.Network, listOf(), false))
}
fetch(size + data.size)
}
)
}
return channel
}
},
{ error ->
when (error.exception) {
is RefreshError -> EventBus.send(Event.LogOut)
else -> emit(Repository.Response(Repository.Origin.Network, listOf(), false))
}
}
)
}.flowOn(IO)
class GenericDeserializer<T : FunkwhaleResponse<*>>(val type: Type) : ResponseDeserializable<T> {
override fun deserialize(reader: Reader): T? {

View File

@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.PlaylistTracksCache
import com.github.apognu.otter.utils.PlaylistTracksResponse
import com.github.kittinunf.fuel.gson.gsonDeserializerOf
import com.google.gson.reflect.TypeToken
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import java.io.BufferedReader
@ -18,7 +20,10 @@ class PlaylistTracksRepository(override val context: Context?, playlistId: Int)
override fun uncache(reader: BufferedReader) = gsonDeserializerOf(PlaylistTracksCache::class.java).deserialize(reader)
override fun onDataFetched(data: List<PlaylistTrack>): List<PlaylistTrack> = runBlocking {
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin)
.map { it.data }
.toList()
.flatten()
data.map { track ->
track.track.favorite = favorites.contains(track.track.id)

View File

@ -3,17 +3,12 @@ package com.github.apognu.otter.repositories
import android.content.Context
import com.github.apognu.otter.utils.Cache
import com.github.apognu.otter.utils.CacheItem
import com.github.apognu.otter.utils.log
import com.github.apognu.otter.utils.untilNetwork
import com.google.gson.Gson
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.*
import java.io.BufferedReader
interface Upstream<D> {
fun fetch(size: Int = 0): Channel<Repository.Response<D>>?
fun fetch(size: Int = 0): Flow<Repository.Response<D>>
}
abstract class Repository<D : Any, C : CacheItem<D>> {
@ -28,44 +23,29 @@ abstract class Repository<D : Any, C : CacheItem<D>> {
abstract val cacheId: String?
abstract val upstream: Upstream<D>
private var _channel: Channel<Response<D>>? = null
private val channel: Channel<Response<D>>
get() {
if (_channel?.isClosedForSend ?: true) {
_channel = Channel(10)
}
return _channel!!
}
open fun cache(data: List<D>): C? = null
protected open fun uncache(reader: BufferedReader): C? = null
fun fetch(upstreams: Int = Origin.Cache.origin and Origin.Network.origin, size: Int = 0): Channel<Response<D>> {
if (Origin.Cache.origin and upstreams == upstreams) fromCache()
if (Origin.Network.origin and upstreams == upstreams) fromNetwork(size)
return channel
fun fetch(upstreams: Int = Origin.Cache.origin and Origin.Network.origin, size: Int = 0): Flow<Response<D>> = flow {
if (Origin.Cache.origin and upstreams == upstreams) fromCache().collect { emit(it) }
if (Origin.Network.origin and upstreams == upstreams) fromNetwork(size).collect { emit(it) }
}
private fun fromCache() {
GlobalScope.launch(IO) {
cacheId?.let { cacheId ->
Cache.get(context, cacheId)?.let { reader ->
uncache(reader)?.let { cache ->
channel.offer(Response(Origin.Cache, cache.data, false))
}
private fun fromCache() = flow {
cacheId?.let { cacheId ->
Cache.get(context, cacheId)?.let { reader ->
uncache(reader)?.let { cache ->
emit(Response(Origin.Cache, cache.data, false))
}
}
}
}
}.flowOn(IO)
private fun fromNetwork(size: Int) {
upstream.fetch(size)?.untilNetwork(IO) { data, _, hasMore ->
val data = onDataFetched(data)
channel.offer(Response(Origin.Network, data, hasMore))
}
private fun fromNetwork(size: Int) = flow {
upstream
.fetch(size)
.map { response -> Response(Origin.Network, onDataFetched(response.data), response.hasMore) }
.collect { response -> emit(Response(Origin.Network, response.data, response.hasMore)) }
}
protected open fun onDataFetched(data: List<D>) = data

View File

@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.TracksCache
import com.github.apognu.otter.utils.TracksResponse
import com.github.kittinunf.fuel.gson.gsonDeserializerOf
import com.google.gson.reflect.TypeToken
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import java.io.BufferedReader
@ -18,7 +20,10 @@ class SearchRepository(override val context: Context?, query: String) : Reposito
override fun uncache(reader: BufferedReader) = gsonDeserializerOf(TracksCache::class.java).deserialize(reader)
override fun onDataFetched(data: List<Track>): List<Track> = runBlocking {
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin)
.map { it.data }
.toList()
.flatten()
data.map { track ->
track.favorite = favorites.contains(track.id)

View File

@ -7,6 +7,8 @@ import com.github.apognu.otter.utils.TracksCache
import com.github.apognu.otter.utils.TracksResponse
import com.github.kittinunf.fuel.gson.gsonDeserializerOf
import com.google.gson.reflect.TypeToken
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import java.io.BufferedReader
@ -18,7 +20,10 @@ class TracksRepository(override val context: Context?, albumId: Int) : Repositor
override fun uncache(reader: BufferedReader) = gsonDeserializerOf(TracksCache::class.java).deserialize(reader)
override fun onDataFetched(data: List<Track>): List<Track> = runBlocking {
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin).receive().data
val favorites = FavoritedRepository(context).fetch(Origin.Network.origin)
.map { it.data }
.toList()
.flatten()
data.map { track ->
track.favorite = favorites.contains(track.id)

View File

@ -7,6 +7,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.filter
import kotlinx.coroutines.channels.map
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.launch
sealed class Command {
@ -55,15 +57,11 @@ sealed class Response {
object EventBus {
fun send(event: Event) {
GlobalScope.launch {
get().offer(event)
Otter.get().eventBus.send(event)
}
}
fun get() = Otter.get().eventBus
inline fun <reified T : Event> asChannel(): ReceiveChannel<T> {
return get().openSubscription().filter { it is T }.map { it as T }
}
fun get() = Otter.get().eventBus.asFlow()
}
object CommandBus {
@ -82,16 +80,12 @@ object RequestBus {
GlobalScope.launch(Main) {
request.channel = it
get().offer(request)
Otter.get().requestBus.offer(request)
}
}
}
fun get() = Otter.get().requestBus
inline fun <reified T> asChannel(): ReceiveChannel<T> {
return get().openSubscription().filter { it is T }.map { it as T }
}
fun get() = Otter.get().requestBus.asFlow()
}
object ProgressBus {
@ -101,9 +95,7 @@ object ProgressBus {
}
}
fun asChannel(): ReceiveChannel<Triple<Int, Int, Int>> {
return Otter.get().progressBus.openSubscription()
}
fun get() = Otter.get().progressBus.asFlow().conflate()
}
suspend inline fun <reified T> Channel<Response>.wait(): T? {

View File

@ -11,7 +11,8 @@ import com.squareup.picasso.Picasso
import com.squareup.picasso.RequestCreator
import kotlinx.coroutines.Dispatchers.Main
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
@ -19,23 +20,10 @@ fun Context.getColor(colorRes: Int): Int {
return ContextCompat.getColor(this, colorRes)
}
inline fun <D> Channel<Repository.Response<D>>.await(context: CoroutineContext = Main, crossinline callback: (data: List<D>) -> Unit) {
inline fun <D> Flow<Repository.Response<D>>.untilNetwork(context: CoroutineContext = Main, crossinline callback: (data: List<D>, isCache: Boolean, hasMore: Boolean) -> Unit) {
GlobalScope.launch(context) {
this@await.receive().also {
callback(it.data)
close()
}
}
}
inline fun <D> Channel<Repository.Response<D>>.untilNetwork(context: CoroutineContext = Main, crossinline callback: (data: List<D>, isCache: Boolean, hasMore: Boolean) -> Unit) {
GlobalScope.launch(context) {
for (data in this@untilNetwork) {
collect { data ->
callback(data.data, data.origin == Repository.Origin.Cache, data.hasMore)
if (data.origin == Repository.Origin.Network && !data.hasMore) {
close()
}
}
}
}