Use websocket as default streaming method for all timelines

This commit is contained in:
AkiraFukushima 2019-07-29 00:56:55 +09:00
parent c3690b1671
commit 3424fdd603
10 changed files with 28 additions and 258 deletions

View File

@ -39,7 +39,6 @@ const state = (): TimelineSpaceState => {
local: true,
public: true
},
useWebsocket: false,
pleroma: false
}
}
@ -175,7 +174,6 @@ describe('TimelineSpace', () => {
;(Mastodon.get as any).mockResolvedValue(mockedResponse)
await store.dispatch('TimelineSpace/detectPleroma')
expect(store.state.TimelineSpace.pleroma).toEqual(true)
expect(store.state.TimelineSpace.useWebsocket).toEqual(true)
})
})
describe('API is not pleroma', () => {
@ -210,7 +208,6 @@ describe('TimelineSpace', () => {
;(Mastodon.get as any).mockResolvedValue(mockedResponse)
await store.dispatch('TimelineSpace/detectPleroma')
expect(store.state.TimelineSpace.pleroma).toEqual(false)
expect(store.state.TimelineSpace.useWebsocket).toEqual(false)
})
})
})

View File

@ -17,7 +17,6 @@ describe('TimelineSpace', () => {
local: unreadSettings.Local.default,
public: unreadSettings.Public.default
},
useWebsocket: false,
pleroma: false
}
})

View File

@ -78,7 +78,6 @@
"show_replies": "Show replies",
"apply": "Apply"
},
"switch_streaming": "Use websocket for streaming. If the timeline does not update with streaming, please try it.",
"new_toot": "Toot",
"reload": "Reload",
"settings": "Settings"

View File

