Improve code structure, improve /help, /safebooru, /echo, add /lua

This commit is contained in:
octospacc 2024-06-16 01:40:30 +02:00
parent b7eb53e6a7
commit d687cbd51e
19 changed files with 335 additions and 233 deletions

View File

@ -10,24 +10,24 @@ MastodonUrl = ''
MastodonToken = '' MastodonToken = ''
TelegramId = 1637713483 TelegramId = 1637713483
TelegramToken = '' TelegramToken = "0123456789:abcdefghijklmnopqrstuvwxyz123456789"
TelegramAdmins = [ 123456789, ] TelegramAdmins = [ 123456789, 634314973, ]
TelegramWhitelist = [ 123456789, ] TelegramWhitelist = [ 123456789, 634314973, ]
TelegramRestrict = False TelegramRestrict = False
AdminIds = [ "123456789@telegram", "admin@activitypub@mastodon.example.com", ] AdminIds = [ "123456789@telegram", "634314973@telegram", "admin@activitypub@mastodon.example.com", ]
DefaultLang = 'en' DefaultLang = "en"
Debug = False Debug = False
Dumper = False Dumper = False
CmdPrefixes = '.!/' CmdPrefixes = ".!/"
# False: ASCII output; True: ANSI Output (must be escaped) # False: ASCII output; True: ANSI Output (must be escaped)
ExecAllowed = {'date': False, 'fortune': False, 'neofetch': True, 'uptime': False} ExecAllowed = {"date": False, "fortune": False, "neofetch": True, "uptime": False}
WebUserAgent = f'WinDog v.Staging' WebUserAgent = "WinDog v.Staging"
Endpoints = { # TODO deprecate this in favour of new module API
Endpoints = (Endpoints | {
"start": cStart, "start": cStart,
"help": cHelp,
#"config": cConfig, #"config": cConfig,
"source": cSource, "source": cSource,
"ping": cPing, "ping": cPing,
@ -43,7 +43,6 @@ Endpoints = {
"floor": multifun, "floor": multifun,
"hands": multifun, "hands": multifun,
"sessocto": multifun, "sessocto": multifun,
"hash": cHash,
#"encode": cEncode, #"encode": cEncode,
#"decode": cDecode, #"decode": cDecode,
#"time": cTime, #"time": cTime,
@ -51,9 +50,4 @@ Endpoints = {
"exec": cExec, "exec": cExec,
#"format": cFormat, #"format": cFormat,
#"frame": cFrame, #"frame": cFrame,
"embedded": cEmbedded, })
"web": cWeb,
"translate": cTranslate,
"unsplash": cUnsplash,
"safebooru": cSafebooru,
}

View File

@ -1,4 +1,5 @@
import mastodon import mastodon
from bs4 import BeautifulSoup
def MastodonSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -> None: def MastodonSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -> None:
if InDict(Data, 'Media'): if InDict(Data, 'Media'):
@ -13,7 +14,6 @@ def MastodonSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -
visibility=('direct' if event['status']['visibility'] == 'direct' else 'unlisted'), visibility=('direct' if event['status']['visibility'] == 'direct' else 'unlisted'),
) )
# TODO make this non-blocking or else we can't load it dynamically
def MastodonMain() -> None: def MastodonMain() -> None:
if not (MastodonUrl and MastodonToken): if not (MastodonUrl and MastodonToken):
return return
@ -25,10 +25,12 @@ def MastodonMain() -> None:
if not Msg.split('@')[0]: if not Msg.split('@')[0]:
Msg = ' '.join('@'.join(Msg.split('@')[1:]).strip().split(' ')[1:]).strip() Msg = ' '.join('@'.join(Msg.split('@')[1:]).strip().split(' ')[1:]).strip()
if Msg[0] in CmdPrefixes: if Msg[0] in CmdPrefixes:
Cmd = ParseCmd(Msg) cmd = ParseCmd(Msg)
Cmd.messageId = event['status']['id'] if cmd:
if Cmd.Name in Endpoints: cmd.messageId = event['status']['id']
Endpoints[Cmd.Name]({"Event": event, "Manager": Mastodon}, Cmd) if cmd.Name in Endpoints:
Mastodon.stream_user(MastodonListener()) Endpoints[cmd.Name]({"Event": event, "Manager": Mastodon}, cmd)
Mastodon.stream_user(MastodonListener(), run_async=True)
RegisterPlatform(name="Mastodon", main=MastodonMain, sender=MastodonSender, managerClass=mastodon.Mastodon)
Platforms["Mastodon"] = {"main": MastodonMain, "sender": MastodonSender, "managerClass": mastodon.Mastodon}

View File

@ -0,0 +1,2 @@
Mastodon.py
beautifulsoup4

View File

@ -1,4 +1,8 @@
def MatrixMain() -> None: def MatrixMain() -> None:
pass pass
#Platforms["Matrix"] = {"main": MatrixMain} def MatrixSender() -> None:
pass
#RegisterPlatform(name="Matrix", main=MatrixMain, sender=MatrixSender)

View File

