1
0
mirror of https://github.com/tateisu/SubwayTooter synced 2025-01-27 09:11:23 +01:00
This commit is contained in:
tateisu 2020-12-27 06:03:58 +09:00
parent cb505b3bde
commit ca025f3e53
8 changed files with 70 additions and 60 deletions

View File

@ -571,9 +571,13 @@ enum class ColumnType(
streamFilterMastodon = { stream, item ->
when {
item !is TootStatus -> false
(stream != null && !stream.startsWith("public:domain")) -> false
(stream != null && !stream.endsWith(instance_uri)) -> false
// streamがnullではないならドメインタイムラインのチェック
stream?.startsWith("public:domain")==false ||
stream?.endsWith(instance_uri)==false -> false
with_attachment && item.media_attachments.isNullOrEmpty() -> false
else -> true
}
}

View File

@ -470,7 +470,7 @@ fun JsonObject.encodeQuery(): String {
if (sb.isNotEmpty()) sb.append('&')
sb.append(k).append('=').append(v.toString().encodePercent())
}
is JsonArray -> {
is List<*> -> {
v.forEach {
if (sb.isNotEmpty()) sb.append('&')
sb.append(k).append("[]=").append(it.toString().encodePercent())
@ -503,9 +503,9 @@ internal fun Column.makeHashtagQueryParams(tagKey: String? = "tag") = JsonObject
if (tagKey != null) put(tagKey, hashtag)
hashtag_any.parseExtraTag().notEmpty()?.let { put("any", it) }
hashtag_all.parseExtraTag().notEmpty()?.let { put("all", it) }
hashtag_none.parseExtraTag().notEmpty()?.let { put("none", it) }
hashtag_any.parseExtraTag().notEmpty()?.let { put("any", it.toJsonArray()) }
hashtag_all.parseExtraTag().notEmpty()?.let { put("all", it.toJsonArray()) }
hashtag_none.parseExtraTag().notEmpty()?.let { put("none", it.toJsonArray()) }
}
fun Column.checkHashtagExtra(item: TootStatus): Boolean {

View File

@ -56,7 +56,7 @@ class StreamConnection(
private var lastAliveSend = 0L
private val subscription = ConcurrentHashMap<StreamSpec, StreamGroupKey>()
private val subscription = ConcurrentHashMap<StreamSpec, StreamGroup>()
// Misskeyの投稿キャプチャ
private val capturedId = HashSet<EntityId>()
@ -312,8 +312,8 @@ class StreamConnection(
}
}
private fun unsubscribe(group: StreamGroupKey) {
subscription.remove(group.spec)
private fun unsubscribe(spec:StreamSpec) {
subscription.remove(spec)
try {
val jsonObject = if (acctGroup.account.isMastodon) {
/*
@ -321,9 +321,7 @@ class StreamConnection(
{ "stream": "hashtag:local", "tag": "foo" }
等に後から "type": "unsubscribe" を足す
*/
group.paramsClone().apply {
this["type"] = "unsubscribe"
}
spec.paramsClone().apply { put("type","unsubscribe") }
} else {
/*
Misskeyの場合
@ -332,7 +330,7 @@ class StreamConnection(
jsonObject {
put("type", "disconnect")
put("body", jsonObjectOf("id" to group.channelId))
put("body", jsonObjectOf("id" to spec.channelId))
}
}
socket.get()?.send(jsonObject.toString())
@ -341,7 +339,8 @@ class StreamConnection(
}
}
private fun subscribe(group: StreamGroupKey) {
private fun subscribe(group: StreamGroup) {
val spec = group.spec
try {
val jsonObject = if (acctGroup.account.isMastodon) {
/*
@ -349,9 +348,7 @@ class StreamConnection(
{ "stream": "hashtag:local", "tag": "foo" }
等に後から "type": "subscribe" を足す
*/
group.paramsClone().apply {
put("type","subscribe")
}
spec.paramsClone().apply { put("type","subscribe") }
} else {
/*
Misskeyの場合
@ -361,18 +358,18 @@ class StreamConnection(
*/
jsonObjectOf(
"type" to "connect",
"body" to group.paramsClone().also{ it["id"]= group.channelId }
"body" to spec.paramsClone().apply{ put("id",spec.channelId) }
)
}
socket.get()?.send(jsonObject.toString())
} catch (ex: Throwable) {
log.e(ex, "send failed.")
} finally {
subscription[group.spec] = group
subscription[spec] = group
}
}
private fun subscribeIfChanged(newGroup: StreamGroupKey, oldGroup: StreamGroupKey?) {
private fun subscribeIfChanged(newGroup: StreamGroup, oldGroup: StreamGroup?) {
if (oldGroup == null) subscribe(newGroup)
}
@ -392,7 +389,7 @@ class StreamConnection(
// 購読するべきでないものを購読解除する
subscription.entries.toList().forEach {
if (!existsIds.contains(it.key)) unsubscribe(it.value)
if (!existsIds.contains(it.key)) unsubscribe(it.key)
}
}
}

View File

@ -3,33 +3,27 @@ package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.api.entity.TimelineItem
import jp.juggler.util.JsonArray
import jp.juggler.util.LogCategory
import jp.juggler.util.decodeJsonObject
import java.util.concurrent.ConcurrentHashMap
// 同じ種類のストリーミングを複数のカラムで受信する場合がある
// subscribe/unsubscribe はまとめて行いたい
class StreamGroupKey(val spec: StreamSpec) {
class StreamGroup(val spec: StreamSpec) {
companion object {
private val log = LogCategory("StreamGroupKey")
private val log = LogCategory("StreamGroup")
}
val keyString = spec.keyString
val channelId = spec.channelId
val destinations = ConcurrentHashMap<Int, StreamRelation>()
val destinations = ConcurrentHashMap<Int, StreamDestination>()
override fun hashCode(): Int = spec.keyString.hashCode()
override fun hashCode(): Int = keyString.hashCode()
override fun equals(other: Any?): Boolean {
if (other is StreamGroupKey) return keyString == keyString
if (other is StreamGroup) return spec.keyString == other.spec.keyString
return false
}
fun paramsClone() =
spec.params.toString().decodeJsonObject()
fun eachCallback(channelId: String?, stream: JsonArray?, item: TimelineItem?, block: (callback: StreamCallback) -> Unit) {
// skip if channel id is provided and not match
if (channelId?.isNotEmpty() == true && channelId != this.channelId) return
if (channelId?.isNotEmpty() == true && channelId != spec.channelId) return
val strStream = stream?.joinToString { "," }

View File

@ -14,7 +14,7 @@ class StreamGroupAcct(
) {
val parser: TootParser = TootParser(manager.appState.context, linkHelper = account)
val keyGroups = ConcurrentHashMap<StreamSpec, StreamGroupKey>()
val keyGroups = ConcurrentHashMap<StreamSpec, StreamGroup>()
// 接続を束ねない場合に使われる
private val connections = ConcurrentHashMap<StreamSpec, StreamConnection>()
@ -24,13 +24,13 @@ class StreamGroupAcct(
// カラムIDから出力先へのマップ
@Volatile
private var destinations = ConcurrentHashMap<Int, StreamDestination>()
private var destinations = ConcurrentHashMap<Int, StreamRelation>()
fun addSpec(dst: StreamDestination) {
fun addSpec(dst: StreamRelation) {
val spec = dst.spec
var group = keyGroups[spec]
if (group == null) {
group = StreamGroupKey(spec)
group = StreamGroup(spec)
keyGroups[spec] = group
}
group.destinations[ dst.columnInternalId] = dst
@ -58,7 +58,7 @@ class StreamGroupAcct(
}
}
this.destinations = ConcurrentHashMap<Int, StreamDestination>().apply {
this.destinations = ConcurrentHashMap<Int, StreamRelation>().apply {
keyGroups.values.forEach { group ->
group.destinations.values.forEach { spec -> put(spec.columnInternalId, spec) }
}

View File

@ -20,14 +20,13 @@ import kotlin.collections.HashMap
import kotlin.collections.set
class StreamManager(val appState: AppState) {
companion object{
companion object {
private val log = LogCategory("StreamManager")
// 画面ONの間は定期的に状況を更新する
const val updateInterval = 5000L
}
val context = appState.context
@ -49,26 +48,39 @@ class StreamManager(val appState: AppState) {
val isScreenOn = isScreenOn.get()
val newMap = HashMap<Acct, StreamGroupAcct>()
val errorAcct = HashSet<Acct>()
suspend fun prepareAcctGroup(accessInfo: SavedAccount): StreamGroupAcct? {
val acct = accessInfo.acct
if (errorAcct.contains(acct)) return null
var acctGroup = newMap[acct]
if (acctGroup == null) {
var (ti, ri) = TootInstance.get(client, account = accessInfo)
if (ti == null) {
log.d("can't get server info. ${ri?.error}")
val tiOld = acctGroups[acct]?.ti
if (tiOld == null) {
errorAcct.add(acct)
return null
}
ti = tiOld
}
acctGroup = StreamGroupAcct(this, accessInfo, ti)
newMap[acct] = acctGroup
}
return acctGroup
}
if (isScreenOn && !Pref.bpDontUseStreaming(appState.pref)) {
for (column in appState.columnList) {
if (column.is_dispose.get()) continue
if (column.dont_streaming) continue
val accessInfo = column.access_info
if (accessInfo.isNA) continue
var server = newMap[accessInfo.acct]
if (server == null) {
var (ti, ri) = TootInstance.get(client, account = accessInfo)
if (ti == null) {
log.d("can't get server info. ${ri?.error}")
val tiOld = acctGroups[accessInfo.acct]?.ti ?: continue
ti = tiOld
}
server = StreamGroupAcct(this, accessInfo, ti)
newMap[accessInfo.acct] = server
}
if (column.is_dispose.get() || column.dont_streaming || accessInfo.isNA) continue
val server = prepareAcctGroup(accessInfo) ?: continue
val streamSpec = column.getStreamDestination()
if (streamSpec != null) server.addSpec(streamSpec)
if (streamSpec != null)
server.addSpec(streamSpec)
}
}
@ -135,7 +147,7 @@ class StreamManager(val appState: AppState) {
fun getStreamingStatus(accessInfo: SavedAccount, columnInternalId: Int) =
acctGroups[accessInfo.acct]?.getStreamingStatus(columnInternalId)
fun getConnection(column: Column)=
fun getConnection(column: Column) =
acctGroups[column.access_info.acct]?.getConnection(column.internalId)
////////////////////////////////////////////////////////////////

View File

@ -4,7 +4,7 @@ import jp.juggler.subwaytooter.Column
import jp.juggler.util.LogCategory
import java.lang.ref.WeakReference
class StreamDestination(
class StreamRelation(
column: Column,
val spec: StreamSpec
) {
@ -29,4 +29,4 @@ class StreamDestination(
}
fun Column.getStreamDestination() =
streamSpec?.let { StreamDestination( spec = it, column = this ) }
streamSpec?.let { StreamRelation( spec = it, column = this ) }

View File

@ -55,6 +55,9 @@ class StreamSpec(
if (other is StreamSpec) return keyString == other.keyString
return false
}
fun paramsClone() =
params.toString().decodeJsonObject()
}
private fun encodeStreamNameMastodon(root: JsonObject) = StringWriter()