Speed-up Poe client

This commit is contained in:
SillyLossy
2023-06-03 16:23:45 +03:00
parent f1924b6c12
commit b0f8e51c42
2 changed files with 76 additions and 40 deletions

View File

@@ -259,7 +259,16 @@ class Client {
constructor(auto_reconnect = false, use_cached_bots = false) { constructor(auto_reconnect = false, use_cached_bots = false) {
this.auto_reconnect = auto_reconnect; this.auto_reconnect = auto_reconnect;
this.use_cached_bots = use_cached_bots; this.use_cached_bots = use_cached_bots;
this.abortController = new AbortController(); }
async reconnect() {
if (!this.ws_connected) {
console.log("WebSocket died. Reconnecting...");
this.disconnect_ws()
this.next_data = await this.get_next_data()
await this.subscribe();
await this.connect_ws();
}
} }
async init(token, proxy = null) { async init(token, proxy = null) {
@@ -268,7 +277,6 @@ class Client {
timeout: 60000, timeout: 60000,
httpAgent: new http.Agent({ keepAlive: true }), httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true }), httpsAgent: new https.Agent({ keepAlive: true }),
signal: this.abortController.signal,
}); });
if (proxy) { if (proxy) {
this.session.defaults.proxy = { this.session.defaults.proxy = {
@@ -285,18 +293,17 @@ class Client {
"Cookie": cookies, "Cookie": cookies,
}; };
this.session.defaults.headers.common = this.headers; this.session.defaults.headers.common = this.headers;
this.next_data = await this.get_next_data(); [this.next_data, this.channel] = await Promise.all([this.get_next_data(), this.get_channel_data()]);
this.channel = await this.get_channel_data();
this.bots = await this.get_bots(); this.bots = await this.get_bots();
this.bot_names = this.get_bot_names(); this.bot_names = this.get_bot_names();
this.ws_domain = `tch${Math.floor(Math.random() * 1e6)}`;
this.gql_headers = { this.gql_headers = {
"poe-formkey": this.formkey, "poe-formkey": this.formkey,
"poe-tchannel": this.channel["channel"], "poe-tchannel": this.channel["channel"],
...this.headers, ...this.headers,
}; };
await this.connect_ws();
await this.subscribe(); await this.subscribe();
await this.connect_ws();
console.log('Client initialized.');
} }
async get_next_data() { async get_next_data() {
@@ -321,28 +328,37 @@ class Client {
const botList = viewer.viewerBotList; const botList = viewer.viewerBotList;
const retries = 2; const retries = 2;
const bots = {}; const bots = {};
const promises = [];
for (const bot of botList.filter(x => x.deletionState == 'not_deleted')) { for (const bot of botList.filter(x => x.deletionState == 'not_deleted')) {
try { const promise = new Promise(async (resolve, reject) => {
const url = `https://poe.com/_next/data/${this.next_data.buildId}/${bot.displayName}.json`; try {
let r; const url = `https://poe.com/_next/data/${this.next_data.buildId}/${bot.displayName}.json`;
let r;
if (this.use_cached_bots && cached_bots[url]) { if (this.use_cached_bots && cached_bots[url]) {
r = cached_bots[url]; r = cached_bots[url];
} }
else { else {
logger.info(`Downloading ${url}`); logger.info(`Downloading ${url}`);
r = await request_with_retries(() => this.session.get(url), retries); r = await request_with_retries(() => this.session.get(url), retries);
cached_bots[url] = r; cached_bots[url] = r;
} }
const chatData = r.data.pageProps.payload.chatOfBotDisplayName; const chatData = r.data.pageProps.payload.chatOfBotDisplayName;
bots[chatData.defaultBotObject.nickname] = chatData; bots[chatData.defaultBotObject.nickname] = chatData;
} resolve();
catch {
console.log(`Could not load bot: ${bot.displayName}`); }
} catch {
console.log(`Could not load bot: ${bot.displayName}`);
reject();
}
});
promises.push(promise);
} }
await Promise.allSettled(promises);
return bots; return bots;
} }
@@ -439,6 +455,7 @@ class Client {
} }
async connect_ws() { async connect_ws() {
this.ws_domain = `tch${Math.floor(Math.random() * 1e6)}`;
this.ws_connected = false; this.ws_connected = false;
this.ws_run_thread(); this.ws_run_thread();
while (!this.ws_connected) { while (!this.ws_connected) {
@@ -460,10 +477,6 @@ class Client {
on_ws_error(ws, error) { on_ws_error(ws, error) {
logger.warn(`Websocket returned error: ${error}`); logger.warn(`Websocket returned error: ${error}`);
this.disconnect_ws(); this.disconnect_ws();
if (this.auto_reconnect) {
this.connect_ws();
}
} }
async on_message(ws, msg) { async on_message(ws, msg) {
@@ -510,7 +523,11 @@ class Client {
} }
} }
async *send_message(chatbot, message, with_chat_break = false, timeout = 20) { async *send_message(chatbot, message, with_chat_break = false, timeout = 30, signal = null) {
if (this.auto_reconnect) {
await this.reconnect();
}
//if there is another active message, wait until it has finished sending //if there is another active message, wait until it has finished sending
while (Object.values(this.active_messages).includes(null)) { while (Object.values(this.active_messages).includes(null)) {
await new Promise(resolve => setTimeout(resolve, 10)); await new Promise(resolve => setTimeout(resolve, 10));
@@ -551,10 +568,17 @@ class Client {
let messageId; let messageId;
while (true) { while (true) {
try { try {
this.abortController.signal.throwIfAborted(); if (signal instanceof AbortSignal) {
signal.throwIfAborted();
}
if (timeout == 0) {
throw new Error("Response timed out.");
}
const message = this.message_queues[humanMessageId].shift(); const message = this.message_queues[humanMessageId].shift();
if (!message) { if (!message) {
timeout -= 1;
await new Promise(resolve => setTimeout(() => resolve(), 1000)); await new Promise(resolve => setTimeout(() => resolve(), 1000));
continue; continue;
//throw new Error("Queue is empty"); //throw new Error("Queue is empty");

View File

@@ -2196,9 +2196,20 @@ app.post('/deletegroup', jsonParser, async (request, response) => {
const POE_DEFAULT_BOT = 'a2'; const POE_DEFAULT_BOT = 'a2';
const poeClientCache = {};
async function getPoeClient(token, useCache = false) { async function getPoeClient(token, useCache = false) {
let client = new poe.Client(false, useCache); let client;
await client.init(token);
if (useCache && poeClientCache[token]) {
client = poeClientCache[token];
}
else {
client = new poe.Client(true, useCache);
await client.init(token);
}
poeClientCache[token] = client;
return client; return client;
} }
@@ -2210,9 +2221,9 @@ app.post('/status_poe', jsonParser, async (request, response) => {
} }
try { try {
const client = await getPoeClient(token); const client = await getPoeClient(token, false);
const botNames = client.get_bot_names(); const botNames = client.get_bot_names();
client.disconnect_ws(); //client.disconnect_ws();
return response.send({ 'bot_names': botNames }); return response.send({ 'bot_names': botNames });
} }
@@ -2240,7 +2251,7 @@ app.post('/purge_poe', jsonParser, async (request, response) => {
else { else {
await client.send_chat_break(bot); await client.send_chat_break(bot);
} }
client.disconnect_ws(); //client.disconnect_ws();
return response.send({ "ok": true }); return response.send({ "ok": true });
} }
@@ -2262,12 +2273,13 @@ app.post('/generate_poe', jsonParser, async (request, response) => {
} }
let isGenerationStopped = false; let isGenerationStopped = false;
const abortController = new AbortController();
request.socket.removeAllListeners('close'); request.socket.removeAllListeners('close');
request.socket.on('close', function () { request.socket.on('close', function () {
isGenerationStopped = true; isGenerationStopped = true;
if (client) { if (client) {
client.abortController.abort(); abortController.abort();
} }
}); });
const prompt = request.body.prompt; const prompt = request.body.prompt;
@@ -2293,7 +2305,7 @@ app.post('/generate_poe', jsonParser, async (request, response) => {
}); });
let reply = ''; let reply = '';
for await (const mes of client.send_message(bot, prompt)) { for await (const mes of client.send_message(bot, prompt, false, 30, abortController.signal)) {
if (isGenerationStopped) { if (isGenerationStopped) {
console.error('Streaming stopped by user. Closing websocket...'); console.error('Streaming stopped by user. Closing websocket...');
break; break;
@@ -2309,22 +2321,22 @@ app.post('/generate_poe', jsonParser, async (request, response) => {
console.error(err); console.error(err);
} }
finally { finally {
client.disconnect_ws(); //client.disconnect_ws();
response.end(); response.end();
} }
} }
else { else {
try { try {
let reply; let reply;
for await (const mes of client.send_message(bot, prompt)) { for await (const mes of client.send_message(bot, prompt, false, 30, abortController.signal)) {
reply = mes.text; reply = mes.text;
} }
console.log(reply); console.log(reply);
client.disconnect_ws(); //client.disconnect_ws();
return response.send({ 'reply': reply }); return response.send({ 'reply': reply });
} }
catch { catch {
client.disconnect_ws(); //client.disconnect_ws();
return response.sendStatus(500); return response.sendStatus(500);
} }
} }