mirror of
https://github.com/KoboldAI/KoboldAI-Client.git
synced 2025-06-05 21:59:24 +02:00
Merge pull request #166 from ebolam/united
Add file browser to soft prompts and user scripts
This commit is contained in:
251
aiserver.py
251
aiserver.py
@ -7,7 +7,7 @@
|
||||
|
||||
# External packages
|
||||
import eventlet
|
||||
eventlet.monkey_patch(all=True, thread=False)
|
||||
eventlet.monkey_patch(all=True, thread=False, os=False)
|
||||
import os
|
||||
os.system("")
|
||||
__file__ = os.path.dirname(os.path.realpath(__file__))
|
||||
@ -370,11 +370,15 @@ log.setLevel(logging.ERROR)
|
||||
|
||||
# Start flask & SocketIO
|
||||
print("{0}Initializing Flask... {1}".format(colors.PURPLE, colors.END), end="")
|
||||
from flask import Flask, render_template, Response, request, copy_current_request_context, send_from_directory
|
||||
from flask import Flask, render_template, Response, request, copy_current_request_context, send_from_directory, session
|
||||
from flask_socketio import SocketIO, emit
|
||||
from flask_session import Session
|
||||
import secrets
|
||||
app = Flask(__name__, root_path=os.getcwd())
|
||||
app.config['SECRET KEY'] = 'secret!'
|
||||
app.secret_key = secrets.token_hex()
|
||||
app.config['SESSION_TYPE'] = 'filesystem'
|
||||
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
||||
Session(app)
|
||||
socketio = SocketIO(app, async_method="eventlet")
|
||||
print("{0}OK!{1}".format(colors.GREEN, colors.END))
|
||||
|
||||
@ -464,7 +468,7 @@ def check_if_dir_is_model(path):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
from transformers import AutoConfig
|
||||
model_config = AutoConfig.from_pretrained(path, revision=vars.revision, cache_dir="cache")
|
||||
model_config = AutoConfig.from_pretrained(path)
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
@ -6068,6 +6072,245 @@ def send_debug():
|
||||
|
||||
emit('from_server', {'cmd': 'debug_info', 'data': debug_info}, broadcast=True)
|
||||
|
||||
#==================================================================#
|
||||
# Load file browser for soft prompts
|
||||
#==================================================================#
|
||||
@socketio.on('show_folder_soft_prompt')
|
||||
def show_folder_soft_prompt(data):
|
||||
file_popup("Load Softprompt", "./softprompts", "", renameable=True, folder_only=False, editable=False, deleteable=True, jailed=True, item_check=None)
|
||||
|
||||
#==================================================================#
|
||||
# Load file browser for user scripts
|
||||
#==================================================================#
|
||||
@socketio.on('show_folder_usersripts')
|
||||
def show_folder_usersripts(data):
|
||||
file_popup("Load Softprompt", "./userscripts", "", renameable=True, folder_only=False, editable=True, deleteable=True, jailed=True, item_check=None)
|
||||
|
||||
|
||||
|
||||
#==================================================================#
|
||||
# File Popup options
|
||||
#==================================================================#
|
||||
|
||||
@socketio.on('upload_file')
|
||||
def upload_file(data):
|
||||
print("upload_file {}".format(data['filename']))
|
||||
print('current_folder' in session)
|
||||
print('popup_jailed_dir' not in session)
|
||||
print(session['popup_jailed_dir'])
|
||||
print(session['current_folder'])
|
||||
if 'current_folder' in session:
|
||||
path = os.path.abspath(os.path.join(session['current_folder'], data['filename']).replace("\\", "/")).replace("\\", "/")
|
||||
print(path)
|
||||
print(os.path.exists(path))
|
||||
if 'popup_jailed_dir' not in session:
|
||||
print("Someone is trying to upload a file to your server. Blocked.")
|
||||
elif session['popup_jailed_dir'] is None:
|
||||
if os.path.exists(path):
|
||||
print("popup error")
|
||||
emit("error_popup", "The file already exists. Please delete it or rename the file before uploading", room="UI_2");
|
||||
else:
|
||||
with open(path, "wb") as f:
|
||||
f.write(data['data'])
|
||||
get_files_folders(session['current_folder'])
|
||||
print("saved")
|
||||
elif session['popup_jailed_dir'] in session['current_folder']:
|
||||
if os.path.exists(path):
|
||||
print("popup error")
|
||||
emit("error_popup", "The file already exists. Please delete it or rename the file before uploading", room="UI_2");
|
||||
else:
|
||||
with open(path, "wb") as f:
|
||||
f.write(data['data'])
|
||||
get_files_folders(session['current_folder'])
|
||||
print("saved")
|
||||
|
||||
@socketio.on('popup_change_folder')
|
||||
def popup_change_folder(data):
|
||||
print("Doing popup change folder: {}".format(data))
|
||||
if 'popup_jailed_dir' not in session:
|
||||
print("Someone is trying to get at files in your server. Blocked.")
|
||||
return
|
||||
if session['popup_jailed_dir'] is None:
|
||||
get_files_folders(data)
|
||||
elif session['popup_jailed_dir'] in data:
|
||||
get_files_folders(data)
|
||||
else:
|
||||
print("User is trying to get at files in your server outside the jail. Blocked. Jailed Dir: {} Requested Dir: {}".format(session['popup_jailed_dir'], data))
|
||||
|
||||
@socketio.on('popup_rename')
|
||||
def popup_rename(data):
|
||||
if 'popup_renameable' not in session:
|
||||
print("Someone is trying to rename a file in your server. Blocked.")
|
||||
return
|
||||
if not session['popup_renameable']:
|
||||
print("Someone is trying to rename a file in your server. Blocked.")
|
||||
return
|
||||
|
||||
if session['popup_jailed_dir'] is None:
|
||||
os.rename(data['file'], data['new_name'])
|
||||
get_files_folders(os.path.dirname(data['file']))
|
||||
elif session['popup_jailed_dir'] in data:
|
||||
os.rename(data['file'], data['new_name'])
|
||||
get_files_folders(os.path.dirname(data['file']))
|
||||
else:
|
||||
print("User is trying to rename files in your server outside the jail. Blocked. Jailed Dir: {} Requested Dir: {}".format(session['popup_jailed_dir'], data['file']))
|
||||
|
||||
|
||||
@socketio.on('popup_delete')
|
||||
def popup_delete(data):
|
||||
if 'popup_deletable' not in session:
|
||||
print("Someone is trying to delete a file in your server. Blocked.")
|
||||
return
|
||||
if not session['popup_deletable']:
|
||||
print("Someone is trying to delete a file in your server. Blocked.")
|
||||
return
|
||||
|
||||
if session['popup_jailed_dir'] is None:
|
||||
import shutil
|
||||
if os.path.isdir(data):
|
||||
shutil.rmtree(data)
|
||||
else:
|
||||
os.remove(data)
|
||||
path = os.path.abspath(data).replace("\\", "/")
|
||||
if path[-1] == "/":
|
||||
path = path[:-1]
|
||||
path = "/".join(path.split("/")[:-1])
|
||||
get_files_folders(path)
|
||||
elif session['popup_jailed_dir'] in data:
|
||||
import shutil
|
||||
if os.path.isdir(data):
|
||||
shutil.rmtree(data)
|
||||
else:
|
||||
os.remove(data)
|
||||
path = os.path.abspath(data).replace("\\", "/")
|
||||
if path[-1] == "/":
|
||||
path = path[:-1]
|
||||
path = "/".join(path.split("/")[:-1])
|
||||
get_files_folders(path)
|
||||
else:
|
||||
print("User is trying to delete files in your server outside the jail. Blocked. Jailed Dir: {} Requested Dir: {}".format(session['popup_jailed_dir'], data))
|
||||
|
||||
@socketio.on('popup_edit')
|
||||
def popup_edit(data):
|
||||
if 'popup_editable' not in session:
|
||||
print("Someone is trying to edit a file in your server. Blocked.")
|
||||
return
|
||||
if not session['popup_editable']:
|
||||
print("Someone is trying to edit a file in your server. Blocked.")
|
||||
return
|
||||
|
||||
if session['popup_jailed_dir'] is None:
|
||||
emit("popup_edit_file", {"file": data, "text": open(data, 'r').read()});
|
||||
elif session['popup_jailed_dir'] in data:
|
||||
emit("popup_edit_file", {"file": data, "text": open(data, 'r').read()});
|
||||
else:
|
||||
print("User is trying to delete files in your server outside the jail. Blocked. Jailed Dir: {} Requested Dir: {}".format(session['popup_jailed_dir'], data))
|
||||
|
||||
@socketio.on('popup_change_file')
|
||||
def popup_change_file(data):
|
||||
if 'popup_editable' not in session:
|
||||
print("Someone is trying to edit a file in your server. Blocked.")
|
||||
return
|
||||
if not session['popup_editable']:
|
||||
print("Someone is trying to edit a file in your server. Blocked.")
|
||||
return
|
||||
|
||||
if session['popup_jailed_dir'] is None:
|
||||
with open(data['file'], 'w') as f:
|
||||
f.write(data['data'])
|
||||
elif session['popup_jailed_dir'] in data['file']:
|
||||
with open(data['file'], 'w') as f:
|
||||
f.write(data['data'])
|
||||
else:
|
||||
print("User is trying to delete files in your server outside the jail. Blocked. Jailed Dir: {} Requested Dir: {}".format(session['popup_jailed_dir'], data))
|
||||
|
||||
def file_popup(popup_title, starting_folder, return_event, upload=True, jailed=True, folder_only=True, renameable=False, deleteable=False, editable=False, show_breadcrumbs=True, item_check=None, show_hidden=False):
|
||||
#starting_folder = The folder we're going to get folders and/or items from
|
||||
#return_event = the socketio event that will be emitted when the load button is clicked
|
||||
#jailed = if set to true will look for the session variable jailed_folder and prevent navigation outside of that folder
|
||||
#folder_only = will only show folders, no files
|
||||
#deletable = will show the delete icons/methods.
|
||||
#editable = will show the edit icons/methods
|
||||
#show_breadcrumbs = will show the breadcrumbs at the top of the screen
|
||||
#item_check will call this function to check if the item is valid as a selection if not none. Will pass absolute directory as only argument to function
|
||||
#show_hidden = ... really, you have to ask?
|
||||
if jailed:
|
||||
session['popup_jailed_dir'] = os.path.abspath(starting_folder).replace("\\", "/")
|
||||
else:
|
||||
session['popup_jailed_dir'] = None
|
||||
session['popup_deletable'] = deleteable
|
||||
session['popup_renameable'] = renameable
|
||||
session['popup_editable'] = editable
|
||||
session['popup_show_hidden'] = show_hidden
|
||||
session['popup_item_check'] = item_check
|
||||
session['popup_folder_only'] = folder_only
|
||||
session['popup_show_breadcrumbs'] = show_breadcrumbs
|
||||
session['upload'] = upload
|
||||
|
||||
socketio.emit("load_popup", {"popup_title": popup_title, "call_back": return_event, "renameable": renameable, "deleteable": deleteable, "editable": editable, 'upload': upload}, broadcast=True)
|
||||
|
||||
get_files_folders(starting_folder)
|
||||
|
||||
|
||||
def get_files_folders(starting_folder):
|
||||
import stat
|
||||
session['current_folder'] = os.path.abspath(starting_folder).replace("\\", "/")
|
||||
item_check = session['popup_item_check']
|
||||
show_breadcrumbs = session['popup_show_breadcrumbs']
|
||||
show_hidden = session['popup_show_hidden']
|
||||
folder_only = session['popup_folder_only']
|
||||
|
||||
if starting_folder == 'This PC':
|
||||
breadcrumbs = [['This PC', 'This PC']]
|
||||
items = [["{}:/".format(chr(i)), "{}:\\".format(chr(i))] for i in range(65, 91) if os.path.exists("{}:".format(chr(i)))]
|
||||
else:
|
||||
path = os.path.abspath(starting_folder).replace("\\", "/")
|
||||
if path[-1] == "/":
|
||||
path = path[:-1]
|
||||
breadcrumbs = []
|
||||
for i in range(len(path.split("/"))):
|
||||
breadcrumbs.append(["/".join(path.split("/")[:i+1]),
|
||||
path.split("/")[i]])
|
||||
if len(breadcrumbs) == 1:
|
||||
breadcrumbs = [["{}:/".format(chr(i)), "{}:\\".format(chr(i))] for i in range(65, 91) if os.path.exists("{}:".format(chr(i)))]
|
||||
else:
|
||||
if len([["{}:/".format(chr(i)), "{}:\\".format(chr(i))] for i in range(65, 91) if os.path.exists("{}:".format(chr(i)))]) > 0:
|
||||
breadcrumbs.insert(0, ['This PC', 'This PC'])
|
||||
|
||||
#if we're jailed, remove the stuff before the jail from the breadcrumbs
|
||||
if session['popup_jailed_dir'] is not None:
|
||||
|
||||
breadcrumbs = breadcrumbs[len(session['popup_jailed_dir'].split("/")):]
|
||||
|
||||
folders = []
|
||||
files = []
|
||||
base_path = os.path.abspath(starting_folder).replace("\\", "/")
|
||||
for item in os.listdir(base_path):
|
||||
item_full_path = os.path.join(base_path, item).replace("\\", "/")
|
||||
if hasattr(os.stat(item_full_path), "st_file_attributes"):
|
||||
hidden = bool(os.stat(item_full_path).st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN)
|
||||
else:
|
||||
hidden = item[0] == "."
|
||||
if item_check is None:
|
||||
valid_selection = True
|
||||
else:
|
||||
valid_selection = item_check(item_full_path)
|
||||
|
||||
if (show_hidden and hidden) or not hidden:
|
||||
if os.path.isdir(os.path.join(base_path, item)):
|
||||
folders.append([True, item_full_path, item, valid_selection])
|
||||
else:
|
||||
files.append([False, item_full_path, item, valid_selection])
|
||||
items = folders
|
||||
if not folder_only:
|
||||
items += files
|
||||
|
||||
socketio.emit("popup_items", items, broadcast=True, include_self=True)
|
||||
if show_breadcrumbs:
|
||||
socketio.emit("popup_breadcrumbs", breadcrumbs, broadcast=True)
|
||||
|
||||
|
||||
|
||||
#==================================================================#
|
||||
# Final startup commands to launch Flask app
|
||||
#==================================================================#
|
||||
|
Reference in New Issue
Block a user