Unsplash, code refactor

This commit is contained in:
octospacc 2023-07-08 17:49:06 +02:00
parent 0b6af811cd
commit 06b7f42cdb
3 changed files with 213 additions and 169 deletions

View File

@ -1,6 +1,6 @@
{ {
"start": [ "start": [
"*Hi* {user.mention_markdown_v2()}*!*" "*Hi* {0}*!*"
], ],
"help": [ "help": [
"*There's no one around to help (yet).*" "*There's no one around to help (yet).*"

156
ModWinDog/Mods.py Normal file
View File

@ -0,0 +1,156 @@
# ================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
def percenter(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
if len(Cmd.Tokens) >= 2:
Text = choice(Locale.__(f'{Cmd.Name}.done'))
else:
Text = choice(Locale.__(f'{Cmd.Name}.empty'))
update.message.reply_markdown_v2(
CharEscape(Text, '.!<>').format(Cmd=Cmd.Tokens[0], Percent=RandPercent(), Thing=Cmd.Body),
reply_to_message_id=update.message.message_id)
def multifun(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
Key = ParseCmd(update.message.text).Name
ReplyToMsg = update.message.message_id
if update.message.reply_to_message:
ReplyFromUID = update.message.reply_to_message.from_user.id
if ReplyFromUID == TGID and 'bot' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.bot')), 'MARKDOWN_SPEECH')
elif ReplyFromUID == update.message.from_user.id and 'self' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.self')), 'MARKDOWN_SPEECH').format(FromUName)
else:
if 'others' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
ToUName = CharEscape(update.message.reply_to_message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.others')), 'MARKDOWN_SPEECH').format(FromUName,ToUName)
ReplyToMsg = update.message.reply_to_message.message_id
else:
if 'empty' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.empty')), 'MARKDOWN_SPEECH')
update.message.reply_markdown_v2(Text, reply_to_message_id=ReplyToMsg)
def cStart(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
user = update.effective_user
update.message.reply_markdown_v2('Hi\!',
#CharEscape(choice(Locale.__('start')).format(CharEscape(user.mention_markdown_v2(), 'MARKDOWN')), 'MARKDOWN'),
reply_to_message_id=update.message.message_id)
def cHelp(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('help')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cConfig(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
def cEcho(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
Msg = update.message.text
if len(Msg.split(' ')) >= 2:
Text = Msg[len(Msg.split(' ')[0])+1:]
update.message.reply_text(
Text,
reply_to_message_id=update.message.message_id)
else:
Text = CharEscape(choice(Locale.__('echo.empty')), '.!')
update.message.reply_markdown_v2(
Text,
reply_to_message_id=update.message.message_id)
def cPing(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
update.message.reply_markdown_v2(
'*Pong\!*',
reply_to_message_id=update.message.message_id)
#def cTime(update:Update, context:CallbackContext) -> None:
# update.message.reply_markdown_v2(
# CharEscape(choice(Locale.__('time')).format(time.ctime().replace(' ', ' ')), 'MARKDOWN_SPEECH'),
# reply_to_message_id=update.message.message_id)
def cHash(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
if len(Cmd.Tokens) >= 3 and Cmd.Tokens[1] in hashlib.algorithms_available:
Alg = Cmd.Tokens[1]
Caption = hashlib.new(Alg, Alg.join(Cmd.Body.split(Alg)[1:]).strip().encode()).hexdigest()
else:
Caption = CharEscape(choice(Locale.__('hash')).format(Cmd.Tokens[0], hashlib.algorithms_available), 'MARKDOWN_SPEECH')
update.message.reply_markdown_v2(Caption, reply_to_message_id=update.message.message_id)
def cEval(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('eval')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cExec(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
Toks = Cmd.Tokens
if len(Cmd.Tokens) >= 2 and Cmd.Tokens[1].lower() in ExecAllowed:
Cmd = Toks[1].lower()
Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {Cmd}'), stdout=subprocess.PIPE).stdout.decode()
# <https://stackoverflow.com/a/14693789>
Caption = (re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', Out)) #if ExecAllowed[Cmd] else Out)
update.message.reply_markdown_v2(
'```\n' + CharEscape(Caption, 'MARKDOWN').strip() + '\n```',
reply_to_message_id=update.message.message_id)
else:
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('eval')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cWeb(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
Msg = update.message.text
Toks = Cmd.Tokens
if len(Cmd.Tokens) >= 2:
try:
Key = ParseCmd(Msg).Name
Query = Key.join(Msg.split(Key)[1:]).strip()
QueryUrl = UrlParse.quote(Query)
Req = HttpGet(f'https://html.duckduckgo.com/html?q={QueryUrl}')
Caption = f'[🦆🔎 "*{CharEscape(Query, "MARKDOWN")}*"](https://duckduckgo.com/?q={CharEscape(QueryUrl, "MARKDOWN")})\n\n'
Index = 0
for Line in Req.read().decode().replace('\t', ' ').splitlines():
if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line:
Index += 1
Link = CharEscape(UrlParse.unquote(Line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&amp;rut=')[0]), 'MARKDOWN')
Title = CharEscape(UrlParse.unquote(Line.split('</a>')[0].split('</span>')[-1].split('>')[1]), 'MARKDOWN')
Domain = Link.split('://')[1].split('/')[0]
Caption += f'{Index}\. [{Title}]({Link}) \[`{Domain}`\]\n\n'
update.message.reply_markdown_v2(Caption, reply_to_message_id=update.message.message_id)
except Exception:
raise
def cUnsplash(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
try:
update.message.reply_photo(
HttpGet(f'https://source.unsplash.com/random/?{UrlParse.quote(Cmd.Body)}').read(),
reply_to_message_id=update.message.message_id)
except Exception:
raise
def cSafebooru(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return

222
WinDog.py
View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# ================================== # # ================================== #
# WinDog multi-purpose chatbot # # WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc # # Licensed under AGPLv3 by OctoSpacc #
@ -7,6 +6,7 @@
import json, hashlib, re, time, subprocess import json, hashlib, re, time, subprocess
from os import listdir from os import listdir
from os.path import isfile
from random import choice, randint from random import choice, randint
from types import SimpleNamespace from types import SimpleNamespace
#from traceback import format_exc as TraceText #from traceback import format_exc as TraceText
@ -15,22 +15,16 @@ from telegram.utils.helpers import escape_markdown
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from urllib import parse as UrlParse from urllib import parse as UrlParse
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
from Config import *
# False: ASCII output; True: ANSI Output (must be escaped)
ExecAllowed = {'date': False, 'fortune': False, 'neofetch': True, 'uptime': False}
UserAgent = f'WinDog v.Staging'
Db = {"Chats": {}} Db = {"Chats": {}}
Locale = {"Fallback": {}} Locale = {"Fallback": {}}
def SetupDb() -> None: for Dir in ('Lib', 'Mod'):
global Db for File in listdir(f'./{Dir}WinDog'):
try: File = f'./{Dir}WinDog/{File}'
with open('Database.json', 'r') as File: if isfile(File):
Db = json.load(File) with open(File, 'r') as File:
except Exception: exec(File.read())
pass
def SetupLocale() -> None: def SetupLocale() -> None:
global Locale global Locale
@ -65,6 +59,14 @@ def SetupLocale() -> None:
Locale['Locale'] = Locale Locale['Locale'] = Locale
Locale = SimpleNamespace(**Locale) Locale = SimpleNamespace(**Locale)
def SetupDb() -> None:
global Db
try:
with open('Database.json', 'r') as File:
Db = json.load(File)
except Exception:
pass
def CharEscape(String, Escape='') -> str: def CharEscape(String, Escape='') -> str:
if Escape == 'MARKDOWN': if Escape == 'MARKDOWN':
return escape_markdown(String, version=2) return escape_markdown(String, version=2)
@ -77,99 +79,38 @@ def CharEscape(String, Escape='') -> str:
String = String.replace(c, '\\'+c) String = String.replace(c, '\\'+c)
return String return String
def CmdAllowed(update) -> bool:
if not TGRestrict:
return True
else:
if TGRestrict.lower() == 'whitelist':
if update.message.chat.id in TGWhitelist:
return True
return False
def HandleCmd(update):
filters(update)
if CmdAllowed(update):
return ParseCmd(update.message.text)
else:
return False
def GetRawTokens(Text:str) -> list: def GetRawTokens(Text:str) -> list:
return Text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ') return Text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')
def CmdFilter(Msg) -> str: def ParseCmd(Msg) -> dict:
return Msg.lower().split(' ')[0][1:].split('@')[0] Name = Msg.lower().split(' ')[0][1:].split('@')[0]
return SimpleNamespace(**{
"Name": Name,
"Body": Name.join(Msg.split(Name)[1:]).strip(),
"Tokens": GetRawTokens(Msg),
})
def RandPercent() -> int: def filters(update:Update, context:CallbackContext=None) -> None:
Num = randint(0,100)
if Num == 100:
Num = str(Num) + '\.00'
else:
Num = str(Num) + '\.' + str(randint(0,9)) + str(randint(0,9))
return Num
def cStart(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
user = update.effective_user
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('start')), 'MARKDOWN_SPEECH').format(user.mention_markdown_v2()),
reply_to_message_id=update.message.message_id)
def cHelp(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('help')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cConfig(update:Update, context:CallbackContext) -> None:
pass
def cEcho(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
Msg = update.message.text
if len(Msg.split(' ')) >= 2:
Text = Msg[len(Msg.split(' ')[0])+1:]
update.message.reply_text(
Text,
reply_to_message_id=update.message.message_id)
else:
Text = CharEscape(choice(Locale.__('echo.empty')), '.!')
update.message.reply_markdown_v2(
Text,
reply_to_message_id=update.message.message_id)
def cPing(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
update.message.reply_markdown_v2(
'*Pong\!*',
reply_to_message_id=update.message.message_id)
def percenter(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
Msg = update.message.text
Key = CmdFilter(Msg)
Toks = GetRawTokens(Msg)
Thing = Key.join(Msg.split(Key)[1:]).strip()
if len(Toks) >= 2:
Text = choice(Locale.__(f'{Key}.done'))
else:
Text = choice(Locale.__(f'{Key}.empty'))
update.message.reply_markdown_v2(
CharEscape(Text, '.!').format(Cmd=Toks[0], Percent=RandPercent(), Thing=Thing),
reply_to_message_id=update.message.message_id)
def multifun(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
Key = CmdFilter(update.message.text)
ReplyToMsg = update.message.message_id
if update.message.reply_to_message:
ReplyFromUID = update.message.reply_to_message.from_user.id
if ReplyFromUID == TGID and 'bot' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.bot')), 'MARKDOWN_SPEECH')
elif ReplyFromUID == update.message.from_user.id and 'self' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.self')), 'MARKDOWN_SPEECH').format(FromUName)
else:
if 'others' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
ToUName = CharEscape(update.message.reply_to_message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.others')), 'MARKDOWN_SPEECH').format(FromUName,ToUName)
ReplyToMsg = update.message.reply_to_message.message_id
else:
if 'empty' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.empty')), 'MARKDOWN_SPEECH')
update.message.reply_markdown_v2(Text, reply_to_message_id=ReplyToMsg)
def cUnsplash(update:Update, context:CallbackContext) -> None:
pass
def filters(update:Update, context:CallbackContext) -> None:
if Debug and Dumper: if Debug and Dumper:
Text = update.message.text.replace('\n', '\\n')
with open('Dump.txt', 'a') as File: with open('Dump.txt', 'a') as File:
File.write(f'[{time.ctime()}] [{int(time.time())}] [{update.message.chat.id}] [{update.message.message_id}] [{update.message.from_user.id}] {update.message.text}\n') File.write(f'[{time.ctime()}] [{int(time.time())}] [{update.message.chat.id}] [{update.message.message_id}] [{update.message.from_user.id}] {Text}\n')
''' '''
if CmdAllowed(update): if CmdAllowed(update):
ChatID = update.message.chat.id ChatID = update.message.chat.id
@ -191,66 +132,13 @@ def setfilter(update:Update, context:CallbackContext) -> None:
Private['Chats'][ChatID]['Filters'][update.message.text] = {'Text':0} Private['Chats'][ChatID]['Filters'][update.message.text] = {'Text':0}
''' '''
#def cTime(update:Update, context:CallbackContext) -> None: def RandPercent() -> int:
# update.message.reply_markdown_v2( Num = randint(0,100)
# CharEscape(choice(Locale.__('time')).format(time.ctime().replace(' ', ' ')), 'MARKDOWN_SPEECH'), if Num == 100:
# reply_to_message_id=update.message.message_id) Num = str(Num) + '\.00'
def cHash(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
Msg = update.message.text
Toks = GetRawTokens(Msg)
if len(Toks) >= 3 and Toks[1] in hashlib.algorithms_available:
Alg = Toks[1]
Caption = hashlib.new(Alg, Alg.join(Msg.split(Alg)[1:]).strip().encode()).hexdigest()
else: else:
Caption = CharEscape(choice(Locale.__('hash')).format(Toks[0], hashlib.algorithms_available), 'MARKDOWN_SPEECH') Num = str(Num) + '\.' + str(randint(0,9)) + str(randint(0,9))
update.message.reply_markdown_v2(Caption, reply_to_message_id=update.message.message_id) return Num
def cEval(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('eval')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cExec(update:Update, context:CallbackContext) -> None:
if CmdAllowed(update):
Toks = GetRawTokens(update.message.text)
if len(Toks) >= 2 and Toks[1].lower() in ExecAllowed:
Cmd = Toks[1].lower()
Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {Cmd}'), stdout=subprocess.PIPE).stdout.decode()
# <https://stackoverflow.com/a/14693789>
Caption = (re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', Out)) #if ExecAllowed[Cmd] else Out)
update.message.reply_markdown_v2(
'```\n' + CharEscape(Caption, 'MARKDOWN').strip() + '\n```',
reply_to_message_id=update.message.message_id)
else:
update.message.reply_markdown_v2(
CharEscape(choice(Locale.__('eval')), 'MARKDOWN_SPEECH'),
reply_to_message_id=update.message.message_id)
def cWeb(update:Update, context:CallbackContext) -> None:
if not CmdAllowed(update): return
Msg = update.message.text
Toks = GetRawTokens(Msg)
if len(Toks) >= 2:
try:
Key = CmdFilter(Msg)
Query = Key.join(Msg.split(Key)[1:]).strip()
QueryUrl = UrlParse.quote(Query)
Req = urlopen(Request(f'https://html.duckduckgo.com/html?q={QueryUrl}', headers={"User-Agent": UserAgent}))
Caption = f'[🦆🔎 "*{CharEscape(Query, "MARKDOWN")}*"](https://duckduckgo.com/?q={CharEscape(QueryUrl, "MARKDOWN")})\n\n'
Index = 0
for Line in Req.read().decode().replace('\t', ' ').splitlines():
if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line:
Index += 1
Link = CharEscape(UrlParse.unquote(Line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&amp;rut=')[0]), 'MARKDOWN')
Title = CharEscape(UrlParse.unquote(Line.split('</a>')[0].split('</span>')[-1].split('>')[1]), 'MARKDOWN')
Domain = Link.split('://')[1].split('/')[0]
Caption += f'{Index}\. [{Title}]({Link}) \[`{Domain}`\]\n\n'
update.message.reply_markdown_v2(Caption, reply_to_message_id=update.message.message_id)
except Exception:
raise
#def CmdArgs(Msg:str, Cfg:tuple=None): #def CmdArgs(Msg:str, Cfg:tuple=None):
# Args = [] # Args = []
@ -262,14 +150,8 @@ def cWeb(update:Update, context:CallbackContext) -> None:
# else: # else:
# return Msg.replace(' ', ' ').replace(' ', ' ').split(' ') # return Msg.replace(' ', ' ').replace(' ', ' ').split(' ')
def CmdAllowed(update) -> bool: def HttpGet(Url:str):
if not TGRestrict: return urlopen(Request(Url, headers={"User-Agent": WebUserAgent}))
return True
else:
if TGRestrict.lower() == 'whitelist':
if update.message.chat.id in TGWhitelist:
return True
return False
#def SendMsg(Data, context): #def SendMsg(Data, context):
# pass # pass
@ -294,6 +176,7 @@ def Main() -> None:
dispatcher.add_handler(CommandHandler('exec', cExec)) dispatcher.add_handler(CommandHandler('exec', cExec))
dispatcher.add_handler(CommandHandler('web', cWeb)) dispatcher.add_handler(CommandHandler('web', cWeb))
dispatcher.add_handler(CommandHandler('unsplash', cUnsplash)) dispatcher.add_handler(CommandHandler('unsplash', cUnsplash))
dispatcher.add_handler(CommandHandler('safebooru', cSafebooru))
for Cmd in ('wish', 'level'): for Cmd in ('wish', 'level'):
dispatcher.add_handler(CommandHandler(Cmd, percenter)) dispatcher.add_handler(CommandHandler(Cmd, percenter))
@ -308,5 +191,10 @@ def Main() -> None:
updater.idle() updater.idle()
if __name__ == '__main__': if __name__ == '__main__':
try:
from Config import *
except Exception:
pass
Main() Main()
print('Closing WinDog...') print('Closing WinDog...')