@ -21,31 +21,32 @@ def TelegramHandleCmd(update:telegram.Update):
def TelegramQueryHandle(update:telegram.Update, context:CallbackContext=None) -> None: def TelegramQueryHandle(update:telegram.Update, context:CallbackContext=None) -> None:
if not (update and update.message): if not (update and update.message):
return return
Cmd = ParseCmd(update.message.text) cmd = ParseCmd(update.message.text)
Cmd.messageId = update.message.message_id if cmd:
Cmd.TextPlain = Cmd.Body cmd.messageId = update.message.message_id
Cmd.TextMarkdown = update.message.text_markdown_v2 cmd.TextPlain = cmd.Body
Cmd.Text = GetWeightedText((Cmd.TextMarkdown, Cmd.TextPlain)) cmd.TextMarkdown = update.message.text_markdown_v2
if Cmd and Cmd.Tokens[0][0] in CmdPrefixes and Cmd.Name in Endpoints: cmd.Text = GetWeightedText((cmd.TextMarkdown, cmd.TextPlain))
Cmd.User = { if cmd.Tokens[0][0] in CmdPrefixes and cmd.Name in Endpoints:
"Name": update.message.from_user.first_name, cmd.User = SimpleNamespace(**{
"Tag": update.message.from_user.username, "Name": update.message.from_user.first_name,
"Id": f'{update.message.from_user.id}@telegram', "Tag": update.message.from_user.username,
} "Id": f'{update.message.from_user.id}@telegram',
if update.message.reply_to_message:
Cmd.Quoted = SimpleNamespace(**{
"messageId": update.message.reply_to_message.message_id,
"Body": update.message.reply_to_message.text,
"TextPlain": update.message.reply_to_message.text,
"TextMarkdown": update.message.reply_to_message.text_markdown_v2,
"Text": GetWeightedText((update.message.reply_to_message.text_markdown_v2, update.message.reply_to_message.text)),
"User": {
"Name": update.message.reply_to_message.from_user.first_name,
"Tag": update.message.reply_to_message.from_user.username,
"Id": f'{update.message.reply_to_message.from_user.id}@telegram',
},
}) })
Endpoints[Cmd.Name]({"Event": update, "Manager": context}, Cmd) if update.message.reply_to_message:
cmd.Quoted = SimpleNamespace(**{
"messageId": update.message.reply_to_message.message_id,
"Body": update.message.reply_to_message.text,
"TextPlain": update.message.reply_to_message.text,
"TextMarkdown": update.message.reply_to_message.text_markdown_v2,
"Text": GetWeightedText((update.message.reply_to_message.text_markdown_v2, update.message.reply_to_message.text)),
"User": SimpleNamespace(**{
"Name": update.message.reply_to_message.from_user.first_name,
"Tag": update.message.reply_to_message.from_user.username,
"Id": f'{update.message.reply_to_message.from_user.id}@telegram',
}),
})
Endpoints[cmd.Name]({"Event": update, "Manager": context}, cmd)
if Debug and Dumper: if Debug and Dumper:
Text = update.message.text Text = update.message.text
Text = (Text.replace('\n', '\\n') if Text else '') Text = (Text.replace('\n', '\\n') if Text else '')
@ -74,8 +75,8 @@ def TelegramMain() -> None:
return return
updater = telegram.ext.Updater(TelegramToken) updater = telegram.ext.Updater(TelegramToken)
dispatcher = updater.dispatcher dispatcher = updater.dispatcher
#dispatcher.add_handler(CommandHandler('config', cConfig))
dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramQueryHandle)) dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramQueryHandle))
updater.start_polling() updater.start_polling()
Platforms["Telegram"] = {"main": TelegramMain, "sender": TelegramSender, "eventClass": telegram.Update} RegisterPlatform(name="Telegram", main=TelegramMain, sender=TelegramSender, eventClass=telegram.Update)

View File

@ -0,0 +1 @@
python-telegram-bot==13.4.1

View File

@ -0,0 +1,8 @@
def WebMain() -> None:
pass
def WebSender() -> None:
pass
#RegisterPlatform(name="Web", main=WebMain, sender=WebSender)

View File

@ -0,0 +1 @@
import base64

View File

@ -1,15 +0,0 @@
import hashlib
# Module: Hash
# Responds with the hash-sum of a message received.
def cHash(Context, Data) -> None:
if len(Data.Tokens) >= 3 and Data.Tokens[1] in hashlib.algorithms_available:
Alg = Data.Tokens[1]
Hash = hashlib.new(Alg, Alg.join(Data.Body.split(Alg)[1:]).strip().encode()).hexdigest()
SendMsg(Context, {
"TextPlain": Hash,
"TextMarkdown": MarkdownCode(Hash, True),
})
else:
SendMsg(Context, {"Text": choice(Locale.__('hash.usage')).format(Data.Tokens[0], hashlib.algorithms_available)})

17
ModWinDog/Hashing.py Normal file
View File

@ -0,0 +1,17 @@
import hashlib
def cHash(context, data) -> None:
if len(data.Tokens) >= 3 and data.Tokens[1] in hashlib.algorithms_available:
Alg = data.Tokens[1]
Hash = hashlib.new(Alg, Alg.join(data.Body.split(Alg)[1:]).strip().encode()).hexdigest()
SendMsg(context, {
"TextPlain": Hash,
"TextMarkdown": MarkdownCode(Hash, True),
})
else:
SendMsg(context, {"Text": choice(Locale.__('hash.usage')).format(data.Tokens[0], hashlib.algorithms_available)})
RegisterModule(name="Hashing", group="Geek", summary="Functions for hashing of textual content.", endpoints={
"Hash": CreateEndpoint(["hash"], summary="Responds with the hash-sum of a message received.", handler=cHash),
})

