Start Matrix support, more refactoring, cleaner /help and /translate

This commit is contained in:
2024-06-17 00:32:03 +02:00
parent d687cbd51e
commit 09cf925850
19 changed files with 236 additions and 189 deletions

5
.gitignore vendored
View File

@ -1,4 +1,5 @@
Database.json
Dump.txt
/Config.py
/Database.sqlite
/Dump.txt
/session.txt
*.pyc

View File

@ -4,16 +4,18 @@
# ================================== #
# If you have modified the bot's code, you should set this
ModifiedSourceUrl = ''
ModifiedSourceUrl = ""
MastodonUrl = ''
MastodonToken = ''
# Only for the platforms you want to use, uncomment the below credentials and fill with your own:
TelegramId = 1637713483
TelegramToken = "0123456789:abcdefghijklmnopqrstuvwxyz123456789"
TelegramAdmins = [ 123456789, 634314973, ]
TelegramWhitelist = [ 123456789, 634314973, ]
TelegramRestrict = False
# MastodonUrl = "https://mastodon.example.com"
# MastodonToken = ""
# MatrixUrl = "https://matrix.example.com"
# MatrixUsername = "username"
# MatrixPassword = "hunter2"
# TelegramToken = "1234567890:abcdefghijklmnopqrstuvwxyz123456789"
AdminIds = [ "123456789@telegram", "634314973@telegram", "admin@activitypub@mastodon.example.com", ]
@ -25,29 +27,8 @@ CmdPrefixes = ".!/"
ExecAllowed = {"date": False, "fortune": False, "neofetch": True, "uptime": False}
WebUserAgent = "WinDog v.Staging"
# TODO deprecate this in favour of new module API
Endpoints = (Endpoints | {
"start": cStart,
#"config": cConfig,
"source": cSource,
"ping": cPing,
"echo": cEcho,
"broadcast": cBroadcast,
#"repeat": cRepeat,
"wish": percenter,
"level": percenter,
"hug": multifun,
"pat": multifun,
"poke": multifun,
"cuddle": multifun,
"floor": multifun,
"hands": multifun,
"sessocto": multifun,
#"encode": cEncode,
#"decode": cDecode,
#"time": cTime,
"eval": cEval,
"exec": cExec,
#"format": cFormat,
#"frame": cFrame,
ModuleGroups = (ModuleGroups | {
"Basic": "",
"Geek": "",
})

8
LibWinDog/Database.py Normal file
View File

@ -0,0 +1,8 @@
from peewee import *
Db = SqliteDatabase("Database.sqlite")
class BaseModel(Model):
class Meta:
database = Db

View File

@ -1,36 +1,45 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
MastodonUrl, MastodonToken = None, None
import mastodon
from bs4 import BeautifulSoup
def MastodonSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -> None:
if InDict(Data, 'Media'):
Media = manager.media_post(Data['Media'], Magic(mime=True).from_buffer(Data['Media']))
while Media['url'] == 'null':
Media = manager.media(Media)
if TextPlain:
manager.status_post(
status=(TextPlain + '\n\n@' + event['account']['acct']),
media_ids=(Media if InDict(Data, 'Media') else None),
in_reply_to_id=event['status']['id'],
visibility=('direct' if event['status']['visibility'] == 'direct' else 'unlisted'),
)
def MastodonMain() -> None:
def MastodonMain() -> bool:
if not (MastodonUrl and MastodonToken):
return
return False
Mastodon = mastodon.Mastodon(api_base_url=MastodonUrl, access_token=MastodonToken)
class MastodonListener(mastodon.StreamListener):
def on_notification(self, event):
if event['type'] == 'mention':
Msg = BeautifulSoup(event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ')
if not Msg.split('@')[0]:
Msg = ' '.join('@'.join(Msg.split('@')[1:]).strip().split(' ')[1:]).strip()
if Msg[0] in CmdPrefixes:
cmd = ParseCmd(Msg)
if cmd:
cmd.messageId = event['status']['id']
if cmd.Name in Endpoints:
Endpoints[cmd.Name]({"Event": event, "Manager": Mastodon}, cmd)
OnMessageReceived()
message = BeautifulSoup(event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ')
if not message.split('@')[0]:
message = ' '.join('@'.join(message.split('@')[1:]).strip().split(' ')[1:]).strip()
if message[0] in CmdPrefixes:
command = ParseCmd(message)
if command:
command.messageId = event['status']['id']
if command.Name in Endpoints:
Endpoints[command.Name]({"Event": event, "Manager": Mastodon}, command)
Mastodon.stream_user(MastodonListener(), run_async=True)
return True
def MastodonSender(event, manager, data, destination, textPlain, textMarkdown) -> None:
if InDict(data, 'Media'):
Media = manager.media_post(data['Media'], Magic(mime=True).from_buffer(data['Media']))
while Media['url'] == 'null':
Media = manager.media(Media)
if textPlain or Media:
manager.status_post(
status=(textPlain + '\n\n@' + event['account']['acct']),
media_ids=(Media if InDict(data, 'Media') else None),
in_reply_to_id=event['status']['id'],
visibility=('direct' if event['status']['visibility'] == 'direct' else 'unlisted'),
)
RegisterPlatform(name="Mastodon", main=MastodonMain, sender=MastodonSender, managerClass=mastodon.Mastodon)

View File

@ -1,8 +0,0 @@
def MatrixMain() -> None:
pass
def MatrixSender() -> None:
pass
#RegisterPlatform(name="Matrix", main=MatrixMain, sender=MatrixSender)

View File

@ -0,0 +1,34 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
MatrixUrl, MatrixUsername, MatrixPassword = None, None, None
import nio
import simplematrixbotlib as MatrixBotLib
from threading import Thread
def MatrixMain() -> bool:
if not (MatrixUrl and MatrixUsername and MatrixPassword):
return False
MatrixBot = MatrixBotLib.Bot(MatrixBotLib.Creds(MatrixUrl, MatrixUsername, MatrixPassword))
@MatrixBot.listener.on_message_event
@MatrixBot.listener.on_custom_event(nio.events.room_events.RoomMessageFile)
async def MatrixMessageListener(room, message) -> None:
pass
#print(message)
#match = MatrixBotLib.MessageMatch(room, message, MatrixBot)
#OnMessageReceived()
#if match.is_not_from_this_bot() and match.command("windogtest"):
# pass #await MatrixBot.api.send_text_message(room.room_id, " ".join(arg for arg in match.args()))
def runMatrixBot() -> None:
MatrixBot.run()
Thread(target=runMatrixBot).start()
return True
def MatrixSender() -> None:
pass
#RegisterPlatform(name="Matrix", main=MatrixMain, sender=MatrixSender)

View File

@ -0,0 +1 @@
simplematrixbotlib

View File

@ -1,26 +1,28 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
TelegramToken = None
import telegram, telegram.ext
from telegram import ForceReply, Bot
from telegram.utils.helpers import escape_markdown
from telegram.ext import CommandHandler, MessageHandler, Filters, CallbackContext
def TelegramCmdAllowed(update:telegram.Update) -> bool:
if not TelegramRestrict:
return True
if TelegramRestrict.lower() == 'whitelist':
if update.message.chat.id in TelegramWhitelist:
return True
def TelegramMain() -> bool:
if not TelegramToken:
return False
updater = telegram.ext.Updater(TelegramToken)
dispatcher = updater.dispatcher
dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramHandler))
updater.start_polling()
return True
def TelegramHandleCmd(update:telegram.Update):
TelegramQueryHandle(update)
if TelegramCmdAllowed(update):
return ParseCmd(update.message.text)
else:
return False
def TelegramQueryHandle(update:telegram.Update, context:CallbackContext=None) -> None:
def TelegramHandler(update:telegram.Update, context:CallbackContext=None) -> None:
if not (update and update.message):
return
OnMessageReceived()
cmd = ParseCmd(update.message.text)
if cmd:
cmd.messageId = update.message.message_id
@ -70,13 +72,5 @@ def TelegramSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -
elif TextPlain:
event.message.reply_text(TextPlain, reply_to_message_id=replyToId)
def TelegramMain() -> None:
if not TelegramToken:
return
updater = telegram.ext.Updater(TelegramToken)
dispatcher = updater.dispatcher
dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramQueryHandle))
updater.start_polling()
RegisterPlatform(name="Telegram", main=TelegramMain, sender=TelegramSender, eventClass=telegram.Update)

