Add months-old local changes, restructure platforms api and some modules, fix some bugs

This commit is contained in:
octospacc 2024-06-15 02:08:09 +02:00
parent 2512df4f98
commit b7eb53e6a7
13 changed files with 393 additions and 332 deletions

View File

@ -3,6 +3,9 @@
# Licensed under AGPLv3 by OctoSpacc #
# ================================== #
# If you have modified the bot's code, you should set this
ModifiedSourceUrl = ''
MastodonUrl = ''
MastodonToken = ''
@ -33,14 +36,16 @@ Endpoints = {
#"repeat": cRepeat,
"wish": percenter,
"level": percenter,
#"hug": multifun,
#"pat": multifun,
#"poke": multifun,
#"cuddle": multifun,
#"floor": multifun,
#"hands": multifun,
#"sessocto": multifun,
"hug": multifun,
"pat": multifun,
"poke": multifun,
"cuddle": multifun,
"floor": multifun,
"hands": multifun,
"sessocto": multifun,
"hash": cHash,
#"encode": cEncode,
#"decode": cDecode,
#"time": cTime,
"eval": cEval,
"exec": cExec,

View File

@ -0,0 +1,34 @@
import mastodon
def MastodonSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -> None:
if InDict(Data, 'Media'):
Media = manager.media_post(Data['Media'], Magic(mime=True).from_buffer(Data['Media']))
while Media['url'] == 'null':
Media = manager.media(Media)
if TextPlain:
manager.status_post(
status=(TextPlain + '\n\n@' + event['account']['acct']),
media_ids=(Media if InDict(Data, 'Media') else None),
in_reply_to_id=event['status']['id'],
visibility=('direct' if event['status']['visibility'] == 'direct' else 'unlisted'),
)
# TODO make this non-blocking or else we can't load it dynamically
def MastodonMain() -> None:
if not (MastodonUrl and MastodonToken):
return
Mastodon = mastodon.Mastodon(api_base_url=MastodonUrl, access_token=MastodonToken)
class MastodonListener(mastodon.StreamListener):
def on_notification(self, event):
if event['type'] == 'mention':
Msg = BeautifulSoup(event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ')
if not Msg.split('@')[0]:
Msg = ' '.join('@'.join(Msg.split('@')[1:]).strip().split(' ')[1:]).strip()
if Msg[0] in CmdPrefixes:
Cmd = ParseCmd(Msg)
Cmd.messageId = event['status']['id']
if Cmd.Name in Endpoints:
Endpoints[Cmd.Name]({"Event": event, "Manager": Mastodon}, Cmd)
Mastodon.stream_user(MastodonListener())
Platforms["Mastodon"] = {"main": MastodonMain, "sender": MastodonSender, "managerClass": mastodon.Mastodon}

View File

@ -0,0 +1,4 @@
def MatrixMain() -> None:
pass
#Platforms["Matrix"] = {"main": MatrixMain}

View File

@ -0,0 +1,81 @@
import telegram, telegram.ext
from telegram import ForceReply, Bot
from telegram.utils.helpers import escape_markdown
from telegram.ext import CommandHandler, MessageHandler, Filters, CallbackContext
def TelegramCmdAllowed(update:telegram.Update) -> bool:
if not TelegramRestrict:
return True
if TelegramRestrict.lower() == 'whitelist':
if update.message.chat.id in TelegramWhitelist:
return True
return False
def TelegramHandleCmd(update:telegram.Update):
TelegramQueryHandle(update)
if TelegramCmdAllowed(update):
return ParseCmd(update.message.text)
else:
return False
def TelegramQueryHandle(update:telegram.Update, context:CallbackContext=None) -> None:
if not (update and update.message):
return
Cmd = ParseCmd(update.message.text)
Cmd.messageId = update.message.message_id
Cmd.TextPlain = Cmd.Body
Cmd.TextMarkdown = update.message.text_markdown_v2
Cmd.Text = GetWeightedText((Cmd.TextMarkdown, Cmd.TextPlain))
if Cmd and Cmd.Tokens[0][0] in CmdPrefixes and Cmd.Name in Endpoints:
Cmd.User = {
"Name": update.message.from_user.first_name,
"Tag": update.message.from_user.username,
"Id": f'{update.message.from_user.id}@telegram',
}
if update.message.reply_to_message:
Cmd.Quoted = SimpleNamespace(**{
"messageId": update.message.reply_to_message.message_id,
"Body": update.message.reply_to_message.text,
"TextPlain": update.message.reply_to_message.text,
"TextMarkdown": update.message.reply_to_message.text_markdown_v2,
"Text": GetWeightedText((update.message.reply_to_message.text_markdown_v2, update.message.reply_to_message.text)),
"User": {
"Name": update.message.reply_to_message.from_user.first_name,
"Tag": update.message.reply_to_message.from_user.username,
"Id": f'{update.message.reply_to_message.from_user.id}@telegram',
},
})
Endpoints[Cmd.Name]({"Event": update, "Manager": context}, Cmd)
if Debug and Dumper:
Text = update.message.text
Text = (Text.replace('\n', '\\n') if Text else '')
with open('Dump.txt', 'a') as File:
File.write(f'[{time.ctime()}] [{int(time.time())}] [{update.message.chat.id}] [{update.message.message_id}] [{update.message.from_user.id}] {Text}\n')
def TelegramSender(event, manager, Data, Destination, TextPlain, TextMarkdown) -> None:
if Destination:
manager.bot.send_message(Destination, text=TextPlain)
else:
replyToId = (Data["ReplyTo"] if ("ReplyTo" in Data and Data["ReplyTo"]) else event.message.message_id)
if InDict(Data, 'Media'):
event.message.reply_photo(
Data['Media'],
caption=(TextMarkdown if TextMarkdown else TextPlain if TextPlain else None),
parse_mode=('MarkdownV2' if TextMarkdown else None),
reply_to_message_id=replyToId,
)
elif TextMarkdown:
event.message.reply_markdown_v2(TextMarkdown, reply_to_message_id=replyToId)
elif TextPlain:
event.message.reply_text(TextPlain, reply_to_message_id=replyToId)
def TelegramMain() -> None:
if not TelegramToken:
return
updater = telegram.ext.Updater(TelegramToken)
dispatcher = updater.dispatcher
#dispatcher.add_handler(CommandHandler('config', cConfig))
dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramQueryHandle))
updater.start_polling()
Platforms["Telegram"] = {"main": TelegramMain, "sender": TelegramSender, "eventClass": telegram.Update}

