483 lines
18 KiB
Python
483 lines
18 KiB
Python
import psutil
|
|
import requests
|
|
import random
|
|
import pyfiglet
|
|
import socket
|
|
import threading
|
|
import datetime
|
|
import telebot
|
|
import time
|
|
import ast
|
|
import ffmpeg
|
|
import os
|
|
import python_weather
|
|
import asyncio
|
|
import json
|
|
import goslate
|
|
import logging
|
|
import deepl
|
|
import spotipy
|
|
import pyshorteners
|
|
import pdf2docx
|
|
import PyPDF2
|
|
import urllib
|
|
from currency_converter import CurrencyConverter
|
|
from googletrans import Translator
|
|
from random import randint
|
|
from random import random
|
|
from fileinput import filename
|
|
from pdf2docx import Converter, parse
|
|
from spotipy.oauth2 import SpotifyClientCredentials
|
|
from youtube_search import YoutubeSearch
|
|
from dotenv import load_dotenv
|
|
from yt_dlp import YoutubeDL
|
|
from platform import system
|
|
from tqdm.auto import tqdm
|
|
from importlib.metadata import files
|
|
from dataclasses import dataclass
|
|
from jmespath import search
|
|
from telebot import types, telebot
|
|
from pytube import YouTube
|
|
from bs4 import BeautifulSoup
|
|
from gc import callbacks
|
|
import logging
|
|
import sys
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("debug.log"),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
load_dotenv()
|
|
API_TOKEN = os.getenv('BOT_TOKEN')
|
|
bot = telebot.TeleBot(API_TOKEN)
|
|
|
|
#Start bot
|
|
logging.info("Il bot si è avviato con successo!")
|
|
|
|
|
|
#Command /start
|
|
@bot.message_handler(commands=['start'])
|
|
def send_welcome(message):
|
|
chat_id = message.chat.id
|
|
logging.info("Triggered command START.")
|
|
bot.send_photo(chat_id, photo='https://i.imgur.com/6YPJBze.png')
|
|
messageText = "✋ Benvenuto su <b>RetniNet!</b>\n\n<b>RetniNet</b> è un bot privato per <b>automatizzare</b> e <b>semplificare</b> cose che facciamo quotidianamente. \n\n👨💻 Creato & sviluppato da @Stef58_Official"
|
|
bot.send_message(chat_id, messageText, parse_mode="HTML")
|
|
|
|
#Command /music
|
|
|
|
|
|
@bot.message_handler(commands=['music'])
|
|
def select_music(pm):
|
|
logging.info("Triggered command MUSIC.")
|
|
sent_msg = bot.send_message(pm.chat.id, "Inserisci il link della canzone:")
|
|
bot.register_next_step_handler(sent_msg, music_step)
|
|
|
|
|
|
def music_step(pm):
|
|
text = pm.text
|
|
#Check if the link is correct (Youtube and Spotify links only)
|
|
if text.startswith('https://www.youtube.com/') or text.startswith('https://youtu.be/'):
|
|
ytdl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'outtmpl': 'music_cache/%(title)s.%(ext)s',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
}
|
|
with YoutubeDL(ytdl_opts) as ydl:
|
|
url = pm.text
|
|
info = ydl.extract_info(url, download=False)
|
|
name = info.get('title')
|
|
id = info.get('id')
|
|
ydl.download([id])
|
|
send_message = "🎶 La canzone <b>" + name + \
|
|
"</b> è stata scaricata con successo!"
|
|
bot.send_message(pm.chat.id, send_message, parse_mode="HTML")
|
|
bot.send_audio(pm.chat.id, audio=open("music_cache/" + name + '.mp3', 'rb'))
|
|
os.remove("music_cache/" + name + '.mp3')
|
|
#If the link for spotify ends with /album, /user, /playlist ecc.. send a error message
|
|
elif pm.text.startswith('https://open.spotify.com/track/'):
|
|
ytdl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'outtmpl': 'music_cache/%(title)s.%(ext)s',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
}
|
|
with YoutubeDL(ytdl_opts) as ydl:
|
|
message = pm.text
|
|
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=os.getenv('SPOTIFY_ID'), client_secret=os.getenv('SPOTIFY_SECRET')))
|
|
track = sp.track(message)
|
|
name = track["name"]
|
|
artist = track["artists"][0]["name"]
|
|
results = YoutubeSearch(artist + " - " + name, max_results=1).to_json()
|
|
get_link = json.loads(results)
|
|
link = get_link['videos'][0]['url_suffix']
|
|
print_link = 'https://youtube.com' + link
|
|
url = print_link
|
|
info = ydl.extract_info(url, download=False)
|
|
name = info.get('title')
|
|
id = info.get('id')
|
|
ydl.download([id])
|
|
send_message = "🎶 La canzone <b>" + name + \
|
|
"</b> è stata scaricata con successo!"
|
|
bot.send_message(pm.chat.id, send_message, parse_mode="HTML")
|
|
bot.send_audio(pm.chat.id, audio=open("music_cache/" + name + '.mp3', 'rb'))
|
|
os.remove("music_cache/" + name + '.mp3')
|
|
logging.info("\"" + name + "\" è stato scaricato ed eliminato con successo!")
|
|
|
|
elif pm.text.startswith('https://youtube.com/') or pm.text.startswith('youtube.com/'):
|
|
bot.send_message(
|
|
pm.chat.id, "🚫 <b>Errore</b>\n\n<b>Il link</b> inserito non è valido!")
|
|
else:
|
|
try:
|
|
with YoutubeDL(ytdl_opts) as ydl:
|
|
url = pm.text
|
|
info = ydl.extract_info(url, download=False)
|
|
name = info.get('title')
|
|
id = info.get('id')
|
|
ydl.download([id])
|
|
send_message = "🎶 La canzone <b>" + name + \
|
|
"</b> è stata scaricata con successo!"
|
|
bot.send_message(pm.chat.id, send_message, parse_mode="HTML")
|
|
bot.send_audio(pm.chat.id, audio=open("music_cache/" + name + '.mp3', 'rb'))
|
|
os.remove("music_cache/" + name + '.mp3')
|
|
except:
|
|
chat = pm.chat.id
|
|
send_msg = "🚫 Errore!\n\n<b>Errore:</b> <i>Impossibile scaricare la canzone</i>"
|
|
bot.send_message(chat, send_msg, parse_mode="HTML")
|
|
|
|
else:
|
|
chat = pm.chat.id
|
|
bot.send_message(chat, pm.chat.id,"🚫 Devi inserire un <b>link</b> valido!")
|
|
|
|
|
|
#Command /meteo
|
|
@bot.message_handler(commands=['meteo'])
|
|
def meteo(pm):
|
|
logging.info("Triggered command METEO.")
|
|
sent_msg = bot.send_message(pm.chat.id, "🏙️ Inserisci la città:")
|
|
bot.register_next_step_handler(sent_msg, meteo_step)
|
|
|
|
|
|
def meteo_step(message):
|
|
translator = deepl.Translator(os.getenv('DEEPL_TOKEN'))
|
|
result = translator.translate_text('hi', target_lang='IT')
|
|
translated_text = result.text
|
|
city = message.text
|
|
token_weather = os.environ.get('WEATHER_TOKEN')
|
|
response = requests.get("https://api.openweathermap.org/data/2.5/weather?q="+city+",it&APPID="+token_weather).json()
|
|
weather = response['weather'][0]['main']
|
|
temp = response['main']['temp']
|
|
temp = temp - 273.15
|
|
weather_translate = translator.translate_text(weather, target_lang='IT')
|
|
result_weather = weather_translate.text
|
|
bot.send_message(message.chat.id, "🌡️ La temperatura in " + city + " è di " + str(temp) + "°C")
|
|
bot.send_message(message.chat.id, "🌧️ La condizione è " + result_weather)
|
|
|
|
#Command /stats
|
|
|
|
|
|
@bot.message_handler(commands=['stats'])
|
|
def uptime(message):
|
|
print("Triggered command STATS.")
|
|
cpuUsage = psutil.cpu_percent(interval=1)
|
|
ramTotal = int(psutil.virtual_memory().total/(1024*1024)) # GB
|
|
ramUsage = int(psutil.virtual_memory().used/(1024*1024)) # GB
|
|
ramFree = int(psutil.virtual_memory().free/(1024*1024)) # GB
|
|
ramUsagePercent = psutil.virtual_memory().percent
|
|
msg = '''
|
|
CPU & RAM Info
|
|
|
|
🟩 Utilizzo CPU = {} %
|
|
RAM
|
|
Totale = {} MB
|
|
Usato = {} MB
|
|
Libero = {} MB
|
|
In uso = {} %\n'''.format(cpuUsage, ramTotal, ramUsage, ramFree, ramUsagePercent)
|
|
bot.send_message(message.chat.id, msg)
|
|
|
|
#Command /pastebin
|
|
|
|
|
|
@bot.message_handler(commands=['pastebin'])
|
|
def pastebin(message):
|
|
logging.info("Triggered command PASTEBIN.")
|
|
sent_msg = bot.send_message(message.chat.id, "📋 Inserisci il testo:")
|
|
bot.register_next_step_handler(sent_msg, pastebin_step)
|
|
|
|
|
|
def pastebin_step(message):
|
|
chat = message.chat.id
|
|
text = message.text
|
|
site = 'https://pastebin.com/api/api_post.php'
|
|
dev_key = os.environ.get('PASTEBIN_TOKEN')
|
|
code = text
|
|
our_data = urllib.parse.urlencode(
|
|
{"api_dev_key": dev_key, "api_option": "paste", "api_paste_code": code})
|
|
our_data = our_data.encode()
|
|
resp = urllib.request.urlopen(site, our_data)
|
|
resp = resp.read()
|
|
send_msg = "📋 Il tuo <b>codice</b> è stato inviato con successo!\n\n<b>Link:</b> " + \
|
|
str(resp)
|
|
bot.send_message(chat, send_msg, parse_mode="HTML")
|
|
|
|
#Command /epicgames
|
|
|
|
|
|
@bot.message_handler(commands=['epicgames'])
|
|
def epicgames(message):
|
|
text = message.text
|
|
bot.send_message(
|
|
message.chat.id, "🎮 Vuoi vedere il gioco disponibile al momento o quello futuro? (disponibile/futuro)")
|
|
bot.register_next_step_handler(message, epicgames_step)
|
|
|
|
|
|
def epicgames_step(message):
|
|
text = message.text
|
|
if text == 'disponibile' or text == 'Disponibile':
|
|
logging.info("Triggered command EPICGAMES DISPONIBILE.")
|
|
# URL API
|
|
url = "https://api.plenusbot.xyz/epic_games?country=IT"
|
|
response = requests.get(url).json()
|
|
# Title of current games
|
|
current_games = response['currentGames'][0]['title']
|
|
# Image current games
|
|
image_currentgames = response['currentGames'][0]['keyImages'][0]['url']
|
|
# Description current games
|
|
current_games_description = response['currentGames'][0]['description']
|
|
# Token for translate
|
|
translator = deepl.Translator(os.getenv('DEEPL_TOKEN'))
|
|
# Translate description current games
|
|
description_translate = translator.translate_text(
|
|
current_games_description, target_lang='IT')
|
|
result_description = description_translate.text
|
|
send_img = bot.send_photo(message.chat.id, image_currentgames)
|
|
sent_msg = bot.send_message(message.chat.id, "🎮 Il gioco gratis di oggi è " + current_games + "\n\n" + result_description)
|
|
else:
|
|
logging.info("Triggered command EPICGAMES FUTURO.")
|
|
# URL API
|
|
url = "https://api.plenusbot.xyz/epic_games?country=IT"
|
|
response = requests.get(url).json()
|
|
# Title of future games
|
|
future_games1 = response['nextGames'][0]['title']
|
|
# Image future games
|
|
image_futuregames1 = response['nextGames'][0]['keyImages'][0]['url']
|
|
# Description future games
|
|
future_games_description1 = response['nextGames'][0]['description']
|
|
# Token for translate
|
|
translator = deepl.Translator(os.getenv('DEEPL_TOKEN'))
|
|
description_translate1 = translator.translate_text(future_games_description1, target_lang='IT')
|
|
result_description1 = description_translate1.text
|
|
send_img = bot.send_photo(message.chat.id, image_futuregames1)
|
|
sent_msg = bot.send_message(message.chat.id, "🎮 Il gioco futuro è " + future_games1 + "\n\n" + result_description1)
|
|
# Title of future games
|
|
future_games2 = response['nextGames'][1]['title']
|
|
# Image future games
|
|
image_futuregames2 = response['nextGames'][1]['keyImages'][0]['url']
|
|
# Description future games
|
|
future_games_description2 = response['nextGames'][1]['description']
|
|
# Traslate description future games
|
|
description_translate2 = translator.translate_text(future_games_description2, target_lang='IT')
|
|
result_description2 = description_translate2.text
|
|
send_img = bot.send_photo(message.chat.id, image_futuregames2)
|
|
sent_msg = bot.send_message(message.chat.id, "🎮 Il gioco futuro è " + future_games2 + "\n\n" + result_description2)
|
|
|
|
#Command /shutdown
|
|
@bot.message_handler(commands=['shutdown'])
|
|
def shutdown(message):
|
|
logging.ingo("Triggered SHUTDOWN")
|
|
text = message.text
|
|
sent_msg = bot.send_message(message.chat.id, "Sei sicuro di voler spegnere il pc?")
|
|
bot.register_next_step_handler(sent_msg, shutdown_step)
|
|
|
|
def shutdown_step(message):
|
|
id = message.from_user.id
|
|
text = message.text
|
|
id_owner = os.getenv('USER_ID')
|
|
if id == 771375637:
|
|
|
|
if text == "si" or text == "Si" or text == "y" or text == "Y" or text == "Yes" or text == "yes":
|
|
bot.send_message(message.chat.id, "Il computer è stato spento con successo!")
|
|
logging.info("Triggered Shutdown")
|
|
url = os.getenv("PASSWORD")
|
|
requests.get(url)
|
|
else:
|
|
bot.send_message(message.chat.id, "⚠️ Hai annullato l'operazione!")
|
|
|
|
#Command /shortlink
|
|
|
|
@bot.message_handler(commands=['shortlink'])
|
|
def shortlink(message):
|
|
logging.info("Triggered SHORTLINK")
|
|
text = message.text
|
|
sent_msg = bot.send_message(message.chat.id, "Inserisci il link:")
|
|
bot.register_next_step_handler(sent_msg, shortlink_step)
|
|
|
|
def shortlink_step(message):
|
|
text = message.text
|
|
type_tiny = pyshorteners.Shortener()
|
|
short_url = type_tiny.tinyurl.short(text)
|
|
bot.send_message(message.chat.id, "Ecco a te lo shortlink: " + short_url)
|
|
|
|
#Command /uptime
|
|
@bot.message_handler(commands=['uptime'])
|
|
def uptime(message):
|
|
logging.info("Triggered UPTIME")
|
|
sent_msg = bot.send_message(message.chat.id, "Manda il link del sito che vuoi controllare.")
|
|
bot.register_next_step_handler(sent_msg, uptime_step)
|
|
|
|
def uptime_step(message):
|
|
text = message.text
|
|
try:
|
|
x = requests.get(text)
|
|
if x.status_code == 200:
|
|
bot.send_message(message.chat.id, "Il sito è online, puoi festeggiare adesso! :tada:")
|
|
except requests.exceptions.ConnectionError:
|
|
bot.send_message(message.chat.id, "Il sito non esiste oppure è offline, sad...")
|
|
# status_code = urllib.request.urlopen(url).getcode()
|
|
# website_is_up = status_code == 200
|
|
# try
|
|
# if website_is_up == True:
|
|
# bot.send_message(message.chat.id, "Il sito è online!")
|
|
# except:
|
|
# bot.send_message(message.chat.id, "Si è verificato un errore")
|
|
|
|
#Command /convert
|
|
@bot.message_handler(commands=["convertpdf"])
|
|
def addfile(message):
|
|
logging.info("Triggered CONVERT")
|
|
sent_msg = bot.send_message(message.chat.id, "Manda il file pdf che vuoi convertire in docx")
|
|
bot.register_next_step_handler(sent_msg, convert)
|
|
|
|
def convert(message):
|
|
file_name = message.document.file_name
|
|
file_info = bot.get_file(message.document.file_id)
|
|
downloaded_file = bot.download_file(file_info.file_path)
|
|
with open(file_name, 'wb') as new_file:
|
|
new_file.write(downloaded_file)
|
|
docx_file = file_name + '.docx'
|
|
pdf_file = file_name
|
|
cv = Converter(pdf_file)
|
|
cv.convert(docx_file)
|
|
cv.close()
|
|
html_doc = open(docx_file, 'rb')
|
|
sent_msg = bot.send_message(message.chat.id, "Il file è stato convertito con successo!")
|
|
bot.send_document(message.chat.id, html_doc)
|
|
os.remove(pdf_file)
|
|
|
|
#Command /cloud
|
|
@bot.message_handler(commands=["cloud"])
|
|
def cloud(message):
|
|
logging.info("Triggered CLOUD")
|
|
sent_msg = bot.send_message(message.chat.id, "Invia il file che vuoi caricare")
|
|
bot.register_next_step_handler(sent_msg, cloud_step)
|
|
|
|
def cloud_step(message):
|
|
id = message.from_user.id
|
|
file_name = message.document.file_name
|
|
file_info = bot.get_file(message.document.file_id)
|
|
downloaded_file = bot.download_file(file_info.file_path)
|
|
if file_name == file_name:
|
|
with open(file_name, 'wb') as new_file:
|
|
new_file.write(downloaded_file)
|
|
value = randint(1, 9999)
|
|
old_name = file_name
|
|
new_name = str(value) + file_name
|
|
os.rename(old_name, new_name)
|
|
os.replace(new_name, 'storage/'+new_name)
|
|
else:
|
|
with open(file_name, 'wb') as new_file:
|
|
new_file.write(downloaded_file)
|
|
os.replace(file_name, 'storage/'+file_name)
|
|
bot.send_message(message.chat.id, "Il tuo file è stato caricato con successo! Ecco l'id del tuo file "+new_name)
|
|
bot.send_message(message.chat.id, "Se vuoi scaricare il file fai /cloud download + l'id del file")
|
|
dic_exm ={
|
|
|
|
"filename" : new_name,
|
|
"user_id" : id,
|
|
|
|
}
|
|
with open('data.json', 'a') as f:
|
|
json.dump(dic_exm, f, indent=2)
|
|
f.write('\n')
|
|
|
|
@bot.message_handler(commands=["translatepdf"])
|
|
def translatepdf(message):
|
|
logging.info("Triggered TRANSLATE PDF")
|
|
sent_msg = bot.send_message(message.chat.id, "Scrivi il messaggio che vuoi tradurre.")
|
|
bot.register_next_step_handler(sent_msg, translatepdf_step)
|
|
|
|
def translatepdf_step(message):
|
|
id = message.from_user.id
|
|
file_name = message.document.file_name
|
|
file_info = bot.get_file(message.document.file_id)
|
|
downloaded_file = bot.download_file(file_info.file_path)
|
|
with open(file_name, 'wb') as new_file:
|
|
new_file.write(downloaded_file)
|
|
input_file = file_name
|
|
text_file = file_name + '.txt'
|
|
os.replace(input_file, 'storage/'+input_file)
|
|
input_file_position = 'storage/'+input_file
|
|
with open(input_file_position, "rb") as pdf_file:
|
|
read_pdf = PyPDF2.PdfFileReader(pdf_file)
|
|
number_of_pages = read_pdf.getNumPages()
|
|
page = read_pdf.pages[0]
|
|
page_content = page.extractText()
|
|
with open(text_file, 'w') as f:
|
|
f.write(page_content)
|
|
os.replace(text_file, 'storage/'+text_file)
|
|
text_file_position = 'storage/'+text_file
|
|
f = open(text_file_position)
|
|
contents = f.read()
|
|
translator = Translator()
|
|
translated_text = translator.translate(contents, dest='it')
|
|
bot.send_message(message.chat.id, translated_text.text)
|
|
os.remove(input_file_position)
|
|
|
|
#Command /convertmoney
|
|
@bot.message_handler(commands=["convertmoney"])
|
|
def convertmoney(message):
|
|
logging.info("Triggered CONVERT MONEY")
|
|
text = message.text
|
|
sent_msg = bot.send_message(message.chat.id, "In che valuta vuoi convertire?")
|
|
bot.register_next_step_handler(sent_msg, convertmoney_step)
|
|
|
|
def convertmoney_step(message):
|
|
text = message.text
|
|
c = CurrencyConverter()
|
|
second = text
|
|
sent_msg = bot.send_message(message.chat.id, "Inserisci quanto vuoi convertire?")
|
|
bot.register_next_step_handler(sent_msg,convertmoney_step2, second )
|
|
|
|
def convertmoney_step2(message, second):
|
|
many = message.text
|
|
c = CurrencyConverter()
|
|
example = c.convert(many, 'EUR', second)
|
|
bot.send_message(message.chat.id, str(example))
|
|
|
|
#Plenus is off or on?
|
|
while True:
|
|
try:
|
|
x = requests.get('https://plenusbot.xyz')
|
|
if x.status_code == 200:
|
|
logging.info("Plenus ON!")
|
|
time.sleep(3600)
|
|
except requests.exceptions.ConnectionError:
|
|
bot.send_message(771375637, "Plenus è OFF!")
|
|
time.sleep(3600)
|
|
except KeyboardInterrupt:
|
|
logging.info("Il bot è stato spento con successo!")
|
|
break
|
|
|
|
bot.polling() |