Manage sync in an infinite thread

This commit is contained in:
ganfra 2018-10-17 13:59:21 +02:00
parent 4904ac894e
commit 44eb838610
20 changed files with 221 additions and 150 deletions

View File

@ -1,7 +1,9 @@
<component name="ProjectDictionaryState">
<dictionary name="ganfra">
<words>
<w>connectable</w>
<w>coroutine</w>
<w>merlins</w>
<w>moshi</w>
<w>synchronizer</w>
</words>

View File

@ -3,53 +3,31 @@ package im.vector.riotredesign.features.home
import android.content.Context
import android.content.Intent
import android.os.Bundle
import android.view.View
import im.vector.matrix.android.api.Matrix
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.events.sync.data.SyncResponse
import im.vector.riotredesign.R
import im.vector.riotredesign.core.platform.RiotActivity
import kotlinx.android.synthetic.main.activity_home.*
import org.koin.android.ext.android.inject
import timber.log.Timber
class HomeActivity : RiotActivity() {
private val matrix by inject<Matrix>()
private val synchronizer = matrix.currentSession?.synchronizer()
private val realmHolder = matrix.currentSession?.realmHolder()
private val currentSession = matrix.currentSession!!
private val realmHolder = currentSession.realmHolder()
private val syncThread = currentSession.syncThread()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_home)
synchronizeButton.setOnClickListener { synchronize() }
}
private fun synchronize() {
synchronizeButton.visibility = View.GONE
loadingView.visibility = View.VISIBLE
synchronizer?.synchronize(object : MatrixCallback<SyncResponse> {
override fun onSuccess(data: SyncResponse) {
synchronizeButton.visibility = View.VISIBLE
loadingView.visibility = View.GONE
Timber.v("Sync successful")
}
override fun onFailure(failure: Failure) {
synchronizeButton.visibility = View.VISIBLE
loadingView.visibility = View.GONE
Timber.e("Sync has failed : %s", failure.toString())
}
})
if (realmHolder != null) {
val results = realmHolder.instance.where(RoomSummaryEntity::class.java).findAll()
results.addChangeListener { summaries ->
Timber.v("Summaries updated")
}
val results = realmHolder.instance.where(RoomSummaryEntity::class.java).findAll()
results.addChangeListener { summaries ->
Timber.v("Summaries updated")
}
startSyncButton.setOnClickListener { syncThread.restart() }
stopSyncButton.setOnClickListener { syncThread.pause() }
}
companion object {

View File

@ -7,34 +7,34 @@
android:orientation="vertical"
tools:context=".features.login.LoginActivity">
<ProgressBar
android:id="@+id/loadingView"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
<Button
android:id="@+id/startSyncButton"
android:layout_width="0dp"
android:layout_height="0dp"
android:layout_marginBottom="8dp"
android:layout_marginEnd="8dp"
android:layout_marginStart="8dp"
android:layout_marginTop="8dp"
android:visibility="gone"
app:layout_constraintBottom_toBottomOf="parent"
android:text="Start sync"
app:layout_constraintBottom_toTopOf="@+id/stopSyncButton"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/synchronizeButton"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:id="@+id/stopSyncButton"
android:layout_width="0dp"
android:layout_height="0dp"
android:layout_marginBottom="8dp"
android:layout_marginEnd="8dp"
android:layout_marginStart="8dp"
android:layout_marginTop="8dp"
android:text="Synchronize"
android:text="stop sync"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintHorizontal_bias="0.5"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toTopOf="parent" />
app:layout_constraintTop_toBottomOf="@+id/startSyncButton" />
</android.support.constraint.ConstraintLayout>

View File

@ -45,6 +45,7 @@ dependencies {
def support_version = '28.0.0'
def moshi_version = '1.7.0'
def lifecycle_version = "1.1.1"
def work_version = "1.0.0-alpha10"
implementation fileTree(dir: 'libs', include: ['*.aar'])
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
@ -55,7 +56,6 @@ dependencies {
implementation "android.arch.lifecycle:extensions:$lifecycle_version"
kapt "android.arch.lifecycle:compiler:$lifecycle_version"
// Network
implementation 'com.squareup.retrofit2:retrofit:2.4.0'
implementation 'com.squareup.retrofit2:converter-moshi:2.4.0'
@ -64,6 +64,7 @@ dependencies {
implementation 'com.squareup.okhttp3:okhttp:3.10.0'
implementation 'com.squareup.okhttp3:logging-interceptor:3.10.0'
implementation 'com.squareup.okio:okio:1.15.0'
implementation 'com.novoda:merlin:1.1.6'
implementation 'com.google.code.gson:gson:2.8.5'
implementation "com.squareup.moshi:moshi-adapters:$moshi_version"
@ -72,6 +73,10 @@ dependencies {
// Paging
implementation "android.arch.paging:runtime:1.0.1"
// Worker
implementation "android.arch.work:work-runtime-ktx:$work_version"
implementation 'com.evernote:android-job:1.2.6'
implementation "io.arrow-kt:arrow-core:$arrow_version"
// DI

View File

@ -1,5 +1,6 @@
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="im.vector.matrix.android" >
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.VIBRATE" />
</manifest>

View File

@ -1,7 +1,7 @@
package im.vector.matrix.android.api
import android.content.Context
import im.vector.matrix.android.BuildConfig
import com.evernote.android.job.JobManager
import im.vector.matrix.android.api.auth.Authenticator
import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.internal.auth.AuthModule
@ -11,8 +11,6 @@ import io.realm.Realm
import org.koin.standalone.KoinComponent
import org.koin.standalone.StandAloneContext.loadKoinModules
import org.koin.standalone.inject
import timber.log.Timber
import timber.log.Timber.DebugTree
class Matrix(matrixOptions: MatrixOptions) : KoinComponent {
@ -23,13 +21,11 @@ class Matrix(matrixOptions: MatrixOptions) : KoinComponent {
init {
Realm.init(matrixOptions.context)
JobManager.create(matrixOptions.context)
val matrixModule = MatrixModule(matrixOptions)
val networkModule = NetworkModule()
val authModule = AuthModule()
loadKoinModules(listOf(matrixModule, networkModule, authModule))
if (BuildConfig.DEBUG) {
Timber.plant(DebugTree())
}
}
fun authenticator(): Authenticator {

View File

@ -2,14 +2,14 @@ package im.vector.matrix.android.api.session
import android.support.annotation.MainThread
import im.vector.matrix.android.internal.database.SessionRealmHolder
import im.vector.matrix.android.internal.events.sync.Synchronizer
import im.vector.matrix.android.internal.events.sync.job.SyncThread
interface Session {
@MainThread
fun open()
fun synchronizer(): Synchronizer
fun syncThread(): SyncThread
// Visible for testing request directly. Will be deleted
fun realmHolder(): SessionRealmHolder

View File

@ -1,57 +0,0 @@
package im.vector.matrix.android.api.util
interface Logger {
/** Log a verbose message with optional format args. */
fun v(message: String, vararg args: Any)
/** Log a verbose exception and a message with optional format args. */
fun v(t: Throwable, message: String, vararg args: Any)
/** Log a verbose exception. */
fun v(t: Throwable)
/** Log a debug message with optional format args. */
fun d(message: String, vararg args: Any)
/** Log a debug exception and a message with optional format args. */
fun d(t: Throwable, message: String, vararg args: Any)
/** Log a debug exception. */
fun d(t: Throwable)
/** Log an info message with optional format args. */
fun i(message: String, vararg args: Any)
/** Log an info exception and a message with optional format args. */
fun i(t: Throwable, message: String, vararg args: Any)
/** Log an info exception. */
fun i(t: Throwable)
/** Log a warning message with optional format args. */
fun w(message: String, vararg args: Any)
/** Log a warning exception and a message with optional format args. */
fun w(t: Throwable, message: String, vararg args: Any)
/** Log a warning exception. */
fun w(t: Throwable)
/** Log an error message with optional format args. */
fun e(message: String, vararg args: Any)
/** Log an error exception and a message with optional format args. */
fun e(t: Throwable, message: String, vararg args: Any)
/** Log an error exception. */
fun e(t: Throwable)
/** Log an assert message with optional format args. */
fun wtf(message: String, vararg args: Any)
/** Log an assert exception and a message with optional format args. */
fun wtf(t: Throwable, message: String, vararg args: Any)
/** Log an assert exception. */
fun wtf(t: Throwable)
}

View File

@ -15,7 +15,7 @@ object EventMapper {
internal fun map(event: Event): EventEntity {
val eventEntity = EventEntity()
eventEntity.eventId = event.eventId!!
eventEntity.eventId = event.eventId ?: ""
eventEntity.content = adapter.toJson(event.content)
eventEntity.prevContent = adapter.toJson(event.prevContent)
eventEntity.stateKey = event.stateKey

View File

@ -2,6 +2,7 @@ package im.vector.matrix.android.internal.di
import com.jakewharton.retrofit2.adapter.kotlin.coroutines.CoroutineCallAdapterFactory
import im.vector.matrix.android.internal.network.AccessTokenInterceptor
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
import org.koin.dsl.context.ModuleDefinition
@ -49,6 +50,10 @@ class NetworkModule : Module {
CoroutineCallAdapterFactory() as CallAdapter.Factory
}
single {
NetworkConnectivityChecker(get())
}
factory {
Retrofit.Builder()
.client(get())

View File

@ -104,6 +104,7 @@ class RoomSyncHandler(private val realmConfiguration: RealmConfiguration) {
chunkEntity.nextToken = nextToken
chunkEntity.isLimited = isLimited
eventList.forEach { event ->
val eventEntity = event.asEntity().let {
realm.copyToRealmOrUpdate(it)

View File

@ -1,18 +0,0 @@
package im.vector.matrix.android.internal.events.sync
import com.squareup.moshi.Json
import com.squareup.moshi.JsonClass
@JsonClass(generateAdapter = true)
data class StateEvent(
@Json(name = "name") val name: String? = null,
@Json(name = "topic") val topic: String? = null,
@Json(name = "join_rule") val joinRule: String? = null,
@Json(name = "guest_access") val guestAccess: String? = null,
@Json(name = "alias") val canonicalAlias: String? = null,
@Json(name = "aliases") val aliases: List<String>? = null,
@Json(name = "algorithm") val algorithm: String? = null,
@Json(name = "history_visibility") val historyVisibility: String? = null,
@Json(name = "url") val url: String? = null,
@Json(name = "groups") val groups: List<String>? = null
)

View File

@ -1,11 +1,13 @@
package im.vector.matrix.android.internal.events.sync
import im.vector.matrix.android.internal.events.sync.job.SyncThread
import im.vector.matrix.android.internal.session.DefaultSession
import org.koin.dsl.context.ModuleDefinition
import org.koin.dsl.module.Module
import org.koin.dsl.module.module
import retrofit2.Retrofit
class SyncModule : Module {
override fun invoke(): ModuleDefinition = module(override = true) {
@ -24,8 +26,13 @@ class SyncModule : Module {
}
scope(DefaultSession.SCOPE) {
Synchronizer(get(), get(), get())
SyncRequest(get(), get(), get())
}
scope(DefaultSession.SCOPE) {
SyncThread(get(), get())
}
}.invoke()
}

View File

@ -16,28 +16,27 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
class Synchronizer(private val syncAPI: SyncAPI,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val syncResponseHandler: SyncResponseHandler) {
class SyncRequest(private val syncAPI: SyncAPI,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val syncResponseHandler: SyncResponseHandler) {
private var token: String? = null
fun synchronize(callback: MatrixCallback<SyncResponse>): Cancelable {
fun execute(token: String?, callback: MatrixCallback<SyncResponse>): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val syncOrFailure = synchronize()
val syncOrFailure = execute(token)
syncOrFailure.bimap({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}
private suspend fun synchronize() = withContext(coroutineDispatchers.io) {
private suspend fun execute(token: String?) = withContext(coroutineDispatchers.io) {
val params = HashMap<String, String>()
val filterBody = FilterBody()
FilterUtil.enableLazyLoading(filterBody, true)
var timeout = 0
if (token != null) {
params["since"] = token as String
timeout = 30
timeout = 30000
}
params["timeout"] = timeout.toString()
params["filter"] = filterBody.toJSONString()
@ -46,7 +45,6 @@ class Synchronizer(private val syncAPI: SyncAPI,
}.leftIfNull {
Failure.Unknown(RuntimeException("Sync response shouln't be null"))
}.flatMap {
token = it?.nextBatch
try {
syncResponseHandler.handleResponse(it, null, false)
Either.right(it)

View File

@ -0,0 +1,105 @@
package im.vector.matrix.android.internal.events.sync.job
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.events.sync.SyncRequest
import im.vector.matrix.android.internal.events.sync.data.SyncResponse
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import timber.log.Timber
import java.util.concurrent.CountDownLatch
private const val RETRY_WAIT_TIME_MS = 10_000L
class SyncThread(private val syncRequest: SyncRequest,
private val networkConnectivityChecker: NetworkConnectivityChecker
) : Thread(), NetworkConnectivityChecker.Listener {
enum class State {
IDLE,
RUNNING,
PAUSED,
KILLING,
KILLED
}
private var state: State = State.IDLE
private val lock = Object()
private var nextBatch: String? = null
private var cancelableRequest: Cancelable? = null
fun restart() {
synchronized(lock) {
if (state != State.PAUSED) {
return@synchronized
}
Timber.v("Unpause sync...")
state = State.RUNNING
lock.notify()
}
}
fun pause() {
synchronized(lock) {
if (state != State.RUNNING) {
return@synchronized
}
Timber.v("Pause sync...")
state = State.PAUSED
}
}
fun kill() {
synchronized(lock) {
Timber.v("Kill sync...")
state = State.KILLING
cancelableRequest?.cancel()
lock.notify()
}
}
override fun run() {
Timber.v("Start syncing...")
state = State.RUNNING
networkConnectivityChecker.register(this)
while (state != State.KILLING) {
if (!networkConnectivityChecker.isConnected() || state == State.PAUSED) {
Timber.v("Waiting...")
synchronized(lock) {
lock.wait()
}
} else {
Timber.v("Execute sync request...")
val latch = CountDownLatch(1)
cancelableRequest = syncRequest.execute(nextBatch, object : MatrixCallback<SyncResponse> {
override fun onSuccess(data: SyncResponse) {
nextBatch = data.nextBatch
latch.countDown()
}
override fun onFailure(failure: Failure) {
if (failure !is Failure.NetworkConnection) {
// Wait 10s before retrying
sleep(RETRY_WAIT_TIME_MS)
}
latch.countDown()
}
})
latch.await()
}
}
Timber.v("Sync killed")
state = State.KILLED
networkConnectivityChecker.unregister(this)
}
override fun onConnect() {
synchronized(lock) {
lock.notify()
}
}
}

View File

@ -1075,7 +1075,7 @@ public class MXSession {
}
if (null != mEventsThread) {
Log.d(LOG_TAG, "## resumeEventStream() : unpause");
Log.d(LOG_TAG, "## resumeEventStream() : pickUp");
mEventsThread.unpause();
} else {
Log.e(LOG_TAG, "resumeEventStream : mEventsThread is null");

View File

@ -228,7 +228,7 @@ public class EventsThread extends Thread {
}
/**
* Pause the thread. It will resume where it left off when unpause()d.
* Pause the thread. It will resume where it left off when pickUp()d.
*/
public void pause() {
Log.d(LOG_TAG, "pause()");
@ -264,10 +264,10 @@ public class EventsThread extends Thread {
* Unpause the thread if it had previously been paused. If not, this does nothing.
*/
public void unpause() {
Log.d(LOG_TAG, "## unpause() : thread state " + getState());
Log.d(LOG_TAG, "## pickUp() : thread state " + getState());
if (State.WAITING == getState()) {
Log.d(LOG_TAG, "## unpause() : the thread was paused so resume it.");
Log.d(LOG_TAG, "## pickUp() : the thread was paused so resume it.");
mPaused = false;
synchronized (mSyncObject) {

View File

@ -0,0 +1,44 @@
package im.vector.matrix.android.internal.network
import android.content.Context
import com.novoda.merlin.Merlin
import com.novoda.merlin.MerlinsBeard
import com.novoda.merlin.registerable.connection.Connectable
class NetworkConnectivityChecker(context: Context) {
private val merlin = Merlin.Builder().withConnectableCallbacks().build(context)
private val merlinsBeard = MerlinsBeard.from(context)
private val listeners = ArrayList<Listener>()
fun register(listener: Listener) {
if (listeners.isEmpty()) {
merlin.bind()
}
listeners.add(listener)
val connectable = Connectable {
if (listeners.contains(listener)) {
listener.onConnect()
}
}
merlin.registerConnectable(connectable)
}
fun unregister(listener: Listener) {
if (listeners.remove(listener) && listeners.isEmpty()) {
merlin.unbind()
}
}
fun isConnected(): Boolean {
return merlinsBeard.isConnected
}
interface Listener {
fun onConnect()
}
}

View File

@ -7,13 +7,14 @@ import im.vector.matrix.android.internal.auth.data.SessionParams
import im.vector.matrix.android.internal.database.SessionRealmHolder
import im.vector.matrix.android.internal.di.SessionModule
import im.vector.matrix.android.internal.events.sync.SyncModule
import im.vector.matrix.android.internal.events.sync.Synchronizer
import im.vector.matrix.android.internal.events.sync.job.SyncThread
import org.koin.core.scope.Scope
import org.koin.standalone.KoinComponent
import org.koin.standalone.StandAloneContext
import org.koin.standalone.getKoin
import org.koin.standalone.inject
class DefaultSession(private val sessionParams: SessionParams) : Session, KoinComponent {
companion object {
@ -23,9 +24,8 @@ class DefaultSession(private val sessionParams: SessionParams) : Session, KoinCo
private lateinit var scope: Scope
private val realmInstanceHolder by inject<SessionRealmHolder>()
private val synchronizer by inject<Synchronizer>()
private val roomSummaryObserver by inject<RoomSummaryObserver>()
private val syncThread by inject<SyncThread>()
private var isOpen = false
@MainThread
@ -39,11 +39,7 @@ class DefaultSession(private val sessionParams: SessionParams) : Session, KoinCo
scope = getKoin().getOrCreateScope(SCOPE)
realmInstanceHolder.open()
roomSummaryObserver.start()
}
override fun synchronizer(): Synchronizer {
assert(isOpen)
return synchronizer
syncThread.start()
}
override fun realmHolder(): SessionRealmHolder {
@ -51,16 +47,24 @@ class DefaultSession(private val sessionParams: SessionParams) : Session, KoinCo
return realmInstanceHolder
}
override fun syncThread(): SyncThread {
assert(isOpen)
return syncThread
}
@MainThread
override fun close() {
checkIsMainThread()
assert(isOpen)
syncThread.kill()
roomSummaryObserver.dispose()
realmInstanceHolder.close()
scope.close()
isOpen = false
}
// Private methods *****************************************************************************
private fun checkIsMainThread() {
if (Looper.myLooper() != Looper.getMainLooper()) {
throw IllegalStateException("Should be called on main thread")