View File

@ -1,6 +1,6 @@
{
"start": [
"*Hi* {0}*!*"
"*Hi* {0}*!*\n\nUse /help to read a list of available commands."
],
"help": [
"*There's no one around to help (yet).*"

View File

@ -1,6 +1,6 @@
{
"start": [
"*Ciao* {0}*!*"
"*Ciao* {0}*!*\n\nUsa /help per leggere la lista dei comandi."
],
"help": [
"*Non c'è nessuno qui ad aiutarti (per ora).*"
@ -134,7 +134,7 @@
"*Vuoi fare sessocto con qualcuno? Rispondi ad un suo messaggio.*"
],
"bot": [
"*Vorrei anche io sessocto ma non ho un corpo...😥️*"
"*Vorrei anche io sessocto ma non ho un corpo... 😥️*"
],
"self": [
"*{0} fa sessocto in singleplayer 😳️*"

0
ModWinDog/Codings.py Normal file
View File

15
ModWinDog/Hash.py Normal file
View File

@ -0,0 +1,15 @@
import hashlib
# Module: Hash
# Responds with the hash-sum of a message received.
def cHash(Context, Data) -> None:
if len(Data.Tokens) >= 3 and Data.Tokens[1] in hashlib.algorithms_available:
Alg = Data.Tokens[1]
Hash = hashlib.new(Alg, Alg.join(Data.Body.split(Alg)[1:]).strip().encode()).hexdigest()
SendMsg(Context, {
"TextPlain": Hash,
"TextMarkdown": MarkdownCode(Hash, True),
})
else:
SendMsg(Context, {"Text": choice(Locale.__('hash.usage')).format(Data.Tokens[0], hashlib.algorithms_available)})

129
ModWinDog/Internet.py Normal file
View File

@ -0,0 +1,129 @@
from urlextract import URLExtract
from urllib import parse as UrlParse
from urllib.request import urlopen, Request
def HttpGet(Url:str):
return urlopen(Request(Url, headers={"User-Agent": WebUserAgent}))
# Module: Embedded
# Rewrite a link trying to make sure we have an embed view.
def cEmbedded(Context, Data) -> None:
if len(Data.Tokens) >= 2:
# Find links in command body
Text = (Data.TextMarkdown + ' ' + Data.TextPlain)
elif Data.Quoted and Data.Quoted.Text:
# Find links in quoted message
Text = (Data.Quoted.TextMarkdown + ' ' + Data.Quoted.TextPlain)
else:
# TODO Error message
return
pass
urls = URLExtract().find_urls(Text)
if len(urls) > 0:
proto = 'https://'
url = urls[0]
urlLow = url.lower()
if urlLow.startswith('http://') or urlLow.startswith('https://'):
proto = url.split('://')[0] + '://'
url = '://'.join(url.split('://')[1:])
urlLow = '://'.join(urlLow.split('://')[1:])
urlDomain = urlLow.split('/')[0]
if urlDomain in ("facebook.com", "www.facebook.com", "m.facebook.com", "mbasic.facebook.com"):
url = "https://hlb0.octt.eu.org/cors-main.php/https://" + url
proto = ''
else:
if urlDomain == "instagram.com":
urlDomain = "ddinstagram.com"
elif urlDomain in ("twitter.com", "x.com"):
urlDomain = "fxtwitter.com"
elif urlDomain == "vm.tiktok.com":
urlDomain = "vm.vxtiktok.com"
url = urlDomain + url[len(urlDomain):]
SendMsg(Context, {"TextPlain": f"{{{proto}{url}}}"})
# else TODO error message?
# Module: Web
# Provides results of a DuckDuckGo search.
def cWeb(Context, Data) -> None:
if Data.Body:
try:
QueryUrl = UrlParse.quote(Data.Body)
Req = HttpGet(f'https://html.duckduckgo.com/html?q={QueryUrl}')
Caption = f'🦆🔎 "{Data.Body}": https://duckduckgo.com/?q={QueryUrl}\n\n'
Index = 0
for Line in Req.read().decode().replace('\t', ' ').splitlines():
if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line:
Index += 1
Link = UrlParse.unquote(Line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&rut=')[0])
Title = Line.strip().split('</a>')[0].strip().split('</span>')[-1].strip().split('>')
if len(Title) > 1:
Title = HtmlUnescape(Title[1].strip())
Caption += f'[{Index}] {Title} : {{{Link}}}\n\n'
else:
continue
SendMsg(Context, {"TextPlain": f'{Caption}...'})
except Exception:
raise
else:
pass
# Module: Translate
# Return the received message after translating it in another language.
def cTranslate(Context, Data) -> None:
if len(Data.Tokens) < 3:
return
try:
Lang = Data.Tokens[1]
# TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one
Result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{Lang}/{UrlParse.quote(Lang.join(Data.Body.split(Lang)[1:]))}').read())["translation"]
SendMsg(Context, {"TextPlain": Result})
except Exception:
raise
# Module: Unsplash
# Send a picture sourced from Unsplash.
def cUnsplash(Context, Data) -> None:
try:
Req = HttpGet(f'https://source.unsplash.com/random/?{UrlParse.quote(Data.Body)}')
ImgUrl = Req.geturl().split('?')[0]
SendMsg(Context, {
"TextPlain": f'{{{ImgUrl}}}',
"TextMarkdown": MarkdownCode(ImgUrl, True),
"Media": Req.read(),
})
except Exception:
raise
# Module: Safebooru
# Send a picture sourced from Safebooru.
def cSafebooru(Context, Data) -> None:
ApiUrl = 'https://safebooru.org/index.php?page=dapi&s=post&q=index&limit=100&tags='
try:
if Data.Body:
for i in range(7):
ImgUrls = HttpGet(f'{ApiUrl}md5:{RandHexStr(3)}%20{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:]
if ImgUrls:
break
if not ImgUrls:
ImgUrls = HttpGet(f'{ApiUrl}{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:]
ImgXml = choice(ImgUrls)
ImgUrl = ImgXml.split('"')[0]
ImgId = ImgXml.split(' id="')[1].split('"')[0]
else:
HtmlReq = HttpGet(HttpGet('https://safebooru.org/index.php?page=post&s=random').geturl())
for Line in HtmlReq.read().decode().replace('\t', ' ').splitlines():
if '<img ' in Line and ' id="image" ' in Line and ' src="':
ImgUrl = Line.split(' src="')[1].split('"')[0]
ImgId = ImgUrl.split('?')[-1]
break
if ImgUrl:
SendMsg(Context, {
"TextPlain": f'[{ImgId}]\n{{{ImgUrl}}}',
"TextMarkdown": f'\\[`{ImgId}`\\]\n{MarkdownCode(ImgUrl, True)}',
"Media": HttpGet(ImgUrl).read(),
})
else:
pass
except Exception:
raise

View File

@ -3,45 +3,36 @@
# Licensed under AGPLv3 by OctoSpacc #
# ==================================== #
from urlextract import URLExtract
# Module: Percenter
# Provides fun trough percentage-based toys.
def percenter(Context, Data=None) -> None:
if Data.Body:
Text = choice(Locale.__(f'{Data.Name}.done'))
else:
Text = choice(Locale.__(f'{Data.Name}.empty'))
SendMsg(Context, {"Text": Text.format(Cmd=Data.Tokens[0], Percent=RandPercent(), Thing=Data.Body)})
def percenter(Context, Data) -> None:
SendMsg(Context, {"Text": choice(Locale.__(f'{Data.Name}.{"done" if Data.Body else "empty"}')).format(
Cmd=Data.Tokens[0], Percent=RandPercent(), Thing=Data.Body)})
# Module: Multifun
# Provides fun trough preprogrammed-text-based toys.
def multifun(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
Key = ParseCmd(update.message.text).Name
ReplyToMsg = update.message.message_id
if update.message.reply_to_message:
ReplyFromUID = update.message.reply_to_message.from_user.id
if ReplyFromUID == TelegramId and 'bot' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.bot')), 'MARKDOWN_SPEECH')
elif ReplyFromUID == update.message.from_user.id and 'self' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.self')), 'MARKDOWN_SPEECH').format(FromUName)
def multifun(Context, Data) -> None:
cmdkey = Data.Name
replyToId = None
if Data.Quoted:
replyFromUid = Data.Quoted.User["Id"]
# TODO work on all platforms for the bot id
if int(replyFromUid.split('@')[0]) == int(TelegramId) and 'bot' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.bot'))
elif replyFromUid == Data.User["Id"] and 'self' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.self')).format(Data.User["Name"])
else:
if 'others' in Locale.__(Key):
FromUName = CharEscape(update.message.from_user.first_name, 'MARKDOWN')
ToUName = CharEscape(update.message.reply_to_message.from_user.first_name, 'MARKDOWN')
Text = CharEscape(choice(Locale.__(f'{Key}.others')), 'MARKDOWN_SPEECH').format(FromUName,ToUName)
ReplyToMsg = update.message.reply_to_message.message_id
if 'others' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.others')).format(Data.User["Name"], Data.Quoted.User["Name"])
replyToId = Data.Quoted.messageId
else:
if 'empty' in Locale.__(Key):
Text = CharEscape(choice(Locale.__(f'{Key}.empty')), 'MARKDOWN_SPEECH')
update.message.reply_markdown_v2(Text, reply_to_message_id=ReplyToMsg)
if 'empty' in Locale.__(cmdkey):
Text = choice(Locale.__(f'{cmdkey}.empty'))
SendMsg(Context, {"Text": Text, "ReplyTo": replyToId})
# Module: Start
# Salutes the user, for now no other purpose except giving a feel that the bot is working.
def cStart(Context, Data=None) -> None:
def cStart(Context, Data) -> None:
SendMsg(Context, {"Text": choice(Locale.__('start')).format(Data.User['Name'])})
# Module: Help
@ -55,16 +46,19 @@ def cHelp(Context, Data=None) -> None:
# Module: Source
# Provides a copy of the bot source codes and/or instructions on how to get it.
def cSource(Context, Data=None) -> None:
SendMsg(Context, {"TextPlain": "{https://gitlab.com/octospacc/WinDog}"})
SendMsg(Context, {"TextPlain": ("""\
* Original Source Code: {https://gitlab.com/octospacc/WinDog}
* Mirror: {https://github.com/octospacc/WinDog}
""" + (f"* Modified Source Code: {{{ModifiedSourceUrl}}}" if ModifiedSourceUrl else ""))})
# Module: Config
# ...
def cConfig(update:Update, context:CallbackContext) -> None:
Cmd = HandleCmd(update)
if not Cmd: return
# ... area: eu, us, ...
# ... language: en, it, ...
# ... userdata: import, export, delete
#def cConfig(update:telegram.Update, context:CallbackContext) -> None:
# Cmd = TelegramHandleCmd(update)
# if not Cmd: return
# # ... area: eu, us, ...
# # ... language: en, it, ...
# # ... userdata: import, export, delete
# Module: Ping
# Responds pong, useful for testing messaging latency.
@ -73,7 +67,7 @@ def cPing(Context, Data=None) -> None:
# Module: Echo
# Responds back with the original text of the received message.
def cEcho(Context, Data=None) -> None:
def cEcho(Context, Data) -> None:
if Data.Body:
SendMsg(Context, {"Text": Data.Body})
else:
@ -81,33 +75,21 @@ def cEcho(Context, Data=None) -> None:
# Module: Broadcast
# Sends an admin message over to another destination
def cBroadcast(Context, Data=None) -> None:
if len(Data.Tokens) >= 3 and Data.User['Id'] in AdminIds:
Dest = Data.Tokens[1]
Text = ' '.join(Data.Tokens[2:])
SendMsg(Context, {"TextPlain": Text}, Dest)
SendMsg(Context, {"TextPlain": "Executed."})
else:
SendMsg(Context, {"Text": choice(Locale.__('eval'))})
def cBroadcast(Context, Data) -> None:
if Data.User['Id'] not in AdminIds:
return SendMsg(Context, {"Text": choice(Locale.__('eval'))})
if len(Data.Tokens) < 3:
return SendMsg(Context, {"Text": "Bad usage."})
Dest = Data.Tokens[1]
Text = ' '.join(Data.Tokens[2:])
SendMsg(Context, {"TextPlain": Text}, Dest)
SendMsg(Context, {"TextPlain": "Executed."})
#def cTime(update:Update, context:CallbackContext) -> None:
# update.message.reply_markdown_v2(
# CharEscape(choice(Locale.__('time')).format(time.ctime().replace(' ', ' ')), 'MARKDOWN_SPEECH'),
# reply_to_message_id=update.message.message_id)
# Module: Hash
# Responds with the hash-sum of a message received.
def cHash(Context, Data=None) -> None:
if len(Data.Tokens) >= 3 and Data.Tokens[1] in hashlib.algorithms_available:
Alg = Data.Tokens[1]
Hash = hashlib.new(Alg, Alg.join(Data.Body.split(Alg)[1:]).strip().encode()).hexdigest()
SendMsg(Context, {
"TextPlain": Hash,
"TextMarkdown": MarkdownCode(Hash, True),
})
else:
SendMsg(Context, {"Text": choice(Locale.__('hash.usage')).format(Data.Tokens[0], hashlib.algorithms_available)})
# Module: Eval
# Execute a Python command (or safe literal operation) in the current context. Currently not implemented.
def cEval(Context, Data=None) -> None:
@ -115,7 +97,7 @@ def cEval(Context, Data=None) -> None:
# Module: Exec
# Execute a system command from the allowed ones and return stdout/stderr.
def cExec(Context, Data=None) -> None:
def cExec(Context, Data) -> None:
if len(Data.Tokens) >= 2 and Data.Tokens[1].lower() in ExecAllowed:
Cmd = Data.Tokens[1].lower()
Out = subprocess.run(('sh', '-c', f'export PATH=$PATH:/usr/games; {Cmd}'), stdout=subprocess.PIPE).stdout.decode()
@ -138,115 +120,3 @@ def cFormat(Context, Data=None) -> None:
def cFrame(Context, Data=None) -> None:
pass
# Module: Embedded
# Rewrite a link trying to make sure we have an embed view.
def cEmbedded(Context, Data=None) -> None:
if len(Data.Tokens) >= 2:
# Find links in command body
Text = Data.Body
elif Data.Quoted and Data.Quoted['Body']:
# Find links in quoted message
Text = Data.Quoted['Body']
else:
# Error
return
pass
Urls = URLExtract().find_urls(Text)
if len(Urls) > 0:
Proto = 'https://'
Url = Urls[0]
UrlLow = Url.lower()
if UrlLow.startswith('http://') or UrlLow.startswith('https://'):
Proto = Url.split('://')[0] + '://'
Url = '://'.join(Url.split('://')[1:])
UrlLow = '://'.join(UrlLow.split('://')[1:])
if UrlLow.startswith('facebook.com/') or UrlLow.startswith('www.facebook.com/') or UrlLow.startswith('m.facebook.com/') or UrlLow.startswith('mbasic.facebook.com/'):
Url = 'https://hlb0.octt.eu.org/cors-main.php/https://' + Url
Proto = ''
elif UrlLow.startswith('instagram.com/'):
Url = 'ddinstagram.com/' + Url[len('instagram.com/'):]
elif UrlLow.startswith('twitter.com/'):
Url = 'fxtwitter.com/' + Url[len('twitter.com/'):]
SendMsg(Context, {"TextPlain": Proto+Url})
# Module: Web
# Provides results of a DuckDuckGo search.
def cWeb(Context, Data=None) -> None:
if Data.Body:
try:
QueryUrl = UrlParse.quote(Data.Body)
Req = HttpGet(f'https://html.duckduckgo.com/html?q={QueryUrl}')
Caption = f'🦆🔎 "{Data.Body}": https://duckduckgo.com/?q={QueryUrl}\n\n'
Index = 0
for Line in Req.read().decode().replace('\t', ' ').splitlines():
if ' class="result__a" ' in Line and ' href="//duckduckgo.com/l/?uddg=' in Line:
Index += 1
Link = UrlParse.unquote(Line.split(' href="//duckduckgo.com/l/?uddg=')[1].split('&amp;rut=')[0])
Title = HtmlUnescape(Line.split('</a>')[0].split('</span>')[-1].split('>')[1])
Caption += f'[{Index}] {Title} : {{{Link}}}\n\n'
SendMsg(Context, {"TextPlain": f'{Caption}...'})
except Exception:
raise
else:
pass
# Module: Translate
# Return the received message after translating it in another language.
def cTranslate(Context, Data=None) -> None:
if Data.Body:
try:
Lang = Data.Tokens[1]
# TODO: Use many different public Lingva instances in rotation to avoid overloading a specific one
Result = json.loads(HttpGet(f'https://lingva.ml/api/v1/auto/{Lang}/{UrlParse.quote(Lang.join(Data.Body.split(Lang)[1:]))}').read())["translation"]
SendMsg(Context, {"TextPlain": Result})
except Exception:
raise
else:
pass
# Module: Unsplash
# Send a picture sourced from Unsplash.
def cUnsplash(Context, Data=None) -> None:
try:
Req = HttpGet(f'https://source.unsplash.com/random/?{UrlParse.quote(Data.Body)}')
ImgUrl = Req.geturl().split('?')[0]
SendMsg(Context, {
"TextPlain": f'{{{ImgUrl}}}',
"TextMarkdown": MarkdownCode(ImgUrl, True),
"Media": Req.read(),
})
except Exception:
raise
# Module: Safebooru
# Send a picture sourced from Safebooru.
def cSafebooru(Context, Data=None) -> None:
ApiUrl = 'https://safebooru.org/index.php?page=dapi&s=post&q=index&limit=100&tags='
try:
if Data.Body:
for i in range(7):
ImgUrls = HttpGet(f'{ApiUrl}md5:{RandHexStr(3)}%20{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:]
if ImgUrls:
break
if not ImgUrls:
ImgUrls = HttpGet(f'{ApiUrl}{UrlParse.quote(Data.Body)}').read().decode().split(' file_url="')[1:]
ImgXml = choice(ImgUrls)
ImgUrl = ImgXml.split('"')[0]
ImgId = ImgXml.split(' id="')[1].split('"')[0]
else:
HtmlReq = HttpGet(HttpGet('https://safebooru.org/index.php?page=post&s=random').geturl())
for Line in HtmlReq.read().decode().replace('\t', ' ').splitlines():
if '<img ' in Line and ' id="image" ' in Line and ' src="':
ImgUrl = Line.split(' src="')[1].split('"')[0]
ImgId = ImgUrl.split('?')[-1]
break
if ImgUrl:
SendMsg(Context, {
"TextPlain": f'[{ImgId}]\n{{{ImgUrl}}}',
"TextMarkdown": f'\\[`{ImgId}`\\]\n{MarkdownCode(ImgUrl, True)}',
"Media": HttpGet(ImgUrl).read(),
})
else:
pass
except Exception:
raise

19
README.md Normal file
View File

@ -0,0 +1,19 @@
# WinDog
WinDog/WinDogBot is a chatbot I've been (lazily) developing for years, with some special characteristics:
* multi-purpose: it's created for doing a myriad of different things, from the funny to the useful.
* multi-platform: it's an experiment in automagical multiplatform compatibility, with modules targeting a common abstracted API.
The officially-hosted instances of this bot are, respectively:
* [@WinDogBot](https://t.me/WinDogBot) on Telegram
* [@WinDog@botsin.space](https://botsin.space/@WinDog) on Mastodon (can also be used from any other Fediverse platform)
In case you want to run your own instance:
1. `git clone --depth 1 https://gitlab.com/octospacc/WinDog && cd ./WinDog` to get the code.
2. `python3 -m pip install -U -r ./requirements.txt` to install dependencies.
3. `cp ./LibWinDog/Config.py ./` and, in the new file, edit essential fields like user credentials, then delete the unmodified fields.
4. `sh ./StartWinDog.sh` to start the bot every time.

205
WinDog.py
View File

@ -4,36 +4,34 @@
# Licensed under AGPLv3 by OctoSpacc #
# ==================================== #
import json, hashlib, re, time, subprocess
import json, re, time, subprocess
from binascii import hexlify
from magic import Magic
from os import listdir
from os.path import isfile
from os.path import isfile, isdir
from random import choice, randint
from types import SimpleNamespace
#from traceback import format_exc as TraceText
import mastodon, telegram
from bs4 import BeautifulSoup
from html import unescape as HtmlUnescape
from markdown import markdown
from telegram import Update, ForceReply, Bot
from telegram.utils.helpers import escape_markdown
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from urllib import parse as UrlParse
from urllib.request import urlopen, Request
# <https://daringfireball.net/projects/markdown/syntax#backslash>
MdEscapes = '\\`*_{}[]()<>#+-.!|='
Db = { "Rooms": {}, "Users": {}, }
Locale = { "Fallback": {}, }
Db = {"Rooms": {}, "Users": {}}
Locale = {"Fallback": {}}
Platforms = {}
Commands = {}
for Dir in ('Mod', 'Lib'):
for File in listdir(f'./{Dir}WinDog'):
File = f'./{Dir}WinDog/{File}'
if isfile(File):
with open(File, 'r') as File:
exec(File.read())
for dir in ("LibWinDog/Platforms", "ModWinDog"):
for path in listdir(f"./{dir}"):
path = f"./{dir}/{path}"
if isfile(path):
exec(open(path, 'r').read())
elif isdir(path):
exec(open(f"{path}/mod.py", 'r').read())
exec(open("./LibWinDog/Config.py", 'r').read())
def SetupLocale() -> None:
global Locale
@ -94,18 +92,18 @@ def CharEscape(String:str, Escape:str='') -> str:
String = String.replace(c, '\\'+c)
return String
def InferMdEscape(Raw:str, Plain:str) -> str:
Chs = ''
for Ch in MdEscapes:
if Ch in Raw and Ch in Plain:
Chs += Ch
return Chs
def InferMdEscape(raw:str, plain:str) -> str:
chars = ''
for char in MdEscapes:
if char in raw and char in plain:
chars += char
return chars
def MarkdownCode(Text:str, Block:bool) -> str:
return '```\n' + Text.strip().replace('`', '\`') + '\n```'
def MarkdownCode(text:str, block:bool) -> str:
return '```\n' + text.strip().replace('`', '\`') + '\n```'
def MdToTxt(Md:str) -> str:
return BeautifulSoup(markdown(Md), 'html.parser').get_text(' ')
def MdToTxt(md:str) -> str:
return BeautifulSoup(markdown(md), 'html.parser').get_text(' ')
def HtmlEscapeFull(Raw:str) -> str:
New = ''
@ -114,78 +112,29 @@ def HtmlEscapeFull(Raw:str) -> str:
New += f'&#x{Hex[i] + Hex[i+1]};'
return New
def CmdAllowed(update) -> bool:
if not TelegramRestrict:
return True
else:
if TelegramRestrict.lower() == 'whitelist':
if update.message.chat.id in TelegramWhitelist:
return True
return False
def GetRawTokens(text:str) -> list:
return text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')
def HandleCmd(update):
TelegramQueryHandle(update)
if CmdAllowed(update):
return ParseCmd(update.message.text)
else:
return False
def ParseCmd(msg) -> dict|None:
name = msg.lower().split(' ')[0][1:].split('@')[0]
if not name:
return
return SimpleNamespace(**{
"Name": name,
"Body": name.join(msg.split(name)[1:]).strip(),
"Tokens": GetRawTokens(msg),
"User": None,
"Quoted": None,
})
def GetRawTokens(Text:str) -> list:
return Text.strip().replace('\t', ' ').replace(' ', ' ').replace(' ', ' ').split(' ')
def ParseCmd(Msg) -> dict:
Name = Msg.lower().split(' ')[0][1:].split('@')[0]
if Name:
return SimpleNamespace(**{
"Name": Name,
"Body": Name.join(Msg.split(Name)[1:]).strip(),
"Tokens": GetRawTokens(Msg),
"User": None,
"Quoted": None,
})
def TelegramQueryHandle(update:Update, context:CallbackContext=None) -> None:
if update and update.message:
Cmd = ParseCmd(update.message.text)
if Cmd and Cmd.Tokens[0][0] in CmdPrefixes and Cmd.Name in Endpoints:
Cmd.User = {
"Name": update.message.from_user.first_name,
"Tag": update.message.from_user.username,
"Id": f'{update.message.from_user.id}@telegram',
}
if update.message.reply_to_message:
Cmd.Quoted = {
"Body": update.message.reply_to_message.text,
"User": {
"Name": update.message.reply_to_message.from_user.first_name,
"Tag": update.message.reply_to_message.from_user.username,
"Id": f'{update.message.reply_to_message.from_user.id}@telegram',
},
}
Endpoints[Cmd.Name]({ "Event": update, "Manager": context }, Cmd)
if Debug and Dumper:
Text = update.message.text
Text = (Text.replace('\n', '\\n') if Text else '')
with open('Dump.txt', 'a') as File:
File.write(f'[{time.ctime()}] [{int(time.time())}] [{update.message.chat.id}] [{update.message.message_id}] [{update.message.from_user.id}] {Text}\n')
'''
if CmdAllowed(update):
ChatID = update.message.chat.id
if ChatID in Private['Chats'] and 'Filters' in Private['Chats'][ChatID]:
for f in Private['Chats'][ChatID]['Filters']:
if f in update.message.text:
update.message.reply_text(
Private['Chats'][ChatID]['Filters'][f],
reply_to_message_id=update.message.message_id)
'''
def GetWeightedText(texts:tuple) -> str:
for text in texts:
if text:
return text
def RandPercent() -> int:
Num = randint(0,100)
if Num == 100:
Num = f'{Num}.00'
else:
Num = f'{Num}.{randint(0,9)}{randint(0,9)}'
return Num
num = randint(0,100)
return (f'{num}.00' if num == 100 else f'{num}.{randint(0,9)}{randint(0,9)}')
def RandHexStr(Len:int) -> str:
Hex = ''
@ -193,16 +142,12 @@ def RandHexStr(Len:int) -> str:
Hex += choice('0123456789abcdef')
return Hex
def HttpGet(Url:str):
return urlopen(Request(Url, headers={"User-Agent": WebUserAgent}))
def SendMsg(Context, Data, Destination=None) -> None:
if type(Context) == dict:
Event = Context['Event'] if 'Event' in Context else None
Manager = Context['Manager'] if 'Manager' in Context else None
else:
[Event, Manager] = [Context, Context]
if InDict(Data, 'TextPlain') or InDict(Data, 'TextMarkdown'):
TextPlain = InDict(Data, 'TextPlain')
TextMarkdown = InDict(Data, 'TextMarkdown')
@ -212,62 +157,20 @@ def SendMsg(Context, Data, Destination=None) -> None:
# our old system attemps to always receive Markdown and retransform when needed
TextPlain = MdToTxt(Data['Text'])
TextMarkdown = CharEscape(HtmlUnescape(Data['Text']), InferMdEscape(HtmlUnescape(Data['Text']), TextPlain))
if isinstance(Manager, mastodon.Mastodon):
if InDict(Data, 'Media'):
Media = Manager.media_post(Data['Media'], Magic(mime=True).from_buffer(Data['Media']))
while Media['url'] == 'null':
Media = Manager.media(Media)
if TextPlain:
Manager.status_post(
status=(TextPlain + '\n\n@' + Event['account']['acct']),
media_ids=(Media if InDict(Data, 'Media') else None),
in_reply_to_id=Event['status']['id'],
visibility=('direct' if Event['status']['visibility'] == 'direct' else 'unlisted'),
)
elif isinstance(Event, telegram.Update):
if Destination:
Manager.bot.send_message(Destination, text=TextPlain)
else:
if InDict(Data, 'Media'):
Event.message.reply_photo(
Data['Media'],
caption=(TextMarkdown if TextMarkdown else TextPlain if TextPlain else None),
parse_mode=('MarkdownV2' if TextMarkdown else None),
reply_to_message_id=Event.message.message_id,
)
elif TextMarkdown:
Event.message.reply_markdown_v2(TextMarkdown, reply_to_message_id=Event.message.message_id)
elif TextPlain:
Event.message.reply_text(TextPlain,reply_to_message_id=Event.message.message_id)
for platform in Platforms:
platform = Platforms[platform]
if ("eventClass" in platform and isinstance(Event, platform["eventClass"])) \
or ("managerClass" in platform and isinstance(Manager, platform["managerClass"])):
platform["sender"](Event, Manager, Data, Destination, TextPlain, TextMarkdown)
def Main() -> None:
SetupDb()
SetupLocale()
if TelegramToken:
updater = Updater(TelegramToken)
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler('config', cConfig))
for Cmd in ('hug', 'pat', 'poke', 'cuddle', 'floor', 'hands', 'sessocto'):
dispatcher.add_handler(CommandHandler(Cmd, multifun))
dispatcher.add_handler(MessageHandler(Filters.text | Filters.command, TelegramQueryHandle))
updater.start_polling()
if MastodonUrl and MastodonToken:
Mastodon = mastodon.Mastodon(api_base_url=MastodonUrl, access_token=MastodonToken)
class MastodonListener(mastodon.StreamListener):
def on_notification(self, Event):
if Event['type'] == 'mention':
Msg = BeautifulSoup(Event['status']['content'], 'html.parser').get_text(' ').strip().replace('\t', ' ')
if not Msg.split('@')[0]:
Msg = ' '.join('@'.join(Msg.split('@')[1:]).strip().split(' ')[1:]).strip()
if Msg[0] in CmdPrefixes:
Cmd = ParseCmd(Msg)
if Cmd.Name in Endpoints:
Endpoints[Cmd.Name]({"Event": Event, "Manager": Mastodon}, Cmd)
Mastodon.stream_user(MastodonListener())
TelegramMain()
MastodonMain()
#MatrixMain()
#for platform in Platforms:
# Platforms[platform]["main"]()
while True:
time.sleep(9**9)
@ -276,7 +179,7 @@ if __name__ == '__main__':
from Config import *
except Exception:
pass
print('Starting WinDog...')
Main()
print('Closing WinDog...')

View File

@ -12,3 +12,4 @@ Markdown
# Telegram support
python-telegram-bot==13.4.1