mirror of
https://gitlab.com/octospacc/WinDog.git
synced 2025-06-05 22:09:20 +02:00
Add months-old WIP code, add global get_linker(),get_message(),delete_message() update Telegram, add /getmessage, multi-type media handling, misc fixes
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,5 +1,7 @@
|
||||
/Data/
|
||||
/downloaded_files/
|
||||
*.bak
|
||||
*.tmp
|
||||
*.pyc
|
||||
*.lock
|
||||
/RunWinDog.py
|
||||
|
@ -7,9 +7,13 @@
|
||||
|
||||
# TelegramToken = "1234567890:abcdefghijklmnopqrstuvwxyz123456789"
|
||||
|
||||
# TelegramGetterChannel = -1001234567890
|
||||
# TelegramGetterGroup = -1001234567890
|
||||
|
||||
# end windog config # """
|
||||
|
||||
TelegramToken = None
|
||||
TelegramGetterChannel = TelegramGetterGroup = None
|
||||
|
||||
import telegram, telegram.ext
|
||||
from telegram import Bot #, Update
|
||||
@ -33,6 +37,8 @@ def TelegramMain(path:str) -> bool:
|
||||
return True
|
||||
|
||||
def TelegramMakeUserData(user:telegram.User) -> UserData:
|
||||
if not user:
|
||||
return None
|
||||
return UserData(
|
||||
id = f"telegram:{user.id}",
|
||||
tag = user.username,
|
||||
@ -42,11 +48,20 @@ def TelegramMakeUserData(user:telegram.User) -> UserData:
|
||||
def TelegramMakeInputMessageData(message:telegram.Message) -> InputMessageData:
|
||||
#if not message:
|
||||
# return None
|
||||
media = None
|
||||
if (photo := (message.photo and message.photo[-1])):
|
||||
media = {"url": photo.file_id, "type": "image/"}
|
||||
elif (video_note := message.video_note):
|
||||
media = {"url": video_note.file_id, "type": "video/"}
|
||||
elif (media := (message.video or message.voice or message.audio or message.document or message.sticker)):
|
||||
media = {"url": media.file_id, "type": media.mime_type}
|
||||
data = InputMessageData(
|
||||
id = f"telegram:{message.message_id}",
|
||||
message_id = f"telegram:{message.message_id}",
|
||||
datetime = int(time.mktime(message.date.timetuple())),
|
||||
text_plain = message.text,
|
||||
text_plain = (message.text or message.caption),
|
||||
text_markdown = message.text_markdown_v2,
|
||||
media = media,
|
||||
user = TelegramMakeUserData(message.from_user),
|
||||
room = SafeNamespace(
|
||||
id = f"telegram:{message.chat.id}",
|
||||
@ -55,7 +70,8 @@ def TelegramMakeInputMessageData(message:telegram.Message) -> InputMessageData:
|
||||
),
|
||||
)
|
||||
data.command = TextCommandData(data.text_plain, "telegram")
|
||||
data.user.settings = UserSettingsData(data.user.id)
|
||||
if data.user:
|
||||
data.user.settings = UserSettingsData(data.user.id)
|
||||
linked = TelegramLinker(data)
|
||||
data.message_url = linked.message
|
||||
data.room.url = linked.room
|
||||
@ -72,6 +88,17 @@ def TelegramHandler(update:telegram.Update, context:CallbackContext=None) -> Non
|
||||
call_endpoint(EventContext(platform="telegram", event=update, manager=context), data)
|
||||
Thread(target=handler).start()
|
||||
|
||||
def TelegramGetter(context:EventContext, data:InputMessageData) -> InputMessageData:
|
||||
# bot API doesn't allow direct access of messages,
|
||||
# so we ask the server to copy it to a service channel, so that the API returns its data, then delete the copy
|
||||
message = TelegramMakeInputMessageData(
|
||||
context.manager.bot.forward_message(
|
||||
message_id=data.message_id,
|
||||
from_chat_id=data.room.id,
|
||||
chat_id=TelegramGetterChannel))
|
||||
delete_message(context, message)
|
||||
return message
|
||||
|
||||
def TelegramSender(context:EventContext, data:OutputMessageData):
|
||||
result = None
|
||||
# TODO clean this
|
||||
@ -81,8 +108,11 @@ def TelegramSender(context:EventContext, data:OutputMessageData):
|
||||
replyToId = (data.ReplyTo or context.event.message.message_id)
|
||||
if data.media:
|
||||
for medium in data.media:
|
||||
result = context.event.message.reply_photo(
|
||||
(obj_get(medium, "bytes") or obj_get(medium, "url")),
|
||||
result = obj_get(context.event.message, (
|
||||
"reply_photo" if medium.type.startswith("image/") else
|
||||
"reply_video" if medium.type.startswith("video/") else
|
||||
"reply_document"))(
|
||||
(medium.bytes or medium.url),
|
||||
caption=(data.text_html or data.text_markdown or data.text_plain),
|
||||
parse_mode=("HTML" if data.text_html else "MarkdownV2" if data.text_markdown else None),
|
||||
reply_to_message_id=replyToId)
|
||||
@ -94,24 +124,30 @@ def TelegramSender(context:EventContext, data:OutputMessageData):
|
||||
result = context.event.message.reply_text(data.text_plain, reply_to_message_id=replyToId)
|
||||
return TelegramMakeInputMessageData(result)
|
||||
|
||||
def TelegramDeleter(context:EventContext, data:MessageData):
|
||||
context.manager.bot.delete_message(chat_id=data.room.id, message_id=data.message_id)
|
||||
|
||||
# TODO support usernames
|
||||
# TODO remove the platform stripping here (after modifying above functions here that use it), it's now implemented in get_link
|
||||
def TelegramLinker(data:InputMessageData) -> SafeNamespace:
|
||||
linked = SafeNamespace()
|
||||
if (room_id := data.room.id):
|
||||
# prefix must be dropped for groups and channels, while direct chats apparently can never be linked
|
||||
if (room_id := "100".join(room_id.split("telegram:")[1].split("100")[1:])):
|
||||
if (room_id := "100".join(room_id.removeprefix("telegram:").split("100")[1:])):
|
||||
# apparently Telegram doesn't really support links to rooms by id without a message id, so we just use a null one
|
||||
linked.room = f"https://t.me/c/{room_id}/0"
|
||||
if data.message_id:
|
||||
message_id = data.message_id.split("telegram:")[1]
|
||||
message_id = data.message_id.removeprefix("telegram:")
|
||||
linked.message = f"https://t.me/c/{room_id}/{message_id}"
|
||||
return linked
|
||||
|
||||
register_platform(
|
||||
name="Telegram",
|
||||
main=TelegramMain,
|
||||
sender=TelegramSender,
|
||||
getter=TelegramGetter,
|
||||
linker=TelegramLinker,
|
||||
sender=TelegramSender,
|
||||
deleter=TelegramDeleter,
|
||||
event_class=telegram.Update,
|
||||
manager_class=(lambda:TelegramClient),
|
||||
agent_info=(lambda:TelegramMakeUserData(TelegramClient.bot.get_me())),
|
||||
|
48
LibWinDog/Utils.py
Executable file
48
LibWinDog/Utils.py
Executable file
@ -0,0 +1,48 @@
|
||||
# ==================================== #
|
||||
# WinDog multi-purpose chatbot #
|
||||
# Licensed under AGPLv3 by OctoSpacc #
|
||||
# ==================================== #
|
||||
|
||||
from LibWinDog.Types import *
|
||||
|
||||
def ObjectUnion(*objects:object, clazz:object=None):
|
||||
dikt = {}
|
||||
auto_clazz = objects[0].__class__
|
||||
for obj in objects:
|
||||
if not obj:
|
||||
continue
|
||||
if type(obj) == dict:
|
||||
obj = (clazz or SafeNamespace)(**obj)
|
||||
for key, value in tuple(obj.__dict__.items()):
|
||||
dikt[key] = value
|
||||
return (clazz or auto_clazz)(**dikt)
|
||||
|
||||
def ObjectClone(obj:object):
|
||||
return ObjectUnion(obj, {});
|
||||
|
||||
def SureArray(array:any) -> list|tuple:
|
||||
return (array if type(array) in [list, tuple] else [array])
|
||||
|
||||
def call_or_return(obj:any, *args) -> any:
|
||||
return (obj(*args) if callable(obj) else obj)
|
||||
|
||||
def obj_get(node:object, query:str, /) -> any:
|
||||
for key in query.split('.'):
|
||||
if hasattr(node, "__getitem__") and node.__getitem__:
|
||||
# dicts and such
|
||||
method = "__getitem__"
|
||||
exception = KeyError
|
||||
else:
|
||||
# namespaces and such
|
||||
method = "__getattribute__"
|
||||
exception = AttributeError
|
||||
try:
|
||||
node = node.__getattribute__(method)(key)
|
||||
except exception:
|
||||
return None
|
||||
return node
|
||||
|
||||
def strip_url_scheme(url:str) -> str:
|
||||
tokens = urlparse.urlparse(url)
|
||||
return f"{tokens.netloc}{tokens.path}"
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
def cBroadcast(context:EventContext, data:InputMessageData):
|
||||
language = data.user.settings.language
|
||||
if (data.user.id not in AdminIds) and (data.user.tag not in AdminIds):
|
||||
if not check_bot_admin(data.user):
|
||||
return send_status(context, 403, language)
|
||||
destination = data.command.arguments.destination
|
||||
text = (data.command.body or (data.quoted and data.quoted.text_plain))
|
||||
|
@ -3,17 +3,45 @@
|
||||
# Licensed under AGPLv3 by OctoSpacc #
|
||||
# ==================================== #
|
||||
|
||||
from json import dumps as json_dumps
|
||||
from json import dumps as json_dumps, loads as json_loads
|
||||
|
||||
def dict_filter_meta(dikt:dict):
|
||||
remove = []
|
||||
for key in dikt:
|
||||
if key.startswith('_'):
|
||||
remove.append(key)
|
||||
elif type(obj := dikt[key]) == dict:
|
||||
dikt[key] = dict_filter_meta(obj)
|
||||
for key in remove:
|
||||
dikt.pop(key)
|
||||
return dikt
|
||||
|
||||
# TODO: assume current room when none specified
|
||||
def get_message_wrapper(context:EventContext, data:InputMessageData):
|
||||
if check_bot_admin(data.user) and (message_id := data.command.arguments.message_id) and (room_id := (data.command.arguments.room_id or data.room.id)):
|
||||
return get_message(context, {"message_id": message_id, "room": {"id": room_id}})
|
||||
|
||||
# TODO work with links to messages
|
||||
# TODO remove "wrong" objects like callables
|
||||
def cDump(context:EventContext, data:InputMessageData):
|
||||
if not (message := data.quoted):
|
||||
if not (message := (data.quoted or get_message_wrapper(context, data))):
|
||||
return send_status_400(context, data.user.settings.language)
|
||||
text = json_dumps(message, default=(lambda obj: (obj.__dict__ if not callable(obj) else None)), indent=" ")
|
||||
data = dict_filter_meta(json_loads(json_dumps(message, default=(lambda obj: (obj.__dict__ if not callable(obj) else None)))))
|
||||
text = json_dumps(data, indent=" ")
|
||||
return send_message(context, {"text_html": f'<pre>{html_escape(text)}</pre>'})
|
||||
|
||||
def cGetMessage(context:EventContext, data:InputMessageData):
|
||||
if not (message := get_message_wrapper(context, data)):
|
||||
return send_status_400(context, data.user.settings.language)
|
||||
return send_message(context, ObjectUnion(message, {"room": None}))
|
||||
|
||||
register_module(name="Dumper", group="Geek", endpoints=[
|
||||
SafeNamespace(names=["dump"], handler=cDump, quoted=True),
|
||||
SafeNamespace(names=["dump"], handler=cDump, quoted=True, arguments={
|
||||
"message_id": True,
|
||||
"room_id": True,
|
||||
}),
|
||||
SafeNamespace(names=["getmessage"], handler=cGetMessage, arguments={
|
||||
"message_id": True,
|
||||
"room_id": True,
|
||||
}),
|
||||
])
|
||||
|
||||
|
@ -12,7 +12,7 @@ def cFilters(context:EventContext, data:InputMessageData):
|
||||
# * (output) setscript <..., script>
|
||||
# * (output) insert, remove <..., groupid, message>
|
||||
#arguments = data.command.parse_arguments(4)
|
||||
if not (action := data.command.arguments.action) or (action not in ["list", "create", "delete"]):
|
||||
if not (action := data.command.arguments.action) or (action not in ["list", "create", "delete", "insert", "remove"]):
|
||||
return send_status_400(context, language)
|
||||
[room_id, filter_id, command_data] = ((None,) * 3)
|
||||
for token in data.command.tokens[2:]:
|
||||
@ -41,6 +41,8 @@ def cFilters(context:EventContext, data:InputMessageData):
|
||||
# TODO filter name validation (no spaces or special symbols, no only numbers)
|
||||
if filter_id and (len(Filter.select().where((Filter.owner == room_id) & (Filter.name == filter_id)).tuples()) > 0):
|
||||
return
|
||||
elif filter_id.isnumeric() or ''.join([c if c not in "abcdefghijklmnopqrstuvwxyz0123456789" else "" for c in filter_id.lower()]):
|
||||
return
|
||||
else:
|
||||
filter_id = Filter.create(name=filter_id, owner=room_id)
|
||||
return send_status(context, 201, language, f"Filter with id <code>{filter_id}</code> in room <code>{room_id}</code> created successfully.", summary=False)
|
||||
|
@ -39,17 +39,20 @@ def cEmbedded(context:EventContext, data:InputMessageData):
|
||||
url = '://'.join(url.split('://')[1:])
|
||||
urlLow = '://'.join(urlLow.split('://')[1:])
|
||||
urlDomain = urlLow.split('/')[0]
|
||||
if urlDomain in ("facebook.com", "www.facebook.com", "m.facebook.com", "mbasic.facebook.com"):
|
||||
url = "https://hlb0.octt.eu.org/cors-main.php/https://" + url
|
||||
# if urlDomain in ("facebook.com", "www.facebook.com", "m.facebook.com", "mbasic.facebook.com"):
|
||||
# url = "https://hlb0.octt.eu.org/cors-main.php/https://" + url
|
||||
# proto = ''
|
||||
# else:
|
||||
# if urlDomain in ("instagram.com", "www.instagram.com"):
|
||||
# urlDomain = "ddinstagram.com"
|
||||
# elif urlDomain in ("twitter.com", "x.com"):
|
||||
# urlDomain = "fxtwitter.com"
|
||||
# elif urlDomain == "vm.tiktok.com":
|
||||
# urlDomain = "vm.vxtiktok.com"
|
||||
# url = (urlDomain + '/' + '/'.join(url.split('/')[1:]))
|
||||
if urlDomain in ("facebook.com", "www.facebook.com", "m.facebook.com", "instagram.com", "www.instagram.com", "twitter.com", "x.com", "vm.tiktok.com", "tiktok.com", "www.tiktok.com"):
|
||||
url = f"https://proxatore.octt.eu.org/{url}"
|
||||
proto = ''
|
||||
else:
|
||||
if urlDomain in ("instagram.com", "www.instagram.com"):
|
||||
urlDomain = "ddinstagram.com"
|
||||
elif urlDomain in ("twitter.com", "x.com"):
|
||||
urlDomain = "fxtwitter.com"
|
||||
elif urlDomain == "vm.tiktok.com":
|
||||
urlDomain = "vm.vxtiktok.com"
|
||||
url = (urlDomain + '/' + '/'.join(url.split('/')[1:]))
|
||||
return send_message(context, {"text_plain": f"{{{proto}{url}}}"})
|
||||
return send_message(context, {"text_plain": "No links found."})
|
||||
|
||||
@ -136,7 +139,7 @@ def cSafebooru(context:EventContext, data:InputMessageData):
|
||||
return send_message(context, OutputMessageData(
|
||||
text_plain=f"[{img_id}]\n{{{img_url}}}",
|
||||
text_html=f"[<code>{img_id}</code>]\n<pre>{img_url}</pre>",
|
||||
media={"url": img_url}))
|
||||
media={"url": img_url, "type": "image/"}))
|
||||
else:
|
||||
return send_status_400(context, language)
|
||||
except Exception:
|
||||
|
@ -50,7 +50,7 @@ def cDalleSelenium(context:EventContext, data:InputMessageData):
|
||||
driver.find_element('form a[role="button"]').submit()
|
||||
try:
|
||||
driver.find_element('img.gil_err_img[alt="Content warning"]')
|
||||
send_message(context, {"text_plain": f"Content warning: This prompt {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read()}})
|
||||
send_message(context, {"text_plain": f"Content warning: This prompt {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read(), "type": "image/"}})
|
||||
return closeSelenium(driver_index, driver)
|
||||
except Exception: # warning element was not found, we should be good
|
||||
pass
|
||||
@ -64,7 +64,7 @@ def cDalleSelenium(context:EventContext, data:InputMessageData):
|
||||
if not len(img_list):
|
||||
try:
|
||||
driver.find_element('img.gil_err_img[alt="Unsafe image content detected"]')
|
||||
result = send_message(context, {"text_plain": f"Unsafe image content detected: This result {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read()}})
|
||||
result = send_message(context, {"text_plain": f"Unsafe image content detected: This result {warning_text}", "media": {"bytes": open("./Assets/ImageCreator-CodeOfConduct.png", 'rb').read(), "type": "image/"}})
|
||||
closeSelenium(driver_index, driver)
|
||||
return result
|
||||
except: # no error is present, so we just have to wait more for the images
|
||||
@ -108,7 +108,7 @@ def cCraiyonSelenium(context:EventContext, data:InputMessageData):
|
||||
continue
|
||||
img_array = []
|
||||
for img_elem in img_list:
|
||||
img_array.append({"url": img_elem.get_attribute("src")}) #, "bytes": HttpReq(img_url).read()})
|
||||
img_array.append({"url": img_elem.get_attribute("src"), "type": "image/"}) #, "bytes": HttpReq(img_url).read()})
|
||||
result = send_message(context, {
|
||||
"text_plain": f'"{prompt}"',
|
||||
"text_html": f'"<i>{html_escape(prompt)}</i>"',
|
||||
|
75
WinDog.py
75
WinDog.py
@ -20,24 +20,7 @@ from bs4 import BeautifulSoup
|
||||
from LibWinDog.Types import *
|
||||
from LibWinDog.Config import *
|
||||
from LibWinDog.Database import *
|
||||
|
||||
def ObjectUnion(*objects:object, clazz:object=None):
|
||||
dikt = {}
|
||||
auto_clazz = objects[0].__class__
|
||||
for obj in objects:
|
||||
if not obj:
|
||||
continue
|
||||
if type(obj) == dict:
|
||||
obj = (clazz or SafeNamespace)(**obj)
|
||||
for key, value in tuple(obj.__dict__.items()):
|
||||
dikt[key] = value
|
||||
return (clazz or auto_clazz)(**dikt)
|
||||
|
||||
def ObjectClone(obj:object):
|
||||
return ObjectUnion(obj, {});
|
||||
|
||||
def SureArray(array:any) -> list|tuple:
|
||||
return (array if type(array) in [list, tuple] else [array])
|
||||
from LibWinDog.Utils import *
|
||||
|
||||
def app_log(text:str=None, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
|
||||
if not text:
|
||||
@ -58,25 +41,6 @@ def get_exception_text(full:bool=False):
|
||||
text = f'@{exc_traceback.tb_frame.f_code.co_name}:{exc_traceback.tb_lineno} {text}'
|
||||
return text
|
||||
|
||||
def call_or_return(obj:any, *args) -> any:
|
||||
return (obj(*args) if callable(obj) else obj)
|
||||
|
||||
def obj_get(node:object, query:str, /) -> any:
|
||||
for key in query.split('.'):
|
||||
if hasattr(node, "__getitem__") and node.__getitem__:
|
||||
# dicts and such
|
||||
method = "__getitem__"
|
||||
exception = KeyError
|
||||
else:
|
||||
# namespaces and such
|
||||
method = "__getattribute__"
|
||||
exception = AttributeError
|
||||
try:
|
||||
node = node.__getattribute__(method)(key)
|
||||
except exception:
|
||||
return None
|
||||
return node
|
||||
|
||||
def good_yaml_load(text:str):
|
||||
return yaml_load(text.replace("\t", " "), Loader=yaml_BaseLoader)
|
||||
|
||||
@ -123,10 +87,6 @@ def get_help_text(endpoint, lang:str=None, prefix:str=None) -> str:
|
||||
text += f'\n\n{extra}'
|
||||
return text
|
||||
|
||||
def strip_url_scheme(url:str) -> str:
|
||||
tokens = urlparse.urlparse(url)
|
||||
return f"{tokens.netloc}{tokens.path}"
|
||||
|
||||
def parse_command_arguments(command, endpoint, count:int=None):
|
||||
arguments = SafeNamespace()
|
||||
body = command.body
|
||||
@ -244,6 +204,25 @@ def send_status_error(context:EventContext, lang:str=None, code:int=500, extra:s
|
||||
app_log()
|
||||
return result
|
||||
|
||||
def get_link(context:EventContext, data:InputMessageData) -> InputMessageData:
|
||||
data = (InputMessageData(**data) if type(data) == dict else data)
|
||||
if (data.room and data.room.id):
|
||||
data.room.id = data.room.id.removeprefix(f"{context.platform}:")
|
||||
if data.message_id:
|
||||
data.message_id = data.message_id.removeprefix(f"{context.platform}:")
|
||||
if data.id:
|
||||
data.id = data.id.removeprefix(f"{context.platform}:")
|
||||
return Platforms[context.platform].linker(data)
|
||||
|
||||
def get_message(context:EventContext, data:InputMessageData) -> InputMessageData:
|
||||
data = (InputMessageData(**data) if type(data) == dict else data)
|
||||
message = Platforms[context.platform].getter(context, data)
|
||||
linked = get_link(context, data)
|
||||
return ObjectUnion(message, {
|
||||
"message_id": data.message_id,
|
||||
"room": {"id": data.room.id, "url": linked.room},
|
||||
"message_url": linked.message})
|
||||
|
||||
def send_message(context:EventContext, data:OutputMessageData, *, from_sent:bool=False):
|
||||
context = ObjectClone(context)
|
||||
data = (OutputMessageData(**data) if type(data) == dict else data)
|
||||
@ -279,10 +258,14 @@ def edit_message(context:EventContext, data:MessageData):
|
||||
...
|
||||
|
||||
def delete_message(context:EventContext, data:MessageData):
|
||||
...
|
||||
data = (MessageData(**data) if type(data) == dict else data)
|
||||
data.room.id = data.room.id.removeprefix(f"{context.platform}:")
|
||||
data.message_id = data.message_id.removeprefix(f"{context.platform}:")
|
||||
data.id = data.id.removeprefix(f"{context.platform}:")
|
||||
return Platforms[context.platform].deleter(context, data)
|
||||
|
||||
def register_platform(name:str, main:callable, sender:callable, linker:callable=None, *, event_class=None, manager_class=None, agent_info=None) -> None:
|
||||
Platforms[name.lower()] = SafeNamespace(name=name, main=main, sender=sender, linker=linker, event_class=event_class, manager_class=manager_class, agent_info=agent_info)
|
||||
def register_platform(name:str, main:callable, sender:callable, getter:callable=None, linker:callable=None, deleter:callable=None, *, event_class=None, manager_class=None, agent_info=None) -> None:
|
||||
Platforms[name.lower()] = SafeNamespace(name=name, main=main, getter=getter, linker=linker, sender=sender, deleter=deleter, event_class=event_class, manager_class=manager_class, agent_info=agent_info)
|
||||
app_log(f"{name}, ", inline=True)
|
||||
|
||||
def register_module(name:str, endpoints:dict, *, group:str|None=None) -> None:
|
||||
@ -324,7 +307,7 @@ def call_endpoint(context:EventContext, data:InputMessageData):
|
||||
|
||||
def write_new_config() -> None:
|
||||
app_log("💾️ No configuration found! Generating and writing to `./Data/Config.py`... ", inline=True)
|
||||
with open("./Data/Config.py", 'w') as configFile:
|
||||
with open("./Data/Config.py", 'w') as config_file:
|
||||
opening = '# windog config start #'
|
||||
closing = '# end windog config #'
|
||||
for folder in ("LibWinDog", "ModWinDog"):
|
||||
@ -334,7 +317,7 @@ def write_new_config() -> None:
|
||||
heading = f"# ==={'=' * len(name)}=== #"
|
||||
source = open(file, 'r').read().replace(f"''' {opening}", f'""" {opening}').replace(f"{closing} '''", f'{closing} """')
|
||||
content = '\n'.join(content.split(f'""" {opening}')[1].split(f'{closing} """')[0].split('\n')[1:-1])
|
||||
configFile.write(f"{heading}\n# 🔽️ {name} 🔽️ #\n{heading}\n{content}\n\n")
|
||||
config_file.write(f"{heading}\n# 🔽️ {name} 🔽️ #\n{heading}\n{content}\n\n")
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
|
Reference in New Issue
Block a user