diff --git a/Assets/ImageCreator-CodeOfConduct.png b/Assets/ImageCreator-CodeOfConduct.png new file mode 100644 index 0000000..a4059d9 Binary files /dev/null and b/Assets/ImageCreator-CodeOfConduct.png differ diff --git a/LibWinDog/Config.py b/LibWinDog/Config.py index 5ef76ef..52e50bb 100755 --- a/LibWinDog/Config.py +++ b/LibWinDog/Config.py @@ -13,7 +13,7 @@ LogToFile = True DumpToConsole = False DumpToFile = False -AdminIds = [ "123456789@telegram", "634314973@telegram", "admin@activitypub@mastodon.example.com", ] +AdminIds = [ "telegram:123456789", "telegram:634314973", "activitypub:admin@mastodon.example.com", ] DefaultLang = "en" Debug = False diff --git a/LibWinDog/Platforms/Mastodon/Mastodon.py b/LibWinDog/Platforms/Mastodon/Mastodon.py index ce1b3c6..4ab9d79 100755 --- a/LibWinDog/Platforms/Mastodon/Mastodon.py +++ b/LibWinDog/Platforms/Mastodon/Mastodon.py @@ -21,21 +21,24 @@ def MastodonMain() -> bool: Mastodon = mastodon.Mastodon(api_base_url=MastodonUrl, access_token=MastodonToken) class MastodonListener(mastodon.StreamListener): def on_notification(self, event): - if event['type'] == 'mention': - #OnMessageParsed() - message = BeautifulSoup(event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ') - if not message.split('@')[0]: - message = ' '.join('@'.join(message.split('@')[1:]).strip().split(' ')[1:]).strip() - if message[0] in CmdPrefixes: - command = ParseCmd(message) - if command: - command.messageId = event['status']['id'] - if command.Name in Endpoints: - Endpoints[command.Name]["handler"]({"Event": event, "Manager": Mastodon}, command) + MastodonHandler(event) Mastodon.stream_user(MastodonListener(), run_async=True) return True -def MastodonSender(event, manager, data, destination, textPlain, textMarkdown) -> None: +def MastodonHandler(event): + if event['type'] == 'mention': + #OnMessageParsed() + message = BeautifulSoup(event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ') + if not message.split('@')[0]: + message = ' '.join('@'.join(message.split('@')[1:]).strip().split(' ')[1:]).strip() + if message[0] in CmdPrefixes: + command = ParseCmd(message) + if command: + command.messageId = event['status']['id'] + if command.Name in Endpoints: + Endpoints[command.Name]["handler"]({"Event": event, "Manager": Mastodon}, command) + +def MastodonSender(event, manager, data:OutputMessageData, destination, textPlain, textMarkdown) -> None: if InDict(data, 'Media'): Media = manager.media_post(data['Media'], Magic(mime=True).from_buffer(data['Media'])) while Media['url'] == 'null': diff --git a/LibWinDog/Platforms/Telegram/Telegram.py b/LibWinDog/Platforms/Telegram/Telegram.py index 7aabdea..04c8bb6 100755 --- a/LibWinDog/Platforms/Telegram/Telegram.py +++ b/LibWinDog/Platforms/Telegram/Telegram.py @@ -30,23 +30,35 @@ def TelegramMain() -> bool: #app.run_polling(allowed_updates=Update.ALL_TYPES) return True +def TelegramMakeInputMessageData(message:telegram.Message) -> InputMessageData: + data = InputMessageData( + message_id = f"telegram:{message.message_id}", + text_plain = message.text, + text_markdown = message.text_markdown_v2, + ) + data.text_auto = GetWeightedText(data.text_markdown, data.text_plain) + data.command = ParseCommand(data.text_plain) + data.room = SafeNamespace( + id = f"telegram:{message.chat.id}", + tag = message.chat.username, + name = message.chat.title, + ) + data.user = SafeNamespace( + id = f"telegram:{message.from_user.id}", + tag = message.from_user.username, + name = message.from_user.first_name, + ) + return data + def TelegramHandlerWrapper(update:telegram.Update, context:CallbackContext=None) -> None: Thread(target=lambda:TelegramHandlerCore(update, context)).start() def TelegramHandlerCore(update:telegram.Update, context:CallbackContext=None) -> None: if not update.message: return - data = SimpleNamespace() - data.room_id = f"{update.message.chat.id}@telegram" - data.message_id = f"{update.message.message_id}@telegram" - data.text_plain = update.message.text - data.text_markdown = update.message.text_markdown_v2 - data.text_auto = GetWeightedText(data.text_markdown, data.text_plain) - data.command = ParseCommand(data.text_plain) - data.user = SimpleNamespace() - data.user.name = update.message.from_user.first_name - data.user.tag = update.message.from_user.username - data.user.id = f"{update.message.from_user.id}@telegram" + data = TelegramMakeInputMessageData(update.message) + if update.message.reply_to_message: + data.quoted = TelegramMakeInputMessageData(update.message.reply_to_message) OnMessageParsed(data) cmd = ParseCmd(update.message.text) if cmd: @@ -59,7 +71,7 @@ def TelegramHandlerCore(update:telegram.Update, context:CallbackContext=None) -> cmd.User = SimpleNamespace(**{ "Name": update.message.from_user.first_name, "Tag": update.message.from_user.username, - "Id": f'{update.message.from_user.id}@telegram', + "Id": f'telegram:{update.message.from_user.id}', }) if update.message.reply_to_message: cmd.Quoted = SimpleNamespace(**{ @@ -71,12 +83,13 @@ def TelegramHandlerCore(update:telegram.Update, context:CallbackContext=None) -> "User": SimpleNamespace(**{ "Name": update.message.reply_to_message.from_user.first_name, "Tag": update.message.reply_to_message.from_user.username, - "Id": f'{update.message.reply_to_message.from_user.id}@telegram', + "Id": f'telegram:{update.message.reply_to_message.from_user.id}', }), }) Endpoints[cmd.Name]["handler"]({"Event": update, "Manager": context}, cmd) + #Endpoints[cmd.Name]["handler"](SafeNamespace(platform="telegram", event=update, manager=context), cmd) -def TelegramSender(event, manager, data, destination, textPlain, textMarkdown) -> None: +def TelegramSender(event, manager, data:OutputMessageData, destination, textPlain, textMarkdown) -> None: if destination: manager.bot.send_message(destination, text=textPlain) else: @@ -84,30 +97,6 @@ def TelegramSender(event, manager, data, destination, textPlain, textMarkdown) - if InDict(data, "Media") and not InDict(data, "media"): data["media"] = {"bytes": data["Media"]} if InDict(data, "media"): - #data["media"] = SureArray(data["media"]) - #media = (data["media"][0]["bytes"] if "bytes" in data["media"][0] else data["media"][0]["url"]) - #if len(data["media"]) > 1: - # media_list = [] - # media_list.append(telegram.InputMediaPhoto( - # media[0], - # caption=(textMarkdown if textMarkdown else textPlain if textPlain else None), - # parse_mode=("MarkdownV2" if textMarkdown else None))) - # for medium in media[1:]: - # media_list.append(telegram.InputMediaPhoto(medium)) - # event.message.reply_media_group(media_list, reply_to_message_id=replyToId) - #else: - # event.message.reply_photo( - # media, - # caption=(textMarkdown if textMarkdown else textPlain if textPlain else None), - # parse_mode=("MarkdownV2" if textMarkdown else None), - # reply_to_message_id=replyToId) - #event.message.reply_photo( - # (DictGet(media[0], "bytes") or DictGet(media[0], "url")), - # caption=(textMarkdown if textMarkdown else textPlain if textPlain else None), - # parse_mode=("MarkdownV2" if textMarkdown else None), - # reply_to_message_id=replyToId) - #for medium in media[1:]: - # event.message.reply_photo((DictGet(medium, "bytes") or DictGet(medium, "url")), reply_to_message_id=replyToId) for medium in SureArray(data["media"]): event.message.reply_photo( (DictGet(medium, "bytes") or DictGet(medium, "url")), @@ -119,5 +108,15 @@ def TelegramSender(event, manager, data, destination, textPlain, textMarkdown) - elif textPlain: event.message.reply_text(textPlain, reply_to_message_id=replyToId) -RegisterPlatform(name="Telegram", main=TelegramMain, sender=TelegramSender, eventClass=telegram.Update) +def TelegramLinker(data:InputMessageData) -> SafeNamespace: + linked = SafeNamespace() + if data.room.id: + room_id = data.room.id.split("telegram:")[1] + linked.room = f"https://t.me/{room_id}" + if data.message_id: + message_id = data.message_id.split("telegram:")[1] + linked.message = f"{linked.room}/{message_id}" + return linked + +RegisterPlatform(name="Telegram", main=TelegramMain, sender=TelegramSender, linker=TelegramLinker, eventClass=telegram.Update) diff --git a/ModWinDog/Broadcast.py b/ModWinDog/Broadcast.py new file mode 100755 index 0000000..bf40c34 --- /dev/null +++ b/ModWinDog/Broadcast.py @@ -0,0 +1,20 @@ +# ================================== # +# WinDog multi-purpose chatbot # +# Licensed under AGPLv3 by OctoSpacc # +# ================================== # + +def cBroadcast(context:EventContext, data:InputMessageData) -> None: + if (data.user.id not in AdminIds) and (data.user.tag not in AdminIds): + return SendMessage(context, {"Text": choice(Locale.__('eval'))}) + destination = data.command.arguments["destination"] + if not (destination and data.command.body): + return SendMessage(context, {"Text": "Bad usage."}) + SendMessage(context, {"TextPlain": data.command.body}, destination) + SendMessage(context, {"TextPlain": "Executed."}) + +RegisterModule(name="Broadcast", endpoints={ + "Broadcast": CreateEndpoint(["broadcast"], summary="Sends an admin message over to any chat destination.", handler=cBroadcast, arguments={ + "destination": True, + }), +}) + diff --git a/ModWinDog/ChatGPT/ChatGPT.py b/ModWinDog/ChatGPT/ChatGPT.py new file mode 100644 index 0000000..410ccf2 --- /dev/null +++ b/ModWinDog/ChatGPT/ChatGPT.py @@ -0,0 +1,23 @@ +# ================================== # +# WinDog multi-purpose chatbot # +# Licensed under AGPLv3 by OctoSpacc # +# ================================== # + +from g4f.client import Client as G4FClient + +g4fClient = G4FClient() + +def cGpt(context:EventContext, data:InputMessageData) -> None: + if not data.command.body: + return SendMessage(context, {"Text": "You must type some text."}) + output = "" + while not output or output.startswith("sorry, 您的ip已由于触发防滥用检测而被封禁,本服务网址是"): # quick fix + output = "" + for completion in g4fClient.chat.completions.create(model="gpt-3.5-turbo", messages=[{"role": "user", "content": data.command.body}], stream=True): + output += (completion.choices[0].delta.content or "") + return SendMessage(context, {"TextPlain": f"[🤖️ GPT]\n\n{output}"}) + +RegisterModule(name="ChatGPT", endpoints={ + "GPT": CreateEndpoint(["gpt", "chatgpt"], summary="Sends a message to GPT to get back a response. Note: conversations are not yet supported, and this is more standard GPT than ChatGPT, and in general there are many bugs!", handler=cGpt), +}) + diff --git a/ModWinDog/ChatGPT/requirements.txt b/ModWinDog/ChatGPT/requirements.txt new file mode 100644 index 0000000..021e60f --- /dev/null +++ b/ModWinDog/ChatGPT/requirements.txt @@ -0,0 +1 @@ +g4f diff --git a/ModWinDog/Hashing.py b/ModWinDog/Hashing.py index 7799a40..84c880d 100755 --- a/ModWinDog/Hashing.py +++ b/ModWinDog/Hashing.py @@ -5,16 +5,16 @@ import hashlib -def cHash(context, data) -> None: +def cHash(context:EventContext, data:InputMessageData) -> None: algorithm = data.command.arguments["algorithm"] if data.command.body and algorithm in hashlib.algorithms_available: - hashed = hashlib.new(algorithm, algorithm.join(data.Body.split(algorithm)[1:]).strip().encode()).hexdigest() - SendMsg(context, { + hashed = hashlib.new(algorithm, data.command.body.encode()).hexdigest() + SendMessage(context, { "TextPlain": hashed, "TextMarkdown": MarkdownCode(hashed, True), }) else: - SendMsg(context, {"Text": choice(Locale.__('hash.usage')).format(data.command.tokens[0], hashlib.algorithms_available)}) + SendMessage(context, {"Text": choice(Locale.__('hash.usage')).format(data.command.tokens[0], hashlib.algorithms_available)}) RegisterModule(name="Hashing", group="Geek", summary="Functions for hashing of textual content.", endpoints={ "Hash": CreateEndpoint(names=["hash"], summary="Responds with the hash-sum of a message received.", handler=cHash, arguments={ diff --git a/ModWinDog/Help.py b/ModWinDog/Help.py index 8fb0a24..e1ce245 100755 --- a/ModWinDog/Help.py +++ b/ModWinDog/Help.py @@ -4,7 +4,7 @@ # ================================== # # TODO: implement /help feature -def cHelp(context, data=None) -> None: +def cHelp(context:EventContext, data:InputMessageData) -> None: moduleList = '' for module in Modules: summary = Modules[module]["summary"] @@ -13,7 +13,7 @@ def cHelp(context, data=None) -> None: for endpoint in endpoints: summary = endpoints[endpoint]["summary"] moduleList += (f"\n* /{', /'.join(endpoints[endpoint]['names'])}" + (f": {summary}" if summary else '')) - SendMsg(context, {"Text": f"[ Available Modules ]{moduleList}"}) + SendMessage(context, {"Text": f"[ Available Modules ]{moduleList}"}) RegisterModule(name="Help", group="Basic", endpoints={ "Help": CreateEndpoint(["help"], summary="Provides help for the bot. For now, it just lists the commands.", handler=cHelp), diff --git a/ModWinDog/Internet/Internet.py b/ModWinDog/Internet/Internet.py index 5d3412b..6f2e68a 100755 --- a/ModWinDog/Internet/Internet.py +++ b/ModWinDog/Internet/Internet.py @@ -16,7 +16,7 @@ from urllib.request import urlopen, Request def HttpReq(url:str, method:str|None=None, *, body:bytes=None, headers:dict[str, str]={"User-Agent": WebUserAgent}): return urlopen(Request(url, method=method, data=body, headers=headers)) -def cEmbedded(context, data) -> None: +def cEmbedded(context:EventContext, data:InputMessageData) -> None: if len(data.Tokens) >= 2: # Find links in command body Text = (data.TextMarkdown + ' ' + data.TextPlain) @@ -48,10 +48,10 @@ def cEmbedded(context, data) -> None: elif urlDomain == "vm.tiktok.com": urlDomain = "vm.vxtiktok.com" url = urlDomain + url[len(urlDomain):] - SendMsg(context, {"TextPlain": f"{{{proto}{url}}}"}) + SendMessage(context, {"TextPlain": f"{{{proto}{url}}}"}) # else TODO error message? -def cWeb(context, data) -> None: +def cWeb(context:EventContext, data:InputMessageData) -> None: if data.Body: try: QueryUrl = UrlParse.quote(data.Body) @@ -68,34 +68,35 @@ def cWeb(context, data) -> None: Caption += f'[{Index}] {Title} : {{{Link}}}\n\n' else: continue - SendMsg(context, {"TextPlain": f'{Caption}...'}) + SendMessage(context, {"TextPlain": f'{Caption}...'}) except Exception: raise else: pass -def cImages(context, data) -> None: +def cImages(context:EventContext, data:InputMessageData) -> None: pass -def cNews(context, data) -> None: +def cNews(context:EventContext, data:InputMessageData) -> None: pass -def cTranslate(context, data) -> None: - if len(data.Tokens) < 3: - return +def cTranslate(context:EventContext, data:InputMessageData) -> None: + language_to = data.command.arguments["language_to"] + text_input = (data.command.body or (data.Quoted and data.Quoted.Body)) + if not (text_input and language_to): + return SendMessage(context, {"TextPlain": f"Usage: /translate "}) try: - toLang = data.Tokens[1] # TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one - result = json.loads(HttpReq(f'https://lingva.ml/api/v1/auto/{toLang}/{UrlParse.quote(toLang.join(data.Body.split(toLang)[1:]))}').read()) - SendMsg(context, {"TextPlain": f"[{result['info']['detectedSource']} (auto) -> {toLang}]\n\n{result['translation']}"}) + result = json.loads(HttpReq(f'https://lingva.ml/api/v1/auto/{language_to}/{UrlParse.quote(text_input)}').read()) + SendMessage(context, {"TextPlain": f"[{result['info']['detectedSource']} (auto) -> {language_to}]\n\n{result['translation']}"}) except Exception: raise -def cUnsplash(context, data) -> None: +def cUnsplash(context:EventContext, data:InputMessageData) -> None: try: Req = HttpReq(f'https://source.unsplash.com/random/?{UrlParse.quote(data.Body)}') ImgUrl = Req.geturl().split('?')[0] - SendMsg(context, { + SendMessage(context, { "TextPlain": f'{{{ImgUrl}}}', "TextMarkdown": MarkdownCode(ImgUrl, True), "Media": Req.read(), @@ -103,7 +104,7 @@ def cUnsplash(context, data) -> None: except Exception: raise -def cSafebooru(context, data) -> None: +def cSafebooru(context:EventContext, data:InputMessageData) -> None: ApiUrl = 'https://safebooru.org/index.php?page=dapi&s=post&q=index&limit=100&tags=' try: if data.Body: @@ -114,7 +115,7 @@ def cSafebooru(context, data) -> None: if not ImgUrls: # literal search ImgUrls = HttpReq(f'{ApiUrl}{UrlParse.quote(data.Body)}').read().decode().split(' file_url="')[1:] if not ImgUrls: - return SendMsg(context, {"Text": "Error: Could not get any result from Safebooru."}) + return SendMessage(context, {"Text": "Error: Could not get any result from Safebooru."}) ImgXml = choice(ImgUrls) ImgUrl = ImgXml.split('"')[0] ImgId = ImgXml.split(' id="')[1].split('"')[0] @@ -126,7 +127,7 @@ def cSafebooru(context, data) -> None: ImgId = ImgUrl.split('?')[-1] break if ImgUrl: - SendMsg(context, { + SendMessage(context, { "TextPlain": f'[{ImgId}]\n{{{ImgUrl}}}', "TextMarkdown": (f'\\[`{ImgId}`\\]\n' + MarkdownCode(ImgUrl, True)), "media": {"url": ImgUrl}, #, "bytes": HttpReq(ImgUrl).read()}, @@ -136,45 +137,14 @@ def cSafebooru(context, data) -> None: except Exception as error: raise -def cDalle(context, data) -> None: - if not data.Body: - return SendMsg(context, {"Text": "Please tell me what to generate."}) - image_filter = ""https://th.bing.com/th/id/" - try: - retry_index = 3 - result_list = "" - result_id = HttpReq( - f"https://www.bing.com/images/create?q={UrlParse.quote(data.Body)}&rt=3&FORM=GENCRE",#"4&FORM=GENCRE", - body=f"q={UrlParse.urlencode({'q': data.Body})}&qs=ds".encode(), - headers=MicrosoftBingSettings).read().decode() - print(result_id) - result_id = result_id.split('&id=')[1].split('&')[0] - results_url = f"https://www.bing.com/images/create/-/{result_id}?FORM=GENCRE" - SendMsg(context, {"Text": "Request sent, please wait..."}) - while retry_index < 12 and image_filter not in result_list: - result_list = HttpReq(results_url, headers={"User-Agent": MicrosoftBingSettings["User-Agent"]}).read().decode() - time.sleep(1.25 * retry_index) - retry_index += 1 - if image_filter in result_list: - SendMsg(context, { - "TextPlain": f"{{{results_url}}}", - "TextMarkdown": MarkdownCode(results_url, True), - "Media": HttpReq( - result_list.split(image_filter)[1].split('\\"')[0], - headers={"User-Agent": MicrosoftBingSettings["User-Agent"]}).read(), - }) - else: - raise Exception("Something went wrong.") - except Exception as error: - Log(error) - SendMsg(context, {"TextPlain": error}) - RegisterModule(name="Internet", summary="Tools and toys related to the Internet.", endpoints={ "Embedded": CreateEndpoint(["embedded"], summary="Rewrites a link, trying to bypass embed view protection.", handler=cEmbedded), "Web": CreateEndpoint(["web"], summary="Provides results of a DuckDuckGo search.", handler=cWeb), - "Translate": CreateEndpoint(["translate"], summary="Returns the received message after translating it in another language.", handler=cTranslate), + "Translate": CreateEndpoint(["translate"], summary="Returns the received message after translating it in another language.", handler=cTranslate, arguments={ + "language_to": True, + "language_from": False, + }), "Unsplash": CreateEndpoint(["unsplash"], summary="Sends a picture sourced from Unsplash.", handler=cUnsplash), "Safebooru": CreateEndpoint(["safebooru"], summary="Sends a picture sourced from Safebooru.", handler=cSafebooru), - #"DALL-E": CreateEndpoint(["dalle"], summary="Sends an AI-generated picture from DALL-E 3 via Microsoft Bing.", handler=cDalle), }) diff --git a/ModWinDog/Misc.py b/ModWinDog/Misc.py index 274c984..bbb2c0d 100755 --- a/ModWinDog/Misc.py +++ b/ModWinDog/Misc.py @@ -5,11 +5,11 @@ import re, subprocess -def mPercenter(context, data) -> None: - SendMsg(context, {"Text": choice(Locale.__(f'{data.Name}.{"done" if data.Body else "empty"}')).format( +def mPercenter(context:EventContext, data:InputMessageData) -> None: + SendMessage(context, {"Text": choice(Locale.__(f'{data.Name}.{"done" if data.Body else "empty"}')).format( Cmd=data.Tokens[0], Percent=RandPercent(), Thing=data.Body)}) -def mMultifun(context, data) -> None: +def mMultifun(context:EventContext, data:InputMessageData) -> None: cmdkey = data.Name replyToId = None if data.Quoted: @@ -26,17 +26,20 @@ def mMultifun(context, data) -> None: else: if 'empty' in Locale.__(cmdkey): Text = choice(Locale.__(f'{cmdkey}.empty')) - SendMsg(context, {"Text": Text, "ReplyTo": replyToId}) + SendMessage(context, {"Text": Text, "ReplyTo": replyToId}) -def cStart(context, data) -> None: - SendMsg(context, {"Text": choice(Locale.__('start')).format(data.User.Name)}) +def cStart(context:EventContext, data:InputMessageData) -> None: + SendMessage(context, {"Text": choice(Locale.__('start')).format(data.User.Name)}) -def cSource(context, data=None) -> None: - SendMsg(context, {"TextPlain": ("""\ +def cSource(context:EventContext, data:InputMessageData) -> None: + SendMessage(context, {"TextPlain": ("""\ * Original Code: {https://gitlab.com/octospacc/WinDog} * Mirror: {https://github.com/octospacc/WinDog} """ + (f"* Modified Code: {{{ModifiedSourceUrl}}}" if ModifiedSourceUrl else ""))}) +def cGdpr(context:EventContext, data:InputMessageData) -> None: + pass + # Module: Config # ... #def cConfig(update:telegram.Update, context:CallbackContext) -> None: @@ -46,12 +49,13 @@ def cSource(context, data=None) -> None: # # ... language: en, it, ... # # ... userdata: import, export, delete -def cPing(context, data=None) -> None: - SendMsg(context, {"Text": "*Pong!*"}) +def cPing(context:EventContext, data:InputMessageData) -> None: + SendMessage(context, {"Text": "*Pong!*"}) -def cEcho(context, data) -> None: - if data.Body: +def cEcho(context:EventContext, data:InputMessageData) -> None: + if data.command.body: prefix = "🗣️ " + #prefix = f"[🗣️]({context.linker(data).message}) " if len(data.Tokens) == 2: nonascii = True for char in data.Tokens[1]: @@ -61,50 +65,40 @@ def cEcho(context, data) -> None: if nonascii: # text is not ascii, probably an emoji (altough not necessarily), so just pass as is (useful for Telegram emojis) prefix = '' - SendMsg(context, {"Text": (prefix + data.Body)}) + SendMessage(context, {"Text": (prefix + data.command.body)}) else: - SendMsg(context, {"Text": choice(Locale.__('echo.empty'))}) - -def cBroadcast(context, data) -> None: - if data.User.Id not in AdminIds: - return SendMsg(context, {"Text": choice(Locale.__('eval'))}) - if len(data.Tokens) < 3: - return SendMsg(context, {"Text": "Bad usage."}) - Dest = data.Tokens[1] - Text = ' '.join(data.Tokens[2:]) - SendMsg(context, {"TextPlain": Text}, Dest) - SendMsg(context, {"TextPlain": "Executed."}) + SendMessage(context, {"Text": choice(Locale.__('echo.empty'))}) #def cTime(update:Update, context:CallbackContext) -> None: # update.message.reply_markdown_v2( # CharEscape(choice(Locale.__('time')).format(time.ctime().replace(' ', ' ')), 'MARKDOWN_SPEECH'), # reply_to_message_id=update.message.message_id) -def cEval(context, data=None) -> None: - SendMsg(context, {"Text": choice(Locale.__('eval'))}) +def cEval(context:EventContext, data:InputMessageData) -> None: + SendMessage(context, {"Text": choice(Locale.__('eval'))}) -def cExec(context, data) -> None: +def cExec(context:EventContext, data:InputMessageData) -> None: if len(data.Tokens) >= 2 and data.Tokens[1].lower() in ExecAllowed: cmd = data.Tokens[1].lower() Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {cmd}'), stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode() # Caption = (re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', Out)) - SendMsg(context, { + SendMessage(context, { "TextPlain": Caption, "TextMarkdown": MarkdownCode(Caption, True), }) else: - SendMsg(context, {"Text": choice(Locale.__('eval'))}) + SendMessage(context, {"Text": choice(Locale.__('eval'))}) RegisterModule(name="Misc", endpoints={ "Percenter": CreateEndpoint(["wish", "level"], summary="Provides fun trough percentage-based toys.", handler=mPercenter), "Multifun": CreateEndpoint(["hug", "pat", "poke", "cuddle", "hands", "floor", "sessocto"], summary="Provides fun trough preprogrammed-text-based toys.", handler=mMultifun), "Start": CreateEndpoint(["start"], summary="Salutes the user, hinting that the bot is working and providing basic quick help.", handler=cStart), "Source": CreateEndpoint(["source"], summary="Provides a copy of the bot source codes and/or instructions on how to get it.", handler=cSource), + "GDPR": CreateEndpoint(["gdpr"], summary="Operations for european citizens regarding your personal data.", handler=cGdpr), "Ping": CreateEndpoint(["ping"], summary="Responds pong, useful for testing messaging latency.", handler=cPing), "Echo": CreateEndpoint(["echo"], summary="Responds back with the original text of the received message.", handler=cEcho), - "Broadcast": CreateEndpoint(["broadcast"], summary="Sends an admin message over to any chat destination.", handler=cBroadcast), "Eval": CreateEndpoint(["eval"], summary="Execute a Python command (or safe literal operation) in the current context. Currently not implemented.", handler=cEval), "Exec": CreateEndpoint(["exec"], summary="Execute a system command from the allowed ones and return stdout+stderr.", handler=cExec), #"Format": CreateEndpoint(["format"], summary="Reformat text using an handful of rules. Not yet implemented.", handler=cFormat), diff --git a/ModWinDog/Scrapers/Scrapers.py b/ModWinDog/Scrapers/Scrapers.py index ace7ddd..672d25d 100755 --- a/ModWinDog/Scrapers/Scrapers.py +++ b/ModWinDog/Scrapers/Scrapers.py @@ -9,94 +9,120 @@ SeleniumDriversLimit = 2 """ # end windog config # """ -currentSeleniumDrivers = 0 +currentSeleniumDrivers = [] -#from selenium import webdriver -#from selenium.webdriver import Chrome -#from selenium.webdriver.common.by import By from seleniumbase import Driver -def getSelenium() -> Driver: - global currentSeleniumDrivers - if currentSeleniumDrivers >= SeleniumDriversLimit: +# TODO implement some kind of timeout after a closure of a browser, since otherwise we get in a buggy state sometimes? + +def getSelenium() -> tuple[int, Driver]|bool: + if len(currentSeleniumDrivers) == SeleniumDriversLimit: return False - #options = webdriver.ChromeOptions() - #options.add_argument("headless=new") - #options.add_argument("user-data-dir=./Selenium-WinDog") - #seleniumDriver = Chrome(options=options) - currentSeleniumDrivers += 1 - return Driver(uc=True, headless2=True, user_data_dir=f"./Selenium-WinDog/{currentSeleniumDrivers}") + for index in range(1, (SeleniumDriversLimit + 1)): + if index not in currentSeleniumDrivers: + currentSeleniumDrivers.append(index) + break + return (index, Driver(uc=True, headless2=True, user_data_dir=f"./Selenium-WinDog/{index}")) -def closeSelenium(driver:Driver) -> None: - global currentSeleniumDrivers - try: - driver.close() - driver.quit() - except: - Log(format_exc()) - if currentSeleniumDrivers > 0: - currentSeleniumDrivers -= 1 +def closeSelenium(index:int, driver:Driver) -> None: + if driver: + try: + driver.close() + driver.quit() + except: + Log(format_exc()) + if index: + currentSeleniumDrivers.remove(index) -def cDalleSelenium(context, data) -> None: - if not data.Body: - return SendMsg(context, {"Text": "Please tell me what to generate."}) - #if not seleniumDriver: - # SendMsg(context, {"Text": "Initializing Selenium, please wait..."}) - # loadSeleniumDriver() +def cDalleSelenium(context:EventContext, data:InputMessageData) -> None: + warning_text = "has been blocked by Microsoft because it violates their content policy. Further attempts might lead to a ban on your profile. Please review the Code of Conduct for Image Creator in this picture or at https://www.bing.com/new/termsofuseimagecreator#content-policy." + prompt = data.command.body + if not prompt: + return SendMessage(context, {"Text": "Please tell me what to generate."}) + driver_index, driver = None, None try: driver = getSelenium() if not driver: - return SendMsg(context, {"Text": "Couldn't access a web scraping VM as they are all busy. Please try again later."}) + return SendMessage(context, {"Text": "Couldn't access a web scraping VM as they are all busy. Please try again later."}) + driver_index, driver = driver driver.get("https://www.bing.com/images/create/") driver.refresh() - #retry_index = 3 - #while retry_index < 12: - # time.sleep(retry_index := retry_index + 1) - # try: - #seleniumDriver.find_element(By.CSS_SELECTOR, 'form input[name="q"]').send_keys(data.Body) - #seleniumDriver.find_element(By.CSS_SELECTOR, 'form a[role="button"]').submit() - driver.find_element('form input[name="q"]').send_keys(data.Body) + driver.find_element('form input[name="q"]').send_keys(prompt) driver.find_element('form a[role="button"]').submit() try: - driver.find_element('img[alt="Content warning"]') - SendMsg(context, {"Text": "This prompt has been blocked by Microsoft because it violates their content policy. Further attempts might lead to a ban on your profile."}) - closeSelenium(driver) - return + driver.find_element('img.gil_err_img[alt="Content warning"]') + SendMessage(context, {"Text": f"Content warning: This prompt {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read()}}) + return closeSelenium(driver_index, driver) except Exception: # warning element was not found, we should be good pass - SendMsg(context, {"Text": "Request sent successfully, please wait..."}) - # except Exception: - # pass + SendMessage(context, {"Text": "Request sent successfully, please wait..."}) retry_index = 3 while retry_index < 12: - # note that sometimes generation fails and we will never get any image! - #try: + # note that sometimes generation can still fail and we will never get any image! time.sleep(retry_index := retry_index + 1) driver.refresh() - img_list = driver.find_elements(#By.CSS_SELECTOR, - 'div.imgpt a img.mimg') + img_list = driver.find_elements('div.imgpt a img.mimg') if not len(img_list): - continue + try: + driver.find_element('img.gil_err_img[alt="Unsafe image content detected"]') + SendMessage(context, {"Text": "Unsafe image content detected: This result {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read()}}) + return closeSelenium(driver_index, driver) + except: # no error is present, so we just have to wait more for the images + continue img_array = [] for img_url in img_list: img_url = img_url.get_attribute("src").split('?')[0] img_array.append({"url": img_url}) #, "bytes": HttpReq(img_url).read()}) page_url = driver.current_url.split('?')[0] - SendMsg(context, { - "TextPlain": f'"{data.Body}"\n{{{page_url}}}', - "TextMarkdown": (f'"_{CharEscape(data.Body, "MARKDOWN")}_"\n' + MarkdownCode(page_url, True)), + SendMessage(context, { + "TextPlain": f'"{prompt}"\n{{{page_url}}}', + "TextMarkdown": (f'"_{CharEscape(prompt, "MARKDOWN")}_"\n' + MarkdownCode(page_url, True)), "media": img_array, }) - closeSelenium(driver) - break - #except Exception as ex: - # pass + return closeSelenium(driver_index, driver) + raise Exception("VM timed out.") except Exception as error: Log(format_exc()) - SendMsg(context, {"TextPlain": "An unexpected error occurred."}) - closeSelenium(driver) + SendMessage(context, {"TextPlain": "An unexpected error occurred."}) + closeSelenium(driver_index, driver) + +def cCraiyonSelenium(context:EventContext, data:InputMessageData) -> None: + prompt = data.command.body + if not prompt: + return SendMessage(context, {"Text": "Please tell me what to generate."}) + driver_index, driver = None, None + try: + driver = getSelenium() + if not driver: + return SendMessage(context, {"Text": "Couldn't access a web scraping VM as they are all busy. Please try again later."}) + driver_index, driver = driver + driver.get("https://www.craiyon.com/") + driver.find_element('textarea#prompt').send_keys(prompt) + driver.execute_script("arguments[0].click();", driver.find_element('button#generateButton')) + SendMessage(context, {"Text": "Request sent successfully, please wait up to 60 seconds..."}) + retry_index = 3 + while retry_index < 16: + time.sleep(retry_index := retry_index + 1) + img_list = driver.find_elements('div.image-container > img') + if not len(img_list): + continue + img_array = [] + for img_elem in img_list: + img_array.append({"url": img_elem.get_attribute("src")}) #, "bytes": HttpReq(img_url).read()}) + SendMessage(context, { + "TextPlain": f'"{prompt}"', + "TextMarkdown": (f'"_{CharEscape(prompt, "MARKDOWN")}_"'), + "media": img_array, + }) + return closeSelenium(driver_index, driver) + raise Exception("VM timed out.") + except Exception as error: + Log(format_exc()) + SendMessage(context, {"TextPlain": "An unexpected error occurred."}) + closeSelenium(driver_index, driver) RegisterModule(name="Scrapers", endpoints={ "DALL-E": CreateEndpoint(["dalle"], summary="Sends an AI-generated picture from DALL-E 3 via Microsoft Bing.", handler=cDalleSelenium), + "Craiyon": CreateEndpoint(["craiyon"], summary="Sends an AI-generated picture from Craiyon.com.", handler=cCraiyonSelenium), }) diff --git a/ModWinDog/Scripting/Scripting.py b/ModWinDog/Scripting/Scripting.py index e3c956b..91671c8 100755 --- a/ModWinDog/Scripting/Scripting.py +++ b/ModWinDog/Scripting/Scripting.py @@ -23,10 +23,11 @@ def luaAttributeFilter(obj, attr_name, is_setting): raise AttributeError("Access Denied.") # TODO make print behave the same as normal Lua, and expose a function for printing without newlines -def cLua(context, data=None) -> None: - scriptText = (data.Body or (data.Quoted and data.Quoted.Body)) +def cLua(context:EventContext, data:InputMessageData) -> None: + # TODO update quoted api getting + scriptText = (data.command.body or (data.Quoted and data.Quoted.Body)) if not scriptText: - return SendMsg(context, {"Text": "You must provide some Lua code to execute."}) + return SendMessage(context, {"Text": "You must provide some Lua code to execute."}) luaRuntime = NewLuaRuntime(max_memory=LuaMemoryLimit, register_eval=False, register_builtins=False, attribute_filter=luaAttributeFilter) luaRuntime.eval(f"""(function() _windog = {{ stdout = "" }} @@ -49,7 +50,7 @@ return _windog.stdout .. (_windog.scriptout or '') end)()""")) except (LuaError, LuaSyntaxError) as error: Log(textOutput := ("Lua Error: " + str(error))) - SendMsg(context, {"TextPlain": textOutput}) + SendMessage(context, {"TextPlain": textOutput}) RegisterModule(name="Scripting", group="Geek", summary="Tools for programming the bot and expanding its features.", endpoints={ "Lua": CreateEndpoint(["lua"], summary="Execute a Lua snippet and get its output.", handler=cLua), diff --git a/WinDog.py b/WinDog.py index 945049e..2f125e6 100755 --- a/WinDog.py +++ b/WinDog.py @@ -22,6 +22,22 @@ from LibWinDog.Database import * # MdEscapes = '\\`*_{}[]()<>#+-.!|=' +class SafeNamespace(SimpleNamespace): + def __getattribute__(self, value): + try: + return super().__getattribute__(value) + except AttributeError: + return None + +class EventContext(SafeNamespace): + pass + +class InputMessageData(SafeNamespace): + pass + +class OutputMessageData(SafeNamespace): + pass + def Log(text:str, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None: endline = '\n' if newline == False or (inline and newline == None): @@ -114,13 +130,13 @@ def HtmlEscapeFull(Raw:str) -> str: def GetRawTokens(text:str) -> list: return text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ') -def ParseCmd(msg) -> SimpleNamespace|None: +def ParseCmd(msg) -> SafeNamespace|None: #if not len(msg) or msg[1] not in CmdPrefixes: # return name = msg.replace('\n', ' ').replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')[0][1:].split('@')[0] #if not name: # return - return SimpleNamespace(**{ + return SafeNamespace(**{ "Name": name.lower(), "Body": name.join(msg.split(name)[1:]).strip(), "Tokens": GetRawTokens(msg), @@ -143,42 +159,46 @@ def RandHexStr(length:int) -> str: hexa += choice('0123456789abcdef') return hexa -def ParseCommand(text:str) -> SimpleNamespace|None: +def ParseCommand(text:str) -> SafeNamespace|None: + command = SafeNamespace() + if not text: + return command text = text.strip() - try: # ensure command is not empty + try: # ensure text is a non-empty command if not (text[0] in CmdPrefixes and text[1:].strip()): - return + return command except IndexError: return - command = SimpleNamespace(**{}) command.tokens = text.replace("\r", " ").replace("\n", " ").replace("\t", " ").replace(" ", " ").replace(" ", " ").split(" ") command.name = command.tokens[0][1:].lower() command.body = text[len(command.tokens[0]):].strip() + if command.name not in Endpoints: + return command if (endpoint_arguments := Endpoints[command.name]["arguments"]): command.arguments = {} - # TODO differences between required (True) and optional (False) args - for index, key in enumerate(endpoint_arguments): + index = 1 + for key in endpoint_arguments: + if not endpoint_arguments[key]: + continue # skip optional (False) arguments for now, they will be implemented later try: - value = command.tokens[index + 1] + value = command.tokens[index] command.body = command.body[len(value):].strip() except IndexError: value = None command.arguments[key] = value + index += 1 return command -def OnMessageParsed(data:SimpleNamespace) -> None: +def OnMessageParsed(data:InputMessageData) -> None: if Debug and (DumpToFile or DumpToConsole): text = (data.text_auto.replace('\n', '\\n') if data.text_auto else '') - text = f"[{int(time.time())}] [{time.ctime()}] [{data.room_id}] [{data.message_id}] [{data.user.id}] {text}" + text = f"[{int(time.time())}] [{time.ctime()}] [{data.room.id}] [{data.message_id}] [{data.user.id}] {text}" if DumpToConsole: print(text) if DumpToFile: open((DumpToFile if (DumpToFile and type(DumpToFile) == str) else "./Dump.txt"), 'a').write(text + '\n') -def SendMsg(context, data, destination=None) -> None: - return SendMessage(context, data, destination) - -def SendMessage(context, data, destination=None) -> None: +def SendMessage(context, data:OutputMessageData, destination=None) -> None: if type(context) == dict: event = context['Event'] if 'Event' in context else None manager = context['Manager'] if 'Manager' in context else None @@ -195,14 +215,14 @@ def SendMessage(context, data, destination=None) -> None: textMarkdown = CharEscape(HtmlUnescape(data['Text']), InferMdEscape(HtmlUnescape(data['Text']), textPlain)) for platform in Platforms: platform = Platforms[platform] - if isinstanceSafe(event, InDict(platform, "eventClass")) or isinstanceSafe(manager, InDict(platform, "managerClass")): - platform["sender"](event, manager, data, destination, textPlain, textMarkdown) + if isinstanceSafe(event, platform.eventClass) or isinstanceSafe(manager, platform.managerClass): + platform.sender(event, manager, data, destination, textPlain, textMarkdown) -def SendReaction() -> None: +def SendNotice(context, data) -> None: pass -def RegisterPlatform(name:str, main:callable, sender:callable, *, eventClass=None, managerClass=None) -> None: - Platforms[name] = {"main": main, "sender": sender, "eventClass": eventClass, "managerClass": managerClass} +def RegisterPlatform(name:str, main:callable, sender:callable, linker:callable=None, *, eventClass=None, managerClass=None) -> None: + Platforms[name] = SafeNamespace(main=main, sender=sender, linker=linker, eventClass=eventClass, managerClass=managerClass) Log(f"{name}, ", inline=True) def RegisterModule(name:str, endpoints:dict, *, group:str|None=None, summary:str|None=None) -> None: @@ -217,12 +237,28 @@ def RegisterModule(name:str, endpoints:dict, *, group:str|None=None, summary:str def CreateEndpoint(names:list[str], handler:callable, arguments:dict[str, bool]|None=None, *, summary:str|None=None) -> dict: return {"names": names, "summary": summary, "handler": handler, "arguments": arguments} +def WriteNewConfig() -> None: + Log("💾️ No configuration found! Generating and writing to `./Config.py`... ", inline=True) + with open("./Config.py", 'w') as configFile: + opening = '# windog config start #' + closing = '# end windog config #' + for folder in ("LibWinDog", "ModWinDog"): + for file in glob(f"./{folder}/**/*.py", recursive=True): + try: + name = '/'.join(file.split('/')[1:-1]) + heading = f"# ==={'=' * len(name)}=== #" + source = open(file, 'r').read().replace(f"''' {opening}", f'""" {opening}').replace(f"{closing} '''", f'{closing} """') + content = '\n'.join(content.split(f'""" {opening}')[1].split(f'{closing} """')[0].split('\n')[1:-1]) + configFile.write(f"{heading}\n# 🔽️ {name} 🔽️ #\n{heading}\n{content}\n\n") + except IndexError: + pass + def Main() -> None: #SetupDb() SetupLocales() Log(f"📨️ Initializing Platforms... ", newline=False) for platform in Platforms: - if Platforms[platform]["main"](): + if Platforms[platform].main(): Log(f"{platform}, ", inline=True) Log("...Done. ✅️", inline=True, newline=True) Log("🐶️ WinDog Ready!") @@ -265,20 +301,7 @@ if __name__ == '__main__': if isfile("./Config.py"): from Config import * else: - Log("💾️ No configuration found! Generating and writing to `./Config.py`... ", inline=True) - with open("./Config.py", 'w') as configFile: - opening = '# windog config start #' - closing = '# end windog config #' - for folder in ("LibWinDog", "ModWinDog"): - for file in glob(f"./{folder}/**/*.py", recursive=True): - try: - name = '/'.join(file.split('/')[1:-1]) - heading = f"# ==={'=' * len(name)}=== #" - source = open(file, 'r').read().replace(f"''' {opening}", f'""" {opening}').replace(f"{closing} '''", f'{closing} """') - content = '\n'.join(content.split(f'""" {opening}')[1].split(f'{closing} """')[0].split('\n')[1:-1]) - configFile.write(f"{heading}\n# 🔽️ {name} 🔽️ #\n{heading}\n{content}\n\n") - except IndexError: - pass + WriteNewConfig() Log("Done. ✅️", inline=True, newline=True) Main()