diff --git a/poe-client.js b/poe-client.js index 2ffa5c8ab..eb122fc28 100644 --- a/poe-client.js +++ b/poe-client.js @@ -259,7 +259,16 @@ class Client { constructor(auto_reconnect = false, use_cached_bots = false) { this.auto_reconnect = auto_reconnect; this.use_cached_bots = use_cached_bots; - this.abortController = new AbortController(); + } + + async reconnect() { + if (!this.ws_connected) { + console.log("WebSocket died. Reconnecting..."); + this.disconnect_ws() + this.next_data = await this.get_next_data() + await this.subscribe(); + await this.connect_ws(); + } } async init(token, proxy = null) { @@ -268,7 +277,6 @@ class Client { timeout: 60000, httpAgent: new http.Agent({ keepAlive: true }), httpsAgent: new https.Agent({ keepAlive: true }), - signal: this.abortController.signal, }); if (proxy) { this.session.defaults.proxy = { @@ -285,18 +293,17 @@ class Client { "Cookie": cookies, }; this.session.defaults.headers.common = this.headers; - this.next_data = await this.get_next_data(); - this.channel = await this.get_channel_data(); + [this.next_data, this.channel] = await Promise.all([this.get_next_data(), this.get_channel_data()]); this.bots = await this.get_bots(); this.bot_names = this.get_bot_names(); - this.ws_domain = `tch${Math.floor(Math.random() * 1e6)}`; this.gql_headers = { "poe-formkey": this.formkey, "poe-tchannel": this.channel["channel"], ...this.headers, }; - await this.connect_ws(); await this.subscribe(); + await this.connect_ws(); + console.log('Client initialized.'); } async get_next_data() { @@ -321,28 +328,37 @@ class Client { const botList = viewer.viewerBotList; const retries = 2; const bots = {}; + const promises = []; for (const bot of botList.filter(x => x.deletionState == 'not_deleted')) { - try { - const url = `https://poe.com/_next/data/${this.next_data.buildId}/${bot.displayName}.json`; - let r; + const promise = new Promise(async (resolve, reject) => { + try { + const url = `https://poe.com/_next/data/${this.next_data.buildId}/${bot.displayName}.json`; + let r; - if (this.use_cached_bots && cached_bots[url]) { - r = cached_bots[url]; - } - else { - logger.info(`Downloading ${url}`); - r = await request_with_retries(() => this.session.get(url), retries); - cached_bots[url] = r; - } + if (this.use_cached_bots && cached_bots[url]) { + r = cached_bots[url]; + } + else { + logger.info(`Downloading ${url}`); + r = await request_with_retries(() => this.session.get(url), retries); + cached_bots[url] = r; + } - const chatData = r.data.pageProps.payload.chatOfBotDisplayName; - bots[chatData.defaultBotObject.nickname] = chatData; - } - catch { - console.log(`Could not load bot: ${bot.displayName}`); - } + const chatData = r.data.pageProps.payload.chatOfBotDisplayName; + bots[chatData.defaultBotObject.nickname] = chatData; + resolve(); + + } + catch { + console.log(`Could not load bot: ${bot.displayName}`); + reject(); + } + }); + + promises.push(promise); } + await Promise.allSettled(promises); return bots; } @@ -439,6 +455,7 @@ class Client { } async connect_ws() { + this.ws_domain = `tch${Math.floor(Math.random() * 1e6)}`; this.ws_connected = false; this.ws_run_thread(); while (!this.ws_connected) { @@ -460,10 +477,6 @@ class Client { on_ws_error(ws, error) { logger.warn(`Websocket returned error: ${error}`); this.disconnect_ws(); - - if (this.auto_reconnect) { - this.connect_ws(); - } } async on_message(ws, msg) { @@ -510,7 +523,11 @@ class Client { } } - async *send_message(chatbot, message, with_chat_break = false, timeout = 20) { + async *send_message(chatbot, message, with_chat_break = false, timeout = 30, signal = null) { + if (this.auto_reconnect) { + await this.reconnect(); + } + //if there is another active message, wait until it has finished sending while (Object.values(this.active_messages).includes(null)) { await new Promise(resolve => setTimeout(resolve, 10)); @@ -551,10 +568,17 @@ class Client { let messageId; while (true) { try { - this.abortController.signal.throwIfAborted(); + if (signal instanceof AbortSignal) { + signal.throwIfAborted(); + } + + if (timeout == 0) { + throw new Error("Response timed out."); + } const message = this.message_queues[humanMessageId].shift(); if (!message) { + timeout -= 1; await new Promise(resolve => setTimeout(() => resolve(), 1000)); continue; //throw new Error("Queue is empty"); diff --git a/server.js b/server.js index 79c8bfc86..7ff6b95ec 100644 --- a/server.js +++ b/server.js @@ -2196,9 +2196,20 @@ app.post('/deletegroup', jsonParser, async (request, response) => { const POE_DEFAULT_BOT = 'a2'; +const poeClientCache = {}; + async function getPoeClient(token, useCache = false) { - let client = new poe.Client(false, useCache); - await client.init(token); + let client; + + if (useCache && poeClientCache[token]) { + client = poeClientCache[token]; + } + else { + client = new poe.Client(true, useCache); + await client.init(token); + } + + poeClientCache[token] = client; return client; } @@ -2210,9 +2221,9 @@ app.post('/status_poe', jsonParser, async (request, response) => { } try { - const client = await getPoeClient(token); + const client = await getPoeClient(token, false); const botNames = client.get_bot_names(); - client.disconnect_ws(); + //client.disconnect_ws(); return response.send({ 'bot_names': botNames }); } @@ -2240,7 +2251,7 @@ app.post('/purge_poe', jsonParser, async (request, response) => { else { await client.send_chat_break(bot); } - client.disconnect_ws(); + //client.disconnect_ws(); return response.send({ "ok": true }); } @@ -2262,12 +2273,13 @@ app.post('/generate_poe', jsonParser, async (request, response) => { } let isGenerationStopped = false; + const abortController = new AbortController(); request.socket.removeAllListeners('close'); request.socket.on('close', function () { isGenerationStopped = true; if (client) { - client.abortController.abort(); + abortController.abort(); } }); const prompt = request.body.prompt; @@ -2293,7 +2305,7 @@ app.post('/generate_poe', jsonParser, async (request, response) => { }); let reply = ''; - for await (const mes of client.send_message(bot, prompt)) { + for await (const mes of client.send_message(bot, prompt, false, 30, abortController.signal)) { if (isGenerationStopped) { console.error('Streaming stopped by user. Closing websocket...'); break; @@ -2309,22 +2321,22 @@ app.post('/generate_poe', jsonParser, async (request, response) => { console.error(err); } finally { - client.disconnect_ws(); + //client.disconnect_ws(); response.end(); } } else { try { let reply; - for await (const mes of client.send_message(bot, prompt)) { + for await (const mes of client.send_message(bot, prompt, false, 30, abortController.signal)) { reply = mes.text; } console.log(reply); - client.disconnect_ws(); + //client.disconnect_ws(); return response.send({ 'reply': reply }); } catch { - client.disconnect_ws(); + //client.disconnect_ws(); return response.sendStatus(500); } }