View File

@ -1,3 +1,8 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
def WebMain() -> None:
pass

View File

@ -113,7 +113,7 @@
},
"hands": {
"empty": [
"*Le @manineuwu? 😳️*",
"*Le t.me/manineuwu? 😳️*",
"*A chi vuoi dare le manine? Rispondi a qualcuno.*"
],
"bot": [

View File

@ -1 +1,7 @@
import base64
#RegisterModule(name="Codings", group="Geek", endpoints={
# "Encode": CreateEndpoint(["encode"], summary="", handler=cEncode),
# "Decode": CreateEndpoint(["decode"], summary="", handler=cDecode),
#})

View File

@ -1,3 +1,8 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
import hashlib
def cHash(context, data) -> None:

View File

@ -1,7 +1,11 @@
# TODO: implement /help <commandname> feature
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
# TODO: implement /help <commandname> feature
def cHelp(context, data=None) -> None:
moduleList, commands = '', ''
moduleList = ''
for module in Modules:
summary = Modules[module]["summary"]
endpoints = Modules[module]["endpoints"]
@ -9,9 +13,7 @@ def cHelp(context, data=None) -> None:
for endpoint in endpoints:
summary = endpoints[endpoint]["summary"]
moduleList += (f"\n* /{', /'.join(endpoints[endpoint]['names'])}" + (f": {summary}" if summary else ''))
for cmd in Endpoints.keys():
commands += f'* /{cmd}\n'
SendMsg(context, {"Text": f"[ Available Modules ]{moduleList}\n\nFull Endpoints List:\n{commands}"})
SendMsg(context, {"Text": f"[ Available Modules ]{moduleList}"})
RegisterModule(name="Help", group="Basic", endpoints={
"Help": CreateEndpoint(["help"], summary="Provides help for the bot. For now, it just lists the commands.", handler=cHelp),

View File

@ -1,3 +1,8 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
from urlextract import URLExtract
from urllib import parse as UrlParse
from urllib.request import urlopen, Request
@ -63,14 +68,20 @@ def cWeb(context, data) -> None:
else:
pass
def cImages(context, data) -> None:
pass
def cNews(context, data) -> None:
pass
def cTranslate(context, data) -> None:
if len(data.Tokens) < 3:
return
try:
Lang = data.Tokens[1]
toLang = data.Tokens[1]
# TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one
Result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{Lang}/{UrlParse.quote(Lang.join(data.Body.split(Lang)[1:]))}').read())["translation"]
SendMsg(context, {"TextPlain": Result})
result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{toLang}/{UrlParse.quote(toLang.join(data.Body.split(toLang)[1:]))}').read())
SendMsg(context, {"TextPlain": f"[{result['info']['detectedSource']} (auto) -> {toLang}]\n\n{result['translation']}"})
except Exception:
raise
@ -119,7 +130,7 @@ def cSafebooru(context, data) -> None:
except Exception:
raise
RegisterModule(name="Internet", group="Internet", summary="Tools and toys related to the Internet.", endpoints={
RegisterModule(name="Internet", summary="Tools and toys related to the Internet.", endpoints={
"Embedded": CreateEndpoint(["embedded"], summary="Rewrites a link, trying to bypass embed view protection.", handler=cEmbedded),
"Web": CreateEndpoint(["web"], summary="Provides results of a DuckDuckGo search.", handler=cWeb),
"Translate": CreateEndpoint(["translate"], summary="Returns the received message after translating it in another language.", handler=cTranslate),

View File

@ -3,21 +3,19 @@
# Licensed under AGPLv3 by OctoSpacc #
# ==================================== #
# Module: Percenter
# Provides fun trough percentage-based toys.
def percenter(context, data) -> None:
import re, subprocess
def mPercenter(context, data) -> None:
SendMsg(context, {"Text": choice(Locale.__(f'{data.Name}.{"done" if data.Body else "empty"}')).format(
Cmd=data.Tokens[0], Percent=RandPercent(), Thing=data.Body)})
# Module: Multifun
# Provides fun trough preprogrammed-text-based toys.
def multifun(context, data) -> None:
def mMultifun(context, data) -> None:
cmdkey = data.Name
replyToId = None
if data.Quoted:
replyFromUid = data.Quoted.User.Id
# TODO work on all platforms for the bot id
if int(replyFromUid.split('@')[0]) == int(TelegramId) and 'bot' in Locale.__(cmdkey):
if replyFromUid.split('@')[0] == TelegramToken.split(':')[0] and 'bot' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.bot'))
elif replyFromUid == data.User.Id and 'self' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.self')).format(data.User.Name)
@ -30,13 +28,9 @@ def multifun(context, data) -> None:
Text = choice(Locale.__(f'{cmdkey}.empty'))
SendMsg(context, {"Text": Text, "ReplyTo": replyToId})
# Module: Start
# Salutes the user, hinting that the bot is working and providing basic quick help.
def cStart(context, data) -> None:
SendMsg(context, {"Text": choice(Locale.__('start')).format(data.User.Name)})
# Module: Source
# Provides a copy of the bot source codes and/or instructions on how to get it.
def cSource(context, data=None) -> None:
SendMsg(context, {"TextPlain": ("""\
* Original Code: {https://gitlab.com/octospacc/WinDog}
@ -52,13 +46,9 @@ def cSource(context, data=None) -> None:
# # ... language: en, it, ...
# # ... userdata: import, export, delete
# Module: Ping
# Responds pong, useful for testing messaging latency.
def cPing(context, data=None) -> None:
SendMsg(context, {"Text": "*Pong!*"})
# Module: Echo
# Responds back with the original text of the received message.
def cEcho(context, data) -> None:
if data.Body:
prefix = "🗣️ "
@ -75,8 +65,6 @@ def cEcho(context, data) -> None:
else:
SendMsg(context, {"Text": choice(Locale.__('echo.empty'))})
# Module: Broadcast
# Sends an admin message over to another destination
def cBroadcast(context, data) -> None:
if data.User.Id not in AdminIds:
return SendMsg(context, {"Text": choice(Locale.__('eval'))})
@ -92,13 +80,9 @@ def cBroadcast(context, data) -> None:
# CharEscape(choice(Locale.__('time')).format(time.ctime().replace(' ', ' ')), 'MARKDOWN_SPEECH'),
# reply_to_message_id=update.message.message_id)
# Module: Eval
# Execute a Python command (or safe literal operation) in the current context. Currently not implemented.
def cEval(context, data=None) -> None:
SendMsg(context, {"Text": choice(Locale.__('eval'))})
# Module: Exec
# Execute a system command from the allowed ones and return stdout/stderr.
def cExec(context, data) -> None:
if len(data.Tokens) >= 2 and data.Tokens[1].lower() in ExecAllowed:
cmd = data.Tokens[1].lower()
@ -113,13 +97,18 @@ def cExec(context, data) -> None:
else:
SendMsg(context, {"Text": choice(Locale.__('eval'))})
# Module: Format
# Reformat text using an handful of rules. Currently not implemented.
def cFormat(context, data=None) -> None:
pass
# Module: Frame
# Frame someone's message into a platform-styled image. Currently not implemented.
def cFrame(context, data=None) -> None:
pass
RegisterModule(name="Misc", endpoints={
"Percenter": CreateEndpoint(["wish", "level"], summary="Provides fun trough percentage-based toys.", handler=mPercenter),
"Multifun": CreateEndpoint(["hug", "pat", "poke", "cuddle", "hands", "floor", "sessocto"], summary="Provides fun trough preprogrammed-text-based toys.", handler=mMultifun),
"Start": CreateEndpoint(["start"], summary="Salutes the user, hinting that the bot is working and providing basic quick help.", handler=cStart),
"Source": CreateEndpoint(["source"], summary="Provides a copy of the bot source codes and/or instructions on how to get it.", handler=cSource),
"Ping": CreateEndpoint(["ping"], summary="Responds pong, useful for testing messaging latency.", handler=cPing),
"Echo": CreateEndpoint(["echo"], summary="Responds back with the original text of the received message.", handler=cEcho),
"Broadcast": CreateEndpoint(["broadcast"], summary="Sends an admin message over to any chat destination.", handler=cBroadcast),
"Eval": CreateEndpoint(["eval"], summary="Execute a Python command (or safe literal operation) in the current context. Currently not implemented.", handler=cEval),
"Exec": CreateEndpoint(["exec"], summary="Execute a system command from the allowed ones and return stdout+stderr.", handler=cExec),
#"Format": CreateEndpoint(["format"], summary="Reformat text using an handful of rules. Not yet implemented.", handler=cFormat),
#"Frame": CreateEndpoint(["frame"], summary="Frame someone's message into a platform-styled image. Not yet implemented.", handler=cFrame),
#"Repeat": CreateEndpoint(["repeat"], summary="I had this planned but I don't remember what this should have done. Not yet implemented.", handler=cRepeat),
})

View File

@ -1,24 +1,18 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
luaCycleLimit = 10000
luaMemoryLimit = (512 * 1024) # 512 KB
luaCrashMessage = f"Lua Error: Script has been forcefully terminated due to having exceeded the max cycle count limit ({luaCycleLimit})."
luaCrashMessage = f"Script has been forcefully terminated due to having exceeded the max cycle count limit ({luaCycleLimit})."
from lupa import LuaRuntime as NewLuaRuntime, LuaError, LuaSyntaxError
# Use specific Lua version; always using the latest is risky due to possible new APIs and using JIT is vulnerable
from lupa.lua54 import LuaRuntime as NewLuaRuntime, LuaError, LuaSyntaxError
def luaAttributeFilter(obj, attr_name, is_setting):
raise AttributeError("Access Denied.")
#LuaRuntime = NewLuaRuntime(max_memory=(16 * 1024**2), register_eval=False, register_builtins=False, attribute_filter=luaAttributeFilter)
#for key in LuaRuntime.globals():
# if key not in ("error", "assert", "math", "type"):
# del LuaRuntime.globals()[key]
#luaGlobalsCopy = dict(LuaRuntime.globals()) # should this manually handle nested stuff?
# this way to prevent overwriting of global fields is flawed since this doesn't protect concurrent scripts
# better to use the currently active solution of a dedicated instance for each new script running
#def luaFunctionRunner(userFunction:callable):
# for key in luaGlobalsCopy:
# LuaRuntime.globals()[key] = luaGlobalsCopy[key]
# return userFunction()
# TODO make print behave the same as normal Lua, and expose a function for printing without newlines
def cLua(context, data=None) -> None:
scriptText = (data.Body or (data.Quoted and data.Quoted.Body))
@ -27,12 +21,12 @@ def cLua(context, data=None) -> None:
luaRuntime = NewLuaRuntime(max_memory=luaMemoryLimit, register_eval=False, register_builtins=False, attribute_filter=luaAttributeFilter)
luaRuntime.eval(f"""(function()
_windog = {{ stdout = "" }}
function print (text, endl) _windog.stdout = _windog.stdout .. text .. (endl ~= false and "\\n" or "") end
function print (text, endl) _windog.stdout = _windog.stdout .. tostring(text) .. (endl ~= false and "\\n" or "") end
function luaCrashHandler () return error("{luaCrashMessage}") end
debug.sethook(luaCrashHandler, "", {luaCycleLimit})
end)()""")
for key in luaRuntime.globals():
if key not in ("error", "assert", "math", "string", "print", "_windog"):
if key not in ["error", "assert", "math", "string", "tostring", "print", "_windog"]:
del luaRuntime.globals()[key]
try:
textOutput = ("[ʟᴜᴀ ꜱᴛᴅᴏᴜᴛ]\n\n" + luaRuntime.eval(f"""(function()
@ -40,7 +34,7 @@ _windog.scriptout = (function()\n{scriptText}\nend)()
return _windog.stdout .. (_windog.scriptout or '')
end)()"""))
except (LuaError, LuaSyntaxError) as error:
Log(textOutput := str("Lua Error: " + error))
Log(textOutput := ("Lua Error: " + str(error)))
SendMsg(context, {"TextPlain": textOutput})
RegisterModule(name="Scripting", group="Geek", summary="Tools for programming the bot and expanding its features.", endpoints={

View File

@ -15,6 +15,12 @@ In case you want to run your own instance:
1. `git clone --depth 1 https://gitlab.com/octospacc/WinDog && cd ./WinDog` to get the code.
2. `find -type f -name requirements.txt -exec python3 -m pip install -U -r {} \;` to install the full package of dependencies.
3. `cp ./LibWinDog/Config.py ./` and, in the new file, edit essential fields like user credentials, then delete the unmodified fields.
3. `cp ./LibWinDog/Config.py ./` and, in the new file, edit essential fields (like user credentials), uncommenting them where needed, then delete the unmodified fields.
4. `sh ./StartWinDog.sh` to start the bot every time.
All my source code mirrors for the bot:
* GitLab (primary): <https://gitlab.com/octospacc/WinDog>
* GitHub: <https://github.com/octospacc/WinDog>
* Gitea.it: <https://gitea.it/octospacc/WinDog>

View File

@ -4,7 +4,7 @@
# Licensed under AGPLv3 by OctoSpacc #
# ==================================== #
import json, re, time, subprocess
import json, time
from binascii import hexlify
from magic import Magic
from os import listdir
@ -15,12 +15,16 @@ from traceback import format_exc
from bs4 import BeautifulSoup
from html import unescape as HtmlUnescape
from markdown import markdown
from LibWinDog.Database import *
# <https://daringfireball.net/projects/markdown/syntax#backslash>
MdEscapes = '\\`*_{}[]()<>#+-.!|='
def Log(text:str, level:str="?") -> None:
print(f"[{level}] [{int(time.time())}] {text}")
def Log(text:str, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
endline = '\n'
if newline == False or (inline and newline == None):
endline = ''
print((text if inline else f"[{level}] [{int(time.time())}] {text}"), end=endline)
def SetupLocales() -> None:
global Locale
@ -55,14 +59,6 @@ def SetupLocales() -> None:
Locale['Locale'] = Locale
Locale = SimpleNamespace(**Locale)
def SetupDb() -> None:
global Db
try:
with open('Database.json', 'r') as file:
Db = json.load(file)
except Exception:
pass
def InDict(Dict:dict, Key:str) -> any:
if Key in Dict:
return Dict[Key]
@ -94,7 +90,7 @@ def InferMdEscape(raw:str, plain:str) -> str:
return chars
def MarkdownCode(text:str, block:bool) -> str:
return '```\n' + text.strip().replace('`', '\`') + '\n```'
return ('```\n' + text.strip().replace('`', '\`') + '\n```')
def MdToTxt(md:str) -> str:
return BeautifulSoup(markdown(md), 'html.parser').get_text(' ')
@ -135,58 +131,68 @@ def RandHexStr(length:int) -> str:
hexa += choice('0123456789abcdef')
return hexa
def SendMsg(Context, Data, Destination=None) -> None:
if type(Context) == dict:
Event = Context['Event'] if 'Event' in Context else None
Manager = Context['Manager'] if 'Manager' in Context else None
def OnMessageReceived() -> None:
pass
def SendMsg(context, data, destination=None) -> None:
if type(context) == dict:
event = context['Event'] if 'Event' in context else None
manager = context['Manager'] if 'Manager' in context else None
else:
[Event, Manager] = [Context, Context]
if InDict(Data, 'TextPlain') or InDict(Data, 'TextMarkdown'):
TextPlain = InDict(Data, 'TextPlain')
TextMarkdown = InDict(Data, 'TextMarkdown')
if not TextPlain:
TextPlain = TextMarkdown
elif InDict(Data, 'Text'):
# our old system attemps to always receive Markdown and retransform when needed
TextPlain = MdToTxt(Data['Text'])
TextMarkdown = CharEscape(HtmlUnescape(Data['Text']), InferMdEscape(HtmlUnescape(Data['Text']), TextPlain))
[event, manager] = [context, context]
if InDict(data, 'TextPlain') or InDict(data, 'TextMarkdown'):
textPlain = InDict(data, 'TextPlain')
textMarkdown = InDict(data, 'TextMarkdown')
if not textPlain:
textPlain = textMarkdown
elif InDict(data, 'Text'):
# our old system attempts to always receive Markdown and retransform when needed
textPlain = MdToTxt(data['Text'])
textMarkdown = CharEscape(HtmlUnescape(data['Text']), InferMdEscape(HtmlUnescape(data['Text']), textPlain))
for platform in Platforms:
platform = Platforms[platform]
if isinstanceSafe(Event, InDict(platform, "eventClass")) or isinstanceSafe(Manager, InDict(platform, "managerClass")):
platform["sender"](Event, Manager, Data, Destination, TextPlain, TextMarkdown)
if isinstanceSafe(event, InDict(platform, "eventClass")) or isinstanceSafe(manager, InDict(platform, "managerClass")):
platform["sender"](event, manager, data, destination, textPlain, textMarkdown)
def RegisterPlatform(name:str, main:callable, sender:callable, *, eventClass=None, managerClass=None) -> None:
Platforms[name] = {"main": main, "sender": sender, "eventClass": eventClass, "managerClass": managerClass}
Log(f"Registered Platform: {name}.")
Log(f"{name}, ", inline=True)
def RegisterModule(name:str, endpoints:dict, *, group:str=None, summary:str=None) -> None:
def RegisterModule(name:str, endpoints:dict, *, group:str|None=None, summary:str|None=None) -> None:
Modules[name] = {"group": group, "summary": summary, "endpoints": endpoints}
Log(f"Registered Module: {name}.")
Log(f"{name}, ", inline=True)
for endpoint in endpoints:
endpoint = endpoints[endpoint]
for name in endpoint["names"]:
Endpoints[name] = endpoint["handler"]
def CreateEndpoint(names:list[str]|tuple[str], handler:callable, *, summary:str=None) -> dict:
return {"names": names, "summary": summary, "handler": handler}
def CreateEndpoint(names:list[str], handler:callable, arguments:dict[str, dict]={}, *, summary:str|None=None) -> dict:
return {"names": names, "summary": summary, "handler": handler, "arguments": arguments}
def Main() -> None:
SetupDb()
#SetupDb()
SetupLocales()
Log(f"📨️ Initializing Platforms... ", newline=False)
for platform in Platforms:
Platforms[platform]["main"]()
Log(f"Initialized Platform: {platform}.")
Log('WinDog Ready!')
if Platforms[platform]["main"]():
Log(f"{platform}, ", inline=True)
Log("...Done. ✅️", inline=True, newline=True)
Log("🐶️ WinDog Ready!")
while True:
time.sleep(9**9)
if __name__ == '__main__':
Log('Starting WinDog...')
Db = {"Rooms": {}, "Users": {}}
Log("🌞️ WinDog Starting...")
#Db = {"Rooms": {}, "Users": {}}
Locale = {"Fallback": {}}
Platforms, Modules, Endpoints = {}, {}, {}
Platforms, Modules, ModuleGroups, Endpoints = {}, {}, {}, {}
for dir in ("LibWinDog/Platforms", "ModWinDog"):
match dir:
case "LibWinDog/Platforms":
Log("📩️ Loading Platforms... ", newline=False)
case "ModWinDog":
Log("🔩️ Loading Modules... ", newline=False)
for name in listdir(f"./{dir}"):
path = f"./{dir}/{name}"
if isfile(path):
@ -197,14 +203,16 @@ if __name__ == '__main__':
#for name in listdir(path):
# if name.lower().endswith('.json'):
#
Log("...Done. ✅️", inline=True, newline=True)
Log('Loading Configuration...')
Log("💽️ Loading Configuration", newline=False)
exec(open("./LibWinDog/Config.py", 'r').read())
try:
from Config import *
except Exception:
Log(format_exc())
Log("...Done. ✅️", inline=True, newline=True)
Main()
Log('Closing WinDog...')
Log("🌚️ WinDog Stopping...")

View File

@ -1,2 +1,3 @@
beautifulsoup4
Markdown
peewee