This commit is contained in:
tateisu 2020-12-22 05:16:33 +09:00
parent 953d4620a1
commit 21508be87a
14 changed files with 747 additions and 565 deletions

View File

@ -201,7 +201,6 @@ class Column(
internal const val HASHTAG_ELLIPSIZE = 26
const val STREAM = "stream"
@Suppress("UNCHECKED_CAST")
private inline fun <reified T> getParamAt(params: Array<out Any>, idx: Int): T {
@ -373,33 +372,9 @@ class Column(
// カラムオブジェクトの識別に使うID。
val internalId = internalIdSeed.incrementAndGet()
override fun hashCode(): Int = internalId
override fun equals(other: Any?): Boolean = this===other
val type = ColumnType.parse(typeId)
fun getIconId(): Int = type.iconId(access_info.acct)
fun getColumnName(long: Boolean) =
type.name2(this, long) ?: type.name1(context)
private var lastStreamPath: String? = null
internal fun makeHashtagQueryParams(tagKey:String? = "tag") =JsonObject().apply{
if(tagKey!=null) put(tagKey,hashtag)
hashtag_any.split(" ").filter { it.isNotEmpty() }.toJsonArray()
.notEmpty()?.let{ put("any",it)}
hashtag_all.split(" ").filter { it.isNotEmpty() }.toJsonArray()
.notEmpty()?.let{ put("all",it)}
hashtag_none.split(" ").filter { it.isNotEmpty() }.toJsonArray()
.notEmpty()?.let{ put("none",it)}
}
internal var dont_close: Boolean = false
@ -762,6 +737,18 @@ class Column(
}
}
override fun hashCode(): Int = internalId
override fun equals(other: Any?): Boolean = this === other
fun getIconId(): Int = type.iconId(access_info.acct)
fun getColumnName(long: Boolean) =
type.name2(this, long) ?: type.name1(context)
private fun JsonObject.putIfTrue(key: String, value: Boolean) {
if (value) put(key, true)
}
@ -2314,7 +2301,6 @@ class Column(
}
internal fun hasHashtagExtra() = when {
isMisskey -> false
type == ColumnType.HASHTAG -> true
@ -2431,7 +2417,7 @@ class Column(
}
override fun onTimelineItem(item: TimelineItem, channelId: String?) {
override fun onTimelineItem(item: TimelineItem, channelId: String?,stream:JsonArray?) {
if (is_dispose.get()) return
if (item is TootConversationSummary) {
@ -2467,20 +2453,19 @@ class Column(
override fun onNoteUpdated(ev: MisskeyNoteUpdate, channelId: String?) {
// userId が自分かどうか調べる
// アクセストークンの更新をして自分のuserIdが分かる状態でないとキャプチャ結果を反映させない
// でないとリアクションの2重カウントなどが発生してしまう)
val myId = EntityId.from(access_info.token_info, TootApiClient.KEY_USER_ID)
if (myId == null) {
log.w("onNoteUpdated: missing my userId. updating access token is recommenced!!")
return
}
val byMe = myId == ev.userId
runOnMainLooper {
if (is_dispose.get()) return@runOnMainLooper
// userId が自分かどうか調べる
// アクセストークンの更新をして自分のuserIdが分かる状態でないとキャプチャ結果を反映させない
// でないとリアクションの2重カウントなどが発生してしまう)
val myId = EntityId.from(access_info.token_info, TootApiClient.KEY_USER_ID)
if (myId == null) {
log.w("onNoteUpdated: missing my userId. updating access token is recommenced!!")
}
val byMe = myId == ev.userId
val changeList = ArrayList<AdapterChange>()
fun scanStatus1(s: TootStatus?, idx: Int, block: (s: TootStatus) -> Boolean) {
@ -2538,64 +2523,70 @@ class Column(
}
override fun onAnnouncementUpdate(item: TootAnnouncement) {
val list = announcements
if (list == null) {
announcements = mutableListOf(item)
} else {
val index = list.indexOfFirst { it.id == item.id }
list.add(
0,
if (index == -1) {
item
} else {
TootAnnouncement.merge(list.removeAt(index), item)
}
)
runOnMainLooper {
val list = announcements
if (list == null) {
announcements = mutableListOf(item)
} else {
val index = list.indexOfFirst { it.id == item.id }
list.add(
0,
if (index == -1) {
item
} else {
TootAnnouncement.merge(list.removeAt(index), item)
}
)
}
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
}
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
}
override fun onAnnouncementDelete(id: EntityId) {
val it = announcements?.iterator() ?: return
while (it.hasNext()) {
val item = it.next()
if (item.id == id) {
it.remove()
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
return
runOnMainLooper {
val it = announcements?.iterator() ?: return@runOnMainLooper
while (it.hasNext()) {
val item = it.next()
if (item.id == id) {
it.remove()
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
return@runOnMainLooper
}
}
}
}
override fun onAnnouncementReaction(reaction: TootAnnouncement.Reaction) {
// find announcement
val announcement_id = reaction.announcement_id ?: return
val announcement = announcements?.find { it.id == announcement_id } ?: return
runOnMainLooper {
// find announcement
val announcement_id = reaction.announcement_id ?: return@runOnMainLooper
val announcement = announcements?.find { it.id == announcement_id } ?: return@runOnMainLooper
// find reaction
val index = announcement.reactions?.indexOfFirst { it.name == reaction.name }
when {
reaction.count <= 0L -> {
if (index != null && index != -1) announcement.reactions?.removeAt(index)
}
// find reaction
val index = announcement.reactions?.indexOfFirst { it.name == reaction.name }
when {
reaction.count <= 0L -> {
if (index != null && index != -1) announcement.reactions?.removeAt(index)
}
index == null -> {
announcement.reactions = ArrayList<TootAnnouncement.Reaction>().apply {
add(reaction)
index == null -> {
announcement.reactions = ArrayList<TootAnnouncement.Reaction>().apply {
add(reaction)
}
}
index == -1 -> announcement.reactions?.add(reaction)
else -> announcement.reactions?.get(index)?.let { old ->
old.count = reaction.count
// ストリーミングイベントにはmeが含まれないので、oldにあるmeは変更されない
}
}
index == -1 -> announcement.reactions?.add(reaction)
else -> announcement.reactions?.get(index)?.let { old ->
old.count = reaction.count
// ストリーミングイベントにはmeが含まれないので、oldにあるmeは変更されない
}
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
}
announcementUpdated = SystemClock.elapsedRealtime()
fireShowColumnHeader()
}
}
@ -2603,13 +2594,13 @@ class Column(
fun canStartStreaming() = when {
// 未初期化なら何もしない
!bFirstInitialized -> {
log.d("resumeStreaming: column is not initialized.")
log.v("canStartStreaming: column is not initialized.")
false
}
// 初期ロード中なら何もしない
bInitialLoading -> {
log.d("resumeStreaming: is in initial loading.")
log.v("canStartStreaming: is in initial loading.")
false
}
@ -2619,12 +2610,11 @@ class Column(
private fun resumeColumn(bPutGap: Boolean) {
// カラム種別によってはストリーミングAPIを利用できない
val stream_path = streamPath ?: return
streamSpec ?: return
if (!canStartStreaming()) return
this.bPutGap = bPutGap
this.lastStreamPath = stream_path
// TODO キューのクリアって必要? stream_data_queue.clear()
@ -2820,7 +2810,7 @@ class Column(
if (!isMisskey) return
val streamConnection = app_state.streamManager.getConnection(this)
?:return
?: return
val max = 40
val list = ArrayList<EntityId>(max * 2) // リブログなどで膨れる場合がある
@ -3054,9 +3044,6 @@ class Column(
}
// fun findListIndexByTimelineId(orderId : EntityId) : Int? {
// list_data.forEachIndexed { i, v ->
// if(v.getOrderId() == orderId) return i

View File

@ -13,13 +13,11 @@ import jp.juggler.subwaytooter.search.NotestockHelper.loadingNotestock
import jp.juggler.subwaytooter.search.NotestockHelper.refreshNotestock
import jp.juggler.subwaytooter.search.TootsearchHelper.loadingTootsearch
import jp.juggler.subwaytooter.search.TootsearchHelper.refreshTootsearch
import jp.juggler.util.LogCategory
import jp.juggler.util.ellipsizeDot3
import jp.juggler.util.notEmpty
import jp.juggler.util.toJsonArray
import jp.juggler.util.*
import java.util.*
import kotlin.math.max
import kotlin.math.min
import jp.juggler.subwaytooter.streaming.*
/*
カラム種別ごとの処理
@ -80,6 +78,8 @@ enum class ColumnType(
val bAllowMastodon: Boolean = true,
val headerType: HeaderType? = null,
val gapDirection: Column.(head: Boolean) -> Boolean = gapDirectionNone,
val streamKeyMastodon: Column.()->JsonObject? = {null},
val streamFilterMastodon: Column.(String?,TimelineItem)->Boolean = {_,_->true},
) {
ProfileStatusMastodon(
@ -409,7 +409,14 @@ enum class ColumnType(
)
},
gapDirection = gapDirectionBoth,
bAllowPseudo = false
bAllowPseudo = false,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to "user")
},
streamFilterMastodon = { stream,item->
item is TootStatus && (stream == null || stream == "user")
}
),
LOCAL(
@ -428,6 +435,21 @@ enum class ColumnType(
)
},
gapDirection = gapDirectionBoth,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to
"public:local"
.appendIf(":media", with_attachment)
)
},
streamFilterMastodon = { stream,item->
when{
item !is TootStatus -> false
(stream != null && !stream.startsWith("public:local")) -> false
with_attachment && item.media_attachments.isNullOrEmpty() -> false
else->true
}
}
),
FEDERATE(
@ -447,6 +469,25 @@ enum class ColumnType(
)
},
gapDirection = gapDirectionBoth,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to
"public"
.appendIf(":remote", remote_only)
.appendIf(":media", with_attachment)
)
},
streamFilterMastodon = { stream,item->
when{
item !is TootStatus -> false
(stream != null && !stream.startsWith("public")) -> false
(stream !=null && stream.contains(":local")) ->false
remote_only && item.account.acct == access_info.acct -> false
with_attachment && item.media_attachments.isNullOrEmpty() -> false
else->true
}
}
),
MISSKEY_HYBRID(
@ -489,6 +530,24 @@ enum class ColumnType(
)
},
gapDirection = gapDirectionBoth,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to
"public:domain"
.appendIf(":media", with_attachment),
"domain" to instance_uri
)
},
streamFilterMastodon = { stream,item->
when{
item !is TootStatus -> false
(stream != null && !stream.startsWith("public:domain")) -> false
(stream != null && !stream.endsWith(instance_uri)) -> false
with_attachment && item.media_attachments.isNullOrEmpty() -> false
else->true
}
}
),
LOCAL_AROUND(29,
@ -654,7 +713,18 @@ enum class ColumnType(
gap = { client -> getNotificationList(client, mastodonFilterByIdRange = true) },
gapDirection = gapDirectionBoth,
bAllowPseudo = false
bAllowPseudo = false,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to "user")
},
streamFilterMastodon = { stream, item ->
when {
item !is TootNotification -> false
(stream != null && stream != "user") -> false
else->true
}
}
),
NOTIFICATION_FROM_ACCT(
@ -741,6 +811,23 @@ enum class ColumnType(
}
},
gapDirection = gapDirectionBoth,
streamKeyMastodon = {
jsonObject(
StreamSpec.STREAM to "hashtag".appendIf(":local", instance_local),
"tag" to hashtag
)
},
streamFilterMastodon = { stream,item->
when{
item !is TootStatus -> false
(stream != null && !stream.startsWith("hashtag")) -> false
instance_local && (stream !=null && !stream.contains(":local")) ->false
else-> this.checkHashtagExtra(item)
}
}
),
HASHTAG_FROM_ACCT(
@ -1196,6 +1283,18 @@ enum class ColumnType(
}
},
gapDirection = gapDirectionBoth,
streamKeyMastodon = {
jsonObject(jp.juggler.subwaytooter.streaming.StreamSpec.STREAM to "list", "list" to profile_id.toString())
},
streamFilterMastodon = { stream,item->
when{
item !is TootStatus -> false
(stream != null && stream != "list:${profile_id}") -> false
else-> true
}
}
),
LIST_MEMBER(21,
@ -1292,7 +1391,19 @@ enum class ColumnType(
gapDirection = gapDirectionMastodonWorkaround,
bAllowPseudo = false,
bAllowMisskey = false
bAllowMisskey = false,
streamKeyMastodon = {
jsonObject(StreamSpec.STREAM to "direct")
},
streamFilterMastodon = { stream,item->
when{
(stream != null && stream != "direct") -> false
else-> true
}
}
),
TREND_TAG(24,

View File

@ -1,5 +1,7 @@
package jp.juggler.subwaytooter
import android.graphics.Bitmap
import android.util.LruCache
import jp.juggler.subwaytooter.Column.Companion.READ_LIMIT
import jp.juggler.subwaytooter.Column.Companion.log
import jp.juggler.subwaytooter.api.TootApiClient
@ -10,10 +12,11 @@ import jp.juggler.subwaytooter.api.syncAccountByAcct
import jp.juggler.util.*
import java.util.*
internal inline fun <reified T : TimelineItem> addAll(
dstArg: ArrayList<TimelineItem>?,
src: List<T>,
head: Boolean = false
dstArg: ArrayList<TimelineItem>?,
src: List<T>,
head: Boolean = false
): ArrayList<TimelineItem> =
(dstArg ?: ArrayList(src.size)).apply {
if (head) {
@ -24,9 +27,9 @@ internal inline fun <reified T : TimelineItem> addAll(
}
internal fun addOne(
dstArg: ArrayList<TimelineItem>?,
item: TimelineItem?,
head: Boolean = false
dstArg: ArrayList<TimelineItem>?,
item: TimelineItem?,
head: Boolean = false
): ArrayList<TimelineItem> =
(dstArg ?: ArrayList()).apply {
if (item != null) {
@ -39,9 +42,9 @@ internal fun addOne(
}
internal fun ColumnTask.addWithFilterStatus(
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootStatus>,
head: Boolean = false
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootStatus>,
head: Boolean = false
): ArrayList<TimelineItem> =
(dstArg ?: ArrayList(srcArg.size)).apply {
val src = srcArg.filter { !column.isFiltered(it) }
@ -53,9 +56,9 @@ internal fun ColumnTask.addWithFilterStatus(
}
internal fun ColumnTask.addWithFilterConversationSummary(
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootConversationSummary>,
head: Boolean = false
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootConversationSummary>,
head: Boolean = false
): ArrayList<TimelineItem> =
(dstArg ?: ArrayList(srcArg.size)).apply {
val src = srcArg.filter { !column.isFiltered(it.last_status) }
@ -68,9 +71,9 @@ internal fun ColumnTask.addWithFilterConversationSummary(
}
internal fun ColumnTask.addWithFilterNotification(
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootNotification>,
head: Boolean = false
dstArg: ArrayList<TimelineItem>?,
srcArg: List<TootNotification>,
head: Boolean = false
): ArrayList<TimelineItem> =
(dstArg ?: ArrayList(srcArg.size)).apply {
val src = srcArg.filter { !column.isFiltered(it) }
@ -117,11 +120,11 @@ internal suspend fun Column.loadListInfo(client: TootApiClient, bForceReload: Bo
if (bForceReload || this.list_info == null) {
val result = if (isMisskey) {
client.request(
"/api/users/lists/show",
makeMisskeyBaseParameter(parser).apply {
put("listId", profile_id)
}.toPostRequestBuilder()
)
"/api/users/lists/show",
makeMisskeyBaseParameter(parser).apply {
put("listId", profile_id)
}.toPostRequestBuilder()
)
} else {
client.request("/api/v1/lists/${profile_id.toString()}")
}
@ -143,11 +146,11 @@ internal suspend fun Column.loadAntennaInfo(client: TootApiClient, bForceReload:
val result = if (isMisskey) {
client.request(
"/api/antennas/show",
makeMisskeyBaseParameter(parser).apply {
put("antennaId", profile_id)
}.toPostRequestBuilder()
)
"/api/antennas/show",
makeMisskeyBaseParameter(parser).apply {
put("antennaId", profile_id)
}.toPostRequestBuilder()
)
} else {
TootApiResult("antenna feature is not supported on Mastodon")
}
@ -185,56 +188,56 @@ internal fun JsonObject.addRangeMisskey(column: Column, bBottom: Boolean): JsonO
internal fun JsonObject.addMisskeyNotificationFilter(column: Column): JsonObject {
when (column.quick_filter) {
Column.QUICK_FILTER_ALL -> {
val excludeList = jsonArray {
// Misskeyのお気に入りは通知されない
// if(dont_show_favourite) ...
Column.QUICK_FILTER_ALL -> {
val excludeList = jsonArray {
// Misskeyのお気に入りは通知されない
// if(dont_show_favourite) ...
if (column.dont_show_boost) {
add("renote")
add("quote")
}
if (column.dont_show_follow) {
add("follow")
add("receiveFollowRequest")
}
if (column.dont_show_reply) {
add("mention")
add("reply")
}
if (column.dont_show_reaction) {
add("reaction")
}
if (column.dont_show_vote) {
add("poll_vote")
}
if (column.dont_show_boost) {
add("renote")
add("quote")
}
if (column.dont_show_follow) {
add("follow")
add("receiveFollowRequest")
}
if (column.dont_show_reply) {
add("mention")
add("reply")
}
if (column.dont_show_reaction) {
add("reaction")
}
if (column.dont_show_vote) {
add("poll_vote")
}
// // FIXME Misskeyには特定フォロー者からの投稿を通知する機能があるのか
// if(column.dont_show_normal_toot) {
// }
}
}
if (excludeList.isNotEmpty()) put("excludeTypes", excludeList)
}
if (excludeList.isNotEmpty()) put("excludeTypes", excludeList)
}
// QUICK_FILTER_FAVOURITE // misskeyはお気に入りの通知はない
Column.QUICK_FILTER_BOOST -> put(
"includeTypes",
jsonArray("renote", "quote")
)
Column.QUICK_FILTER_FOLLOW -> put(
"includeTypes",
jsonArray("follow", "receiveFollowRequest")
)
Column.QUICK_FILTER_MENTION -> put(
"includeTypes",
jsonArray("mention", "reply")
)
Column.QUICK_FILTER_REACTION -> put("includeTypes", jp.juggler.util.jsonArray("reaction"))
Column.QUICK_FILTER_VOTE -> put("includeTypes", jp.juggler.util.jsonArray("poll_vote"))
Column.QUICK_FILTER_BOOST -> put(
"includeTypes",
jsonArray("renote", "quote")
)
Column.QUICK_FILTER_FOLLOW -> put(
"includeTypes",
jsonArray("follow", "receiveFollowRequest")
)
Column.QUICK_FILTER_MENTION -> put(
"includeTypes",
jsonArray("mention", "reply")
)
Column.QUICK_FILTER_REACTION -> put("includeTypes", jp.juggler.util.jsonArray("reaction"))
Column.QUICK_FILTER_VOTE -> put("includeTypes", jp.juggler.util.jsonArray("poll_vote"))
Column.QUICK_FILTER_POST -> {
// FIXME Misskeyには特定フォロー者からの投稿を通知する機能があるのか
}
Column.QUICK_FILTER_POST -> {
// FIXME Misskeyには特定フォロー者からの投稿を通知する機能があるのか
}
}
return this
@ -256,7 +259,7 @@ internal suspend fun Column.makeHashtagAcctUrl(client: TootApiClient): String? {
null
} else {
if (profile_id == null) {
val (result, whoRef) = client.syncAccountByAcct(access_info, hashtag_acct)
val (result, whoRef) = client.syncAccountByAcct(access_info, hashtag_acct)
result ?: return null // cancelled.
if (whoRef == null) {
log.w("makeHashtagAcctUrl: ${result.error ?: "?"}")
@ -272,7 +275,7 @@ internal suspend fun Column.makeHashtagAcctUrl(client: TootApiClient): String? {
if (with_attachment) sb.append("&only_media=true")
if (instance_local) sb.append("&local=true")
makeHashtagQueryParams(tagKey=null).encodeQuery().notEmpty()?.let{
makeHashtagQueryParams(tagKey = null).encodeQuery().notEmpty()?.let {
sb.append('&').append(it)
}
@ -354,8 +357,8 @@ internal fun Column.makeHomeTlUrl(): String {
}
internal suspend fun Column.makeNotificationUrl(
client: TootApiClient,
fromAcct: String? = null
client: TootApiClient,
fromAcct: String? = null
): String {
return when {
access_info.isMisskey -> "/api/i/notifications"
@ -363,14 +366,14 @@ internal suspend fun Column.makeNotificationUrl(
else -> {
val sb = StringBuilder(Column.PATH_NOTIFICATIONS) // always contain "?limit=XX"
when (val quick_filter = quick_filter) {
Column.QUICK_FILTER_ALL -> {
if (dont_show_favourite) sb.append("&exclude_types[]=favourite")
if (dont_show_boost) sb.append("&exclude_types[]=reblog")
if (dont_show_follow) sb.append("&exclude_types[]=follow")
if (dont_show_reply) sb.append("&exclude_types[]=mention")
if (dont_show_vote) sb.append("&exclude_types[]=poll")
if (dont_show_normal_toot) sb.append("&exclude_types[]=status")
}
Column.QUICK_FILTER_ALL -> {
if (dont_show_favourite) sb.append("&exclude_types[]=favourite")
if (dont_show_boost) sb.append("&exclude_types[]=reblog")
if (dont_show_follow) sb.append("&exclude_types[]=follow")
if (dont_show_reply) sb.append("&exclude_types[]=mention")
if (dont_show_vote) sb.append("&exclude_types[]=poll")
if (dont_show_normal_toot) sb.append("&exclude_types[]=status")
}
else -> {
if (quick_filter != Column.QUICK_FILTER_FAVOURITE) sb.append("&exclude_types[]=favourite")
@ -383,7 +386,7 @@ internal suspend fun Column.makeNotificationUrl(
if (fromAcct?.isNotEmpty() == true) {
if (profile_id == null) {
val (result, whoRef) = client.syncAccountByAcct(access_info, hashtag_acct)
val (result, whoRef) = client.syncAccountByAcct(access_info, hashtag_acct)
if (result != null) {
whoRef ?: error(result.error ?: "unknown error")
profile_id = whoRef.get().id
@ -416,27 +419,70 @@ internal fun Column.makeAntennaTlUrl(): String {
}
}
fun JsonObject.encodeQuery():String {
fun JsonObject.encodeQuery(): String {
val sb = StringBuilder()
entries.forEach { pair->
val(k,v)=pair
when(v){
null,is String,is Number,is Boolean ->{
if(sb.isNotEmpty())sb.append('&')
sb.append(k).append('=').append( v.toString().encodePercent())
entries.forEach { pair ->
val (k, v) = pair
when (v) {
null, is String, is Number, is Boolean -> {
if (sb.isNotEmpty()) sb.append('&')
sb.append(k).append('=').append(v.toString().encodePercent())
}
is JsonArray->{
is JsonArray -> {
v.forEach {
if(sb.isNotEmpty())sb.append('&')
sb.append(k).append("[]=").append( it.toString().encodePercent())
if (sb.isNotEmpty()) sb.append('&')
sb.append(k).append("[]=").append(it.toString().encodePercent())
}
}
else-> error("encodeQuery: unsupported type ${v.javaClass.name}")
else -> error("encodeQuery: unsupported type ${v.javaClass.name}")
}
}
return sb.toString()
}
private val extraTagCache by lazy {
object : LruCache<String, List<String>>(1024 * 80) {
override fun sizeOf(key: String, value: List<String>): Int =
key.length
}
}
// parse extra tags with LRU cache.
private fun String.parseExtraTag() = synchronized(extraTagCache) {
var result = extraTagCache.get(this)
if (result == null) {
result = this.split(" ").filter { it.isNotEmpty() }
extraTagCache.put(this, result)
}
result
}
internal fun Column.makeHashtagQueryParams(tagKey: String? = "tag") = JsonObject().apply {
if (tagKey != null) put(tagKey, hashtag)
hashtag_any.parseExtraTag().notEmpty()?.let { put("any", it) }
hashtag_all.parseExtraTag().notEmpty()?.let { put("all", it) }
hashtag_none.parseExtraTag().notEmpty()?.let { put("none", it) }
}
fun Column.checkHashtagExtra(item: TootStatus): Boolean {
hashtag_any.parseExtraTag().notEmpty()
?.any { item.tags?.any { tag -> tag.name.equals(it,ignoreCase = true) } ?: false }
?.let { if (!it) return false }
hashtag_all.parseExtraTag().notEmpty()
.notEmpty()
?.all { item.tags?.any { tag -> tag.name.equals(it,ignoreCase = true) } ?: false }
?.let { if (!it) return false }
hashtag_none.parseExtraTag().notEmpty()
?.any { item.tags?.any { tag -> tag.name.equals(it,ignoreCase = true) } ?: false }
?.not()
?.let { if (!it) return false }
return true
}
internal fun Column.makeHashtagUrl(): String {
return if (isMisskey) {
@ -450,7 +496,7 @@ internal fun Column.makeHashtagUrl(): String {
if (with_attachment) sb.append("&only_media=true")
if (instance_local) sb.append("&local=true")
makeHashtagQueryParams(tagKey=null).encodeQuery().notEmpty()?.let{
makeHashtagQueryParams(tagKey = null).encodeQuery().notEmpty()?.let {
sb.append('&').append(it)
}
@ -489,7 +535,7 @@ internal val defaultAccountListParser: (parser: TootParser, jsonArray: JsonArray
private fun misskeyUnwrapRelationAccount(parser: TootParser, srcList: JsonArray, key: String) =
srcList.objectList().mapNotNull {
when (val relationId = EntityId.mayNull(it.string("id"))) {
null -> null
null -> null
else -> TootAccountRef.mayNull(parser, parser.account(it.jsonObject(key)))
?.apply { _orderId = relationId }
}
@ -520,7 +566,7 @@ internal val misskeyCustomParserFavorites: (TootParser, JsonArray) -> List<TootS
{ parser, jsonArray ->
jsonArray.objectList().mapNotNull {
when (val relationId = EntityId.mayNull(it.string("id"))) {
null -> null
null -> null
else -> parser.status(it.jsonObject("note"))?.apply {
favourited = true
_orderId = relationId

View File

@ -4,9 +4,10 @@ import jp.juggler.subwaytooter.api.entity.EntityId
import jp.juggler.subwaytooter.api.entity.MisskeyNoteUpdate
import jp.juggler.subwaytooter.api.entity.TimelineItem
import jp.juggler.subwaytooter.api.entity.TootAnnouncement
import jp.juggler.util.JsonArray
interface StreamCallback {
fun onTimelineItem(item: TimelineItem, channelId: String?)
fun onTimelineItem(item: TimelineItem, channelId: String?,stream: JsonArray?)
fun onListeningStateChanged(status: StreamStatus)
fun onNoteUpdated(ev: MisskeyNoteUpdate, channelId: String?)
fun onAnnouncementUpdate(item: TootAnnouncement)

View File

@ -22,10 +22,10 @@ import java.util.regex.Pattern
class StreamConnection(
private val manager: StreamManager,
private val server: StreamGroupAcct,
val name: String,
val streamKey: String? = null // null if merged connection
) : WebSocketListener() {
private val acctGroup: StreamGroupAcct,
val spec: StreamSpec? = null, // null if merged connection
val name :String
) : WebSocketListener() ,TootApiCallback {
companion object{
private val log = LogCategory("StreamConnection")
@ -37,13 +37,11 @@ class StreamConnection(
private val isDisposed = AtomicBoolean(false)
val client = TootApiClient(manager.context, callback = object : TootApiCallback {
override val isApiCancelled: Boolean
get() = isDisposed.get()
override val isApiCancelled: Boolean
get() = isDisposed.get()
}).apply {
account = server.accessInfo
}
val client = TootApiClient(manager.context, callback = this)
.apply { account = acctGroup.account }
private val _status = AtomicReference(StreamStatus.Closed)
@ -58,15 +56,11 @@ class StreamConnection(
private var lastAliveSend = 0L
private val subscription = ConcurrentHashMap<String, StreamGroupKey>()
private val subscription = ConcurrentHashMap<StreamSpec, StreamGroupKey>()
// Misskeyの投稿キャプチャ
private val capturedId = HashSet<EntityId>()
// internal val callback_list = LinkedList<StreamCallback>()
// internal val parser : TootParser =
// TootParser(context, access_info, highlightTrie = highlight_trie, fromStream = true)
///////////////////////////////////////////////////////////////////
// methods
@ -76,80 +70,50 @@ class StreamConnection(
socket.set(null)
}
fun getStreamingStatus(streamKey: String?) = when {
streamKey == null -> null
fun getStreamingStatus(streamSpec: StreamSpec?) = when {
streamSpec == null -> null
status != StreamStatus.Open || null == socket.get() ->
StreamIndicatorState.REGISTERED
subscription[streamKey] == null ->
subscription[streamSpec] == null ->
StreamIndicatorState.REGISTERED
else -> StreamIndicatorState.LISTENING
}
private inline fun eachCallback(channelId: String? = null, block: (callback: StreamCallback) -> Unit) {
synchronized(this) {
if (isDisposed.get()) return@synchronized
if (streamKey == null) {
server.groups.values.forEach { group ->
if (channelId?.isNotEmpty() == true && channelId != group.channelId) {
// skip if channel id is provided and not match
} else {
group.values.forEach { spec ->
try {
block(spec.streamCallback)
} catch (ex: Throwable) {
log.trace(ex)
}
}
}
}
} else {
try {
val group = server.groups[streamKey]
if (group == null) {
// skip if missing group for streamKey
} else if (channelId?.isNotEmpty() == true && channelId != group.channelId) {
// skip if channel id is provided and not match
} else {
group.values.forEach { spec ->
try {
block(spec.streamCallback)
} catch (ex: Throwable) {
log.trace(ex)
}
}
}
} catch (ex: Throwable) {
log.trace(ex)
}
}
private fun eachCallback(
channelId: String? = null,
stream:JsonArray? = null,
item:TimelineItem?=null,
block: (callback: StreamCallback) -> Unit
) {
if (isDisposed.get()) return
if (spec == null) {
acctGroup.keyGroups.values.forEach { it.eachCallback(channelId,stream,item,block) }
} else {
acctGroup.keyGroups[spec]?.eachCallback(channelId,stream,item,block)
}
}
private fun fireTimelineItem(item: TimelineItem?, channelId: String? = null) {
item ?: return
eachCallback(channelId) { it.onTimelineItem(item, channelId) }
private fun fireTimelineItem(item: TimelineItem?, channelId: String? = null,stream:JsonArray?=null) {
item?:return
eachCallback(channelId,stream,item=item) { it.onTimelineItem(item, channelId,stream) }
}
// コールバックによってrunOnMainLooperの有無が異なるの直したい
private fun fireNoteUpdated(ev: MisskeyNoteUpdate, channelId: String? = null) {
runOnMainLooper { eachCallback(channelId) { it.onNoteUpdated(ev, channelId) } }
eachCallback(channelId) { it.onNoteUpdated(ev, channelId) }
}
private fun fireDeleteId(id: EntityId) {
val tl_host = server.accessInfo.apiHost
runOnMainLooper {
synchronized(this) {
if (isDisposed.get()) return@synchronized
if (Pref.bpDontRemoveDeletedToot(manager.appState.pref)) return@synchronized
manager.appState.columnList.forEach {
runCatching { it.onStatusRemoved(tl_host, id) }
.onFailure { log.trace(it) }
if (Pref.bpDontRemoveDeletedToot(manager.appState.pref)) return
val tl_host = acctGroup.account.apiHost
manager.appState.columnList.forEach {
runOnMainLooper {
try {
if(!it.is_dispose.get()) it.onStatusRemoved(tl_host, id)
}catch(ex:Throwable) {
log.trace(ex)
}
}
}
@ -193,7 +157,7 @@ class StreamConnection(
"note" -> {
val body = obj.jsonObject("body")
fireTimelineItem(server.parser.status(body), channelId)
fireTimelineItem(acctGroup.parser.status(body), channelId)
}
"noteUpdated" -> {
@ -211,8 +175,8 @@ class StreamConnection(
log.e("$name handleMisskeyMessage: notification body is null")
return
}
log.d("$name misskey notification: ${server.parser.apiHost} ${body}")
fireTimelineItem(server.parser.notification(body), channelId)
log.d("$name misskey notification: ${acctGroup.parser.apiHost} ${body}")
fireTimelineItem(acctGroup.parser.notification(body), channelId)
}
else -> log.v("$name ignore streaming event $type")
@ -222,15 +186,18 @@ class StreamConnection(
private fun handleMastodonMessage(obj: JsonObject, text: String) {
val stream = obj.jsonArray("stream")
if(stream!=null) log.w("stream=${stream}")
when (val event = obj.string("event")) {
null, "" ->
log.d("$name onMessage: missing event parameter")
"filters_changed" ->
Column.onFiltersChanged(manager.context, server.accessInfo)
Column.onFiltersChanged(manager.context, acctGroup.account)
else -> {
val payload = TootPayload.parsePayload(server.parser, event, obj, text)
val payload = TootPayload.parsePayload(acctGroup.parser, event, obj, text)
when (event) {
"delete" -> when (payload) {
@ -242,9 +209,7 @@ class StreamConnection(
// {"event":"announcement","payload":"{\"id\":\"3\",\"content\":\"<p>追加</p>\",\"starts_at\":null,\"ends_at\":null,\"all_day\":false,\"mentions\":[],\"tags\":[],\"emojis\":[],\"reactions\":[]}"}
"announcement" -> {
if (payload is TootAnnouncement) {
runOnMainLooper {
eachCallback { it.onAnnouncementUpdate(payload) }
}
eachCallback { it.onAnnouncementUpdate(payload) }
}
}
@ -252,23 +217,19 @@ class StreamConnection(
"announcement.delete" -> {
val id = EntityId.mayNull(payload?.toString())
if (id != null) {
runOnMainLooper {
eachCallback { it.onAnnouncementDelete(id) }
}
eachCallback { it.onAnnouncementDelete(id) }
}
}
// {"event":"announcement.reaction","payload":"{\"name\":\"hourglass_gif\",\"count\":1,\"url\":\"https://m2j.zzz.ac/...\",\"static_url\":\"https://m2j.zzz.ac/...\",\"announcement_id\":\"9\"}"}
"announcement.reaction" -> {
if (payload is TootAnnouncement.Reaction) {
runOnMainLooper {
eachCallback { it.onAnnouncementReaction(payload) }
}
eachCallback { it.onAnnouncementReaction(payload) }
}
}
else -> when (payload) {
is TimelineItem -> fireTimelineItem(payload)
is TimelineItem -> fireTimelineItem(payload,stream=stream)
else -> log.d("$name unsupported payload type. $payload")
}
}
@ -281,7 +242,7 @@ class StreamConnection(
override fun onOpen(webSocket: WebSocket, response: Response) {
manager.enqueue {
log.d("$name WebSocket onOpen.")
log.v("$name WebSocket onOpen.")
status = StreamStatus.Open
checkSubscription()
}
@ -289,11 +250,11 @@ class StreamConnection(
override fun onMessage(webSocket: WebSocket, text: String) {
manager.enqueue {
log.d("$name WebSocket onMessage.")
log.v("$name WebSocket onMessage.")
try {
val obj = text.decodeJsonObject()
when {
server.accessInfo.isMisskey -> handleMisskeyMessage(obj)
acctGroup.account.isMisskey -> handleMisskeyMessage(obj)
else -> handleMastodonMessage(obj, text)
}
} catch (ex: Throwable) {
@ -305,7 +266,7 @@ class StreamConnection(
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
manager.enqueue {
log.d("$name WebSocket onClosing code=$code, reason=$reason")
log.v("$name WebSocket onClosing code=$code, reason=$reason")
webSocket.cancel()
status = StreamStatus.Closed
}
@ -313,7 +274,7 @@ class StreamConnection(
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
manager.enqueue {
log.d("$name WebSocket onClosed code=$code, reason=$reason")
log.v("$name WebSocket onClosed code=$code, reason=$reason")
status = StreamStatus.Closed
}
}
@ -336,9 +297,11 @@ class StreamConnection(
}
}
///////////////////////////////////////////////////////////////////////////////
private fun postMisskeyAlive() {
if (isDisposed.get()) return
if (server.accessInfo.isMisskey) {
if (acctGroup.account.isMisskey) {
val now = SystemClock.elapsedRealtime()
if (now - lastAliveSend >= misskeyAliveInterval) {
try {
@ -351,15 +314,15 @@ class StreamConnection(
}
private fun unsubscribe(group: StreamGroupKey) {
subscription.remove(group.streamKey)
subscription.remove(group.spec)
try {
val jsonObject = if (server.accessInfo.isMastodon) {
val jsonObject = if (acctGroup.account.isMastodon) {
/*
Mastodonの場合
{ "stream": "hashtag:local", "tag": "foo" }
等に後から "type": "unsubscribe" を足す
*/
group.streamKey.decodeJsonObject().apply {
group.paramsClone().apply {
this["type"] = "unsubscribe"
}
} else {
@ -381,14 +344,15 @@ class StreamConnection(
private fun subscribe(group: StreamGroupKey) {
try {
var jsonObject = group.streamKey.decodeJsonObject()
if (server.accessInfo.isMastodon) {
val jsonObject = if (acctGroup.account.isMastodon) {
/*
マストドンの場合
{ "stream": "hashtag:local", "tag": "foo" }
等に後から "type": "subscribe" を足す
*/
jsonObject["type"] = "subscribe"
group.paramsClone().apply {
put("type","subscribe")
}
} else {
/*
Misskeyの場合
@ -396,15 +360,16 @@ class StreamConnection(
後から body.put("id", "xxx")して
さらに外側を {"type": "connect", "body": body} でラップする
*/
jsonObject["id"] = group.channelId
jsonObject = jsonObject("type" to "connect", "body" to jsonObject)
jsonObject(
"type" to "connect",
"body" to group.paramsClone().also{ it["id"]= group.channelId }
)
}
socket.get()?.send(jsonObject.toString())
} catch (ex: Throwable) {
log.e(ex, "send failed.")
} finally {
subscription[group.streamKey] = group
subscription[group.spec] = group
}
}
@ -414,14 +379,14 @@ class StreamConnection(
private fun checkSubscription() {
postMisskeyAlive()
if (streamKey != null) {
val group = server.groups[streamKey]
if (group != null) subscribeIfChanged(group, subscription[streamKey])
if (spec != null) {
val group = acctGroup.keyGroups[spec]
if (group != null) subscribeIfChanged(group, subscription[spec])
} else {
val existsIds = HashSet<String>()
val existsIds = HashSet<StreamSpec>()
// 購読するべきものを購読する
server.groups.entries.forEach {
acctGroup.keyGroups.entries.forEach {
existsIds.add(it.key)
subscribeIfChanged(it.value, subscription[it.key])
}
@ -433,20 +398,6 @@ class StreamConnection(
}
}
private fun StreamSpec.canStartStreaming(): Boolean {
val column = refColumn.get()
return when {
column == null -> {
log.w("$name updateConnection: missing column.")
false
}
!column.canStartStreaming() -> {
log.w("$name updateConnection: canStartStreaming returns false.")
false
}
else -> true
}
}
internal suspend fun updateConnection() {
if (isDisposed.get()) {
@ -454,16 +405,13 @@ class StreamConnection(
return
}
val group = server.groups[streamKey]
val group = spec?.let{ acctGroup.keyGroups[it] }
if (group != null) {
if (!group.values.any { it.canStartStreaming() }) {
// 準備できたカラムがまったくないなら接続開始しない
log.w("$name updateConnection: column is not prepared.")
return
}
// 準備できたカラムがまったくないなら接続開始しない
if (!group.destinations.values.any { it.canStartStreaming() }) return
} else {
// merged connection ではないのにgroupがなくなってしまったら再接続しない
if (streamKey != null) {
if (spec != null) {
log.w("$name updateConnection: missing group.")
return
}
@ -490,8 +438,8 @@ class StreamConnection(
status = StreamStatus.Connecting
val path = group?.spec?.streamPath ?: when {
server.accessInfo.isMisskey -> "/streaming"
val path = group?.spec?.path ?: when {
acctGroup.account.isMisskey -> "/streaming"
else -> "/api/v1/streaming/"
}

View File

@ -0,0 +1,33 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.Column
import jp.juggler.util.LogCategory
import java.lang.ref.WeakReference
class StreamDestination(
column: Column,
val spec: StreamSpec
) {
companion object{
private val log =LogCategory("StreamDestination")
}
val columnInternalId = column.internalId
val refColumn = WeakReference(column)
val refCallback = WeakReference(column.streamCallback)
fun canStartStreaming(): Boolean {
val column = refColumn.get()
return when {
column == null -> {
log.w("${spec.name} canStartStreaming: missing column.")
false
}
else -> column.canStartStreaming()
}
}
}
fun Column.getStreamDestination() =
streamSpec?.let { StreamDestination( spec = it, column = this ) }

View File

@ -1,5 +1,13 @@
package jp.juggler.subwaytooter.streaming
// ストリーミング接続の状態
enum class StreamStatus {
Closed,
Connecting,
Open,
}
// インジケータの状態
enum class StreamIndicatorState {
NONE,
REGISTERED, // registered, but not listening

View File

@ -9,29 +9,31 @@ import java.util.concurrent.ConcurrentHashMap
// ストリーミング接続をacct単位でグルーピングする
class StreamGroupAcct(
private val manager: StreamManager,
val accessInfo: SavedAccount,
val account: SavedAccount,
var ti: TootInstance
) {
val parser: TootParser = TootParser(manager.appState.context, linkHelper = accessInfo)
val parser: TootParser = TootParser(manager.appState.context, linkHelper = account)
// map key is Column.internalId
val groups = ConcurrentHashMap<String, StreamGroupKey>()
val keyGroups = ConcurrentHashMap<StreamSpec, StreamGroupKey>()
private var specs = ConcurrentHashMap<Int, StreamSpec>()
//val specs = ConcurrentHashMap<Int, StreamSpec>()
private val connections = ConcurrentHashMap<String, StreamConnection>()
// 接続を束ねない場合に使われる
private val connections = ConcurrentHashMap<StreamSpec, StreamConnection>()
// 接続を束ねる場合に使われる
private var mergedConnection: StreamConnection? = null
fun addSpec(spec: StreamSpec) {
val key = spec.streamKey
var group = groups[key]
// カラムIDから出力先へのマップ
@Volatile
private var destinations = ConcurrentHashMap<Int, StreamDestination>()
fun addSpec(dst: StreamDestination) {
val spec = dst.spec
var group = keyGroups[spec]
if (group == null) {
group = StreamGroupKey(streamKey = key)
groups[key] = group
group = StreamGroupKey(spec)
keyGroups[spec] = group
}
group[spec.columnInternalId] = spec
group.destinations[ dst.columnInternalId] = dst
}
fun merge(newServer: StreamGroupAcct) {
@ -39,26 +41,26 @@ class StreamGroupAcct(
// 新スペックの値をコピー
this.ti = newServer.ti
newServer.groups.entries.forEach {
groups[it.key] = it.value
newServer.keyGroups.entries.forEach {
keyGroups[it.key] = it.value
}
// 新グループにないグループを削除
groups.entries.toList().forEach {
if (!newServer.groups.containsKey(it.key)) groups.remove(it.key)
keyGroups.entries.toList().forEach {
if (!newServer.keyGroups.containsKey(it.key)) keyGroups.remove(it.key)
}
// グループにない接続を破棄
connections.entries.toList().forEach {
if (!groups.containsKey(it.key)) {
if (!keyGroups.containsKey(it.key)) {
it.value.dispose()
connections.remove(it.key)
}
}
this.specs = ConcurrentHashMap<Int, StreamSpec>().apply {
groups.values.forEach { group ->
group.values.forEach { spec -> put(spec.columnInternalId, spec) }
this.destinations = ConcurrentHashMap<Int, StreamDestination>().apply {
keyGroups.values.forEach { group ->
group.destinations.values.forEach { spec -> put(spec.columnInternalId, spec) }
}
}
}
@ -71,31 +73,31 @@ class StreamGroupAcct(
mergedConnection?.dispose()
mergedConnection = null
groups.clear()
keyGroups.clear()
}
private fun findConnection(streamKey: String?) =
mergedConnection ?: connections.values.firstOrNull { it.streamKey == streamKey }
private fun findConnection(streamSpec: StreamSpec?) =
mergedConnection ?: connections.values.firstOrNull { it.spec == streamSpec }
// ストリーミング接続インジケータ
fun getStreamingStatus(columnInternalId: Int): StreamIndicatorState? {
val streamKey = specs[columnInternalId]?.streamKey
return findConnection(streamKey)?.getStreamingStatus(streamKey)
val spec = destinations[columnInternalId]?.spec
return findConnection(spec)?.getStreamingStatus(spec)
}
suspend fun updateConnection() {
val multiplex = if (accessInfo.isMastodon) {
val multiplex = if (account.isMastodon) {
ti.versionGE(TootInstance.VERSION_3_3_0_rc1)
} else {
accessInfo.misskeyVersion >= 11
account.misskeyVersion >= 11
}
if (multiplex) {
connections.values.forEach { it.dispose() }
connections.clear()
if(specs.isEmpty()){
if(destinations.isEmpty()){
mergedConnection?.dispose()
mergedConnection = null
}else{
@ -103,8 +105,8 @@ class StreamGroupAcct(
mergedConnection = StreamConnection(
manager,
this,
"[${accessInfo.acct.pretty}:multiplex",
streamKey = null
spec = null,
name="[${account.acct.pretty}:multiplex]",
)
}
mergedConnection?.updateConnection()
@ -114,15 +116,15 @@ class StreamGroupAcct(
mergedConnection?.dispose()
mergedConnection = null
groups.entries.forEach { pair ->
keyGroups.entries.forEach { pair ->
val(streamKey,group)=pair
var conn = connections[streamKey]
if (conn == null) {
conn = StreamConnection(
manager,
this,
"[${accessInfo.acct.pretty}:${group.streamKey}]",
streamKey = group.streamKey
spec = group.spec,
"[${account.acct.pretty}:${group.spec.name}]",
)
connections[streamKey] = conn
}
@ -132,6 +134,6 @@ class StreamGroupAcct(
}
fun getConnection(internalId: Int)=
mergedConnection ?: connections[ specs[internalId]?.streamKey]
mergedConnection ?: destinations[internalId]?.spec?.let{ connections[it]}
}

View File

@ -1,20 +1,48 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.util.digestSHA256Base64Url
import jp.juggler.subwaytooter.api.entity.TimelineItem
import jp.juggler.util.JsonArray
import jp.juggler.util.LogCategory
import jp.juggler.util.decodeJsonObject
import java.util.concurrent.ConcurrentHashMap
// 同じ種類のストリーミングを複数のカラムで受信する場合がある
// subscribe/unsubscribe はまとめて行いたい
class StreamGroupKey(val streamKey: String) : ConcurrentHashMap<Int, StreamSpec>() {
class StreamGroupKey(val spec: StreamSpec) {
companion object {
private val log = LogCategory("StreamGroupKey")
}
val channelId = streamKey.digestSHA256Base64Url()
val keyString = spec.keyString
val channelId = spec.channelId
override fun hashCode(): Int = streamKey.hashCode()
val destinations = ConcurrentHashMap<Int, StreamDestination>()
override fun hashCode(): Int = keyString.hashCode()
override fun equals(other: Any?): Boolean {
if (other is StreamGroupKey) return streamKey == other.streamKey
if (other is StreamGroupKey) return keyString == keyString
return false
}
val spec: StreamSpec
get() = this.values.first()
fun paramsClone() =
spec.params.toString().decodeJsonObject()
fun eachCallback(channelId: String?, stream: JsonArray?, item: TimelineItem?, block: (callback: StreamCallback) -> Unit) {
// skip if channel id is provided and not match
if (channelId?.isNotEmpty() == true && channelId != this.channelId) return
val strStream = stream?.joinToString { "," }
destinations.values.forEach { dst ->
try {
if (strStream != null && item != null) {
val column = dst.refColumn.get() ?: return@forEach
if (!dst.spec.streamFilter(column, strStream, item)) return@forEach
}
dst.refCallback.get()?.let { block(it) }
} catch (ex: Throwable) {
log.trace(ex)
}
}
}
}

View File

@ -14,10 +14,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.Pattern
import kotlin.collections.HashMap
import kotlin.collections.set
@ -37,7 +35,7 @@ class StreamManager(val appState: AppState) {
private val handler = appState.handler
private val isScreenOn = AtomicBoolean(false)
private val serverMap = ConcurrentHashMap<Acct, StreamGroupAcct>()
private val acctGroups = ConcurrentHashMap<Acct, StreamGroupAcct>()
val client = TootApiClient(
appState.context,
@ -63,29 +61,29 @@ class StreamManager(val appState: AppState) {
var (ti, ri) = TootInstance.get(client, account = accessInfo)
if (ti == null) {
log.d("can't get server info. ${ri?.error}")
val tiOld = serverMap[accessInfo.acct]?.ti ?: continue
val tiOld = acctGroups[accessInfo.acct]?.ti ?: continue
ti = tiOld
}
server = StreamGroupAcct(this, accessInfo, ti)
newMap[accessInfo.acct] = server
}
val streamSpec = column.getStreamSpec()
val streamSpec = column.getStreamDestination()
if (streamSpec != null) server.addSpec(streamSpec)
}
}
// 新構成にないサーバは破棄する
serverMap.entries.toList().forEach {
acctGroups.entries.toList().forEach {
if (!newMap.containsKey(it.key)) {
it.value.dispose()
serverMap.remove(it.key)
acctGroups.remove(it.key)
}
}
// 追加.変更されたサーバをマージする
newMap.entries.forEach {
when (val current = serverMap[it.key]) {
null -> serverMap[it.key] = it.value
when (val current = acctGroups[it.key]) {
null -> acctGroups[it.key] = it.value
else -> current.merge(it.value)
}
}
@ -93,7 +91,7 @@ class StreamManager(val appState: AppState) {
// ハイライトツリーを読み直す
val highlight_trie = HighlightWord.nameSet
serverMap.values.forEach {
acctGroups.values.forEach {
// パーサーを更新する
it.parser.highlightTrie = highlight_trie
@ -135,10 +133,10 @@ class StreamManager(val appState: AppState) {
// カラムヘッダの表示更新から、インジケータを取得するために呼ばれる
// UIスレッドから呼ばれる
fun getStreamingStatus(accessInfo: SavedAccount, columnInternalId: Int) =
serverMap[accessInfo.acct]?.getStreamingStatus(columnInternalId)
acctGroups[accessInfo.acct]?.getStreamingStatus(columnInternalId)
fun getConnection(column: Column)=
serverMap[column.access_info.acct]?.getConnection(column.internalId)
acctGroups[column.access_info.acct]?.getConnection(column.internalId)
////////////////////////////////////////////////////////////////
@ -150,7 +148,7 @@ class StreamManager(val appState: AppState) {
} catch (_: ClosedReceiveChannelException) {
// 発生しない
} catch (ex: Throwable) {
log.e(ex, "lambda failed.")
log.trace(ex, "error.")
}
}
}

View File

@ -1,17 +1,210 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.Column
import jp.juggler.util.JsonObject
import java.lang.ref.WeakReference
import jp.juggler.subwaytooter.ColumnType
import jp.juggler.subwaytooter.api.entity.TimelineItem
import jp.juggler.subwaytooter.encodeQuery
import jp.juggler.subwaytooter.makeHashtagQueryParams
import jp.juggler.subwaytooter.streaming.StreamSpec.Companion.CHANNEL
import jp.juggler.subwaytooter.streaming.StreamSpec.Companion.PARAMS
import jp.juggler.subwaytooter.streaming.StreamSpec.Companion.STREAM
import jp.juggler.util.*
import java.io.StringWriter
private fun StringWriter.appendValue(v: Any?) {
when (v) {
is JsonArray -> {
append('[')
v.forEachIndexed { i, child ->
if (i > 0) append(',')
appendValue(child)
}
append(']')
}
is JsonObject -> {
append('{')
v.entries.sortedBy { it.key }.forEachIndexed { i, child ->
if (i > 0) append(',')
append(child.key)
append('=')
appendValue(child)
}
append('}')
}
else -> append(v.toString())
}
}
class StreamSpec(
column: Column,
val streamPath: String,
streamParam: JsonObject,
val streamCallback: StreamCallback
val params: JsonObject,
val path: String,
val name: String,
val streamFilter: Column.(String?,TimelineItem)->Boolean = { _, _ -> true }
) {
val streamKey = streamParam.toString(indentFactor = 0, sort = true)
companion object {
const val STREAM = "stream"
const val CHANNEL = "channel"
const val PARAMS = "params"
}
val columnInternalId = column.internalId
val refColumn = WeakReference(column)
}
val keyString = "$path?${params.toString(indentFactor = 0, sort = true)}"
val channelId = keyString.digestSHA256Base64Url()
override fun hashCode(): Int = keyString.hashCode()
override fun equals(other: Any?): Boolean {
if (other is StreamSpec) return keyString == other.keyString
return false
}
fun match(stream: JsonArray): Boolean {
return true
}
}
private fun Column.streamKeyMastodon(): StreamSpec? {
val root = type.streamKeyMastodon(this) ?: return null
val filter = type.streamFilterMastodon
val path = "/api/v1/streaming/?${root.encodeQuery()}"
val sw = StringWriter()
synchronized(sw.buffer) {
sw.append(root.string(STREAM)!!)
root.entries.sortedBy { it.key }.forEach { pair ->
val (k, v) = pair
if (k != STREAM && v !is JsonArray && v !is JsonObject) {
sw.append(',').append(k).append('=').appendValue(v)
}
}
root.entries.sortedBy { it.key }.forEach { pair ->
val (k, v) = pair
if (v is JsonArray || v is JsonObject) {
sw.append(',').append(k).append('=').appendValue(v)
}
}
}
return StreamSpec(root, path, sw.toString(),streamFilter=filter)
}
fun Column.streamKeyMisskey(): StreamSpec? {
// 使われ方は StreamConnection.subscribe を参照のこと
fun x(channel: String, params: JsonObject = JsonObject()) =
jsonObject(CHANNEL to channel, PARAMS to params)
val misskeyApiToken = access_info.misskeyApiToken
val root = when (misskeyApiToken) {
null -> when (type) {
ColumnType.LOCAL -> x("localTimeline")
else -> null
}
else -> when (type) {
ColumnType.HOME ->
x("homeTimeline")
ColumnType.LOCAL ->
x("localTimeline")
ColumnType.MISSKEY_HYBRID ->
x("hybridTimeline")
ColumnType.FEDERATE ->
x("globalTimeline")
ColumnType.NOTIFICATIONS ->
x("main")
ColumnType.MISSKEY_ANTENNA_TL ->
x("antenna", jsonObject { put("antennaId", profile_id.toString()) })
ColumnType.LIST_TL ->
x("userList", jsonObject { put("listId", profile_id.toString()) })
ColumnType.HASHTAG ->
x("hashtag", jsonObject { put("q", hashtag) })
else -> null
}
} ?: return null
val path = when {
// Misskey 11以降は統合されてる
misskeyVersion >= 11 -> "/streaming"
// Misskey 10 認証なし
// Misskey 8.25 からLTLだけ認証なしでも見れるようになった
access_info.isPseudo -> when (type) {
ColumnType.LOCAL -> "/local-timeline"
else -> null
}
// Misskey 10 認証あり
// Misskey 8.25 からLTLだけ認証なしでも見れるようになった
else -> when (type) {
ColumnType.HOME, ColumnType.NOTIFICATIONS -> "/"
ColumnType.LOCAL -> "/local-timeline"
ColumnType.MISSKEY_HYBRID -> "/hybrid-timeline"
ColumnType.FEDERATE -> "/global-timeline"
ColumnType.LIST_TL -> "/user-list?listId=${profile_id.toString()}"
// タグやアンテナには対応しない
else -> null
}
} ?: return null
val sw = StringWriter()
synchronized(sw.buffer) {
sw.append(root.string(CHANNEL)!!)
val params = root.jsonObject(PARAMS)!!
params.entries.sortedBy { it.key }.forEach { pair ->
val (k, v) = pair
if (v !is JsonArray && v !is JsonObject) {
sw.append(',').append(k).append('=').appendValue(v)
}
}
params.entries.sortedBy { it.key }.forEach { pair ->
val (k, v) = pair
if (v is JsonArray || v is JsonObject) {
sw.append(',').append(k).append('=').appendValue(v)
}
}
}
return StreamSpec(root, path, sw.toString())
}
// 公開ストリームなら真
val Column.isPublicStream: Boolean
get() {
return when (type) {
ColumnType.LOCAL,
ColumnType.FEDERATE,
ColumnType.HASHTAG,
ColumnType.LOCAL_AROUND,
ColumnType.FEDERATED_AROUND,
ColumnType.DOMAIN_TIMELINE -> true
else -> false
}
}
val Column.streamSpec: StreamSpec?
get() = when {
// 疑似アカウントではストリーミングAPIを利用できない
// 2.1 では公開ストリームのみ利用できるらしい
(access_info.isNA || access_info.isPseudo && !isPublicStream) -> null
access_info.isMastodon -> streamKeyMastodon()
access_info.isMisskey -> streamKeyMisskey()
else -> null
}
fun Column.canStreaming() = when {
access_info.isNA -> false
access_info.isPseudo -> isPublicStream && streamSpec != null
else -> streamSpec != null
}
fun Column.canAutoRefresh() = canStreaming()

View File

@ -1,7 +0,0 @@
package jp.juggler.subwaytooter.streaming
enum class StreamStatus {
Closed,
Connecting,
Open,
}

View File

@ -1,179 +1,11 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.Column
import jp.juggler.subwaytooter.ColumnType
import jp.juggler.subwaytooter.encodeQuery
import jp.juggler.util.JsonObject
import jp.juggler.util.jsonObject
val Column.isPublicStream: Boolean
get() {
return when (type) {
ColumnType.LOCAL,
ColumnType.FEDERATE,
ColumnType.HASHTAG,
ColumnType.LOCAL_AROUND,
ColumnType.FEDERATED_AROUND,
ColumnType.DOMAIN_TIMELINE -> true
else -> false
}
}
fun Column.getStreamingStatus() =
app_state.streamManager.getStreamingStatus(access_info, internalId)
?: StreamIndicatorState.NONE
fun Column.getStreamSpec(): StreamSpec? {
// 疑似アカウントではストリーミングAPIを利用できない
// 2.1 では公開ストリームのみ利用できるらしい
if (access_info.isNA || access_info.isPseudo && !isPublicStream) return null
val streamParam = when {
access_info.isMastodon -> streamArgMastodon()
access_info.isMisskey -> streamArgMisskey()
else ->null
}
return StreamSpec(
column = this,
streamPath = streamPath ?: return null,
streamParam= streamParam ?: return null,
streamCallback = streamCallback
)
}
// "/api/v1/streaming/?stream=user"
// "/api/v1/streaming/?stream=public:local"
fun Column.streamArgMastodon() = when (type) {
ColumnType.HOME, ColumnType.NOTIFICATIONS ->
jsonObject(Column.STREAM to "user")
ColumnType.LOCAL ->
jsonObject(Column.STREAM to "public:local")
ColumnType.FEDERATE ->
jsonObject(Column.STREAM to if(remote_only) "public:remote" else "public" )
ColumnType.LIST_TL ->
jsonObject(Column.STREAM to "list",
"list" to profile_id.toString() )
ColumnType.DOMAIN_TIMELINE ->
jsonObject(Column.STREAM to if (with_attachment) "public:domain:media" else "public:domain",
"domain" to instance_uri )
ColumnType.DIRECT_MESSAGES ->
jsonObject(Column.STREAM to "direct")
ColumnType.HASHTAG ->
makeHashtagQueryParams().apply{
put(Column.STREAM,if(instance_local) "hashtag:local" else "hashtag")
}
else -> null
}
fun Column.streamArgMisskey(): JsonObject?{
fun x( channel: String, params: JsonObject = JsonObject() ) = JsonObject().apply {
// put("type", "connect")
put("body", JsonObject().apply {
put("channel", channel)
put("id", internalId.toString())
put("params", params)
})
}
return if (access_info.misskeyVersion < 11) {
null
} else {
val misskeyApiToken = access_info.misskeyApiToken
if (misskeyApiToken == null) {
when (type) {
ColumnType.LOCAL -> x("localTimeline")
else -> null
}
} else {
when (type) {
ColumnType.HOME -> x("homeTimeline")
ColumnType.LOCAL -> x("localTimeline")
ColumnType.MISSKEY_HYBRID -> x("hybridTimeline")
ColumnType.FEDERATE -> x("globalTimeline")
ColumnType.NOTIFICATIONS -> x("main")
ColumnType.MISSKEY_ANTENNA_TL ->
x(
"antenna",
jsonObject { put("antennaId", profile_id.toString()) }
)
ColumnType.LIST_TL ->
x(
"userList",
jsonObject { put("listId", profile_id.toString()) }
)
ColumnType.HASHTAG ->
x(
"hashtag",
jsonObject { put("q", hashtag) }
)
else -> null
}
}
}
}
fun Column.canSpeech(): Boolean {
return canStreaming() && !isNotificationColumn
}
fun Column.canStreaming() = when {
access_info.isNA -> false
access_info.isMisskey -> streamPath != null
access_info.isPseudo -> isPublicStream
else -> streamPath != null
}
fun Column.canAutoRefresh() = streamPath != null
val Column.streamPath: String?
get() = if (isMisskey) {
val misskeyApiToken = access_info.misskeyApiToken
when {
// Misskey 11以降
access_info.misskeyVersion >= 11 -> when {
streamArgMisskey() == null -> null
else -> "/streaming"
}
// Misskey 10 認証なし
// Misskey 8.25 からLTLだけ認証なしでも見れるようになった
misskeyApiToken ==null -> when (type) {
ColumnType.LOCAL -> "/local-timeline"
else -> null
}
// Misskey 10 認証あり
else -> when (type) {
ColumnType.HOME, ColumnType.NOTIFICATIONS -> "/"
ColumnType.LOCAL -> "/local-timeline"
ColumnType.MISSKEY_HYBRID -> "/hybrid-timeline"
ColumnType.FEDERATE -> "/global-timeline"
ColumnType.LIST_TL -> "/user-list?listId=$profile_id"
ColumnType.MISSKEY_ANTENNA_TL -> "/antenna?listId=$profile_id"
else -> null
}
}
} else {
when (val params = streamArgMastodon()) {
null -> null
else -> "/api/v1/streaming/?${params.encodeQuery()}"
}
}
fun Column.canSpeech() =
canStreaming() && !isNotificationColumn

View File

@ -159,6 +159,8 @@ fun CharSequence?.mayUri() : Uri? = try {
val charsetUTF8 = Charsets.UTF_8
fun String.appendIf(text: String, flag: Boolean) = if (flag) "$this$text" else this
// 文字列とバイト列の変換
fun String.encodeUTF8() = this.toByteArray(charsetUTF8)