19
ModWinDog/Help.py Normal file
View File

@ -0,0 +1,19 @@
# TODO: implement /help <commandname> feature
def cHelp(context, data=None) -> None:
moduleList, commands = '', ''
for module in Modules:
summary = Modules[module]["summary"]
endpoints = Modules[module]["endpoints"]
moduleList += (f"\n\n{module}" + (f": {summary}" if summary else ''))
for endpoint in endpoints:
summary = endpoints[endpoint]["summary"]
moduleList += (f"\n* /{', /'.join(endpoints[endpoint]['names'])}" + (f": {summary}" if summary else ''))
for cmd in Endpoints.keys():
commands += f'* /{cmd}\n'
SendMsg(context, {"Text": f"[ Available Modules ]{moduleList}\n\nFull Endpoints List:\n{commands}"})
RegisterModule(name="Help", group="Basic", endpoints={
"Help": CreateEndpoint(["help"], summary="Provides help for the bot. For now, it just lists the commands.", handler=cHelp),
})

View File

@ -2,18 +2,16 @@ from urlextract import URLExtract
from urllib import parse as UrlParse from urllib import parse as UrlParse
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
def HttpGet(Url:str): def HttpGet(url:str):
return urlopen(Request(Url, headers={"User-Agent": WebUserAgent})) return urlopen(Request(url, headers={"User-Agent": WebUserAgent}))
# Module: Embedded def cEmbedded(context, data) -> None:
# Rewrite a link trying to make sure we have an embed view. if len(data.Tokens) >= 2:
def cEmbedded(Context, Data) -> None:
if len(Data.Tokens) >= 2:
# Find links in command body # Find links in command body
Text = (Data.TextMarkdown + ' ' + Data.TextPlain) Text = (data.TextMarkdown + ' ' + data.TextPlain)
elif Data.Quoted and Data.Quoted.Text: elif data.Quoted and data.Quoted.Text:
# Find links in quoted message # Find links in quoted message
Text = (Data.Quoted.TextMarkdown + ' ' + Data.Quoted.TextPlain) Text = (data.Quoted.TextMarkdown + ' ' + data.Quoted.TextPlain)
else: else:
# TODO Error message # TODO Error message
return return
@ -39,17 +37,15 @@ def cEmbedded(Context, Data) -> None:
elif urlDomain == "vm.tiktok.com": elif urlDomain == "vm.tiktok.com":
urlDomain = "vm.vxtiktok.com" urlDomain = "vm.vxtiktok.com"
url = urlDomain + url[len(urlDomain):] url = urlDomain + url[len(urlDomain):]
SendMsg(Context, {"TextPlain": f"{{{proto}{url}}}"}) SendMsg(context, {"TextPlain": f"{{{proto}{url}}}"})
# else TODO error message? # else TODO error message?
# Module: Web def cWeb(context, data) -> None:
# Provides results of a DuckDuckGo search. if data.Body:
def cWeb(Context, Data) -> None:
if Data.Body:
try: try:
QueryUrl = UrlParse.quote(Data.Body) QueryUrl = UrlParse.quote(data.Body)
Req = HttpGet(f'https://html.duckduckgo.com/html?q={QueryUrl}') Req = HttpGet(f'https://html.duckduckgo.com/html?q={QueryUrl}')
Caption = f'🦆🔎 "{Data.Body}": https://duckduckgo.com/?q={QueryUrl}\n\n' Caption = f'🦆🔎 "{data.Body}": https://duckduckgo.com/?q={QueryUrl}\n\n'
Index = 0 Index = 0
for Line in Req.read().decode().replace('\t', ' ').splitlines(): for Line in Req.read().decode().replace('\t', ' ').splitlines():
if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line: if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line:
@ -61,32 +57,28 @@ def cWeb(Context, Data) -> None:
Caption += f'[{Index}] {Title} : {{{Link}}}\n\n' Caption += f'[{Index}] {Title} : {{{Link}}}\n\n'
else: else:
continue continue
SendMsg(Context, {"TextPlain": f'{Caption}...'}) SendMsg(context, {"TextPlain": f'{Caption}...'})
except Exception: except Exception:
raise raise
else: else:
pass pass
# Module: Translate def cTranslate(context, data) -> None:
# Return the received message after translating it in another language. if len(data.Tokens) < 3:
def cTranslate(Context, Data) -> None:
if len(Data.Tokens) < 3:
return return
try: try:
Lang = Data.Tokens[1] Lang = data.Tokens[1]
# TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one # TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one
Result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{Lang}/{UrlParse.quote(Lang.join(Data.Body.split(Lang)[1:]))}').read())["translation"] Result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{Lang}/{UrlParse.quote(Lang.join(data.Body.split(Lang)[1:]))}').read())["translation"]
SendMsg(Context, {"TextPlain": Result}) SendMsg(context, {"TextPlain": Result})
except Exception: except Exception:
raise raise
# Module: Unsplash def cUnsplash(context, data) -> None:
# Send a picture sourced from Unsplash.
def cUnsplash(Context, Data) -> None:
try: try:
Req = HttpGet(f'https://source.unsplash.com/random/?{UrlParse.quote(Data.Body)}') Req = HttpGet(f'https://source.unsplash.com/random/?{UrlParse.quote(data.Body)}')
ImgUrl = Req.geturl().split('?')[0] ImgUrl = Req.geturl().split('?')[0]
SendMsg(Context, { SendMsg(context, {
"TextPlain": f'{{{ImgUrl}}}', "TextPlain": f'{{{ImgUrl}}}',
"TextMarkdown": MarkdownCode(ImgUrl, True), "TextMarkdown": MarkdownCode(ImgUrl, True),
"Media": Req.read(), "Media": Req.read(),
@ -94,18 +86,18 @@ def cUnsplash(Context, Data) -> None:
except Exception: except Exception:
raise raise
# Module: Safebooru def cSafebooru(context, data) -> None:
# Send a picture sourced from Safebooru.
def cSafebooru(Context, Data) -> None:
ApiUrl = 'https://safebooru.org/index.php?page=dapi&s=post&q=index&limit=100&tags=' ApiUrl = 'https://safebooru.org/index.php?page=dapi&s=post&q=index&limit=100&tags='
try: try:
if Data.Body: if data.Body:
for i in range(7): for i in range(7): # retry a bunch of times if we can't find a really random result
ImgUrls = HttpGet(f'{ApiUrl}md5:{RandHexStr(3)}%20{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:] ImgUrls = HttpGet(f'{ApiUrl}md5:{RandHexStr(3)}%20{UrlParse.quote(data.Body)}').read().decode().split(' file_url="')[1:]
if ImgUrls: if ImgUrls:
break break
if not ImgUrls: # literal search
ImgUrls = HttpGet(f'{ApiUrl}{UrlParse.quote(data.Body)}').read().decode().split(' file_url="')[1:]
if not ImgUrls: if not ImgUrls:
ImgUrls = HttpGet(f'{ApiUrl}{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:] return SendMsg(context, {"Text": "Error: Could not get any result from Safebooru."})
ImgXml = choice(ImgUrls) ImgXml = choice(ImgUrls)
ImgUrl = ImgXml.split('"')[0] ImgUrl = ImgXml.split('"')[0]
ImgId = ImgXml.split(' id="')[1].split('"')[0] ImgId = ImgXml.split(' id="')[1].split('"')[0]
@ -117,9 +109,9 @@ def cSafebooru(Context, Data) -> None:
ImgId = ImgUrl.split('?')[-1] ImgId = ImgUrl.split('?')[-1]
break break
if ImgUrl: if ImgUrl:
SendMsg(Context, { SendMsg(context, {
"TextPlain": f'[{ImgId}]\n{{{ImgUrl}}}', "TextPlain": f'[{ImgId}]\n{{{ImgUrl}}}',
"TextMarkdown": f'\\[`{ImgId}`\\]\n{MarkdownCode(ImgUrl, True)}', "TextMarkdown": (f'\\[`{ImgId}`\\]\n' + MarkdownCode(ImgUrl, True)),
"Media": HttpGet(ImgUrl).read(), "Media": HttpGet(ImgUrl).read(),
}) })
else: else:
@ -127,3 +119,11 @@ def cSafebooru(Context, Data) -> None:
except Exception: except Exception:
raise raise
RegisterModule(name="Internet", group="Internet", summary="Tools and toys related to the Internet.", endpoints={
"Embedded": CreateEndpoint(["embedded"], summary="Rewrites a link, trying to bypass embed view protection.", handler=cEmbedded),
"Web": CreateEndpoint(["web"], summary="Provides results of a DuckDuckGo search.", handler=cWeb),
"Translate": CreateEndpoint(["translate"], summary="Returns the received message after translating it in another language.", handler=cTranslate),
"Unsplash": CreateEndpoint(["unsplash"], summary="Sends a picture sourced from Unsplash.", handler=cUnsplash),
"Safebooru": CreateEndpoint(["safebooru"], summary="Sends a picture sourced from Safebooru.", handler=cSafebooru),
})

View File

@ -0,0 +1,2 @@
urllib3
urlextract

View File

@ -5,51 +5,43 @@
# Module: Percenter # Module: Percenter
# Provides fun trough percentage-based toys. # Provides fun trough percentage-based toys.
def percenter(Context, Data) -> None: def percenter(context, data) -> None:
SendMsg(Context, {"Text": choice(Locale.__(f'{Data.Name}.{"done" if Data.Body else "empty"}')).format( SendMsg(context, {"Text": choice(Locale.__(f'{data.Name}.{"done" if data.Body else "empty"}')).format(
Cmd=Data.Tokens[0], Percent=RandPercent(), Thing=Data.Body)}) Cmd=data.Tokens[0], Percent=RandPercent(), Thing=data.Body)})
# Module: Multifun # Module: Multifun
# Provides fun trough preprogrammed-text-based toys. # Provides fun trough preprogrammed-text-based toys.
def multifun(Context, Data) -> None: def multifun(context, data) -> None:
cmdkey = Data.Name cmdkey = data.Name
replyToId = None replyToId = None
if Data.Quoted: if data.Quoted:
replyFromUid = Data.Quoted.User["Id"] replyFromUid = data.Quoted.User.Id
# TODO work on all platforms for the bot id # TODO work on all platforms for the bot id
if int(replyFromUid.split('@')[0]) == int(TelegramId) and 'bot' in Locale.__(cmdkey): if int(replyFromUid.split('@')[0]) == int(TelegramId) and 'bot' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.bot')) Text = choice(Locale.__(f'{cmdkey}.bot'))
elif replyFromUid == Data.User["Id"] and 'self' in Locale.__(cmdkey): elif replyFromUid == data.User.Id and 'self' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.self')).format(Data.User["Name"]) Text = choice(Locale.__(f'{cmdkey}.self')).format(data.User.Name)
else: else:
if 'others' in Locale.__(cmdkey): if 'others' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.others')).format(Data.User["Name"], Data.Quoted.User["Name"]) Text = choice(Locale.__(f'{cmdkey}.others')).format(data.User.Name, data.Quoted.User.Name)
replyToId = Data.Quoted.messageId replyToId = data.Quoted.messageId
else: else:
if 'empty' in Locale.__(cmdkey): if 'empty' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.empty')) Text = choice(Locale.__(f'{cmdkey}.empty'))
SendMsg(Context, {"Text": Text, "ReplyTo": replyToId}) SendMsg(context, {"Text": Text, "ReplyTo": replyToId})
# Module: Start # Module: Start
# Salutes the user, for now no other purpose except giving a feel that the bot is working. # Salutes the user, hinting that the bot is working and providing basic quick help.
def cStart(Context, Data) -> None: def cStart(context, data) -> None:
SendMsg(Context, {"Text": choice(Locale.__('start')).format(Data.User['Name'])}) SendMsg(context, {"Text": choice(Locale.__('start')).format(data.User.Name)})
# Module: Help
# Provides help for the bot. For now, it just lists the commands.
def cHelp(Context, Data=None) -> None:
Commands = ''
for Cmd in Endpoints.keys():
Commands += f'* /{Cmd}\n'
SendMsg(Context, {"TextPlain": f'Available Endpoints (WIP):\n{Commands}'})
# Module: Source # Module: Source
# Provides a copy of the bot source codes and/or instructions on how to get it. # Provides a copy of the bot source codes and/or instructions on how to get it.
def cSource(Context, Data=None) -> None: def cSource(context, data=None) -> None:
SendMsg(Context, {"TextPlain": ("""\ SendMsg(context, {"TextPlain": ("""\
* Original Source Code: {https://gitlab.com/octospacc/WinDog} * Original Code: {https://gitlab.com/octospacc/WinDog}
* Mirror: {https://github.com/octospacc/WinDog} * Mirror: {https://github.com/octospacc/WinDog}
""" + (f"* Modified Source Code: {{{ModifiedSourceUrl}}}" if ModifiedSourceUrl else ""))}) """ + (f"* Modified Code: {{{ModifiedSourceUrl}}}" if ModifiedSourceUrl else ""))})
# Module: Config # Module: Config
# ... # ...
@ -62,28 +54,38 @@ def cSource(Context, Data=None) -> None:
# Module: Ping # Module: Ping
# Responds pong, useful for testing messaging latency. # Responds pong, useful for testing messaging latency.
def cPing(Context, Data=None) -> None: def cPing(context, data=None) -> None:
SendMsg(Context, {"Text": "*Pong!*"}) SendMsg(context, {"Text": "*Pong!*"})
# Module: Echo # Module: Echo
# Responds back with the original text of the received message. # Responds back with the original text of the received message.
def cEcho(Context, Data) -> None: def cEcho(context, data) -> None:
if Data.Body: if data.Body:
SendMsg(Context, {"Text": Data.Body}) prefix = "🗣️ "
if len(data.Tokens) == 2:
nonascii = True
for char in data.Tokens[1]:
if ord(char) < 256:
nonascii = False
break
if nonascii:
# text is not ascii, probably an emoji (altough not necessarily), so just pass as is (useful for Telegram emojis)
prefix = ''
SendMsg(context, {"Text": (prefix + data.Body)})
else: else:
SendMsg(Context, {"Text": choice(Locale.__('echo.empty'))}) SendMsg(context, {"Text": choice(Locale.__('echo.empty'))})
# Module: Broadcast # Module: Broadcast
# Sends an admin message over to another destination # Sends an admin message over to another destination
def cBroadcast(Context, Data) -> None: def cBroadcast(context, data) -> None:
if Data.User['Id'] not in AdminIds: if data.User.Id not in AdminIds:
return SendMsg(Context, {"Text": choice(Locale.__('eval'))}) return SendMsg(context, {"Text": choice(Locale.__('eval'))})
if len(Data.Tokens) < 3: if len(data.Tokens) < 3:
return SendMsg(Context, {"Text": "Bad usage."}) return SendMsg(context, {"Text": "Bad usage."})
Dest = Data.Tokens[1] Dest = data.Tokens[1]
Text = ' '.join(Data.Tokens[2:]) Text = ' '.join(data.Tokens[2:])
SendMsg(Context, {"TextPlain": Text}, Dest) SendMsg(context, {"TextPlain": Text}, Dest)
SendMsg(Context, {"TextPlain": "Executed."}) SendMsg(context, {"TextPlain": "Executed."})
#def cTime(update:Update, context:CallbackContext) -> None: #def cTime(update:Update, context:CallbackContext) -> None:
# update.message.reply_markdown_v2( # update.message.reply_markdown_v2(
@ -92,31 +94,32 @@ def cBroadcast(Context, Data) -> None:
# Module: Eval # Module: Eval
# Execute a Python command (or safe literal operation) in the current context. Currently not implemented. # Execute a Python command (or safe literal operation) in the current context. Currently not implemented.
def cEval(Context, Data=None) -> None: def cEval(context, data=None) -> None:
SendMsg(Context, {"Text": choice(Locale.__('eval'))}) SendMsg(context, {"Text": choice(Locale.__('eval'))})
# Module: Exec # Module: Exec
# Execute a system command from the allowed ones and return stdout/stderr. # Execute a system command from the allowed ones and return stdout/stderr.
def cExec(Context, Data) -> None: def cExec(context, data) -> None:
if len(Data.Tokens) >= 2 and Data.Tokens[1].lower() in ExecAllowed: if len(data.Tokens) >= 2 and data.Tokens[1].lower() in ExecAllowed:
Cmd = Data.Tokens[1].lower() cmd = data.Tokens[1].lower()
Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {Cmd}'), stdout=subprocess.PIPE).stdout.decode() Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {cmd}'),
stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode()
# <https://stackoverflow.com/a/14693789> # <https://stackoverflow.com/a/14693789>
Caption = (re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', Out)) Caption = (re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', Out))
SendMsg(Context, { SendMsg(context, {
"TextPlain": Caption, "TextPlain": Caption,
"TextMarkdown": MarkdownCode(Caption, True), "TextMarkdown": MarkdownCode(Caption, True),
}) })
else: else:
SendMsg(Context, {"Text": choice(Locale.__('eval'))}) SendMsg(context, {"Text": choice(Locale.__('eval'))})
# Module: Format # Module: Format
# Reformat text using an handful of rules. Currently not implemented. # Reformat text using an handful of rules. Currently not implemented.
def cFormat(Context, Data=None) -> None: def cFormat(context, data=None) -> None:
pass pass
# Module: Frame # Module: Frame
# Frame someone's message into a platform-styled image. Currently not implemented. # Frame someone's message into a platform-styled image. Currently not implemented.
def cFrame(Context, Data=None) -> None: def cFrame(context, data=None) -> None:
pass pass

View File

@ -0,0 +1,49 @@
luaCycleLimit = 10000
luaMemoryLimit = (512 * 1024) # 512 KB
luaCrashMessage = f"Lua Error: Script has been forcefully terminated due to having exceeded the max cycle count limit ({luaCycleLimit})."
from lupa import LuaRuntime as NewLuaRuntime, LuaError, LuaSyntaxError
def luaAttributeFilter(obj, attr_name, is_setting):
raise AttributeError("Access Denied.")
#LuaRuntime = NewLuaRuntime(max_memory=(16 * 1024**2), register_eval=False, register_builtins=False, attribute_filter=luaAttributeFilter)
#for key in LuaRuntime.globals():
# if key not in ("error", "assert", "math", "type"):
# del LuaRuntime.globals()[key]
#luaGlobalsCopy = dict(LuaRuntime.globals()) # should this manually handle nested stuff?
# this way to prevent overwriting of global fields is flawed since this doesn't protect concurrent scripts
# better to use the currently active solution of a dedicated instance for each new script running
#def luaFunctionRunner(userFunction:callable):
# for key in luaGlobalsCopy:
# LuaRuntime.globals()[key] = luaGlobalsCopy[key]
# return userFunction()
# TODO make print behave the same as normal Lua, and expose a function for printing without newlines
def cLua(context, data=None) -> None:
scriptText = (data.Body or (data.Quoted and data.Quoted.Body))
if not scriptText:
return SendMsg(context, {"Text": "You must provide some Lua code to execute."})
luaRuntime = NewLuaRuntime(max_memory=luaMemoryLimit, register_eval=False, register_builtins=False, attribute_filter=luaAttributeFilter)
luaRuntime.eval(f"""(function()
_windog = {{ stdout = "" }}
function print (text, endl) _windog.stdout = _windog.stdout .. text .. (endl ~= false and "\\n" or "") end
function luaCrashHandler () return error("{luaCrashMessage}") end
debug.sethook(luaCrashHandler, "", {luaCycleLimit})
end)()""")
for key in luaRuntime.globals():
if key not in ("error", "assert", "math", "string", "print", "_windog"):
del luaRuntime.globals()[key]
try:
textOutput = ("[ʟᴜᴀ ꜱᴛᴅᴏᴜᴛ]\n\n" + luaRuntime.eval(f"""(function()
_windog.scriptout = (function()\n{scriptText}\nend)()
return _windog.stdout .. (_windog.scriptout or '')
end)()"""))
except (LuaError, LuaSyntaxError) as error:
Log(textOutput := str("Lua Error: " + error))
SendMsg(context, {"TextPlain": textOutput})
RegisterModule(name="Scripting", group="Geek", summary="Tools for programming the bot and expanding its features.", endpoints={
"Lua": CreateEndpoint(["lua"], summary="Execute a Lua snippet and get its output.", handler=cLua),
})

View File

@ -0,0 +1 @@
lupa

View File

@ -2,8 +2,9 @@
WinDog/WinDogBot is a chatbot I've been (lazily) developing for years, with some special characteristics: WinDog/WinDogBot is a chatbot I've been (lazily) developing for years, with some special characteristics:
* multi-purpose: it's created for doing a myriad of different things, from the funny to the useful. * multi-purpose: it's created for doing a myriad of different things, from the funny to the useful (moderation features will be implemented in the future).
* multi-platform: it's an experiment in automagical multiplatform compatibility, with modules targeting a common abstracted API. * multi-platform: it's an experiment in automagical multiplatform compatibility, with modules targeting a common abstracted API.
* modular: in all of this, the bot is modular, and allows features to be easily activated or removed at will (like some other ones).
The officially-hosted instances of this bot are, respectively: The officially-hosted instances of this bot are, respectively:
@ -13,7 +14,7 @@ The officially-hosted instances of this bot are, respectively:
In case you want to run your own instance: In case you want to run your own instance:
1. `git clone --depth 1 https://gitlab.com/octospacc/WinDog && cd ./WinDog` to get the code. 1. `git clone --depth 1 https://gitlab.com/octospacc/WinDog && cd ./WinDog` to get the code.
2. `python3 -m pip install -U -r ./requirements.txt` to install dependencies. 2. `find -type f -name requirements.txt -exec python3 -m pip install -U -r {} \;` to install the full package of dependencies.
3. `cp ./LibWinDog/Config.py ./` and, in the new file, edit essential fields like user credentials, then delete the unmodified fields. 3. `cp ./LibWinDog/Config.py ./` and, in the new file, edit essential fields like user credentials, then delete the unmodified fields.
4. `sh ./StartWinDog.sh` to start the bot every time. 4. `sh ./StartWinDog.sh` to start the bot every time.

149
WinDog.py
View File

@ -11,7 +11,7 @@ from os import listdir
from os.path import isfile, isdir from os.path import isfile, isdir
from random import choice, randint from random import choice, randint
from types import SimpleNamespace from types import SimpleNamespace
#from traceback import format_exc as TraceText from traceback import format_exc
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from html import unescape as HtmlUnescape from html import unescape as HtmlUnescape
from markdown import markdown from markdown import markdown
@ -19,67 +19,61 @@ from markdown import markdown
# <https://daringfireball.net/projects/markdown/syntax#backslash> # <https://daringfireball.net/projects/markdown/syntax#backslash>
MdEscapes = '\\`*_{}[]()<>#+-.!|=' MdEscapes = '\\`*_{}[]()<>#+-.!|='
Db = {"Rooms": {}, "Users": {}} def Log(text:str, level:str="?") -> None:
Locale = {"Fallback": {}} print(f"[{level}] [{int(time.time())}] {text}")
Platforms = {}
Commands = {}
for dir in ("LibWinDog/Platforms", "ModWinDog"): def SetupLocales() -> None:
for path in listdir(f"./{dir}"):
path = f"./{dir}/{path}"
if isfile(path):
exec(open(path, 'r').read())
elif isdir(path):
exec(open(f"{path}/mod.py", 'r').read())
exec(open("./LibWinDog/Config.py", 'r').read())
def SetupLocale() -> None:
global Locale global Locale
for File in listdir('./Locale'): for file in listdir('./Locale'):
Lang = File.split('.')[0] lang = file.split('.')[0]
try: try:
with open(f'./Locale/{File}') as File: with open(f'./Locale/{file}') as file:
Locale[Lang] = json.load(File) Locale[lang] = json.load(file)
except Exception: except Exception:
print(f'Cannot load {Lang} locale, exiting.') Log(f'Cannot load {lang} locale, exiting.')
raise raise
exit(1) exit(1)
for Key in Locale[DefaultLang]: for key in Locale[DefaultLang]:
Locale['Fallback'][Key] = Locale[DefaultLang][Key] Locale['Fallback'][key] = Locale[DefaultLang][key]
for Lang in Locale: for lang in Locale:
for Key in Locale[Lang]: for key in Locale[lang]:
if not Key in Locale['Fallback']: if not key in Locale['Fallback']:
Locale['Fallback'][Key] = Locale[Lang][Key] Locale['Fallback'][key] = Locale[lang][key]
def __(Key:str, Lang:str=DefaultLang): def querier(query:str, lang:str=DefaultLang):
Set = None value = None
Key = Key.split('.') query = query.split('.')
try: try:
Set = Locale.Locale[Lang] value = Locale.Locale[lang]
for El in Key: for key in query:
Set = Set[El] value = value[key]
except Exception: except Exception:
Set = Locale.Locale['Fallback'] value = Locale.Locale['Fallback']
for El in Key: for key in query:
Set = Set[El] value = value[key]
return Set return value
Locale['__'] = __ Locale['__'] = querier
Locale['Locale'] = Locale Locale['Locale'] = Locale
Locale = SimpleNamespace(**Locale) Locale = SimpleNamespace(**Locale)
def SetupDb() -> None: def SetupDb() -> None:
global Db global Db
try: try:
with open('Database.json', 'r') as File: with open('Database.json', 'r') as file:
Db = json.load(File) Db = json.load(file)
except Exception: except Exception:
pass pass
def InDict(Dict:dict, Key:str): def InDict(Dict:dict, Key:str) -> any:
if Key in Dict: if Key in Dict:
return Dict[Key] return Dict[Key]
else: else:
return None return None
def isinstanceSafe(clazz:any, instance:any) -> bool:
if instance != None:
return isinstance(clazz, instance)
return False
def CharEscape(String:str, Escape:str='') -> str: def CharEscape(String:str, Escape:str='') -> str:
if Escape == 'MARKDOWN': if Escape == 'MARKDOWN':
return escape_markdown(String, version=2) return escape_markdown(String, version=2)
@ -116,18 +110,17 @@ def GetRawTokens(text:str) -> list:
return text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ') return text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')
def ParseCmd(msg) -> dict|None: def ParseCmd(msg) -> dict|None:
name = msg.lower().split(' ')[0][1:].split('@')[0] name = msg.replace('\n', ' ').replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')[0][1:].split('@')[0]
if not name: if not name: return
return
return SimpleNamespace(**{ return SimpleNamespace(**{
"Name": name, "Name": name.lower(),
"Body": name.join(msg.split(name)[1:]).strip(), "Body": name.join(msg.split(name)[1:]).strip(),
"Tokens": GetRawTokens(msg), "Tokens": GetRawTokens(msg),
"User": None, "User": None,
"Quoted": None, "Quoted": None,
}) })
def GetWeightedText(texts:tuple) -> str: def GetWeightedText(texts:tuple) -> str|None:
for text in texts: for text in texts:
if text: if text:
return text return text
@ -136,11 +129,11 @@ def RandPercent() -> int:
num = randint(0,100) num = randint(0,100)
return (f'{num}.00' if num == 100 else f'{num}.{randint(0,9)}{randint(0,9)}') return (f'{num}.00' if num == 100 else f'{num}.{randint(0,9)}{randint(0,9)}')
def RandHexStr(Len:int) -> str: def RandHexStr(length:int) -> str:
Hex = '' hexa = ''
for Char in range(Len): for char in range(length):
Hex += choice('0123456789abcdef') hexa += choice('0123456789abcdef')
return Hex return hexa
def SendMsg(Context, Data, Destination=None) -> None: def SendMsg(Context, Data, Destination=None) -> None:
if type(Context) == dict: if type(Context) == dict:
@ -159,27 +152,59 @@ def SendMsg(Context, Data, Destination=None) -> None:
TextMarkdown = CharEscape(HtmlUnescape(Data['Text']), InferMdEscape(HtmlUnescape(Data['Text']), TextPlain)) TextMarkdown = CharEscape(HtmlUnescape(Data['Text']), InferMdEscape(HtmlUnescape(Data['Text']), TextPlain))
for platform in Platforms: for platform in Platforms:
platform = Platforms[platform] platform = Platforms[platform]
if ("eventClass" in platform and isinstance(Event, platform["eventClass"])) \ if isinstanceSafe(Event, InDict(platform, "eventClass")) or isinstanceSafe(Manager, InDict(platform, "managerClass")):
or ("managerClass" in platform and isinstance(Manager, platform["managerClass"])):
platform["sender"](Event, Manager, Data, Destination, TextPlain, TextMarkdown) platform["sender"](Event, Manager, Data, Destination, TextPlain, TextMarkdown)
def RegisterPlatform(name:str, main:callable, sender:callable, *, eventClass=None, managerClass=None) -> None:
Platforms[name] = {"main": main, "sender": sender, "eventClass": eventClass, "managerClass": managerClass}
Log(f"Registered Platform: {name}.")
def RegisterModule(name:str, endpoints:dict, *, group:str=None, summary:str=None) -> None:
Modules[name] = {"group": group, "summary": summary, "endpoints": endpoints}
Log(f"Registered Module: {name}.")
for endpoint in endpoints:
endpoint = endpoints[endpoint]
for name in endpoint["names"]:
Endpoints[name] = endpoint["handler"]
def CreateEndpoint(names:list[str]|tuple[str], handler:callable, *, summary:str=None) -> dict:
return {"names": names, "summary": summary, "handler": handler}
def Main() -> None: def Main() -> None:
SetupDb() SetupDb()
SetupLocale() SetupLocales()
TelegramMain() for platform in Platforms:
MastodonMain() Platforms[platform]["main"]()
#MatrixMain() Log(f"Initialized Platform: {platform}.")
#for platform in Platforms: Log('WinDog Ready!')
# Platforms[platform]["main"]()
while True: while True:
time.sleep(9**9) time.sleep(9**9)
if __name__ == '__main__': if __name__ == '__main__':
Log('Starting WinDog...')
Db = {"Rooms": {}, "Users": {}}
Locale = {"Fallback": {}}
Platforms, Modules, Endpoints = {}, {}, {}
for dir in ("LibWinDog/Platforms", "ModWinDog"):
for name in listdir(f"./{dir}"):
path = f"./{dir}/{name}"
if isfile(path):
exec(open(path, 'r').read())
elif isdir(path):
exec(open(f"{path}/{name}.py", 'r').read())
# TODO load locales
#for name in listdir(path):
# if name.lower().endswith('.json'):
#
Log('Loading Configuration...')
exec(open("./LibWinDog/Config.py", 'r').read())
try: try:
from Config import * from Config import *
except Exception: except Exception:
pass Log(format_exc())
print('Starting WinDog...')
Main() Main()
print('Closing WinDog...') Log('Closing WinDog...')

View File

@ -1,15 +1,2 @@
# Program core
# ...
# Required by some bot modules
urllib3
urlextract
# Mastodon support
Mastodon.py
beautifulsoup4 beautifulsoup4
Markdown Markdown
# Telegram support
python-telegram-bot==13.4.1