@ -28,7 +28,7 @@ import sanitizeHtml from 'sanitize-html'
import pkg from '~/package.json'
import Authentication from './auth'
import Account from './account'
import StreamingManager from './streamingManager'
import WebSocket from './websocket'
import Preferences from './preferences'
import Fonts from './fonts'
import Hashtags from './hashtags'
@ -438,7 +438,7 @@ ipcMain.on('reset-badge', () => {
})
// user streaming
let userStreamings: { [key: string]: StreamingManager | null } = {}
let userStreamings: { [key: string]: WebSocket | null } = {}
ipcMain.on('start-all-user-streamings', (event: Event, accounts: Array<LocalAccount>) => {
accounts.map(account => {
@ -451,8 +451,8 @@ ipcMain.on('start-all-user-streamings', (event: Event, accounts: Array<LocalAcco
userStreamings[id]!.stop()
userStreamings[id] = null
}
userStreamings[id] = new StreamingManager(acct, true)
userStreamings[id]!.startUser(
userStreamings[id] = new WebSocket(acct)
userStreamings[id]!.startUserStreaming(
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send(`update-start-all-user-streamings-${id}`, update)
@ -541,13 +541,12 @@ const stopUserStreaming = (id: string) => {
type StreamingSetting = {
account: LocalAccount
useWebsocket: boolean
}
let directMessagesStreaming: StreamingManager | null = null
let directMessagesStreaming: WebSocket | null = null
ipcMain.on('start-directmessages-streaming', (event: Event, obj: StreamingSetting) => {
const { account, useWebsocket } = obj
const { account } = obj
accountManager
.getAccount(account._id!)
.then(acct => {
@ -557,10 +556,9 @@ ipcMain.on('start-directmessages-streaming', (event: Event, obj: StreamingSettin
directMessagesStreaming = null
}
directMessagesStreaming = new StreamingManager(acct, useWebsocket)
directMessagesStreaming = new WebSocket(acct)
directMessagesStreaming.start(
'direct',
'',
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send('update-start-directmessages-streaming', update)
@ -594,10 +592,10 @@ ipcMain.on('stop-directmessages-streaming', () => {
}
})
let localStreaming: StreamingManager | null = null
let localStreaming: WebSocket | null = null
ipcMain.on('start-local-streaming', (event: Event, obj: StreamingSetting) => {
const { account, useWebsocket } = obj
const { account } = obj
accountManager
.getAccount(account._id!)
.then(acct => {
@ -607,10 +605,9 @@ ipcMain.on('start-local-streaming', (event: Event, obj: StreamingSetting) => {
localStreaming = null
}
localStreaming = new StreamingManager(acct, useWebsocket)
localStreaming = new WebSocket(acct)
localStreaming.start(
'public/local',
'',
'public:local',
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send('update-start-local-streaming', update)
@ -644,10 +641,10 @@ ipcMain.on('stop-local-streaming', () => {
}
})
let publicStreaming: StreamingManager | null = null
let publicStreaming: WebSocket | null = null
ipcMain.on('start-public-streaming', (event: Event, obj: StreamingSetting) => {
const { account, useWebsocket } = obj
const { account } = obj
accountManager
.getAccount(account._id!)
.then(acct => {
@ -657,10 +654,9 @@ ipcMain.on('start-public-streaming', (event: Event, obj: StreamingSetting) => {
publicStreaming = null
}
publicStreaming = new StreamingManager(acct, useWebsocket)
publicStreaming = new WebSocket(acct)
publicStreaming.start(
'public',
'',
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send('update-start-public-streaming', update)
@ -694,14 +690,14 @@ ipcMain.on('stop-public-streaming', () => {
}
})
let listStreaming: StreamingManager | null = null
let listStreaming: WebSocket | null = null
type ListID = {
listID: string
}
ipcMain.on('start-list-streaming', (event: Event, obj: ListID & StreamingSetting) => {
const { listID, account, useWebsocket } = obj
const { listID, account } = obj
accountManager
.getAccount(account._id!)
.then(acct => {
@ -711,10 +707,9 @@ ipcMain.on('start-list-streaming', (event: Event, obj: ListID & StreamingSetting
listStreaming = null
}
listStreaming = new StreamingManager(acct, useWebsocket)
listStreaming = new WebSocket(acct)
listStreaming.start(
'list',
`list=${listID}`,
`list&list=${listID}`,
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send('update-start-list-streaming', update)
@ -748,14 +743,14 @@ ipcMain.on('stop-list-streaming', () => {
}
})
let tagStreaming: StreamingManager | null = null
let tagStreaming: WebSocket | null = null
type Tag = {
tag: string
}
ipcMain.on('start-tag-streaming', (event: Event, obj: Tag & StreamingSetting) => {
const { tag, account, useWebsocket } = obj
const { tag, account } = obj
accountManager
.getAccount(account._id!)
.then(acct => {
@ -765,10 +760,9 @@ ipcMain.on('start-tag-streaming', (event: Event, obj: Tag & StreamingSetting) =>
tagStreaming = null
}
tagStreaming = new StreamingManager(acct, useWebsocket)
tagStreaming = new WebSocket(acct)
tagStreaming.start(
'hashtag',
`tag=${tag}`,
`hashtag&tag=${tag}`,
(update: Status) => {
if (!event.sender.isDestroyed()) {
event.sender.send('update-start-tag-streaming', update)

View File

@ -1,82 +0,0 @@
import Mastodon, { StreamListener, Status, Notification } from 'megalodon'
import log from 'electron-log'
import { LocalAccount } from '~/src/types/localAccount'
export default class Streaming {
private client: Mastodon
private listener: StreamListener | null
constructor(account: LocalAccount) {
this.client = new Mastodon(account.accessToken!, account.baseURL + '/api/v1', 'Whalebird')
this.listener = null
}
startUserStreaming(updateCallback: Function, notificationCallback: Function, deleteCallback: Function, errCallback: Function) {
this.listener = this.client.stream('/streaming/user')
this.listener.on('connect', _ => {
log.info('/streaming/user started')
})
this.listener.on('update', (status: Status) => {
updateCallback(status)
})
this.listener.on('notification', (notification: Notification) => {
notificationCallback(notification)
})
this.listener.on('delete', (id: string) => {
deleteCallback(id)
})
this.listener.on('error', (err: Error) => {
errCallback(err)
})
this.listener.on('connection-limit-exceeded', (err: Error) => {
errCallback(err)
})
}
start(path: string, updateCallback: Function, deleteCallback: Function, errCallback: Function) {
this.listener = this.client.stream(path)
this.listener.on('connect', _ => {
log.info(`${path} started`)
})
this.listener.on('update', (status: Status) => {
updateCallback(status)
})
this.listener.on('delete', (id: string) => {
deleteCallback(id)
})
this.listener.on('error', (err: Error) => {
errCallback(err)
})
this.listener.on('connection-limit-exceeded', err => {
errCallback(err)
})
}
stop() {
if (this.listener) {
this.listener.removeAllListeners('connect')
this.listener.removeAllListeners('update')
this.listener.removeAllListeners('notification')
this.listener.removeAllListeners('error')
this.listener.removeAllListeners('parser-error')
this.listener.on('error', (e: Error) => {
log.error(e)
})
this.listener.on('parser-error', (e: Error) => {
log.error(e)
})
this.listener.stop()
log.info('streaming stopped')
}
}
}

View File

@ -1,71 +0,0 @@
import Streaming from './streaming'
import WebSocket from './websocket'
import { LocalAccount } from '~/src/types/localAccount'
export default class StreamingManager {
private streaming: Streaming
private websocket: WebSocket
private useWebsocket: boolean
constructor(account: LocalAccount, useWebsocket = false) {
this.streaming = new Streaming(account)
this.websocket = new WebSocket(account)
this.useWebsocket = useWebsocket
}
startUser(updateCallback: Function, notificationCallback: Function, deleteCallback: Function, errCallback: Function) {
if (this.useWebsocket) {
this._startUserSocket(updateCallback, notificationCallback, deleteCallback, errCallback)
} else {
this._startUserStreaming(updateCallback, notificationCallback, deleteCallback, errCallback)
}
}
start(path: string, params: string, updateCallback: Function, deleteCallback: Function, errCallback: Function) {
if (this.useWebsocket) {
this._startSocket(path, params, updateCallback, deleteCallback, errCallback)
} else {
this._startStreaming(path, params, updateCallback, deleteCallback, errCallback)
}
}
stop() {
this._stopStreaming()
this._stopSocket()
}
/**
* Using streaming for Mastodon
*/
_startUserStreaming(updateCallback: Function, notificationCallback: Function, deleteCallback: Function, errCallback: Function) {
this.streaming.startUserStreaming(updateCallback, notificationCallback, deleteCallback, errCallback)
}
_startStreaming(path: string, params: string, updateCallback: Function, deleteCallback, errCallback: Function) {
const target = `/streaming/${path}?${params}`
this.streaming.start(target, updateCallback, deleteCallback, errCallback)
}
_stopStreaming() {
this.streaming.stop()
}
/**
* Using websocket for Pleroma
*/
_startUserSocket(updateCallback: Function, notificationCallback: Function, deleteCallback: Function, errCallback: Function) {
this.websocket.startUserStreaming(updateCallback, notificationCallback, deleteCallback, errCallback)
}
_startSocket(path: string, params: string, updateCallback: Function, deleteCallback: Function, errCallback: Function) {
let stream = path
if (stream === 'public/local') {
stream = 'public:local'
}
this.websocket.start(`${stream}&${params}`, updateCallback, deleteCallback, errCallback)
}
_stopSocket() {
this.websocket.stop()
}
}

View File

@ -5,26 +5,6 @@
</div>
<div class="tools">
<img src="../../assets/images/loading-spinner-wide.svg" v-show="loading" class="header-loading" />
<el-button
v-if="streamingSwitchable()"
type="text"
class="action"
@click="switchStreaming"
:title="$t('header_menu.switch_streaming')"
>
<svg
:class="useWebsocket ? 'websocket' : 'not-websocket'"
width="25"
height="18"
viewBox="0 0 256 193"
xmlns="http://www.w3.org/2000/svg"
preserveAspectRatio="xMidYMid"
>
<path
d="M192.44 144.645h31.78V68.339l-35.805-35.804-22.472 22.472 26.497 26.497v63.14zm31.864 15.931H113.452L86.954 134.08l11.237-11.236 21.885 21.885h45.028l-44.357-44.441 11.32-11.32 44.357 44.358V88.296l-21.801-21.801 11.152-11.153L110.685 0H0l31.696 31.696v.084H97.436l23.227 23.227-33.96 33.96L63.476 65.74V47.712h-31.78v31.193l55.007 55.007L64.314 156.3l35.805 35.805H256l-31.696-31.529z"
/>
</svg>
</el-button>
<el-button type="text" class="action" @click="openNewTootModal" :title="$t('header_menu.new_toot')">
<icon name="regular/edit" scale="1.1"></icon>
</el-button>
@ -86,10 +66,6 @@ export default {
...mapState('TimelineSpace/HeaderMenu', {
title: state => state.title,
loading: state => state.loading
}),
...mapState('TimelineSpace', {
useWebsocket: state => state.useWebsocket,
pleroma: state => state.pleroma
})
},
created() {
@ -157,23 +133,6 @@ export default {
break
}
},
streamingSwitchable() {
switch (this.$route.name) {
case 'direct-messages':
case 'local':
case 'public':
case 'tag':
case 'list':
return !this.pleroma
default:
return false
}
},
switchStreaming() {
this.$store.dispatch('TimelineSpace/stopStreamings')
this.$store.commit('TimelineSpace/changeUseWebsocket', !this.useWebsocket)
this.$store.dispatch('TimelineSpace/startStreamings')
},
openNewTootModal() {
this.$store.dispatch('TimelineSpace/Modals/NewToot/openModal')
},
@ -349,18 +308,6 @@ export default {
&:hover {
color: #409eff;
}
.not-websocket {
fill: var(--theme-secondary-color);
&:hover {
fill: #409eff;
}
}
.websocket {
fill: #409eff;
}
}
}
}

View File

@ -24,7 +24,6 @@ export type TimelineSpaceState = {
emojis: Array<MyEmoji>
tootMax: number
unreadNotification: UnreadNotification
useWebsocket: boolean
pleroma: boolean
}
@ -53,7 +52,6 @@ const state = (): TimelineSpaceState => ({
local: unreadSettings.Local.default,
public: unreadSettings.Public.default
},
useWebsocket: false,
pleroma: false
})
@ -64,8 +62,7 @@ export const MUTATION_TYPES = {
UPDATE_EMOJIS: 'updateEmojis',
UPDATE_TOOT_MAX: 'updateTootMax',
UPDATE_UNREAD_NOTIFICATION: 'updateUnreadNotification',
CHANGE_PLEROMA: 'changePleroma',
CHANGE_USE_WEBSOCKET: 'changeUseWebsocket'
CHANGE_PLEROMA: 'changePleroma'
}
const mutations: MutationTree<TimelineSpaceState> = {
@ -98,9 +95,6 @@ const mutations: MutationTree<TimelineSpaceState> = {
},
[MUTATION_TYPES.CHANGE_PLEROMA]: (state, pleroma: boolean) => {
state.pleroma = pleroma
},
[MUTATION_TYPES.CHANGE_USE_WEBSOCKET]: (state, use: boolean) => {
state.useWebsocket = use
}
}
@ -181,10 +175,8 @@ const actions: ActionTree<TimelineSpaceState, RootState> = {
const res = await Mastodon.get<Instance>('/instance', {}, state.account.baseURL + '/api/v1')
if (res.data.version.includes('Pleroma')) {
commit(MUTATION_TYPES.CHANGE_PLEROMA, true)
commit(MUTATION_TYPES.CHANGE_USE_WEBSOCKET, true)
} else {
commit(MUTATION_TYPES.CHANGE_PLEROMA, false)
commit(MUTATION_TYPES.CHANGE_USE_WEBSOCKET, false)
}
},
// -----------------------------------------------
@ -360,8 +352,7 @@ const actions: ActionTree<TimelineSpaceState, RootState> = {
return new Promise((resolve, reject) => {
// eslint-disable-line no-unused-vars
ipcRenderer.send('start-local-streaming', {
account: state.account,
useWebsocket: state.useWebsocket
account: state.account
})
ipcRenderer.once('error-start-local-streaming', (_, err: Error) => {
reject(err)
@ -385,8 +376,7 @@ const actions: ActionTree<TimelineSpaceState, RootState> = {
return new Promise((resolve, reject) => {
// eslint-disable-line no-unused-vars
ipcRenderer.send('start-public-streaming', {
account: state.account,
useWebsocket: state.useWebsocket
account: state.account
})
ipcRenderer.once('error-start-public-streaming', (_, err: Error) => {
reject(err)
@ -410,8 +400,7 @@ const actions: ActionTree<TimelineSpaceState, RootState> = {
return new Promise((resolve, reject) => {
// eslint-disable-line no-unused-vars
ipcRenderer.send('start-directmessages-streaming', {
account: state.account,
useWebsocket: state.useWebsocket
account: state.account
})
ipcRenderer.once('error-start-directmessages-streaming', (_, err: Error) => {
reject(err)

View File

@ -117,8 +117,7 @@ const actions: ActionTree<TagState, RootState> = {
// eslint-disable-line no-unused-vars
ipcRenderer.send('start-tag-streaming', {
tag: encodeURIComponent(tag),
account: rootState.TimelineSpace.account,
useWebsocket: rootState.TimelineSpace.useWebsocket
account: rootState.TimelineSpace.account
})
ipcRenderer.once('error-start-tag-streaming', (_, err: Error) => {
reject(err)

View File

@ -117,8 +117,7 @@ const actions: ActionTree<ShowState, RootState> = {
// eslint-disable-line no-unused-vars
ipcRenderer.send('start-list-streaming', {
listID: listID,
account: rootState.TimelineSpace.account,
useWebsocket: rootState.TimelineSpace.useWebsocket
account: rootState.TimelineSpace.account
})
ipcRenderer.once('error-start-list-streaming', (_, err: Error) => {
reject(err)