mirror of
https://gitlab.com/octospacc/WinDog.git
synced 2025-06-05 22:09:20 +02:00
Legacy removals, code restructuring, add send_... functions and better help
This commit is contained in:
218
WinDog.py
218
WinDog.py
@ -11,6 +11,7 @@ from html import escape as html_escape, unescape as html_unescape
|
||||
from os import listdir
|
||||
from os.path import isfile, isdir
|
||||
from random import choice, choice as randchoice, randint
|
||||
from sys import exc_info as sys_exc_info
|
||||
from threading import Thread
|
||||
from traceback import format_exc, format_exc as traceback_format_exc
|
||||
from urllib import parse as urlparse, parse as urllib_parse
|
||||
@ -32,23 +33,35 @@ def ObjectUnion(*objects:object, clazz:object=None):
|
||||
dikt[key] = value
|
||||
return (clazz or auto_clazz)(**dikt)
|
||||
|
||||
def Log(text:str, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
|
||||
endline = '\n'
|
||||
if newline == False or (inline and newline == None):
|
||||
endline = ''
|
||||
text = (text if inline else f"[{level}] [{time.ctime()}] [{int(time.time())}] {text}")
|
||||
if LogToConsole:
|
||||
print(text, end=endline)
|
||||
if LogToFile:
|
||||
open("./Log.txt", 'a').write(text + endline)
|
||||
|
||||
def call_or_return(obj:any) -> any:
|
||||
return (obj() if callable(obj) else obj)
|
||||
def ObjectClone(obj:object):
|
||||
return ObjectUnion(obj, {});
|
||||
|
||||
def SureArray(array:any) -> list|tuple:
|
||||
return (array if type(array) in [list, tuple] else [array])
|
||||
|
||||
def ObjGet(node:object, query:str, /) -> any:
|
||||
def app_log(text:str=None, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
|
||||
if not text:
|
||||
text = get_exception_text(full=True)
|
||||
endline = '\n'
|
||||
if newline == False or (inline and newline == None):
|
||||
endline = ''
|
||||
text = (str(text) if inline else f"[{level}] [{time.ctime()}] [{int(time.time())}] {text}")
|
||||
if LogToConsole:
|
||||
print(text, end=endline)
|
||||
if LogToFile:
|
||||
open((DumpToFile if (DumpToFile and type(DumpToFile) == str) else "./Data/Log.txt"), 'a').write(text + endline)
|
||||
|
||||
def get_exception_text(full:bool=False):
|
||||
exc_type, exc_value, exc_traceback = sys_exc_info()
|
||||
text = f'{exc_type.__qualname__}: {exc_value}'
|
||||
if full:
|
||||
text = f'@{exc_traceback.tb_frame.f_code.co_name}:{exc_traceback.tb_lineno} {text}'
|
||||
return text
|
||||
|
||||
def call_or_return(obj:any, *args) -> any:
|
||||
return (obj(*args) if callable(obj) else obj)
|
||||
|
||||
def obj_get(node:object, query:str, /) -> any:
|
||||
for key in query.split('.'):
|
||||
if hasattr(node, "__getitem__") and node.__getitem__:
|
||||
# dicts and such
|
||||
@ -67,19 +80,30 @@ def ObjGet(node:object, query:str, /) -> any:
|
||||
def good_yaml_load(text:str):
|
||||
return yaml_load(text.replace("\t", " "), Loader=yaml_BaseLoader)
|
||||
|
||||
def get_string(bank:dict, query:str|dict, lang:str=None) -> str|list[str]|None:
|
||||
if not (result := ObjGet(bank, f"{query}.{lang or DefaultLanguage}")):
|
||||
if not (result := ObjGet(bank, f"{query}.en")):
|
||||
result = ObjGet(bank, query)
|
||||
def get_string(bank:dict, query:str, lang:str=None) -> str|list[str]|None:
|
||||
if type(result := obj_get(bank, query)) != str:
|
||||
if not (result := obj_get(bank, f"{query}.{lang or DefaultLanguage}")):
|
||||
if not (result := obj_get(bank, f"{query}.en")):
|
||||
result = obj_get(bank, query)
|
||||
if result:
|
||||
result = result.strip()
|
||||
return result
|
||||
|
||||
def help_text(endpoint, lang:str=None) -> str:
|
||||
def get_help_text(endpoint, lang:str=None, prefix:str=None) -> str:
|
||||
if type(endpoint) == str:
|
||||
endpoint = instanciate_endpoint(endpoint, prefix)
|
||||
global_string = (lambda query: get_string(GlobalStrings, query, lang))
|
||||
text = f'{endpoint.get_string("summary", lang) or ""}\n\n{global_string("usage")}:'
|
||||
text = f'{endpoint.get_string("summary", lang) or ""}\n\n{global_string("usage")}: {prefix or ""}{endpoint.name}'
|
||||
if endpoint.arguments:
|
||||
for argument in endpoint.arguments:
|
||||
if endpoint.arguments[argument]:
|
||||
text += f' <{endpoint.get_string(f"arguments.{argument}", lang) or endpoint.module.get_string(f"arguments.{argument}", lang) or argument}>'
|
||||
if not ((endpoint.body != None) and (endpoint.arguments[argument] == False)):
|
||||
argument_help = (endpoint.get_string(f"arguments.{argument}", lang)
|
||||
or endpoint.module.get_string(f"arguments.{argument}", lang)
|
||||
or argument)
|
||||
if endpoint.arguments[argument] == True:
|
||||
text += f' <{argument_help}>'
|
||||
elif endpoint.arguments[argument] == False:
|
||||
text += f' [{argument_help}]'
|
||||
body_help = (endpoint.get_string("body", lang) or endpoint.module.get_string("body", lang))
|
||||
quoted_help = (global_string("quoted_message") + (f': {body_help}' if body_help else ''))
|
||||
if not body_help:
|
||||
@ -95,51 +119,38 @@ def help_text(endpoint, lang:str=None) -> str:
|
||||
text += f' <{quoted_help}>'
|
||||
elif endpoint.quoted == False:
|
||||
text += f' [{quoted_help}]'
|
||||
if (extra := call_or_return(endpoint.help_extra, endpoint, lang)):
|
||||
text += f'\n\n{extra}'
|
||||
return text
|
||||
|
||||
def strip_url_scheme(url:str) -> str:
|
||||
tokens = urlparse.urlparse(url)
|
||||
return f"{tokens.netloc}{tokens.path}"
|
||||
|
||||
def RandPercent() -> int:
|
||||
num = randint(0,100)
|
||||
return (f'{num}.00' if num == 100 else f'{num}.{randint(0,9)}{randint(0,9)}')
|
||||
|
||||
def RandHexStr(length:int) -> str:
|
||||
hexa = ''
|
||||
for char in range(length):
|
||||
hexa += choice('0123456789abcdef')
|
||||
return hexa
|
||||
|
||||
def GetUserSettings(user_id:str) -> SafeNamespace|None:
|
||||
try:
|
||||
return SafeNamespace(**EntitySettings.select().join(User).where(User.id == user_id).dicts().get())
|
||||
except EntitySettings.DoesNotExist:
|
||||
return None
|
||||
|
||||
def ParseCommand(text:str, platform:str) -> SafeNamespace|None:
|
||||
def TextCommandData(text:str, platform:str) -> CommandData|None:
|
||||
if not text:
|
||||
return None
|
||||
text = text.strip()
|
||||
try: # ensure text is a non-empty command
|
||||
if not (text[0] in CmdPrefixes and text[1:].strip()):
|
||||
try: # ensure text is non-empty and an actual command
|
||||
if not (text[0] in CommandPrefixes and text[1:].strip()):
|
||||
return None
|
||||
except IndexError:
|
||||
return None
|
||||
command = SafeNamespace()
|
||||
command.tokens = text.split()
|
||||
command.prefix = command.tokens[0][0]
|
||||
command.name, command_target = (command.tokens[0][1:].lower().split('@') + [''])[:2]
|
||||
if command_target and not (command_target == call_or_return(Platforms[platform].agent_info).tag.lower()):
|
||||
return None
|
||||
command.body = text[len(command.tokens[0]):].strip()
|
||||
if command.name not in Endpoints:
|
||||
return command
|
||||
if (endpoint_arguments := Endpoints[command.name].arguments):
|
||||
if not (endpoint := obj_get(Endpoints, command.name)):
|
||||
return command # TODO shouldn't this return None?
|
||||
if (endpoint.arguments):
|
||||
command.arguments = SafeNamespace()
|
||||
index = 1
|
||||
for key in endpoint_arguments:
|
||||
if not endpoint_arguments[key]:
|
||||
continue # skip optional (False) arguments for now, they will be implemented later
|
||||
for key in endpoint.arguments:
|
||||
if (endpoint.body != None) and (endpoint.arguments[key] == False):
|
||||
continue # skip optional (False) arguments for now if command expects a body, they will be implemented later
|
||||
try:
|
||||
value = command.tokens[index]
|
||||
command.body = command.body[len(value):].strip()
|
||||
@ -151,7 +162,7 @@ def ParseCommand(text:str, platform:str) -> SafeNamespace|None:
|
||||
|
||||
def OnInputMessageParsed(data:InputMessageData) -> None:
|
||||
dump_message(data, prefix='> ')
|
||||
handle_bridging(SendMessage, data, from_sent=False)
|
||||
handle_bridging(send_message, data, from_sent=False)
|
||||
update_user_db(data.user)
|
||||
|
||||
def OnOutputMessageSent(output_data:OutputMessageData, input_data:InputMessageData, from_sent:bool) -> None:
|
||||
@ -159,14 +170,13 @@ def OnOutputMessageSent(output_data:OutputMessageData, input_data:InputMessageDa
|
||||
output_data = ObjectUnion(output_data, {"room": input_data.room})
|
||||
dump_message(output_data, prefix=f'<{"*" if from_sent else " "}')
|
||||
if not from_sent:
|
||||
handle_bridging(SendMessage, output_data, from_sent=True)
|
||||
handle_bridging(send_message, output_data, from_sent=True)
|
||||
|
||||
# TODO: fix to send messages to different rooms, this overrides destination data but that gives problems with rebroadcasting the bot's own messages
|
||||
def handle_bridging(method:callable, data:MessageData, from_sent:bool):
|
||||
if data.user:
|
||||
if (text_plain := ObjGet(data, "text_plain")):
|
||||
if (text_plain := obj_get(data, "text_plain")):
|
||||
text_plain = f"<{data.user.name}>: {text_plain}"
|
||||
if (text_html := ObjGet(data, "text_html")):
|
||||
if (text_html := obj_get(data, "text_html")):
|
||||
text_html = (urlparse.quote(f"<{data.user.name}>: ") + text_html)
|
||||
for bridge in BridgesConfig:
|
||||
if data.room.id not in bridge:
|
||||
@ -200,9 +210,28 @@ def dump_message(data:InputMessageData, prefix:str='') -> None:
|
||||
if DumpToConsole:
|
||||
print(text, data)
|
||||
if DumpToFile:
|
||||
open((DumpToFile if (DumpToFile and type(DumpToFile) == str) else "./Dump.txt"), 'a').write(text + '\n')
|
||||
open((DumpToFile if (DumpToFile and type(DumpToFile) == str) else "./Data/Dump.txt"), 'a').write(text + '\n')
|
||||
|
||||
def SendMessage(context:EventContext, data:OutputMessageData, from_sent:bool=False) -> None:
|
||||
def send_status(context:EventContext, code:int, lang:str=None, extra:str=None, preamble:bool=True, summary:bool=True):
|
||||
global_string = (lambda query: get_string(GlobalStrings, query, lang))
|
||||
summary_text = (global_string(f"statuses.{code}.summary") or '')
|
||||
return send_message(context, {"text_html": (
|
||||
(((f'{global_string(f"statuses.{code}.icon")} {global_string("error") if code >= 400 else ""}'.strip()
|
||||
+ f' {code}: {global_string(f"statuses.{code}.title")}. {summary_text if summary else ""}').strip()) if preamble else '')
|
||||
+ '\n\n' + (extra or "")).strip()})
|
||||
|
||||
def send_status_400(context:EventContext, lang:str=None, extra:str=None):
|
||||
return send_status(context, 400, lang,
|
||||
f'{context.endpoint.get_help_text(lang)}\n\n{extra or ""}', preamble=False, summary=False)
|
||||
|
||||
def send_status_error(context:EventContext, lang:str=None, code:int=500, extra:str=None):
|
||||
result = send_status(context, code, lang,
|
||||
f'{html_escape(get_exception_text())}\n\n{extra or ""}')
|
||||
app_log()
|
||||
return result
|
||||
|
||||
def send_message(context:EventContext, data:OutputMessageData, *, from_sent:bool=False):
|
||||
context = ObjectClone(context)
|
||||
data = (OutputMessageData(**data) if type(data) == dict else data)
|
||||
if data.text_html and not data.text_plain:
|
||||
data.text_plain = BeautifulSoup(data.text_html, "html.parser").get_text()
|
||||
@ -229,15 +258,18 @@ def SendMessage(context:EventContext, data:OutputMessageData, from_sent:bool=Fal
|
||||
OnOutputMessageSent(data, context.data, from_sent)
|
||||
return result
|
||||
|
||||
def SendNotice(context:EventContext, data) -> None:
|
||||
def send_notice(context:EventContext, data):
|
||||
pass
|
||||
|
||||
def DeleteMessage(context:EventContext, data) -> None:
|
||||
def edit_message(context:EventContext, data:MessageData):
|
||||
pass
|
||||
|
||||
def delete_message(context:EventContext, data:MessageData):
|
||||
pass
|
||||
|
||||
def RegisterPlatform(name:str, main:callable, sender:callable, linker:callable=None, *, event_class=None, manager_class=None, agent_info=None) -> None:
|
||||
Platforms[name.lower()] = SafeNamespace(name=name, main=main, sender=sender, linker=linker, event_class=event_class, manager_class=manager_class, agent_info=agent_info)
|
||||
Log(f"{name}, ", inline=True)
|
||||
app_log(f"{name}, ", inline=True)
|
||||
|
||||
def RegisterModule(name:str, endpoints:dict, *, group:str|None=None) -> None:
|
||||
module = SafeNamespace(group=group, endpoints=endpoints, get_string=(lambda query, lang=None: None))
|
||||
@ -245,27 +277,40 @@ def RegisterModule(name:str, endpoints:dict, *, group:str|None=None) -> None:
|
||||
module.strings = good_yaml_load(open(file, 'r').read())
|
||||
module.get_string = (lambda query, lang=None: get_string(module.strings, query, lang))
|
||||
Modules[name] = module
|
||||
Log(f"{name}, ", inline=True)
|
||||
if group not in ModuleGroups:
|
||||
ModuleGroups[group] = []
|
||||
ModuleGroups[group].append(name)
|
||||
app_log(f"{name}, ", inline=True)
|
||||
for endpoint in endpoints:
|
||||
endpoint.module = module
|
||||
for name in endpoint.names:
|
||||
Endpoints[name] = endpoint
|
||||
|
||||
def CallEndpoint(name:str, context:EventContext, data:InputMessageData):
|
||||
endpoint = Endpoints[name]
|
||||
def instanciate_endpoint(name:str, prefix:str):
|
||||
if not (endpoint := obj_get(Endpoints, name)):
|
||||
return None
|
||||
endpoint = ObjectClone(endpoint)
|
||||
endpoint.name = name
|
||||
endpoint.get_string = (lambda query=name, lang=None:
|
||||
endpoint.module.get_string(f"endpoints.{name}.{query}", lang))
|
||||
endpoint.get_help_text = (lambda lang=None: get_help_text(endpoint, lang, prefix))
|
||||
return endpoint
|
||||
|
||||
def call_endpoint(context:EventContext, data:InputMessageData):
|
||||
if not ((command := data.command) and (name := command.name)):
|
||||
return
|
||||
if not (endpoint := instanciate_endpoint(name, command.prefix)):
|
||||
return
|
||||
context.data = data
|
||||
context.module = endpoint.module
|
||||
context.endpoint = endpoint
|
||||
context.endpoint.get_string = (lambda query=data.command.name, lang=None:
|
||||
endpoint.module.get_string(f"endpoints.{data.command.name}.{query}", lang))
|
||||
context.endpoint.help_text = (lambda lang=None: help_text(endpoint, lang))
|
||||
if callable(agent_info := Platforms[context.platform].agent_info):
|
||||
Platforms[context.platform].agent_info = agent_info()
|
||||
return endpoint.handler(context, data)
|
||||
|
||||
def WriteNewConfig() -> None:
|
||||
Log("💾️ No configuration found! Generating and writing to `./Config.py`... ", inline=True)
|
||||
with open("./Config.py", 'w') as configFile:
|
||||
def write_new_config() -> None:
|
||||
app_log("💾️ No configuration found! Generating and writing to `./Data/Config.py`... ", inline=True)
|
||||
with open("./Data/Config.py", 'w') as configFile:
|
||||
opening = '# windog config start #'
|
||||
closing = '# end windog config #'
|
||||
for folder in ("LibWinDog", "ModWinDog"):
|
||||
@ -279,50 +324,49 @@ def WriteNewConfig() -> None:
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
def Main() -> None:
|
||||
def app_main() -> None:
|
||||
#SetupDb()
|
||||
Log(f"📨️ Initializing Platforms... ", newline=False)
|
||||
app_log(f"📨️ Initializing Platforms... ", newline=False)
|
||||
for platform in Platforms.values():
|
||||
if platform.main():
|
||||
Log(f"{platform.name}, ", inline=True)
|
||||
Log("...Done. ✅️", inline=True, newline=True)
|
||||
Log("🐶️ WinDog Ready!")
|
||||
app_log(f"{platform.name}, ", inline=True)
|
||||
app_log("...Done. ✅️", inline=True, newline=True)
|
||||
app_log("🐶️ WinDog Ready!")
|
||||
while True:
|
||||
time.sleep(9**9)
|
||||
|
||||
if __name__ == '__main__':
|
||||
Log("🌞️ WinDog Starting...")
|
||||
app_log("🌞️ WinDog Starting...")
|
||||
GlobalStrings = good_yaml_load(open("./WinDog.yaml", 'r').read())
|
||||
Platforms, Modules, ModuleGroups, Endpoints = {}, {}, {}, {}
|
||||
|
||||
for folder in ("LibWinDog/Platforms", "ModWinDog"):
|
||||
match folder:
|
||||
case "LibWinDog/Platforms":
|
||||
Log("📩️ Loading Platforms... ", newline=False)
|
||||
app_log("📩️ Loading Platforms... ", newline=False)
|
||||
case "ModWinDog":
|
||||
Log("🔩️ Loading Modules... ", newline=False)
|
||||
app_log("🔩️ Loading Modules... ", newline=False)
|
||||
for name in listdir(f"./{folder}"):
|
||||
path = f"./{folder}/{name}"
|
||||
if isfile(path):
|
||||
exec(open(path, 'r').read())
|
||||
if path.endswith(".py") and isfile(path):
|
||||
exec(open(path).read())
|
||||
elif isdir(path):
|
||||
files = listdir(path)
|
||||
if f"{name}.py" in files:
|
||||
files.remove(f"{name}.py")
|
||||
exec(open(f"{path}/{name}.py", 'r').read())
|
||||
for file in files:
|
||||
if file.endswith(".py"):
|
||||
exec(open(f"{path}/{name}.py", 'r').read())
|
||||
Log("...Done. ✅️", inline=True, newline=True)
|
||||
#for file in files:
|
||||
# if file.endswith(".py"):
|
||||
# exec(open(f"{path}/{file}", 'r').read())
|
||||
app_log("...Done. ✅️", inline=True, newline=True)
|
||||
|
||||
Log("💽️ Loading Configuration... ", newline=False)
|
||||
from Config import *
|
||||
if isfile("./Config.py"):
|
||||
from Config import *
|
||||
app_log("💽️ Loading Configuration... ", newline=False)
|
||||
if isfile("./Data/Config.py"):
|
||||
exec(open("./Data/Config.py", 'r').read())
|
||||
else:
|
||||
WriteNewConfig()
|
||||
Log("Done. ✅️", inline=True, newline=True)
|
||||
write_new_config()
|
||||
app_log("Done. ✅️", inline=True, newline=True)
|
||||
|
||||
Main()
|
||||
Log("🌚️ WinDog Stopping...")
|
||||
app_main()
|
||||
app_log("🌚️ WinDog Stopping...")
|
||||
|
||||
|
Reference in New Issue
Block a user