diff --git a/LibWinDog/Platforms/NodeBB/NodeBB.py b/LibWinDog/Platforms/NodeBB/NodeBB.py
new file mode 100644
index 0000000..dcaf3cf
--- /dev/null
+++ b/LibWinDog/Platforms/NodeBB/NodeBB.py
@@ -0,0 +1,58 @@
+# ==================================== #
+# WinDog multi-purpose chatbot #
+# Licensed under AGPLv3 by OctoSpacc #
+# ==================================== #
+
+""" # windog config start #
+
+# NodeBBUrl = "https://nodebb.example.com"
+# NodeBBToken = "abcdefgh-abcd-efgh-ijkl-mnopqrstuvwx"
+
+# end windog config # """
+
+import polling
+
+NodeBBUrl = NodeBBToken = None
+NodeBBStamps = {}
+
+def nodebb_request(room_id:int='', method:str="GET", body:dict=None):
+ return json.loads(HttpReq(f"{NodeBBUrl}/api/v3/chats/{room_id}", method, headers={
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {NodeBBToken}",
+ }, body=(body and json.dumps(body).encode())).read().decode())
+
+def NodeBBMain(path:str) -> bool:
+ def handler():
+ try:
+ for room in nodebb_request()["response"]["rooms"]:
+ room_id = room["roomId"]
+ if "roomId" not in NodeBBStamps:
+ NodeBBStamps[room_id] = 0
+ if room["teaser"]["timestamp"] > NodeBBStamps[room_id]:
+ message = nodebb_request(room_id)["response"]["messages"][-1]
+ NodeBBStamps[room_id] = message["timestamp"]
+ if not message["self"]:
+ text_plain = BeautifulSoup(message["content"]).get_text()
+ data = InputMessageData(
+ # id = message["timestamp"],
+ text_html = message["content"],
+ text_plain = text_plain,
+ room = SafeNamespace(
+ id = room_id,
+ ),
+ user = UserData(
+ settings = UserSettingsData(),
+ ),
+ command = TextCommandData(text_plain, "nodebb"),
+ )
+ on_input_message_parsed(data)
+ call_endpoint(EventContext(platform="nodebb"), data)
+ except Exception:
+ app_log()
+ Thread(target=lambda:polling.poll(handler, step=3, poll_forever=True)).start()
+ return True
+
+def NodeBBSender(context:EventContext, data:OutputMessageData):
+ nodebb_request(context.data.room.id, "POST", {"message": data["text_plain"]})
+
+register_platform(name="NodeBB", main=NodeBBMain, sender=NodeBBSender)
\ No newline at end of file
diff --git a/LibWinDog/Platforms/NodeBB/requirements.txt b/LibWinDog/Platforms/NodeBB/requirements.txt
new file mode 100644
index 0000000..81d8a95
--- /dev/null
+++ b/LibWinDog/Platforms/NodeBB/requirements.txt
@@ -0,0 +1 @@
+urllib3
\ No newline at end of file
diff --git a/LibWinDog/Platforms/Telegram/Telegram.py b/LibWinDog/Platforms/Telegram/Telegram.py
index 263920f..2f3a0be 100755
--- a/LibWinDog/Platforms/Telegram/Telegram.py
+++ b/LibWinDog/Platforms/Telegram/Telegram.py
@@ -27,6 +27,12 @@ from hmac import new as hmac_new
TelegramClient = None
+# def telegram_trim_message(text:str) -> str:
+# return trim_message(text, 4096)
+
+# def telegram_trim_caption(text:str) -> str:
+# return trim_message(text, 1024)
+
def TelegramMain(path:str) -> bool:
if not TelegramToken:
return False
diff --git a/LibWinDog/Platforms/Web/Web.py b/LibWinDog/Platforms/Web/Web.py
index baff4e3..9a57d9c 100755
--- a/LibWinDog/Platforms/Web/Web.py
+++ b/LibWinDog/Platforms/Web/Web.py
@@ -11,7 +11,7 @@ WebConfig = {
"anti_drop_interval": 15,
}
-WebTokens = {}
+WebTokens = {} # Generate new tokens with secrets.token_urlsafe()
""" # end windog config # """
diff --git a/ModWinDog/Base/Base.py b/ModWinDog/Base/Base.py
index 5d751b0..ba9d07e 100755
--- a/ModWinDog/Base/Base.py
+++ b/ModWinDog/Base/Base.py
@@ -42,7 +42,8 @@ def cPing(context:EventContext, data:InputMessageData):
# nice experiment, but it won't work with Telegram since time is not to milliseconds (?)
#time_diff = (time_now := int(time.time())) - (time_sent := data.datetime)
#send_message(context, OutputMessageData(text_html=f"Pong!\n\n{time_sent} ā {time_now} = {time_diff}"))
- send_message(context, OutputMessageData(text_html="Pong!"))
+ word = (obj_get({"dick": "cock"}, data.command.name) or data.command.name.replace('i', 'o'))
+ send_message(context, OutputMessageData(text_html=f"{word[0].upper()}{word[1:]}!"))
#def cTime(update:Update, context:CallbackContext) -> None:
# update.message.reply_markdown_v2(
@@ -58,7 +59,7 @@ register_module(name="Base", endpoints=[
"get": True,
}),
#SafeNamespace(names=["gdpr"], summary="Operations for european citizens regarding your personal data.", handler=cGdpr),
- SafeNamespace(names=["ping"], handler=cPing),
+ SafeNamespace(names=["ping", "bing", "ding", "dick"], handler=cPing),
#SafeNamespace(names=["eval"], summary="Execute a Python command (or safe literal operation) in the current context. Currently not implemented.", handler=cEval),
#SafeNamespace(names=["format"], summary="Reformat text using an handful of rules. Not yet implemented.", handler=cFormat),
#SafeNamespace(names=["frame"], summary="Frame someone's message into a platform-styled image. Not yet implemented.", handler=cFrame),
diff --git a/ModWinDog/Internet/Internet.py b/ModWinDog/Internet/Internet.py
index 19f7603..e6d9590 100755
--- a/ModWinDog/Internet/Internet.py
+++ b/ModWinDog/Internet/Internet.py
@@ -12,6 +12,7 @@ MicrosoftBingSettings = {}
from urlextract import URLExtract
from urllib import parse as urlparse
from urllib.request import urlopen, Request
+from translate_shell.translate import translate as ts_translate
def RandomHexString(length:int) -> str:
return ''.join([randchoice('0123456789abcdef') for i in range(length)])
@@ -50,32 +51,71 @@ def cEmbedded(context:EventContext, data:InputMessageData):
# elif urlDomain == "vm.tiktok.com":
# urlDomain = "vm.vxtiktok.com"
# url = (urlDomain + '/' + '/'.join(url.split('/')[1:]))
- if urlDomain in ("facebook.com", "www.facebook.com", "m.facebook.com", "instagram.com", "www.instagram.com", "twitter.com", "x.com", "vm.tiktok.com", "tiktok.com", "www.tiktok.com"):
+ if urlDomain.startswith("www."):
+ urlDomain = '.'.join(urlDomain.split('.')[1:])
+ if urlDomain in ("facebook.com", "m.facebook.com", "instagram.com", "twitter.com", "x.com", "vm.tiktok.com", "tiktok.com", "threads.net", "threads.com"):
url = f"https://proxatore.octt.eu.org/{url}"
proto = ''
+ elif urlDomain in ("youtube.com",):
+ url = f"https://proxatore.octt.eu.org/{url}{'' if url.endswith('?') else '?'}&proxatore-htmlmedia=true&proxatore-mediaproxy=video"
+ proto = ''
return send_message(context, {"text_plain": f"{{{proto}{url}}}"})
return send_message(context, {"text_plain": "No links found."})
+def search_duckduckgo(query:str) -> dict:
+ url = f"https://html.duckduckgo.com/html?q={urlparse.quote(query)}"
+ request = HttpReq(url)
+ results = []
+ for line in request.read().decode().replace('\t', ' ').splitlines():
+ if ' class="result__a" ' in line and ' href="//duckduckgo.com/l/?uddg=' in line:
+ link = urlparse.unquote(line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&rut=')[0])
+ title = line.strip().split('')[0].strip().split('')[-1].strip().split('>')
+ if len(title) > 1:
+ results.append({"title": html_unescape(title[1].strip()), "link": link})
+ return results
+
+def format_search_result(link:str, title:str, index:int) -> str:
+ return f'[{index + 1}] {title} : {{{link}}}\n\n'
+
def cWeb(context:EventContext, data:InputMessageData):
+ language = data.user.settings.language
+ if not (query := data.command.body):
+ return send_status_400(context, language)
+ try:
+ text = f'š¦š "{query}": https://duckduckgo.com/?q={urlparse.quote(query)}\n\n'
+ for i,e in enumerate(search_duckduckgo(query)):
+ text += format_search_result(e["link"], e["title"], i)
+ return send_message(context, {"text_plain": trim_text(text, 4096, True), "text_mode": "trim"})
+ except Exception:
+ return send_status_error(context, language)
+
+def cWikipedia(context:EventContext, data:InputMessageData):
+ language = data.user.settings.language
+ if not (query := data.command.body):
+ return send_status_400(context, language)
+ try:
+ result = search_duckduckgo(f"site:wikipedia.org {query}")[0]
+ # TODO try to use API: https://*.wikipedia.org/w/api.php?action=parse&page={title}&prop=text&formatversion=2 (?)
+ soup = BeautifulSoup(HttpReq(result["link"]).read().decode(), "html.parser").select('#mw-content-text')[0]
+ if len(elems := soup.select('.infobox')):
+ elems[0].decompose()
+ for elem in soup.select('.mw-editsection'):
+ elem.decompose()
+ text = (f'{result["title"]}\n{{{result["link"]}}}\n\n' + soup.get_text().strip())
+ return send_message(context, {"text_plain": trim_text(text, 4096, True), "text_mode": "trim"})
+ except Exception:
+ return send_status_error(context, language)
+
+def cFrittoMistoOctoSpacc(context:EventContext, data:InputMessageData):
language = data.user.settings.language
if not (query := data.command.body):
return send_status_400(context, language)
try:
query_url = urlparse.quote(query)
- request = HttpReq(f'https://html.duckduckgo.com/html?q={query_url}')
- caption = f'š¦š "{query}": https://duckduckgo.com/?q={query_url}\n\n'
- index = 0
- for line in request.read().decode().replace('\t', ' ').splitlines():
- if ' class="result__a" ' in line and ' href="//duckduckgo.com/l/?uddg=' in line:
- index += 1
- link = urlparse.unquote(line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&rut=')[0])
- title = line.strip().split('')[0].strip().split('')[-1].strip().split('>')
- if len(title) > 1:
- title = html_unescape(title[1].strip())
- caption += f'[{index}] {title} : {{{link}}}\n\n'
- else:
- continue
- return send_message(context, {"text_plain": f'{caption}...'})
+ text = f'š¤š "{query}": https://octospacc.altervista.org/?s={query_url}\n\n'
+ for i,e in enumerate(json.loads(HttpReq(f"https://octospacc.altervista.org/wp-json/wp/v2/posts?search={query_url}").read().decode())):
+ text += format_search_result(e["link"], (e["title"]["rendered"] or e["slug"]), i)
+ return send_message(context, {"text_html": trim_text(text, 4096, True), "text_mode": "trim"})
except Exception:
return send_status_error(context, language)
@@ -87,14 +127,15 @@ def cNews(context:EventContext, data:InputMessageData):
def cTranslate(context:EventContext, data:InputMessageData):
language = data.user.settings.language
- instances = ["lingva.ml", "lingva.lunar.icu"]
+ #instances = ["lingva.ml", "lingva.lunar.icu"]
language_to = data.command.arguments.language_to
text_input = (data.command.body or (data.quoted and data.quoted.text_plain))
if not (text_input and language_to):
return send_status_400(context, language)
try:
- result = json.loads(HttpReq(f'https://{randchoice(instances)}/api/v1/auto/{language_to}/{urlparse.quote(text_input)}').read())
- return send_message(context, {"text_plain": f"[{result['info']['detectedSource']} (auto) -> {language_to}]\n\n{result['translation']}"})
+ #result = json.loads(HttpReq(f'https://{randchoice(instances)}/api/v1/auto/{language_to}/{urlparse.quote(text_input)}').read())
+ #return send_message(context, {"text_plain": f"[{result['info']['detectedSource']} (auto) -> {language_to}]\n\n{result['translation']}"})
+ return send_message(context, {"text_plain": f'[auto -> {language_to}]\n\n{ts_translate(text_input, language_to).results[0].paraphrase}'})
except Exception:
return send_status_error(context, language)
@@ -146,8 +187,10 @@ def cSafebooru(context:EventContext, data:InputMessageData):
return send_status_error(context, language)
register_module(name="Internet", endpoints=[
- SafeNamespace(names=["embedded"], handler=cEmbedded, body=False, quoted=False),
- SafeNamespace(names=["web"], handler=cWeb, body=True),
+ SafeNamespace(names=["embedded", "embed", "proxy", "proxatore", "sborratore"], handler=cEmbedded, body=False, quoted=False),
+ SafeNamespace(names=["web", "search", "duck", "duckduckgo"], handler=cWeb, body=True),
+ SafeNamespace(names=["wikipedia", "wokipedia", "wiki"], handler=cWikipedia, body=True),
+ SafeNamespace(names=["frittomistodioctospacc", "fmos", "frittomisto", "octospacc"], handler=cFrittoMistoOctoSpacc, body=True),
SafeNamespace(names=["translate"], handler=cTranslate, body=False, quoted=False, arguments={
"language_to": True,
"language_from": False,
diff --git a/ModWinDog/Internet/requirements.txt b/ModWinDog/Internet/requirements.txt
index 6ef9041..c68cdd0 100755
--- a/ModWinDog/Internet/requirements.txt
+++ b/ModWinDog/Internet/requirements.txt
@@ -1,2 +1,3 @@
urllib3
urlextract
+translate-shell
\ No newline at end of file
diff --git a/WinDog.py b/WinDog.py
index a1e963d..f1d9030 100755
--- a/WinDog.py
+++ b/WinDog.py
@@ -23,7 +23,7 @@ from LibWinDog.Database import *
from LibWinDog.Utils import *
def app_log(text:str=None, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
- if not text:
+ if text == None:
text = get_exception_text(full=True)
endline = '\n'
if newline == False or (inline and newline == None):
@@ -220,6 +220,15 @@ def send_status_error(context:EventContext, lang:str=None, code:int=500, extra:s
app_log()
return result
+def trim_text(text:str, limit:int, always_footer:bool=False) -> str:
+ ending = "ā¦"
+ footer = "\n\nā¦"
+ if len(text) > limit:
+ text = (text[:(limit - len(ending + footer))].rstrip() + footer)
+ elif always_footer:
+ text = (text.rstrip() + footer)
+ return text
+
def get_link(context:EventContext, data:InputMessageData):
data = (InputMessageData(**data) if type(data) == dict else data)
if (data.room and data.room.id):
@@ -374,42 +383,49 @@ def app_main() -> None:
if platform.main(f"./LibWinDog/Platforms/{platform.name}"):
app_log(f"{platform.name}, ", inline=True)
app_log("...Done. ā
ļø", inline=True, newline=True)
- app_log("š¶ļø WinDog Ready!")
- while True:
- time.sleep(9**9)
if __name__ == '__main__':
app_log("šļø WinDog Starting...")
- GlobalStrings = good_yaml_load(open("./WinDog.yaml", 'r').read())
- Platforms, Modules, ModuleGroups, Endpoints = {}, {}, {}, {}
+ try:
+ GlobalStrings = good_yaml_load(open("./WinDog.yaml", 'r').read())
+ Platforms, Modules, ModuleGroups, Endpoints = {}, {}, {}, {}
- for folder in ("LibWinDog/Platforms", "ModWinDog"):
- match folder:
- case "LibWinDog/Platforms":
- app_log("š©ļø Loading Platforms... ", newline=False)
- case "ModWinDog":
- app_log("š©ļø Loading Modules... ", newline=False)
- for name in listdir(f"./{folder}"):
- path = f"./{folder}/{name}"
- if path.endswith(".py") and isfile(path):
- exec(open(path).read())
- elif isdir(path):
- files = listdir(path)
- if f"{name}.py" in files:
- files.remove(f"{name}.py")
- exec(open(f"{path}/{name}.py", 'r').read())
- #for file in files:
- # if file.endswith(".py"):
- # exec(open(f"{path}/{file}", 'r').read())
- app_log("...Done. ā
ļø", inline=True, newline=True)
+ for folder in ("LibWinDog/Platforms", "ModWinDog"):
+ match folder:
+ case "LibWinDog/Platforms":
+ app_log("š©ļø Loading Platforms... ", newline=False)
+ case "ModWinDog":
+ app_log("š©ļø Loading Modules... ", newline=False)
+ for name in listdir(f"./{folder}"):
+ path = f"./{folder}/{name}"
+ if path.endswith(".py") and isfile(path):
+ exec(open(path).read())
+ elif isdir(path):
+ files = listdir(path)
+ if f"{name}.py" in files:
+ files.remove(f"{name}.py")
+ exec(open(f"{path}/{name}.py", 'r').read())
+ #for file in files:
+ # if file.endswith(".py"):
+ # exec(open(f"{path}/{file}", 'r').read())
+ app_log("...Done. ā
ļø", inline=True, newline=True)
- app_log("š½ļø Loading Configuration... ", newline=False)
- if isfile("./Data/Config.py"):
- exec(open("./Data/Config.py", 'r').read())
- else:
- write_new_config()
- app_log("Done. ā
ļø", inline=True, newline=True)
+ app_log("š½ļø Loading Configuration... ", newline=False)
+ if isfile("./Data/Config.py"):
+ exec(open("./Data/Config.py", 'r').read())
+ else:
+ write_new_config()
+ app_log("Done. ā
ļø", inline=True, newline=True)
- app_main()
+ app_main()
+ except Exception:
+ app_log('')
+ app_log()
+ app_log("š Error starting WinDog. Stopping...")
+ exit(1)
+
+ app_log("š¶ļø WinDog Ready!")
+ while True:
+ time.sleep(9**9)
app_log("šļø WinDog Stopping...")