mirror of
				https://github.com/OpenVoiceOS/OpenVoiceOS
				synced 2025-06-05 22:19:21 +02:00 
			
		
		
		
	Add deezeridu and py_bandcamp packages into rootfs overlay.
ToDo: Create buildroot packages for these.
This commit is contained in:
		| @@ -0,0 +1 @@ | ||||
| pip | ||||
| @@ -0,0 +1,18 @@ | ||||
| Metadata-Version: 2.1 | ||||
| Name: deezeridu | ||||
| Version: 0.0.2 | ||||
| Summary: Downloads songs, albums or playlists from deezer | ||||
| Home-page: https://github.com/OpenJarbas/deezeridu | ||||
| Author: An0nimia | ||||
| License: CC BY-NC-SA 4.0 | ||||
| Platform: UNKNOWN | ||||
| Requires-Python: >=3.8 | ||||
| Description-Content-Type: text/markdown | ||||
| Requires-Dist: mutagen | ||||
| Requires-Dist: pycryptodomex | ||||
| Requires-Dist: requests | ||||
| Requires-Dist: tqdm | ||||
| Requires-Dist: json-database (>=0.5.6) | ||||
|  | ||||
| UNKNOWN | ||||
|  | ||||
| @@ -0,0 +1,34 @@ | ||||
| deezeridu-0.0.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4 | ||||
| deezeridu-0.0.2.dist-info/METADATA,sha256=JBjeAZnVtVZvYgmfElDjDUGdS9EjEwqxaiJGE541aH8,430 | ||||
| deezeridu-0.0.2.dist-info/RECORD,, | ||||
| deezeridu-0.0.2.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 | ||||
| deezeridu-0.0.2.dist-info/WHEEL,sha256=g4nMs7d-Xl9-xC9XovUrsDHGXt-FT0E17Yqo92DEfvY,92 | ||||
| deezeridu-0.0.2.dist-info/top_level.txt,sha256=5OOrodHgvewBBzVAiYzCWOzPv55J_lxWSYD_H86uoaM,27 | ||||
| deezeridu/__init__.py,sha256=JLrWou6ghlPaCCvudqUMt0WLXNAYcLMkgd6ALyff4xY,9462 | ||||
| deezeridu/__init__.pyc,, | ||||
| deezeridu/api.py,sha256=YRdBnqu4KMgDFyXM1Zutlkux-tREtq10-tkPdssB8Tc,6116 | ||||
| deezeridu/api.pyc,, | ||||
| deezeridu/download.py,sha256=RH9JgXWiKMx3vWeAOL44kG4AjmTUHnaWwAG88_isdCs,16473 | ||||
| deezeridu/download.pyc,, | ||||
| deezeridu/download_utils.py,sha256=fpJJvy5WCYR3jeN23yLZ73_nametlWNUTH5kEePAGeQ,1850 | ||||
| deezeridu/download_utils.pyc,, | ||||
| deezeridu/exceptions.py,sha256=FyXJME7bFs0MpksfHDtbStdegMyTu3I5_V_ttPYsCmA,2152 | ||||
| deezeridu/exceptions.pyc,, | ||||
| deezeridu/gateway.py,sha256=Hv2gqK3qaKFMiJIPbHF98eTbKU4WL_zMSuuxOR0DhGY,7321 | ||||
| deezeridu/gateway.pyc,, | ||||
| deezeridu/models/__init__.py,sha256=MKWdOOYUJVhWjWAUvk7wEm5jC88QCRIFK5oHbDePIKw,138 | ||||
| deezeridu/models/__init__.pyc,, | ||||
| deezeridu/models/album.py,sha256=mcaZDyZQNhpPyNpHzkYZt05u0oZJLJFiOIDBoHM_vnI,504 | ||||
| deezeridu/models/album.pyc,, | ||||
| deezeridu/models/playlist.py,sha256=z37DLc6-lQiXthtJz0kAwxTG3wlc8ZosiL9aRo0VpLk,220 | ||||
| deezeridu/models/playlist.pyc,, | ||||
| deezeridu/models/preferences.py,sha256=UOl-6T9tgXueV_kG5qboF_G4I-Znrne6ctZ8km9OoXw,424 | ||||
| deezeridu/models/preferences.pyc,, | ||||
| deezeridu/models/track.py,sha256=ePJ60dobaI141N-BtIP83jUr8MxOgTU1xyQ665w5rVQ,1644 | ||||
| deezeridu/models/track.pyc,, | ||||
| deezeridu/settings.py,sha256=ZOURy_oQ-lWQWe41Oyx-ED1wpz9zZBidnQgFpf13XWg,571 | ||||
| deezeridu/settings.pyc,, | ||||
| deezeridu/taggers.py,sha256=-iPciR31q9qNrF_QXo3fPmaUCMxoWr8bP_DY3RS9_Zg,4359 | ||||
| deezeridu/taggers.pyc,, | ||||
| deezeridu/utils.py,sha256=L9PxpvqbfEUMccFBvy1B1fplsm7cU9uU_TKV5HnxNuM,5432 | ||||
| deezeridu/utils.pyc,, | ||||
| @@ -0,0 +1,5 @@ | ||||
| Wheel-Version: 1.0 | ||||
| Generator: bdist_wheel (0.34.2) | ||||
| Root-Is-Purelib: true | ||||
| Tag: py3-none-any | ||||
|  | ||||
| @@ -0,0 +1,2 @@ | ||||
| deezeridu | ||||
| deezeridu/models | ||||
| @@ -0,0 +1,301 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from .api import API | ||||
| from .gateway import Gateway | ||||
| from .download import ( | ||||
|     TrackDownloader, AlbumDownloader, PlaylistDownloader, | ||||
|     DownloaderJob | ||||
| ) | ||||
|  | ||||
| from .utils import ( | ||||
|     create_zip, get_ids, link_is_valid, | ||||
|     what_kind, convert_to_date | ||||
| ) | ||||
| from .exceptions import ( | ||||
|     InvalidLink, TrackNotFound, | ||||
|     NoDataApi, AlbumNotFound, CredentialsMissing | ||||
| ) | ||||
| from .models import ( | ||||
|     Track, Album, Playlist, | ||||
|     Preferences | ||||
| ) | ||||
| from json_database import JsonConfigXDG | ||||
|  | ||||
|  | ||||
| class Deezer: | ||||
|     def __init__( | ||||
|             self, | ||||
|             arl=None, | ||||
|             email=None, | ||||
|             password=None | ||||
|     ): | ||||
|         if arl: | ||||
|             self.__gw_api = Gateway(arl=arl) | ||||
|         else: | ||||
|             if not email or not  password: | ||||
|                 creds = JsonConfigXDG("deezer", subfolder="deezeridu") | ||||
|                 email = creds.get("email") | ||||
|                 password = creds.get("password") | ||||
|             if not email or not  password: | ||||
|                 raise CredentialsMissing | ||||
|             self.__gw_api = Gateway( | ||||
|                 email=email, | ||||
|                 password=password | ||||
|             ) | ||||
|  | ||||
|         self.__api = API() | ||||
|         self.__download_job = DownloaderJob(self.__api, self.__gw_api) | ||||
|  | ||||
|     def download_track( | ||||
|             self, link_track, | ||||
|             output_dir, | ||||
|             quality_download="MP3_320", | ||||
|             recursive_quality=False, | ||||
|             recursive_download=False, | ||||
|             not_interface=False, | ||||
|             method_save=2 | ||||
|     ) -> Track: | ||||
|  | ||||
|         link_is_valid(link_track) | ||||
|         ids = get_ids(link_track) | ||||
|  | ||||
|         try: | ||||
|             song_metadata = self.__api.tracking(ids) | ||||
|         except NoDataApi: | ||||
|             infos = self.__gw_api.get_song_data(ids) | ||||
|  | ||||
|             if not "FALLBACK" in infos: | ||||
|                 raise TrackNotFound(link_track) | ||||
|  | ||||
|             ids = infos['FALLBACK']['SNG_ID'] | ||||
|             song_metadata = self.__api.tracking(ids) | ||||
|  | ||||
|         preferences = Preferences() | ||||
|  | ||||
|         preferences.link = link_track | ||||
|         preferences.song_metadata = song_metadata | ||||
|         preferences.quality_download = quality_download | ||||
|         preferences.output_dir = output_dir | ||||
|         preferences.ids = ids | ||||
|         preferences.recursive_quality = recursive_quality | ||||
|         preferences.recursive_download = recursive_download | ||||
|         preferences.not_interface = not_interface | ||||
|         preferences.method_save = method_save | ||||
|  | ||||
|         track = TrackDownloader(preferences, self.__download_job).dw() | ||||
|  | ||||
|         return track | ||||
|  | ||||
|     def download_album( | ||||
|             self, link_album, | ||||
|             output_dir, | ||||
|             quality_download="MP3_320", | ||||
|             recursive_quality=False, | ||||
|             recursive_download=False, | ||||
|             not_interface=False, | ||||
|             make_zip=False, | ||||
|             method_save=2 | ||||
|     ) -> Album: | ||||
|  | ||||
|         link_is_valid(link_album) | ||||
|         ids = get_ids(link_album) | ||||
|  | ||||
|         try: | ||||
|             album_json = self.__api.get_album(ids) | ||||
|         except NoDataApi: | ||||
|             raise AlbumNotFound(link_album) | ||||
|  | ||||
|         song_metadata = { | ||||
|             "music": [], | ||||
|             "artist": [], | ||||
|             "tracknum": [], | ||||
|             "discnum": [], | ||||
|             "bpm": [], | ||||
|             "duration": [], | ||||
|             "isrc": [], | ||||
|             "gain": [], | ||||
|             "album": album_json['title'], | ||||
|             "label": album_json['label'], | ||||
|             "year": convert_to_date(album_json['release_date']), | ||||
|             "upc": album_json['upc'], | ||||
|             "nb_tracks": album_json['nb_tracks'] | ||||
|         } | ||||
|  | ||||
|         genres = [] | ||||
|  | ||||
|         if "genres" in album_json: | ||||
|             for a in album_json['genres']['data']: | ||||
|                 genres.append(a['name']) | ||||
|  | ||||
|         song_metadata['genre'] = " & ".join(genres) | ||||
|         ar_album = [] | ||||
|  | ||||
|         for a in album_json['contributors']: | ||||
|             if a['role'] == "Main": | ||||
|                 ar_album.append(a['name']) | ||||
|  | ||||
|         song_metadata['ar_album'] = " & ".join(ar_album) | ||||
|         sm_items = song_metadata.items() | ||||
|  | ||||
|         for track in album_json['tracks']['data']: | ||||
|             c_ids = track['id'] | ||||
|             detas = self.__api.tracking(c_ids, album=True) | ||||
|  | ||||
|             for key, item in sm_items: | ||||
|                 if type(item) is list: | ||||
|                     song_metadata[key].append(detas[key]) | ||||
|  | ||||
|         preferences = Preferences() | ||||
|  | ||||
|         preferences.link = link_album | ||||
|         preferences.song_metadata = song_metadata | ||||
|         preferences.quality_download = quality_download | ||||
|         preferences.output_dir = output_dir | ||||
|         preferences.ids = ids | ||||
|         preferences.json_data = album_json | ||||
|         preferences.recursive_quality = recursive_quality | ||||
|         preferences.recursive_download = recursive_download | ||||
|         preferences.not_interface = not_interface | ||||
|         preferences.method_save = method_save | ||||
|         preferences.make_zip = make_zip | ||||
|  | ||||
|         album = AlbumDownloader(preferences, self.__download_job).dw() | ||||
|  | ||||
|         return album | ||||
|  | ||||
|     def download_playlist( | ||||
|             self, link_playlist, | ||||
|             output_dir, | ||||
|             quality_download="MP3_320", | ||||
|             recursive_quality=False, | ||||
|             recursive_download=False, | ||||
|             not_interface=False, | ||||
|             make_zip=False, | ||||
|             method_save=2 | ||||
|     ) -> Playlist: | ||||
|  | ||||
|         link_is_valid(link_playlist) | ||||
|         ids = get_ids(link_playlist) | ||||
|  | ||||
|         song_metadata = [] | ||||
|         playlist_json = self.__api.get_playlist(ids) | ||||
|  | ||||
|         for track in playlist_json['tracks']['data']: | ||||
|             c_ids = track['id'] | ||||
|  | ||||
|             try: | ||||
|                 c_song_metadata = self.__api.tracking(c_ids) | ||||
|             except NoDataApi: | ||||
|                 infos = self.__gw_api.get_song_data(c_ids) | ||||
|  | ||||
|                 if not "FALLBACK" in infos: | ||||
|                     c_song_metadata = f"{track['title']} - {track['artist']['name']}" | ||||
|                 else: | ||||
|                     c_song_metadata = self.__api.tracking(c_ids) | ||||
|  | ||||
|             song_metadata.append(c_song_metadata) | ||||
|  | ||||
|         preferences = Preferences() | ||||
|  | ||||
|         preferences.link = link_playlist | ||||
|         preferences.song_metadata = song_metadata | ||||
|         preferences.quality_download = quality_download | ||||
|         preferences.output_dir = output_dir | ||||
|         preferences.ids = ids | ||||
|         preferences.json_data = playlist_json | ||||
|         preferences.recursive_quality = recursive_quality | ||||
|         preferences.recursive_download = recursive_download | ||||
|         preferences.not_interface = not_interface | ||||
|         preferences.method_save = method_save | ||||
|         preferences.make_zip = make_zip | ||||
|  | ||||
|         playlist = PlaylistDownloader(preferences, self.__download_job).dw() | ||||
|  | ||||
|         return playlist | ||||
|  | ||||
|     def download_artist_toptracks( | ||||
|             self, link_artist, | ||||
|             output_dir, | ||||
|             quality_download="MP3_320", | ||||
|             recursive_quality=False, | ||||
|             recursive_download=False, | ||||
|             not_interface=False | ||||
|     ): | ||||
|  | ||||
|         link_is_valid(link_artist) | ||||
|         ids = get_ids(link_artist) | ||||
|  | ||||
|         playlist_json = self.__api.get_artist_top_tracks(ids)['data'] | ||||
|  | ||||
|         names = [ | ||||
|             self.download_track( | ||||
|                 track['link'], output_dir, | ||||
|                 quality_download, recursive_quality, | ||||
|                 recursive_download, not_interface | ||||
|             ) | ||||
|  | ||||
|             for track in playlist_json | ||||
|         ] | ||||
|         return Playlist(names) | ||||
|  | ||||
|     def download( | ||||
|             self, link, | ||||
|             output_dir, | ||||
|             quality_download="MP3_320", | ||||
|             recursive_quality=False, | ||||
|             recursive_download=False, | ||||
|             not_interface=False, | ||||
|             make_zip=False, | ||||
|             method_save=2 | ||||
|     ): | ||||
|  | ||||
|         link_is_valid(link) | ||||
|         link = what_kind(link) | ||||
|  | ||||
|         if "first_result/" in link or "track/" in link: | ||||
|             return self.download_track( | ||||
|                 link, | ||||
|                 output_dir=output_dir, | ||||
|                 quality_download=quality_download, | ||||
|                 recursive_quality=recursive_quality, | ||||
|                 recursive_download=recursive_download, | ||||
|                 not_interface=not_interface, | ||||
|                 method_save=2 | ||||
|             ) | ||||
|         elif "album/" in link: | ||||
|             return self.download_album( | ||||
|                 link, | ||||
|                 output_dir=output_dir, | ||||
|                 quality_download=quality_download, | ||||
|                 recursive_quality=recursive_quality, | ||||
|                 recursive_download=recursive_download, | ||||
|                 not_interface=not_interface, | ||||
|                 make_zip=make_zip, | ||||
|                 method_save=2 | ||||
|             ) | ||||
|         elif "artist/" in link: | ||||
|             return self.download_artist_toptracks( | ||||
|                 link, | ||||
|                 output_dir=output_dir, | ||||
|                 quality_download=quality_download, | ||||
|                 recursive_quality=recursive_quality, | ||||
|                 recursive_download=recursive_download, | ||||
|                 not_interface=not_interface | ||||
|             ) | ||||
|  | ||||
|         elif "playlist/" in link: | ||||
|             return self.download_playlist( | ||||
|                 link, | ||||
|                 output_dir=output_dir, | ||||
|                 quality_download=quality_download, | ||||
|                 recursive_quality=recursive_quality, | ||||
|                 recursive_download=recursive_download, | ||||
|                 not_interface=not_interface, | ||||
|                 make_zip=make_zip, | ||||
|                 method_save=2 | ||||
|             ) | ||||
|  | ||||
|             smart.type = "playlist" | ||||
|             smart._playlist = playlist | ||||
|  | ||||
|         raise InvalidLink(link) | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,212 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from time import sleep | ||||
|  | ||||
| from requests import get as req_get | ||||
|  | ||||
| from .settings import header | ||||
| from .utils import artist_sort, convert_to_date | ||||
| from .exceptions import ( | ||||
| 	NoDataApi, QuotaExceeded, TrackNotFound | ||||
| ) | ||||
|  | ||||
|  | ||||
| class API: | ||||
|     def __init__(self): | ||||
|         self.__api_link = "https://api.deezer.com/" | ||||
|         self.__cover = "https://e-cdns-images.dzcdn.net/images/cover/%s/{}-000000-80-0-0.jpg" | ||||
|  | ||||
|     def __get_api(self, url, quota_exceeded=False): | ||||
|         json = req_get(url, headers=header).json() | ||||
|  | ||||
|         if "error" in json: | ||||
|             if json['error']['message'] == "no data": | ||||
|                 raise NoDataApi("No data avalaible :(") | ||||
|  | ||||
|             elif json['error']['message'] == "Quota limit exceeded": | ||||
|                 if not quota_exceeded: | ||||
|                     sleep(0.8) | ||||
|                     json = self.__get_api(url, True) | ||||
|                 else: | ||||
|                     raise QuotaExceeded | ||||
|  | ||||
|         return json | ||||
|  | ||||
|     def get_chart(self, index=0): | ||||
|         url = f"{self.__api_link}chart/{index}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_track(self, ids): | ||||
|         url = f"{self.__api_link}track/{ids}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_album(self, ids): | ||||
|         url = f"{self.__api_link}album/{ids}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_playlist(self, ids): | ||||
|         url = f"{self.__api_link}playlist/{ids}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist(self, ids): | ||||
|         url = f"{self.__api_link}artist/{ids}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist_top_tracks(self, ids, limit=25): | ||||
|         url = f"{self.__api_link}artist/{ids}/top?limit={limit}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist_top_albums(self, ids, limit=25): | ||||
|         url = f"{self.__api_link}artist/{ids}/albums?limit={limit}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist_related(self, ids): | ||||
|         url = f"{self.__api_link}artist/{ids}/related" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist_radio(self, ids): | ||||
|         url = f"{self.__api_link}artist/{ids}/radio" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def get_artist_top_playlists(self, ids, limit=25): | ||||
|         url = f"{self.__api_link}artist/{ids}/playlists?limit={limit}" | ||||
|         infos = self.__get_api(url) | ||||
|         return infos | ||||
|  | ||||
|     def search(self, query): | ||||
|         url = f"{self.__api_link}search/?q={query}" | ||||
|         infos = self.__get_api(url) | ||||
|  | ||||
|         if infos['total'] == 0: | ||||
|             raise NoDataApi(query) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def search_track(self, query): | ||||
|         url = f"{self.__api_link}search/track/?q={query}" | ||||
|         infos = self.__get_api(url) | ||||
|  | ||||
|         if infos['total'] == 0: | ||||
|             raise NoDataApi(query) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def search_album(self, query): | ||||
|         url = f"{self.__api_link}search/album/?q={query}" | ||||
|         infos = self.__get_api(url) | ||||
|  | ||||
|         if infos['total'] == 0: | ||||
|             raise NoDataApi(query) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def search_playlist(self, query): | ||||
|         url = f"{self.__api_link}search/playlist/?q={query}" | ||||
|         infos = self.__get_api(url) | ||||
|  | ||||
|         if infos['total'] == 0: | ||||
|             raise NoDataApi(query) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def search_artist(self, query): | ||||
|         url = f"{self.__api_link}search/artist/?q={query}" | ||||
|         infos = self.__get_api(url) | ||||
|  | ||||
|         if infos['total'] == 0: | ||||
|             raise NoDataApi(query) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def not_found(self, song, title): | ||||
|         try: | ||||
|             data = self.search_track(song)['data'] | ||||
|         except NoDataApi: | ||||
|             raise TrackNotFound(song) | ||||
|  | ||||
|         ids = None | ||||
|  | ||||
|         for track in data: | ||||
|             if ( | ||||
|                     track['title'] == title | ||||
|             ) or ( | ||||
|                     title in track['title_short'] | ||||
|             ): | ||||
|                 ids = track['id'] | ||||
|                 break | ||||
|  | ||||
|         if not ids: | ||||
|             raise TrackNotFound(song) | ||||
|  | ||||
|         return str(ids) | ||||
|  | ||||
|     def get_img_url(self, md5_image, size="1200x1200"): | ||||
|         cover = self.__cover.format(size) | ||||
|         image_url = cover % md5_image | ||||
|         return image_url | ||||
|  | ||||
|     def choose_img(self, md5_image, size="1200x1200"): | ||||
|         image_url = self.get_img_url(md5_image, size) | ||||
|         image = req_get(image_url).content | ||||
|  | ||||
|         if len(image) == 13: | ||||
|             image_url = self.get_img_url("", size) | ||||
|             image = req_get(image_url).content | ||||
|  | ||||
|         return image | ||||
|  | ||||
|     def tracking(self, ids, album=False): | ||||
|         datas = {} | ||||
|         json_track = self.get_track(ids) | ||||
|  | ||||
|         if not album: | ||||
|             album_ids = json_track['album']['id'] | ||||
|             json_album = self.get_album(album_ids) | ||||
|             genres = [] | ||||
|  | ||||
|             if "genres" in json_album: | ||||
|                 for genre in json_album['genres']['data']: | ||||
|                     genres.append(genre['name']) | ||||
|  | ||||
|             datas['genre'] = " & ".join(genres) | ||||
|             ar_album = [] | ||||
|  | ||||
|             for contributor in json_album['contributors']: | ||||
|                 if contributor['role'] == "Main": | ||||
|                     ar_album.append(contributor['name']) | ||||
|  | ||||
|             datas['ar_album'] = " & ".join(ar_album) | ||||
|             datas['album'] = json_album['title'] | ||||
|             datas['label'] = json_album['label'] | ||||
|             datas['upc'] = json_album['upc'] | ||||
|             datas['nb_tracks'] = json_album['nb_tracks'] | ||||
|  | ||||
|         datas['music'] = json_track['title'] | ||||
|         array = [] | ||||
|  | ||||
|         for contributor in json_track['contributors']: | ||||
|             if contributor['name'] != "": | ||||
|                 array.append(contributor['name']) | ||||
|  | ||||
|         array.append( | ||||
|             json_track['artist']['name'] | ||||
|         ) | ||||
|  | ||||
|         datas['artist'] = artist_sort(array) | ||||
|         datas['tracknum'] = json_track['track_position'] | ||||
|         datas['discnum'] = json_track['disk_number'] | ||||
|         datas['year'] = convert_to_date(json_track['release_date']) | ||||
|         datas['bpm'] = json_track['bpm'] | ||||
|         datas['duration'] = json_track['duration'] | ||||
|         datas['isrc'] = json_track['isrc'] | ||||
|         datas['gain'] = json_track['gain'] | ||||
|         return datas | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,519 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from copy import deepcopy | ||||
| from os.path import isfile | ||||
|  | ||||
| from tqdm import tqdm | ||||
|  | ||||
| from .api import API | ||||
| from .gateway import Gateway | ||||
| from .settings import qualities | ||||
| from .download_utils import decryptfile, gen_song_hash | ||||
| from .taggers import write_tags, check_track | ||||
| from .utils import ( | ||||
| 	set_path, trasform_sync_lyric, | ||||
| 	create_zip, check_track_ids, | ||||
| 	check_track_md5, check_track_token | ||||
| ) | ||||
| from .exceptions import ( | ||||
| 	TrackNotFound, NoRightOnMedia, QualityNotFound | ||||
| ) | ||||
| from .models import ( | ||||
| 	Track, Album, Playlist, | ||||
| 	Preferences, | ||||
| ) | ||||
|  | ||||
|  | ||||
| class DownloaderJob: | ||||
|     def __init__( | ||||
|             self, | ||||
|             api: API, | ||||
|             gw_api: Gateway | ||||
|     ) -> None: | ||||
|  | ||||
|         self.api = api | ||||
|         self.gw_api = gw_api | ||||
|  | ||||
|     def __get_url( | ||||
|             self, | ||||
|             c_track: Track, | ||||
|             quality_download: str | ||||
|     ) -> dict: | ||||
|  | ||||
|         c_md5, c_media_version = check_track_md5(c_track) | ||||
|         c_ids = check_track_ids(c_track) | ||||
|         n_quality = qualities[quality_download]['n_quality'] | ||||
|  | ||||
|         c_song_hash = gen_song_hash( | ||||
|             c_md5, n_quality, | ||||
|             c_ids, c_media_version | ||||
|         ) | ||||
|  | ||||
|         c_media_url = self.gw_api.get_song_url(c_md5[0], c_song_hash) | ||||
|  | ||||
|         c_media_json = { | ||||
|             "media": [ | ||||
|                 { | ||||
|                     "sources": [ | ||||
|                         { | ||||
|                             "url": c_media_url | ||||
|                         } | ||||
|                     ] | ||||
|                 } | ||||
|             ] | ||||
|         } | ||||
|  | ||||
|         return c_media_json | ||||
|  | ||||
|     def check_sources( | ||||
|             self, | ||||
|             infos_dw: list, | ||||
|             quality_download: str | ||||
|     ) -> list: | ||||
|  | ||||
|         tracks_token = [ | ||||
|             check_track_token(c_track) | ||||
|             for c_track in infos_dw | ||||
|         ] | ||||
|  | ||||
|         try: | ||||
|             medias = self.gw_api.get_medias_url(tracks_token, quality_download) | ||||
|  | ||||
|             for a in range( | ||||
|                     len(medias) | ||||
|             ): | ||||
|                 if "errors" in medias[a]: | ||||
|                     c_media_json = self.__get_url(infos_dw[a], | ||||
|                                                   quality_download) | ||||
|                     medias[a] = c_media_json | ||||
|                 else: | ||||
|                     if not medias[a]['media']: | ||||
|                         c_media_json = self.__get_url(infos_dw[a], | ||||
|                                                       quality_download) | ||||
|                         medias[a] = c_media_json | ||||
|         except NoRightOnMedia: | ||||
|             medias = [] | ||||
|  | ||||
|             for c_track in infos_dw: | ||||
|                 c_media_json = self.__get_url(c_track, quality_download) | ||||
|                 medias.append(c_media_json) | ||||
|  | ||||
|         return medias | ||||
|  | ||||
|  | ||||
| class Downloader: | ||||
|     def __init__( | ||||
|             self, | ||||
|             infos_dw: dict, | ||||
|             preferences: Preferences, | ||||
|             download_job: DownloaderJob, | ||||
|     ) -> None: | ||||
|  | ||||
|         self.__download_job = download_job | ||||
|         self.__api = download_job.api | ||||
|         self.__gw_api = download_job.gw_api | ||||
|  | ||||
|         self.__infos_dw = infos_dw | ||||
|  | ||||
|         self.__ids = preferences.ids | ||||
|         self.__link = preferences.link | ||||
|         self.__output_dir = preferences.output_dir | ||||
|         self.__method_save = preferences.method_save | ||||
|         self.__song_metadata = preferences.song_metadata | ||||
|         self.__not_interface = preferences.not_interface | ||||
|         self.__quality_download = preferences.quality_download | ||||
|         self.__recursive_quality = preferences.recursive_quality | ||||
|         self.__recursive_download = preferences.recursive_download | ||||
|  | ||||
|         self.__c_quality = qualities[self.__quality_download] | ||||
|         self.__set_quality() | ||||
|         self.__set_song_path() | ||||
|  | ||||
|     def __set_quality(self) -> None: | ||||
|         self.__file_format = self.__c_quality['f_format'] | ||||
|         self.__song_quality = self.__c_quality['s_quality'] | ||||
|  | ||||
|     def __set_song_path(self) -> None: | ||||
|         self.__song_path = set_path( | ||||
|             self.__song_metadata, | ||||
|             self.__output_dir, | ||||
|             self.__song_quality, | ||||
|             self.__file_format, | ||||
|             self.__method_save | ||||
|         ) | ||||
|  | ||||
|     def __write_track(self) -> None: | ||||
|         self.__set_song_path() | ||||
|  | ||||
|         self.__c_track = Track( | ||||
|             self.__song_metadata, self.__song_path, | ||||
|             self.__file_format, self.__song_quality, | ||||
|             self.__link, self.__ids | ||||
|         ) | ||||
|  | ||||
|     def easy_dw(self) -> Track: | ||||
|         pic = self.__infos_dw['ALB_PICTURE'] | ||||
|         image = self.__api.choose_img(pic) | ||||
|         self.__song_metadata['image'] = image | ||||
|         song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" | ||||
|  | ||||
|         if not self.__not_interface: | ||||
|             print(f"Downloading: {song}") | ||||
|  | ||||
|         try: | ||||
|             self.download_try() | ||||
|         except TrackNotFound: | ||||
|             try: | ||||
|                 ids = self.__api.not_found(song, self.__song_metadata['music']) | ||||
|                 self.__infos_dw = self.__gw_api.get_song_data(ids) | ||||
|  | ||||
|                 media = self.__download_job.check_sources( | ||||
|                     [self.__infos_dw], self.__quality_download | ||||
|                 ) | ||||
|  | ||||
|                 self.__infos_dw['media_url'] = media[0] | ||||
|                 self.download_try() | ||||
|             except TrackNotFound: | ||||
|                 self.__c_track = Track( | ||||
|                     self.__song_metadata, | ||||
|                     None, None, | ||||
|                     None, None, None, | ||||
|                 ) | ||||
|  | ||||
|                 self.__c_track.success = False | ||||
|  | ||||
|         self.__c_track.md5_image = pic | ||||
|  | ||||
|         return self.__c_track | ||||
|  | ||||
|     def download_try(self) -> Track: | ||||
|         self.__c_track = Track( | ||||
|             self.__song_metadata, self.__song_path, | ||||
|             self.__file_format, self.__song_quality, | ||||
|             self.__link, self.__ids | ||||
|         ) | ||||
|  | ||||
|         if isfile(self.__song_path): | ||||
|             if check_track(self.__c_track): | ||||
|                 return self.__c_track | ||||
|  | ||||
|         media_list = self.__infos_dw['media_url']['media'] | ||||
|         song_link = media_list[0]['sources'][0]['url'] | ||||
|  | ||||
|         try: | ||||
|             crypted_audio = self.__gw_api.song_exist(song_link) | ||||
|         except TrackNotFound: | ||||
|             song = self.__song_metadata['music'] | ||||
|             artist = self.__song_metadata['artist'] | ||||
|             msg = f"\n⚠ The {song} - {artist} can't be downloaded in {self.__quality_download} quality :( ⚠\n" | ||||
|  | ||||
|             if not self.__recursive_quality: | ||||
|                 raise QualityNotFound(msg=msg) | ||||
|  | ||||
|             print(msg) | ||||
|  | ||||
|             for c_quality in qualities: | ||||
|                 if self.__quality_download == c_quality: | ||||
|                     continue | ||||
|  | ||||
|                 print( | ||||
|                     f"🛈 Trying to download {song} - {artist} in {c_quality}") | ||||
|  | ||||
|                 media = self.__download_job.check_sources( | ||||
|                     [self.__infos_dw], c_quality | ||||
|                 ) | ||||
|  | ||||
|                 self.__infos_dw['media_url'] = media[0] | ||||
|                 c_media = self.__infos_dw['media_url'] | ||||
|                 media_list = c_media['media'] | ||||
|                 song_link = media_list[0]['sources'][0]['url'] | ||||
|  | ||||
|                 try: | ||||
|                     crypted_audio = self.__gw_api.song_exist(song_link) | ||||
|                     self.__c_quality = qualities[c_quality] | ||||
|                     self.__set_quality() | ||||
|                     break | ||||
|                 except TrackNotFound: | ||||
|                     if c_quality == "MP3_128": | ||||
|                         raise TrackNotFound("Error with this song", | ||||
|                                             self.__link) | ||||
|  | ||||
|         self.__write_track() | ||||
|         c_crypted_audio = crypted_audio.iter_content(2048) | ||||
|         c_ids = check_track_ids(self.__infos_dw) | ||||
|         self.__c_track.set_fallback_ids(c_ids) | ||||
|  | ||||
|         decryptfile( | ||||
|             c_crypted_audio, c_ids, self.__song_path | ||||
|         ) | ||||
|  | ||||
|         self.__add_more_tags() | ||||
|         write_tags(self.__c_track) | ||||
|  | ||||
|         return self.__c_track | ||||
|  | ||||
|     def __add_more_tags(self) -> None: | ||||
|         contributors = self.__infos_dw['SNG_CONTRIBUTORS'] | ||||
|  | ||||
|         if "author" in contributors: | ||||
|             self.__song_metadata['author'] = " & ".join( | ||||
|                 contributors['author'] | ||||
|             ) | ||||
|         else: | ||||
|             self.__song_metadata['author'] = "" | ||||
|  | ||||
|         if "composer" in contributors: | ||||
|             self.__song_metadata['composer'] = " & ".join( | ||||
|                 contributors['composer'] | ||||
|             ) | ||||
|         else: | ||||
|             self.__song_metadata['composer'] = "" | ||||
|  | ||||
|         if "lyricist" in contributors: | ||||
|             self.__song_metadata['lyricist'] = " & ".join( | ||||
|                 contributors['lyricist'] | ||||
|             ) | ||||
|         else: | ||||
|             self.__song_metadata['lyricist'] = "" | ||||
|  | ||||
|         if "composerlyricist" in contributors: | ||||
|             self.__song_metadata['composer'] = " & ".join( | ||||
|                 contributors['composerlyricist'] | ||||
|             ) | ||||
|         else: | ||||
|             self.__song_metadata['composerlyricist'] = "" | ||||
|  | ||||
|         if "version" in self.__infos_dw: | ||||
|             self.__song_metadata['version'] = self.__infos_dw['VERSION'] | ||||
|         else: | ||||
|             self.__song_metadata['version'] = "" | ||||
|  | ||||
|         self.__song_metadata['lyric'] = "" | ||||
|         self.__song_metadata['copyright'] = "" | ||||
|         self.__song_metadata['lyricist'] = "" | ||||
|         self.__song_metadata['lyric_sync'] = [] | ||||
|  | ||||
|         if self.__infos_dw['LYRICS_ID'] != 0: | ||||
|             need = self.__gw_api.get_lyric(self.__ids) | ||||
|  | ||||
|             if "LYRICS_SYNC_JSON" in need: | ||||
|                 self.__song_metadata['lyric_sync'] = trasform_sync_lyric( | ||||
|                     need['LYRICS_SYNC_JSON'] | ||||
|                 ) | ||||
|  | ||||
|             self.__song_metadata['lyric'] = need['LYRICS_TEXT'] | ||||
|             self.__song_metadata['copyright'] = need['LYRICS_COPYRIGHTS'] | ||||
|             self.__song_metadata['lyricist'] = need['LYRICS_WRITERS'] | ||||
|  | ||||
|  | ||||
| class TrackDownloader: | ||||
|     def __init__( | ||||
|             self, | ||||
|             preferences: Preferences, | ||||
|             download_job: DownloaderJob | ||||
|     ) -> None: | ||||
|         self.__download_job = download_job | ||||
|         self.__gw_api = download_job.gw_api | ||||
|  | ||||
|         self.__preferences = preferences | ||||
|         self.__ids = self.__preferences.ids | ||||
|         self.__song_metadata = self.__preferences.song_metadata | ||||
|         self.__quality_download = self.__preferences.quality_download | ||||
|  | ||||
|     def dw(self) -> Track: | ||||
|         infos_dw = self.__gw_api.get_song_data(self.__ids) | ||||
|  | ||||
|         media = self.__download_job.check_sources( | ||||
|             [infos_dw], self.__quality_download | ||||
|         ) | ||||
|  | ||||
|         infos_dw['media_url'] = media[0] | ||||
|  | ||||
|         track = Downloader( | ||||
|             infos_dw, self.__preferences, self.__download_job, | ||||
|         ).easy_dw() | ||||
|  | ||||
|         if not track.success: | ||||
|             song = f"{self.__song_metadata['music']} - {self.__song_metadata['artist']}" | ||||
|             error_msg = f"Cannot download {song}" | ||||
|  | ||||
|             raise TrackNotFound(message=error_msg) | ||||
|  | ||||
|         return track | ||||
|  | ||||
|  | ||||
| class AlbumDownloader: | ||||
|     def __init__( | ||||
|             self, | ||||
|             preferences: Preferences, | ||||
|             download_job: DownloaderJob | ||||
|     ) -> None: | ||||
|  | ||||
|         self.__api = download_job.api | ||||
|         self.__download_job = download_job | ||||
|         self.__gw_api = download_job.gw_api | ||||
|  | ||||
|         self.__preferences = preferences | ||||
|         self.__ids = self.__preferences.ids | ||||
|         self.__make_zip = self.__preferences.make_zip | ||||
|         self.__output_dir = self.__preferences.output_dir | ||||
|         self.__method_save = self.__preferences.method_save | ||||
|         self.__song_metadata = self.__preferences.song_metadata | ||||
|         self.__not_interface = self.__preferences.not_interface | ||||
|         self.__quality_download = self.__preferences.quality_download | ||||
|  | ||||
|         self.__song_metadata_items = self.__song_metadata.items() | ||||
|  | ||||
|     def dw(self) -> Album: | ||||
|         infos_dw = self.__gw_api.get_album_data(self.__ids)['data'] | ||||
|         md5_image = infos_dw[0]['ALB_PICTURE'] | ||||
|         image = self.__api.choose_img(md5_image) | ||||
|         self.__song_metadata['image'] = image | ||||
|  | ||||
|         album = Album(self.__ids) | ||||
|         album.image = image | ||||
|         album.md5_image = md5_image | ||||
|         album.nb_tracks = self.__song_metadata['nb_tracks'] | ||||
|         album.album_name = self.__song_metadata['album'] | ||||
|         album.upc = self.__song_metadata['upc'] | ||||
|         tracks = album.tracks | ||||
|  | ||||
|         medias = self.__download_job.check_sources( | ||||
|             infos_dw, self.__quality_download | ||||
|         ) | ||||
|  | ||||
|         c_song_metadata = {} | ||||
|  | ||||
|         for key, item in self.__song_metadata_items: | ||||
|             if type(item) is not list: | ||||
|                 c_song_metadata[key] = self.__song_metadata[key] | ||||
|  | ||||
|         t = tqdm( | ||||
|             range( | ||||
|                 len(infos_dw) | ||||
|             ), | ||||
|             desc=c_song_metadata['album'], | ||||
|             disable=self.__not_interface | ||||
|         ) | ||||
|  | ||||
|         for a in t: | ||||
|             for key, item in self.__song_metadata_items: | ||||
|                 if type(item) is list: | ||||
|                     c_song_metadata[key] = self.__song_metadata[key][a] | ||||
|  | ||||
|             c_infos_dw = infos_dw[a] | ||||
|             c_infos_dw['media_url'] = medias[a] | ||||
|             song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" | ||||
|             t.set_description_str(song) | ||||
|             c_preferences = deepcopy(self.__preferences) | ||||
|             c_preferences.song_metadata = c_song_metadata | ||||
|             c_preferences.ids = c_infos_dw['SNG_ID'] | ||||
|  | ||||
|             try: | ||||
|                 track = Downloader( | ||||
|                     c_infos_dw, c_preferences, self.__download_job | ||||
|                 ).download_try() | ||||
|  | ||||
|                 tracks.append(track) | ||||
|             except TrackNotFound: | ||||
|                 try: | ||||
|                     ids = self.__api.not_found(song, c_song_metadata['music']) | ||||
|                     c_song_data = self.__gw_api.get_song_data(ids) | ||||
|  | ||||
|                     c_media = self.__download_job.check_sources( | ||||
|                         [c_song_data], self.__quality_download | ||||
|                     ) | ||||
|  | ||||
|                     c_infos_dw['media_url'] = c_media[0] | ||||
|  | ||||
|                     track = Downloader( | ||||
|                         c_infos_dw, c_preferences, self.__download_job | ||||
|                     ).download_try() | ||||
|  | ||||
|                     tracks.append(track) | ||||
|                 except TrackNotFound: | ||||
|                     track = Track( | ||||
|                         c_song_metadata, | ||||
|                         None, None, | ||||
|                         None, None, None, | ||||
|                     ) | ||||
|  | ||||
|                     track.success = False | ||||
|                     tracks.append(track) | ||||
|                     print(f"Track not found: {song} :(") | ||||
|                     continue | ||||
|  | ||||
|         if self.__make_zip: | ||||
|             song_quality = tracks[0].quality | ||||
|  | ||||
|             zip_name = create_zip( | ||||
|                 tracks, | ||||
|                 output_dir=self.__output_dir, | ||||
|                 song_metadata=self.__song_metadata, | ||||
|                 song_quality=song_quality, | ||||
|                 method_save=self.__method_save | ||||
|             ) | ||||
|  | ||||
|             album.zip_path = zip_name | ||||
|  | ||||
|         return album | ||||
|  | ||||
|  | ||||
| class PlaylistDownloader: | ||||
|     def __init__( | ||||
|             self, | ||||
|             preferences: Preferences, | ||||
|             download_job: DownloaderJob | ||||
|     ) -> None: | ||||
|  | ||||
|         self.__download_job = download_job | ||||
|         self.__gw_api = download_job.gw_api | ||||
|  | ||||
|         self.__preferences = preferences | ||||
|         self.__ids = self.__preferences.ids | ||||
|         self.__json_data = preferences.json_data | ||||
|         self.__make_zip = self.__preferences.make_zip | ||||
|         self.__output_dir = self.__preferences.output_dir | ||||
|         self.__song_metadata = self.__preferences.song_metadata | ||||
|         self.__quality_download = self.__preferences.quality_download | ||||
|  | ||||
|     def dw(self) -> Playlist: | ||||
|         infos_dw = self.__gw_api.get_playlist_data(self.__ids)['data'] | ||||
|  | ||||
|         playlist = Playlist() | ||||
|         tracks = playlist.tracks | ||||
|  | ||||
|         medias = self.__download_job.check_sources( | ||||
|             infos_dw, self.__quality_download | ||||
|         ) | ||||
|  | ||||
|         for c_infos_dw, c_media, c_song_metadata in zip( | ||||
|                 infos_dw, medias, self.__song_metadata | ||||
|         ): | ||||
|             c_infos_dw['media_url'] = c_media | ||||
|             c_preferences = deepcopy(self.__preferences) | ||||
|             c_preferences.ids = c_infos_dw['SNG_ID'] | ||||
|             c_preferences.song_metadata = c_song_metadata | ||||
|             c_song_metadata = c_preferences.song_metadata | ||||
|  | ||||
|             if type(c_song_metadata) is str: | ||||
|                 print(f"Track not found {c_song_metadata} :(") | ||||
|                 continue | ||||
|  | ||||
|             track = Downloader( | ||||
|                 c_infos_dw, c_preferences, self.__download_job | ||||
|             ).easy_dw() | ||||
|  | ||||
|             if not track.success: | ||||
|                 song = f"{c_song_metadata['music']} - {c_song_metadata['artist']}" | ||||
|                 print(f"Cannot download {song}") | ||||
|  | ||||
|             tracks.append(track) | ||||
|  | ||||
|         if self.__make_zip: | ||||
|             playlist_title = self.__json_data['title'] | ||||
|             zip_name = f"{self.__output_dir}/{playlist_title} [playlist {self.__ids}]" | ||||
|             create_zip(tracks, zip_name=zip_name) | ||||
|             playlist.zip_path = zip_name | ||||
|  | ||||
|         return playlist | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,99 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from binascii import ( | ||||
| 	a2b_hex as __a2b_hex, | ||||
| 	b2a_hex as __b2a_hex | ||||
| ) | ||||
| from hashlib import md5 as __md5 | ||||
|  | ||||
| from Cryptodome.Cipher.AES import ( | ||||
| 	new as __newAES, | ||||
| 	MODE_ECB as __MODE_ECB | ||||
| ) | ||||
| from Cryptodome.Cipher.Blowfish import ( | ||||
| 	new as __newBlowfish, | ||||
| 	MODE_CBC as __MODE_CBC | ||||
| ) | ||||
|  | ||||
| __secret_key = "g4el58wc0zvf9na1" | ||||
| __secret_key2 = b"jo6aey6haid2Teih" | ||||
| __idk = __a2b_hex("0001020304050607") | ||||
|  | ||||
|  | ||||
| def md5hex(data: str): | ||||
|     hashed = __md5( | ||||
|         data.encode() | ||||
|     ).hexdigest() | ||||
|  | ||||
|     return hashed | ||||
|  | ||||
|  | ||||
| def gen_song_hash(md5, quality, ids, media): | ||||
|     data = b"\xa4".join( | ||||
|         a.encode() | ||||
|         for a in [ | ||||
|             md5, quality, ids, media | ||||
|         ] | ||||
|     ) | ||||
|  | ||||
|     hashed = ( | ||||
|         __md5(data) | ||||
|             .hexdigest() | ||||
|             .encode() | ||||
|     ) | ||||
|  | ||||
|     data = b"\xa4".join( | ||||
|         [hashed, data] | ||||
|     ) + b"\xa4" | ||||
|  | ||||
|     if len(data) % 16: | ||||
|         data += b"\x00" * (16 - len(data) % 16) | ||||
|  | ||||
|     c = __newAES(__secret_key2, __MODE_ECB) | ||||
|  | ||||
|     media_url = __b2a_hex( | ||||
|         c.encrypt(data) | ||||
|     ).decode() | ||||
|  | ||||
|     return media_url | ||||
|  | ||||
|  | ||||
| def __calcbfkey(songid): | ||||
|     h = md5hex(songid) | ||||
|  | ||||
|     bfkey = "".join( | ||||
|         chr( | ||||
|             ord(h[i]) ^ ord(h[i + 16]) ^ ord(__secret_key[i]) | ||||
|         ) | ||||
|  | ||||
|         for i in range(16) | ||||
|     ) | ||||
|  | ||||
|     return bfkey | ||||
|  | ||||
|  | ||||
| def __blowfishDecrypt(data, key): | ||||
|     c = __newBlowfish( | ||||
|         key.encode(), __MODE_CBC, __idk | ||||
|     ) | ||||
|  | ||||
|     return c.decrypt(data) | ||||
|  | ||||
|  | ||||
| def decryptfile(content, key, name): | ||||
|     key = __calcbfkey(key) | ||||
|     decrypted_audio = open(name, "wb") | ||||
|     seg = 0 | ||||
|  | ||||
|     for data in content: | ||||
|         if ( | ||||
|                 (seg % 3) == 0 | ||||
|         ) and ( | ||||
|                 len(data) == 2048 | ||||
|         ): | ||||
|             data = __blowfishDecrypt(data, key) | ||||
|  | ||||
|         decrypted_audio.write(data) | ||||
|         seg += 1 | ||||
|  | ||||
|     decrypted_audio.close() | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,87 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| class TrackNotFound(Exception): | ||||
|     def __init__(self, url=None, message=None): | ||||
|         self.url = url | ||||
|  | ||||
|         if not message: | ||||
|             self.message = f"Track {self.url} not found :(" | ||||
|         else: | ||||
|             self.message = message | ||||
|  | ||||
|         super().__init__(self.message) | ||||
|  | ||||
|  | ||||
| class AlbumNotFound(Exception): | ||||
|     def __init__(self, url=None): | ||||
|         self.url = url | ||||
|         self.msg = f"Album {self.url} not found :(" | ||||
|         super().__init__(self.msg) | ||||
|  | ||||
|  | ||||
| class InvalidLink(Exception): | ||||
|     def __init__(self, url): | ||||
|         self.url = url | ||||
|         self.msg = f"Invalid Link {self.url} :(" | ||||
|         super().__init__(self.msg) | ||||
|  | ||||
|  | ||||
| class QuotaExceeded(Exception): | ||||
|     def __init__(self, message=None): | ||||
|         if not message: | ||||
|             self.message = "TOO MUCH REQUESTS LIMIT YOURSELF !!! :)" | ||||
|  | ||||
|         super().__init__(self.message) | ||||
|  | ||||
|  | ||||
| class QualityNotFound(Exception): | ||||
|     def __init__(self, quality=None, msg=None): | ||||
|         self.quality = quality | ||||
|  | ||||
|         if not msg: | ||||
|             self.msg = ( | ||||
|                 f"The {quality} quality doesn't exist :)\ | ||||
| 				\nThe qualities have to be FLAC or MP3_320 or MP3_256 or MP3_128" | ||||
|             ) | ||||
|         else: | ||||
|             self.msg = msg | ||||
|  | ||||
|         super().__init__(self.msg) | ||||
|  | ||||
|  | ||||
| class NoRightOnMedia(Exception): | ||||
|     def __init__(self, msg): | ||||
|         self.msg = msg | ||||
|         super().__init__(msg) | ||||
|  | ||||
|  | ||||
| class NoDataApi(Exception): | ||||
|     def __init__(self, message): | ||||
|         super().__init__(message) | ||||
|  | ||||
|  | ||||
| class BadCredentials(Exception): | ||||
|     def __init__( | ||||
|             self, | ||||
|             arl=None, | ||||
|             email=None, | ||||
|             password=None, | ||||
|             msg=None | ||||
|     ): | ||||
|         if msg: | ||||
|             self.msg = msg | ||||
|         else: | ||||
|             self.arl = arl | ||||
|             self.email = email | ||||
|             self.password = password | ||||
|  | ||||
|             if arl: | ||||
|                 self.msg = f"Wrong token: {arl} :(" | ||||
|             else: | ||||
|                 self.msg = f"Wrong credentials email: {self.email}, password: {self.password}" | ||||
|  | ||||
|         super().__init__(self.msg) | ||||
|  | ||||
|  | ||||
| class CredentialsMissing(Exception): | ||||
|     """ Deezer credentials not set! """ | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,279 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from requests import Session | ||||
| from requests import ( | ||||
| 	get as req_get, | ||||
| 	post as req_post | ||||
| ) | ||||
|  | ||||
| from .settings import qualities | ||||
| from .download_utils import md5hex | ||||
| from .exceptions import ( | ||||
| 	BadCredentials, TrackNotFound, NoRightOnMedia | ||||
| ) | ||||
|  | ||||
| client_id = "172365" | ||||
| client_secret = "fb0bec7ccc063dab0417eb7b0d847f34" | ||||
| try_link = "https://api.deezer.com/platform/generic/track/3135556" | ||||
|  | ||||
|  | ||||
| class Gateway: | ||||
|     def __init__( | ||||
|             self, | ||||
|             arl=None, | ||||
|             email=None, | ||||
|             password=None | ||||
|     ): | ||||
|         self.__req = Session() | ||||
|         self.__arl = arl | ||||
|         self.__email = email | ||||
|         self.__password = password | ||||
|         self.__token = "null" | ||||
|         self.__get_lyric = "song.getLyrics" | ||||
|         self.__get_song_data = "song.getData" | ||||
|         self.__get_user_getArl = "user.getArl" | ||||
|         self.__get_page_track = "deezer.pageTrack" | ||||
|         self.__get_user_data = "deezer.getUserData" | ||||
|         self.__get_album_data = "song.getListByAlbum" | ||||
|         self.__get_playlist_data = "playlist.getSongs" | ||||
|         self.__get_media_url = "https://media.deezer.com/v1/get_url" | ||||
|         self.__get_auth_token_url = "https://api.deezer.com/auth/token" | ||||
|         self.__private_api_link = "https://www.deezer.com/ajax/gw-light.php" | ||||
|         self.__song_server = "https://e-cdns-proxy-{}.dzcdn.net/mobile/1/{}" | ||||
|         self.__refresh_token() | ||||
|  | ||||
|     def __login(self): | ||||
|         if ( | ||||
|                 (not self.__arl) and | ||||
|                 (not self.__email) and | ||||
|                 (not self.__password) | ||||
|         ): | ||||
|             msg = f"NO LOGIN STUFF INSERTED :)))" | ||||
|  | ||||
|             raise BadCredentials(msg=msg) | ||||
|  | ||||
|         if self.__arl: | ||||
|             self.__req.cookies['arl'] = self.__arl | ||||
|         else: | ||||
|             self.__get_arl() | ||||
|  | ||||
|     def __get_arl(self): | ||||
|         access_token = self.__get_access_token() | ||||
|  | ||||
|         c_headers = { | ||||
|             "Authorization": f"Bearer {access_token}" | ||||
|         } | ||||
|  | ||||
|         self.__req.get(try_link, headers=c_headers).json() | ||||
|         arl = self.__get_api(self.__get_user_getArl) | ||||
|         self.__req.cookies.get("sid") | ||||
|         self.__arl = arl | ||||
|  | ||||
|     def __get_access_token(self): | ||||
|         password = md5hex(self.__password) | ||||
|  | ||||
|         request_hash = md5hex( | ||||
|             "".join( | ||||
|                 [ | ||||
|                     client_id, self.__email, password, client_secret | ||||
|                 ] | ||||
|             ) | ||||
|         ) | ||||
|  | ||||
|         params = { | ||||
|             "app_id": client_id, | ||||
|             "login": self.__email, | ||||
|             "password": password, | ||||
|             "hash": request_hash | ||||
|         } | ||||
|  | ||||
|         results = req_get(self.__get_auth_token_url, params=params).json() | ||||
|  | ||||
|         if "error" in results: | ||||
|             raise BadCredentials( | ||||
|                 email=self.__email, | ||||
|                 password=self.__password | ||||
|             ) | ||||
|  | ||||
|         access_token = results['access_token'] | ||||
|  | ||||
|         return access_token | ||||
|  | ||||
|     def __cool_api(self): | ||||
|         guest_sid = self.__req.cookies.get("sid") | ||||
|         url = "https://api.deezer.com/1.0/gateway.php" | ||||
|  | ||||
|         params = { | ||||
|             'api_key': "4VCYIJUCDLOUELGD1V8WBVYBNVDYOXEWSLLZDONGBBDFVXTZJRXPR29JRLQFO6ZE", | ||||
|             'sid': guest_sid, | ||||
|             'input': '3', | ||||
|             'output': '3', | ||||
|             'method': 'song_getData' | ||||
|         } | ||||
|  | ||||
|         json = {'sng_id': 302127} | ||||
|  | ||||
|         json = req_post(url, params=params, json=json).json() | ||||
|         print(json) | ||||
|  | ||||
|     def __get_api( | ||||
|             self, method, | ||||
|             json_data=None | ||||
|     ): | ||||
|         params = { | ||||
|             "api_version": "1.0", | ||||
|             "api_token": self.__token, | ||||
|             "input": "3", | ||||
|             "method": method | ||||
|         } | ||||
|  | ||||
|         results = self.__req.post( | ||||
|             self.__private_api_link, | ||||
|             params=params, | ||||
|             json=json_data | ||||
|         ).json()['results'] | ||||
|  | ||||
|         if not results: | ||||
|             self.__refresh_token() | ||||
|             self.__get_api(method, json_data) | ||||
|  | ||||
|         return results | ||||
|  | ||||
|     def get_user(self): | ||||
|         data = self.__get_api(self.__get_user_data) | ||||
|         return data | ||||
|  | ||||
|     def __refresh_token(self): | ||||
|         self.__req.cookies.clear_session_cookies() | ||||
|  | ||||
|         if not self.amIlog(): | ||||
|             self.__login() | ||||
|             self.am_I_log() | ||||
|  | ||||
|         data = self.get_user() | ||||
|         self.__token = data['checkForm'] | ||||
|         self.__license_token = self.__get_license_token() | ||||
|  | ||||
|     def __get_license_token(self): | ||||
|         data = self.get_user() | ||||
|         license_token = data['USER']['OPTIONS']['license_token'] | ||||
|  | ||||
|         return license_token | ||||
|  | ||||
|     def amIlog(self): | ||||
|         data = self.get_user() | ||||
|         user_id = data['USER']['USER_ID'] | ||||
|         is_logged = False | ||||
|  | ||||
|         if user_id != 0: | ||||
|             is_logged = True | ||||
|  | ||||
|         return is_logged | ||||
|  | ||||
|     def am_I_log(self): | ||||
|         if not self.amIlog(): | ||||
|             raise BadCredentials(arl=self.__arl) | ||||
|  | ||||
|     def get_song_data(self, ids): | ||||
|         json_data = { | ||||
|             "sng_id": ids | ||||
|         } | ||||
|  | ||||
|         infos = self.__get_api(self.__get_song_data, json_data) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def get_album_data(self, ids): | ||||
|         json_data = { | ||||
|             "alb_id": ids, | ||||
|             "nb": -1 | ||||
|         } | ||||
|  | ||||
|         infos = self.__get_api(self.__get_album_data, json_data) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def get_lyric(self, ids): | ||||
|         json_data = { | ||||
|             "sng_id": ids | ||||
|         } | ||||
|  | ||||
|         infos = self.__get_api(self.__get_lyric, json_data) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def get_playlist_data(self, ids): | ||||
|         json_data = { | ||||
|             "playlist_id": ids, | ||||
|             "nb": -1 | ||||
|         } | ||||
|  | ||||
|         infos = self.__get_api(self.__get_playlist_data, json_data) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def get_page_track(self, ids): | ||||
|         json_data = { | ||||
|             "sng_id": ids | ||||
|         } | ||||
|  | ||||
|         infos = self.__get_api(self.__get_page_track, json_data) | ||||
|  | ||||
|         return infos | ||||
|  | ||||
|     def get_song_url(self, n, song_hash): | ||||
|         song_url = self.__song_server.format(n, song_hash) | ||||
|  | ||||
|         return song_url | ||||
|  | ||||
|     def song_exist(self, song_url): | ||||
|         crypted_audio = req_get(song_url) | ||||
|  | ||||
|         if len(crypted_audio.content) == 0: | ||||
|             raise TrackNotFound | ||||
|  | ||||
|         return crypted_audio | ||||
|  | ||||
|     def get_medias_url(self, tracks_token, quality): | ||||
|         others_qualities = [] | ||||
|  | ||||
|         for c_quality in qualities: | ||||
|             if c_quality == quality: | ||||
|                 continue | ||||
|  | ||||
|             c_quality_set = { | ||||
|                 "cipher": "BF_CBC_STRIPE", | ||||
|                 "format": c_quality | ||||
|             } | ||||
|  | ||||
|             others_qualities.append(c_quality_set) | ||||
|  | ||||
|         json_data = { | ||||
|             "license_token": self.__license_token, | ||||
|             "media": [ | ||||
|                 { | ||||
|                     "type": "FULL", | ||||
|                     "formats": [ | ||||
|                         { | ||||
|                             "cipher": "BF_CBC_STRIPE", | ||||
|                             "format": quality | ||||
|                         } | ||||
|                     ]  # + others_qualities | ||||
|                 } | ||||
|             ], | ||||
|             "track_tokens": tracks_token | ||||
|         } | ||||
|  | ||||
|         infos = req_post( | ||||
|             self.__get_media_url, | ||||
|             json=json_data | ||||
|         ).json() | ||||
|  | ||||
|         if "errors" in infos: | ||||
|             msg = infos['errors'][0]['message'] | ||||
|  | ||||
|             raise NoRightOnMedia(msg) | ||||
|  | ||||
|         medias = infos['data'] | ||||
|  | ||||
|         return medias | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,6 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from .album import Album | ||||
| from .playlist import Playlist | ||||
| from .preferences import Preferences | ||||
| from .track import Track | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,22 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
|  | ||||
| class Album: | ||||
|     def __init__(self, ids: int) -> None: | ||||
|         self.__t_list = [] | ||||
|         self.zip_path = None | ||||
|         self.image = None | ||||
|         self.album_quality = None | ||||
|         self.md5_image = None | ||||
|         self.ids = ids | ||||
|         self.nb_tracks = None | ||||
|         self.album_name = None | ||||
|         self.upc = None | ||||
|         self.__set_album_md5() | ||||
|  | ||||
|     @property | ||||
|     def tracks(self): | ||||
|         return self.__t_list | ||||
|  | ||||
|     def __set_album_md5(self): | ||||
|         self.album_md5 = f"album/{self.ids}" | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,11 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
|  | ||||
| class Playlist: | ||||
|     def __init__(self, tracklist=None) -> None: | ||||
|         self.__t_list = tracklist or [] | ||||
|         self.zip_path = None | ||||
|  | ||||
|     @property | ||||
|     def tracks(self): | ||||
|         return self.__t_list | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,15 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| class Preferences: | ||||
|     def __init__(self) -> None: | ||||
|         self.link = None | ||||
|         self.song_metadata = None | ||||
|         self.quality_download = None | ||||
|         self.output_dir = None | ||||
|         self.ids = None | ||||
|         self.json_data = None | ||||
|         self.recursive_quality = None | ||||
|         self.recursive_download = None | ||||
|         self.not_interface = None | ||||
|         self.method_save = None | ||||
|         self.make_zip = None | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,61 @@ | ||||
| #!/usr/bin/python3 | ||||
| from tempfile import gettempdir | ||||
| from os.path import isfile | ||||
|  | ||||
|  | ||||
| class Track: | ||||
|     def __init__( | ||||
|             self, | ||||
|             tags: dict, | ||||
|             song_path: str, | ||||
|             file_format: str, | ||||
|             quality: str, | ||||
|             link: str, | ||||
|             ids: int | ||||
|     ) -> None: | ||||
|         self.tags = tags | ||||
|         self.__set_tags() | ||||
|         self.song_name = f"{self.music} - {self.artist}" | ||||
|         self.song_path = song_path | ||||
|         self.file_format = file_format | ||||
|         self.quality = quality | ||||
|         self.link = link | ||||
|         self.ids = ids | ||||
|         self.md5_image = None | ||||
|         self.success = True | ||||
|         self.__set_track_md5() | ||||
|  | ||||
|     @property | ||||
|     def image_path(self): | ||||
|         path = self.song_path + ".jpg" | ||||
|         if not isfile(path): | ||||
|             try: | ||||
|                 with open(path, "wb") as f: | ||||
|                     f.write(self.tags["image"]) | ||||
|             except: | ||||
|                 pass | ||||
|         return path | ||||
|  | ||||
|     @property | ||||
|     def track_info(self): | ||||
|         return { | ||||
|             "title": self.tags.get("music") or self.song_name, | ||||
|             "url": self.link, | ||||
|             "album": self.tags.get("album"), | ||||
|             "genre": self.tags.get("genre"), | ||||
|             "artist": self.tags.get("artist"), | ||||
|             "duration": self.tags.get("duration", 0) | ||||
|         } | ||||
|  | ||||
|     def __set_tags(self): | ||||
|         for tag, value in self.tags.items(): | ||||
|             setattr( | ||||
|                 self, tag, value | ||||
|             ) | ||||
|  | ||||
|     def __set_track_md5(self): | ||||
|         self.track_md5 = f"track/{self.ids}" | ||||
|  | ||||
|     def set_fallback_ids(self, fallback_ids): | ||||
|         self.fallback_ids = fallback_ids | ||||
|         self.fallback_track_md5 = f"track/{self.fallback_ids}" | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,28 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| header = { | ||||
|     "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0", | ||||
|     "Accept-Language": "en-US;q=0.5,en;q=0.3" | ||||
| } | ||||
|  | ||||
| method_saves = ["0", "1", "2"] | ||||
|  | ||||
| qualities = { | ||||
|     "FLAC": { | ||||
|         "n_quality": "9", | ||||
|         "f_format": ".flac", | ||||
|         "s_quality": "FLAC" | ||||
|     }, | ||||
|  | ||||
|     "MP3_320": { | ||||
|         "n_quality": "3", | ||||
|         "f_format": ".mp3", | ||||
|         "s_quality": "320" | ||||
|     }, | ||||
|  | ||||
|     "MP3_128": { | ||||
|         "n_quality": "1", | ||||
|         "f_format": ".mp3", | ||||
|         "s_quality": "128" | ||||
|     } | ||||
| } | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,228 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from mutagen.flac import FLAC, Picture | ||||
| from mutagen.id3 import ( | ||||
| 	ID3NoHeaderError, | ||||
| 	ID3, APIC, USLT, SYLT, | ||||
| 	COMM, TSRC, TRCK, TIT2, | ||||
| 	TLEN, TEXT, TCON, TALB, TBPM, | ||||
| 	TPE1, TYER, TDAT, TPOS, TPE2, | ||||
| 	TPUB, TCOP, TXXX, TCOM, IPLS | ||||
| ) | ||||
|  | ||||
| from .models.track import Track | ||||
|  | ||||
|  | ||||
| def __write_flac(song, data): | ||||
|     tag = FLAC(song) | ||||
|     tag.delete() | ||||
|     images = Picture() | ||||
|     images.type = 3 | ||||
|     images.data = data['image'] | ||||
|     tag.clear_pictures() | ||||
|     tag.add_picture(images) | ||||
|     tag['lyrics'] = data['lyric'] | ||||
|     tag['artist'] = data['artist'] | ||||
|     tag['title'] = data['music'] | ||||
|     tag[ | ||||
|         'date'] = f"{data['year'].year}/{data['year'].month}/{data['year'].day}" | ||||
|     tag['album'] = data['album'] | ||||
|     tag['tracknumber'] = f"{data['tracknum']}" | ||||
|     tag['discnumber'] = f"{data['discnum']}" | ||||
|     tag['genre'] = data['genre'] | ||||
|     tag['albumartist'] = data['ar_album'] | ||||
|     tag['author'] = data['author'] | ||||
|     tag['composer'] = data['composer'] | ||||
|     tag['copyright'] = data['copyright'] | ||||
|     tag['bpm'] = f"{data['bpm']}" | ||||
|     tag['length'] = f"{data['duration']}" | ||||
|     tag['organization'] = data['label'] | ||||
|     tag['isrc'] = data['isrc'] | ||||
|     tag['lyricist'] = data['lyricist'] | ||||
|     tag['version'] = data['version'] | ||||
|     tag.save() | ||||
|  | ||||
|  | ||||
| def __write_mp3(song, data): | ||||
|     try: | ||||
|         audio = ID3(song) | ||||
|         audio.delete() | ||||
|     except ID3NoHeaderError: | ||||
|         audio = ID3() | ||||
|  | ||||
|     audio.add( | ||||
|         APIC( | ||||
|             mime="image/jpeg", | ||||
|             type=3, | ||||
|             desc="album front cover", | ||||
|             data=data['image'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         COMM( | ||||
|             lang="eng", | ||||
|             desc="my comment", | ||||
|             text="DO NOT USE FOR YOUR OWN EARNING" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         USLT( | ||||
|             text=data['lyric'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         SYLT( | ||||
|             type=1, | ||||
|             format=2, | ||||
|             desc="sync lyric song", | ||||
|             text=data['lyric_sync'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TSRC( | ||||
|             text=data['isrc'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TRCK( | ||||
|             text=f"{data['tracknum']}/{data['nb_tracks']}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TIT2( | ||||
|             text=data['music'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TLEN( | ||||
|             text=f"{data['duration']}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TEXT( | ||||
|             text=data['lyricist'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TCON( | ||||
|             text=data['genre'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TALB( | ||||
|             text=data['album'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TBPM( | ||||
|             text=f"{data['bpm']}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TPE1( | ||||
|             text=data['artist'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TYER( | ||||
|             text=f"{data['year'].year}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TDAT( | ||||
|             text=f"{data['year'].day}{data['year'].month}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TPOS( | ||||
|             text=f"{data['discnum']}/{data['discnum']}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TPE2( | ||||
|             text=data['ar_album'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TPUB( | ||||
|             text=data['label'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TCOP( | ||||
|             text=data['copyright'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TXXX( | ||||
|             desc="REPLAYGAIN_TRACK_GAIN", | ||||
|             text=f"{data['gain']}" | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         TCOM( | ||||
|             text=data['composer'] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.add( | ||||
|         IPLS( | ||||
|             people=[data['author']] | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     audio.save(song, v2_version=3) | ||||
|  | ||||
|  | ||||
| def write_tags(track: Track): | ||||
|     song = track.song_path | ||||
|     song_metadata = track.tags | ||||
|     f_format = track.file_format | ||||
|  | ||||
|     if f_format == ".flac": | ||||
|         __write_flac(song, song_metadata) | ||||
|     else: | ||||
|         __write_mp3(song, song_metadata) | ||||
|  | ||||
|  | ||||
| def check_track(track: Track): | ||||
|     song = track.song_path | ||||
|     f_format = track.file_format | ||||
|     is_ok = False | ||||
|  | ||||
|     if f_format == ".flac": | ||||
|         tags = FLAC(song) | ||||
|     else: | ||||
|         try: | ||||
|             tags = ID3(song) | ||||
|         except ID3NoHeaderError: | ||||
|             return is_ok | ||||
|  | ||||
|     l_tags = len( | ||||
|         tags.keys() | ||||
|     ) | ||||
|  | ||||
|     if l_tags > 15: | ||||
|         is_ok = True | ||||
|  | ||||
|     return is_ok | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,228 @@ | ||||
| #!/usr/bin/python3 | ||||
|  | ||||
| from datetime import datetime | ||||
| from os import makedirs | ||||
| from os.path import ( | ||||
| 	isdir, basename, join | ||||
| ) | ||||
| from urllib.parse import urlparse | ||||
| from zipfile import ZipFile, ZIP_DEFLATED | ||||
|  | ||||
| from requests import get as req_get | ||||
|  | ||||
| from .settings import header | ||||
| from .exceptions import InvalidLink | ||||
|  | ||||
|  | ||||
| def link_is_valid(link): | ||||
|     netloc = urlparse(link).netloc | ||||
|  | ||||
|     if not any( | ||||
|             c_link == netloc | ||||
|             for c_link in ["www.deezer.com", "deezer.com", "deezer.page.link"] | ||||
|     ): | ||||
|         raise InvalidLink(link) | ||||
|  | ||||
|  | ||||
| def get_ids(link): | ||||
|     parsed = urlparse(link) | ||||
|     path = parsed.path | ||||
|     ids = path.split("/")[-1] | ||||
|     return ids | ||||
|  | ||||
|  | ||||
| def request(url): | ||||
|     thing = req_get(url, headers=header) | ||||
|     return thing | ||||
|  | ||||
|  | ||||
| def artist_sort(array): | ||||
|     if len(array) > 1: | ||||
|         for a in array: | ||||
|             for b in array: | ||||
|                 if a in b and a != b: | ||||
|                     array.remove(b) | ||||
|  | ||||
|     array = list( | ||||
|         dict.fromkeys(array) | ||||
|     ) | ||||
|  | ||||
|     artists = " & ".join(array) | ||||
|     return artists | ||||
|  | ||||
|  | ||||
| def __check_dir(directory): | ||||
|     if not isdir(directory): | ||||
|         makedirs(directory) | ||||
|  | ||||
|  | ||||
| def check_track_md5(infos: dict): | ||||
|     if "FALLBACK" in infos: | ||||
|         song_md5 = infos['FALLBACK']['MD5_ORIGIN'] | ||||
|         version = infos['FALLBACK']['MEDIA_VERSION'] | ||||
|     else: | ||||
|         song_md5 = infos['MD5_ORIGIN'] | ||||
|         version = infos['MEDIA_VERSION'] | ||||
|  | ||||
|     return song_md5, version | ||||
|  | ||||
|  | ||||
| def check_track_token(infos: dict): | ||||
|     if "FALLBACK" in infos: | ||||
|         track_token = infos['FALLBACK']['TRACK_TOKEN'] | ||||
|     else: | ||||
|         track_token = infos['TRACK_TOKEN'] | ||||
|  | ||||
|     return track_token | ||||
|  | ||||
|  | ||||
| def check_track_ids(infos: dict): | ||||
|     if "FALLBACK" in infos: | ||||
|         ids = infos['FALLBACK']['SNG_ID'] | ||||
|     else: | ||||
|         ids = infos['SNG_ID'] | ||||
|  | ||||
|     return ids | ||||
|  | ||||
|  | ||||
| def __var_excape(string): | ||||
|     string = ( | ||||
|         string | ||||
|             .replace("\\", "") | ||||
|             .replace("/", "") | ||||
|             .replace(":", "") | ||||
|             .replace("*", "") | ||||
|             .replace("?", "") | ||||
|             .replace("\"", "") | ||||
|             .replace("<", "") | ||||
|             .replace(">", "") | ||||
|             .replace("|", "") | ||||
|             .replace("&", "") | ||||
|     ) | ||||
|  | ||||
|     return string | ||||
|  | ||||
|  | ||||
| def convert_to_date(date): | ||||
|     if date == "0000-00-00": | ||||
|         date = "0001-01-01" | ||||
|  | ||||
|     date = datetime.strptime(date, "%Y-%m-%d") | ||||
|     return date | ||||
|  | ||||
|  | ||||
| def what_kind(link): | ||||
|     url = request(link).url | ||||
|     return url | ||||
|  | ||||
|  | ||||
| def __get_dir(song_metadata, output_dir, method_save): | ||||
|     album = __var_excape(song_metadata['album']) | ||||
|     artist = __var_excape(song_metadata['ar_album']) | ||||
|     upc = song_metadata['upc'] | ||||
|  | ||||
|     if method_save == 0: | ||||
|         song_dir = f"{album} [{upc}]" | ||||
|  | ||||
|     elif method_save == 1: | ||||
|         song_dir = f"{album} - {artist}" | ||||
|  | ||||
|     elif method_save == 2: | ||||
|         song_dir = f"{album} - {artist} [{upc}]" | ||||
|  | ||||
|     song_dir = song_dir[:255] | ||||
|     final_dir = join(output_dir, song_dir) | ||||
|     final_dir += "/" | ||||
|     return final_dir | ||||
|  | ||||
|  | ||||
| def set_path( | ||||
|         song_metadata, output_dir, | ||||
|         song_quality, file_format, method_save | ||||
| ): | ||||
|     album = __var_excape(song_metadata['album']) | ||||
|     artist = __var_excape(song_metadata['artist']) | ||||
|     music = __var_excape(song_metadata['music']) | ||||
|  | ||||
|     if method_save == 0: | ||||
|         discnum = song_metadata['discnumber'] | ||||
|         tracknum = song_metadata['tracknumber'] | ||||
|         song_name = f"{album} CD {discnum} TRACK {tracknum}" | ||||
|  | ||||
|     elif method_save == 1: | ||||
|         song_name = f"{music} - {artist}" | ||||
|  | ||||
|     elif method_save == 2: | ||||
|         isrc = song_metadata['isrc'] | ||||
|         song_name = f"{music} - {artist} [{isrc}]" | ||||
|  | ||||
|     song_dir = __get_dir(song_metadata, output_dir, method_save) | ||||
|     __check_dir(song_dir) | ||||
|  | ||||
|     l_encoded = len( | ||||
|         song_name.encode() | ||||
|     ) | ||||
|  | ||||
|     if l_encoded > 242: | ||||
|         n_tronc = l_encoded - 242 | ||||
|         n_tronc = len(song_name) - n_tronc | ||||
|     else: | ||||
|         n_tronc = 242 | ||||
|  | ||||
|     song_path = f"{song_dir}{song_name[:n_tronc]}" | ||||
|     song_path += f" ({song_quality}){file_format}" | ||||
|  | ||||
|     return song_path | ||||
|  | ||||
|  | ||||
| def create_zip( | ||||
|         tracks: [], | ||||
|         output_dir=None, | ||||
|         song_metadata=None, | ||||
|         song_quality=None, | ||||
|         method_save=0, | ||||
|         zip_name=None | ||||
| ): | ||||
|     if not zip_name: | ||||
|         album = __var_excape(song_metadata['album']) | ||||
|         song_dir = __get_dir(song_metadata, output_dir, method_save) | ||||
|  | ||||
|         if method_save == 0: | ||||
|             zip_name = f"{song_dir}{album} ({song_quality})" | ||||
|  | ||||
|         elif method_save == 1: | ||||
|             artist = __var_excape(song_metadata['ar_album']) | ||||
|             zip_name = f"{song_dir}{album} - {artist} ({song_quality})" | ||||
|  | ||||
|         elif method_save == 2: | ||||
|             artist = __var_excape(song_metadata['ar_album']) | ||||
|             upc = song_metadata['upc'] | ||||
|             zip_name = f"{song_dir}{album} - {artist} {upc} ({song_quality})" | ||||
|  | ||||
|     zip_name += ".zip" | ||||
|     z = ZipFile(zip_name, "w", ZIP_DEFLATED) | ||||
|  | ||||
|     for track in tracks: | ||||
|         if not track.success: | ||||
|             continue | ||||
|  | ||||
|         c_song_path = track.song_path | ||||
|         song_path = basename(c_song_path) | ||||
|         z.write(c_song_path, song_path) | ||||
|  | ||||
|     z.close() | ||||
|     return zip_name | ||||
|  | ||||
|  | ||||
| def trasform_sync_lyric(lyric): | ||||
|     sync_array = [] | ||||
|  | ||||
|     for a in lyric: | ||||
|         if "milliseconds" in a: | ||||
|             arr = ( | ||||
|                 a['line'], int(a['milliseconds']) | ||||
|             ) | ||||
|  | ||||
|             sync_array.append(arr) | ||||
|  | ||||
|     return sync_array | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,3 @@ | ||||
| UNKNOWN | ||||
|  | ||||
|  | ||||
| @@ -0,0 +1 @@ | ||||
| pip | ||||
| @@ -0,0 +1,16 @@ | ||||
| Metadata-Version: 2.0 | ||||
| Name: py-bandcamp | ||||
| Version: 0.7.0 | ||||
| Summary: bandcamp data scrapper | ||||
| Home-page: https://github.com/OpenJarbas/py_bandcamp | ||||
| Author: jarbasAI | ||||
| Author-email: jarbasai@mailfence.com | ||||
| License: Apache2 | ||||
| Platform: UNKNOWN | ||||
| Requires-Dist: beautifulsoup4 | ||||
| Requires-Dist: requests | ||||
| Requires-Dist: requests-cache | ||||
|  | ||||
| UNKNOWN | ||||
|  | ||||
|  | ||||
| @@ -0,0 +1,16 @@ | ||||
| py_bandcamp-0.7.0.dist-info/DESCRIPTION.rst,sha256=OCTuuN6LcWulhHS3d5rfjdsQtW22n7HENFRh6jC6ego,10 | ||||
| py_bandcamp-0.7.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4 | ||||
| py_bandcamp-0.7.0.dist-info/METADATA,sha256=v64yE1qTYkPrI6J2MtCwmVQdOOZbFHrdr5vQC5guAmk,324 | ||||
| py_bandcamp-0.7.0.dist-info/RECORD,, | ||||
| py_bandcamp-0.7.0.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 | ||||
| py_bandcamp-0.7.0.dist-info/WHEEL,sha256=rNo05PbNqwnXiIHFsYm0m22u4Zm6YJtugFG2THx4w3g,92 | ||||
| py_bandcamp-0.7.0.dist-info/metadata.json,sha256=N3uWHAUUKCiP03daPAIJfc8j6sUY8GyWfXM1r4KBukg,512 | ||||
| py_bandcamp-0.7.0.dist-info/top_level.txt,sha256=bbnj4Boxiv1B7Bo1GHfc5_jVs9p4egzSunq5FQtaCsQ,12 | ||||
| py_bandcamp/__init__.py,sha256=NI84FGwg_EouRuSqRp4JsO5MvSKbuizjOzXsWlNo_R0,9801 | ||||
| py_bandcamp/__init__.pyc,, | ||||
| py_bandcamp/models.py,sha256=bCz1oNwDEioNvMAzy8mVmX6bX7NhcjYVKew1_ncsajI,10877 | ||||
| py_bandcamp/models.pyc,, | ||||
| py_bandcamp/session.py,sha256=j3KLPYMn5YV6rgZNNT9MFpLJJ374HBY0f3SDnTUPnjA,101 | ||||
| py_bandcamp/session.pyc,, | ||||
| py_bandcamp/utils.py,sha256=k5tJESDviwz5wJawRqFR-99TOcYLwilUFInPfoa19oU,2076 | ||||
| py_bandcamp/utils.pyc,, | ||||
| @@ -0,0 +1,5 @@ | ||||
| Wheel-Version: 1.0 | ||||
| Generator: bdist_wheel (0.29.0) | ||||
| Root-Is-Purelib: true | ||||
| Tag: py3-none-any | ||||
|  | ||||
| @@ -0,0 +1 @@ | ||||
| {"extensions": {"python.details": {"contacts": [{"email": "jarbasai@mailfence.com", "name": "jarbasAI", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://github.com/OpenJarbas/py_bandcamp"}}}, "extras": [], "generator": "bdist_wheel (0.29.0)", "license": "Apache2", "metadata_version": "2.0", "name": "py-bandcamp", "run_requires": [{"requires": ["beautifulsoup4", "requests", "requests-cache"]}], "summary": "bandcamp data scrapper", "version": "0.7.0"} | ||||
| @@ -0,0 +1 @@ | ||||
| py_bandcamp | ||||
| @@ -0,0 +1,248 @@ | ||||
| from py_bandcamp.models import * | ||||
| from py_bandcamp.session import SESSION as requests | ||||
| from py_bandcamp.utils import extract_ldjson_blob, get_props, extract_blob, \ | ||||
|     get_stream_data | ||||
|  | ||||
|  | ||||
| class BandCamp: | ||||
|     @staticmethod | ||||
|     def tags(tag_list=True): | ||||
|         data = extract_blob("https://bandcamp.com/tags") | ||||
|         tags = {"genres": data["signup_params"]["genres"], | ||||
|                 "subgenres": data["signup_params"]["subgenres"]} | ||||
|         if not tag_list: | ||||
|             return tags | ||||
|         tag_list = [] | ||||
|         for genre in tags["subgenres"]: | ||||
|             tag_list.append(genre) | ||||
|             tag_list += [sub["norm_name"] for sub in tags["subgenres"][genre]] | ||||
|         return tag_list | ||||
|  | ||||
|     @classmethod | ||||
|     def search_tag(cls, tag, page=1, pop_date=1): | ||||
|         tag = tag.strip().replace(" ", "-").lower() | ||||
|         if tag not in cls.tags(): | ||||
|             return [] | ||||
|         params = {"page": page, "sort_field": pop_date} | ||||
|         url = 'http://bandcamp.com/tag/' + str(tag) | ||||
|         data = extract_blob(url, params=params) | ||||
|  | ||||
|         related_tags = [{"name": t["norm_name"], "score": t["relation"]} | ||||
|                         for t in data["hub"].pop("related_tags")] | ||||
|  | ||||
|         collections, dig_deeper = data["hub"].pop("tabs") | ||||
|         dig_deeper = dig_deeper["dig_deeper"]["results"] | ||||
|         collections = collections["collections"] | ||||
|  | ||||
|         _to_remove = ['custom_domain', 'custom_domain_verified', "item_type", | ||||
|                       'packages', 'slug_text', 'subdomain', 'is_preorder', | ||||
|                       'item_id', 'num_comments', 'tralbum_id', 'band_id', | ||||
|                       'tralbum_type', 'tag_id', 'audio_track_id'] | ||||
|  | ||||
|         for c in collections: | ||||
|             if c["name"] == "bc_dailys": | ||||
|                 continue | ||||
|             for result in c["items"]: | ||||
|                 result["image"] = "https://f4.bcbits.com/img/a{art_id}_1.jpg". \ | ||||
|                     format(art_id=result.pop("art_id")) | ||||
|                 for _ in _to_remove: | ||||
|                     if _ in result: | ||||
|                         result.pop(_) | ||||
|                 result["related_tags"] = related_tags | ||||
|                 result["collection"] = c["name"] | ||||
|                 if "tralbum_url" in result: | ||||
|                     result["album_url"] = result.pop("tralbum_url") | ||||
|                 # TODO featured track object | ||||
|                 yield BandcampTrack(result, parse=False) | ||||
|  | ||||
|         for k in dig_deeper: | ||||
|             for result in dig_deeper[k]["items"]: | ||||
|                 for _ in _to_remove: | ||||
|                     if _ in result: | ||||
|                         result.pop(_) | ||||
|                 result["related_tags"] = related_tags | ||||
|                 result["collection"] = "dig_deeper" | ||||
|                 if "tralbum_url" in result: | ||||
|                     result["album_url"] = result.pop("tralbum_url") | ||||
|                 # TODO featured track object | ||||
|                 yield BandcampTrack(result, parse=False) | ||||
|  | ||||
|     @classmethod | ||||
|     def search_albums(cls, album_name): | ||||
|         for album in cls.search(album_name, albums=True, tracks=False, | ||||
|                                 artists=False, labels=False): | ||||
|             yield album | ||||
|  | ||||
|     @classmethod | ||||
|     def search_tracks(cls, track_name): | ||||
|         for t in cls.search(track_name, albums=False, tracks=True, | ||||
|                             artists=False, labels=False): | ||||
|             yield t | ||||
|  | ||||
|     @classmethod | ||||
|     def search_artists(cls, artist_name): | ||||
|         for a in cls.search(artist_name, albums=False, tracks=False, | ||||
|                             artists=True, labels=False): | ||||
|             yield a | ||||
|  | ||||
|     @classmethod | ||||
|     def search_labels(cls, label_name): | ||||
|         for a in cls.search(label_name, albums=False, tracks=False, | ||||
|                             artists=False, labels=True): | ||||
|             yield a | ||||
|  | ||||
|     @classmethod | ||||
|     def search(cls, name, page=1, albums=True, tracks=True, artists=True, | ||||
|                labels=False): | ||||
|         params = {"page": page, "q": name} | ||||
|         response = requests.get('http://bandcamp.com/search', params=params) | ||||
|         html_doc = response.content | ||||
|         soup = BeautifulSoup(html_doc, 'html.parser') | ||||
|  | ||||
|         seen = [] | ||||
|         for item in soup.find_all("li", class_="searchresult"): | ||||
|             item_type = item.find('div', | ||||
|                                   class_='itemtype').text.strip().lower() | ||||
|             if item_type == "album" and albums: | ||||
|                 data = cls._parse_album(item) | ||||
|             elif item_type == "track" and tracks: | ||||
|                 data = cls._parse_track(item) | ||||
|             elif item_type == "artist" and artists: | ||||
|                 data = cls._parse_artist(item) | ||||
|             elif item_type == "label" and labels: | ||||
|                 data = cls._parse_label(item) | ||||
|             else: | ||||
|                 continue | ||||
|             # data["type"] = type | ||||
|             yield data | ||||
|             seen.append(data) | ||||
|         if not len(seen): | ||||
|             return  # no more pages | ||||
|         for item in cls.search(name, page=page + 1, albums=albums, | ||||
|                                tracks=tracks, artists=artists, | ||||
|                                labels=labels): | ||||
|             if item in seen: | ||||
|                 return  # duplicate data, fail safe out of loops | ||||
|             yield item | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_track_lyrics(track_url): | ||||
|         track_page = requests.get(track_url) | ||||
|         track_soup = BeautifulSoup(track_page.text, 'html.parser') | ||||
|         track_lyrics = track_soup.find("div", {"class": "lyricsText"}) | ||||
|         if track_lyrics: | ||||
|             return track_lyrics.text | ||||
|         return "lyrics unavailable" | ||||
|  | ||||
|     @classmethod | ||||
|     def get_streams(cls, urls): | ||||
|         if not isinstance(urls, list): | ||||
|             urls = [urls] | ||||
|         direct_links = [cls.get_stream_url(url) for url in urls] | ||||
|         return direct_links | ||||
|  | ||||
|     @classmethod | ||||
|     def get_stream_url(cls, url): | ||||
|         data = get_stream_data(url) | ||||
|         print(data) | ||||
|         for p in data['additionalProperty']: | ||||
|             if p['name'] == 'file_mp3-128': | ||||
|                 return p["value"] | ||||
|         return url | ||||
|  | ||||
|     @staticmethod | ||||
|     def _parse_label(item): | ||||
|         art = item.find("div", {"class": "art"}).find("img") | ||||
|         if art: | ||||
|             art = art["src"] | ||||
|         name = item.find('div', class_='heading').text.strip() | ||||
|         url = item.find( | ||||
|             'div', class_='heading').find('a')['href'].split("?")[0] | ||||
|         location = item.find('div', class_='subhead').text.strip() | ||||
|         try: | ||||
|             tags = item.find( | ||||
|                 'div', class_='tags').text.replace("tags:", "").split(",") | ||||
|             tags = [t.strip().lower() for t in tags] | ||||
|         except:  # sometimes missing | ||||
|             tags = [] | ||||
|  | ||||
|         data = {"name": name, "location": location, | ||||
|                 "tags": tags, "url": url, "image": art | ||||
|                 } | ||||
|         return BandcampLabel(data) | ||||
|  | ||||
|     @staticmethod | ||||
|     def _parse_artist(item): | ||||
|         name = item.find('div', class_='heading').text.strip() | ||||
|         url = item.find( | ||||
|             'div', class_='heading').find('a')['href'].split("?")[0] | ||||
|         genre = item.find( | ||||
|             'div', class_='genre').text.strip().replace("genre: ", "") | ||||
|         location = item.find('div', class_='subhead').text.strip() | ||||
|         try: | ||||
|             tags = item.find( | ||||
|                 'div', class_='tags').text.replace("tags:", "").split(",") | ||||
|             tags = [t.strip().lower() for t in tags] | ||||
|         except:  # sometimes missing | ||||
|             tags = [] | ||||
|         art = item.find("div", {"class": "art"}).find("img")["src"] | ||||
|  | ||||
|         data = {"name": name, "genre": genre, "location": location, | ||||
|                 "tags": tags, "url": url, "image": art, "albums": [] | ||||
|                 } | ||||
|         return BandcampArtist(data) | ||||
|  | ||||
|     @staticmethod | ||||
|     def _parse_track(item): | ||||
|         track_name = item.find('div', class_='heading').text.strip() | ||||
|         url = item.find( | ||||
|             'div', class_='heading').find('a')['href'].split("?")[0] | ||||
|         album_name, artist = item.find( | ||||
|             'div', class_='subhead').text.strip().split("by") | ||||
|         album_name = album_name.strip().replace("from ", "") | ||||
|         artist = artist.strip() | ||||
|         released = item.find( | ||||
|             'div', class_='released').text.strip().replace("released ", "") | ||||
|         try: | ||||
|             tags = item.find( | ||||
|                 'div', class_='tags').text.replace("tags:", "").split(",") | ||||
|             tags = [t.strip().lower() for t in tags] | ||||
|         except:  # sometimes missing | ||||
|             tags = [] | ||||
|  | ||||
|         art = item.find("div", {"class": "art"}).find("img")["src"] | ||||
|         data = {"track_name": track_name, "released": released, "url": url, | ||||
|                 "tags": tags, "album_name": album_name, "artist": artist, | ||||
|                 "image": art | ||||
|                 } | ||||
|         return BandcampTrack(data) | ||||
|  | ||||
|     @staticmethod | ||||
|     def _parse_album(item): | ||||
|         art = item.find("div", {"class": "art"}).find("img")["src"] | ||||
|         album_name = item.find('div', class_='heading').text.strip() | ||||
|         url = item.find( | ||||
|             'div', class_='heading').find('a')['href'].split("?")[0] | ||||
|         length = item.find('div', class_='length').text.strip() | ||||
|         tracks, minutes = length.split(",") | ||||
|         tracks = tracks.replace(" tracks", "").replace(" track", "").strip() | ||||
|         minutes = minutes.replace(" minutes", "").strip() | ||||
|         released = item.find( | ||||
|             'div', class_='released').text.strip().replace("released ", "") | ||||
|         tags = item.find( | ||||
|             'div', class_='tags').text.replace("tags:", "").split(",") | ||||
|         tags = [t.strip().lower() for t in tags] | ||||
|         artist = item.find("div", {"class": "subhead"}).text.strip() | ||||
|         if artist.startswith("by "): | ||||
|             artist = artist[3:] | ||||
|         data = {"album_name": album_name, | ||||
|                 "length": length, | ||||
|                 "minutes": minutes, | ||||
|                 "url": url, | ||||
|                 "image": art, | ||||
|                 "artist": artist, | ||||
|                 "track_number": tracks, | ||||
|                 "released": released, | ||||
|                 "tags": tags | ||||
|                 } | ||||
|         return BandcampAlbum(data, scrap=False) | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,406 @@ | ||||
| from bs4 import BeautifulSoup | ||||
| from py_bandcamp.session import SESSION as requests | ||||
| from py_bandcamp.utils import extract_ldjson_blob, get_props | ||||
|  | ||||
|  | ||||
| class BandcampTrack: | ||||
|     def __init__(self, data, parse=True): | ||||
|         self._url = data.get("url") | ||||
|         self._data = data or {} | ||||
|         self._page_data = {} | ||||
|         if parse: | ||||
|             self.parse_page() | ||||
|         if not self.url: | ||||
|             raise ValueError("bandcamp url is not set") | ||||
|  | ||||
|     def parse_page(self): | ||||
|         self._page_data = self.get_track_data(self.url) | ||||
|         return self._page_data | ||||
|  | ||||
|     @staticmethod | ||||
|     def from_url(url): | ||||
|         return BandcampTrack({"url": url}) | ||||
|  | ||||
|     @property | ||||
|     def url(self): | ||||
|         return self._url or self.data.get("url") | ||||
|  | ||||
|     @property | ||||
|     def album(self): | ||||
|         return self.get_album(self.url) | ||||
|  | ||||
|     @property | ||||
|     def artist(self): | ||||
|         return self.get_artist(self.url) | ||||
|  | ||||
|     @property | ||||
|     def data(self): | ||||
|         for k, v in self._page_data.items(): | ||||
|             self._data[k] = v | ||||
|         return self._data | ||||
|  | ||||
|     @property | ||||
|     def title(self): | ||||
|         return self.data.get("title") or self.data.get("name") or \ | ||||
|                self.url.split("/")[-1] | ||||
|  | ||||
|     @property | ||||
|     def image(self): | ||||
|         return self.data.get("image") | ||||
|  | ||||
|     @property | ||||
|     def track_num(self): | ||||
|         return self.data.get("tracknum") | ||||
|  | ||||
|     @property | ||||
|     def duration(self): | ||||
|         return self.data.get("duration_secs") or 0 | ||||
|  | ||||
|     @property | ||||
|     def stream(self): | ||||
|         return self.data.get("file_mp3-128") | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_album(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         if data.get('inAlbum'): | ||||
|             return BandcampAlbum({ | ||||
|                 "title": data['inAlbum'].get('name'), | ||||
|                 "url": data['inAlbum'].get('id', url).split("#")[0], | ||||
|                 'type': data['inAlbum'].get("type"), | ||||
|             }) | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_artist(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         d = data.get("byArtist") | ||||
|         if d: | ||||
|             return BandcampArtist({ | ||||
|                 "title": d.get('name'), | ||||
|                 "url": d.get('id', url).split("#")[0], | ||||
|                 'genre': d.get('genre'), | ||||
|                 "artist_type": d.get('type') | ||||
|             }, scrap=False) | ||||
|         return None | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_track_data(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         kwords = data.get('keywords', "") | ||||
|         if isinstance(kwords, str): | ||||
|             kwords = kwords.split(", ") | ||||
|         track = { | ||||
|             'dateModified': data.get('dateModified'), | ||||
|             'datePublished': data.get('datePublished'), | ||||
|             "url": data.get('id') or url, | ||||
|             "title": data.get("name"), | ||||
|             "type": data.get("type"), | ||||
|             'image': data.get('image'), | ||||
|             'keywords': kwords | ||||
|         } | ||||
|         for k, v in get_props(data).items(): | ||||
|             track[k] = v | ||||
|         return track | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return self.__class__.__name__ + ":" + self.title | ||||
|  | ||||
|     def __str__(self): | ||||
|         return self.url | ||||
|  | ||||
|  | ||||
| class BandcampAlbum: | ||||
|     def __init__(self, data, scrap=True): | ||||
|         self._url = data.get("url") | ||||
|         self._data = data or {} | ||||
|         self._page_data = {} | ||||
|         if scrap: | ||||
|             self.scrap() | ||||
|         if not self.url: | ||||
|             raise ValueError("bandcamp url is not set") | ||||
|  | ||||
|     def scrap(self): | ||||
|         self._page_data = self.get_album_data(self.url) | ||||
|         return self._page_data | ||||
|  | ||||
|     @staticmethod | ||||
|     def from_url(url): | ||||
|         return BandcampAlbum({"url": url}) | ||||
|  | ||||
|     @property | ||||
|     def image(self): | ||||
|         return self.data.get("image") | ||||
|  | ||||
|     @property | ||||
|     def url(self): | ||||
|         return self._url or self.data.get("url") | ||||
|  | ||||
|     @property | ||||
|     def title(self): | ||||
|         return self.data.get("title") or self.data.get("name") or \ | ||||
|                self.url.split("/")[-1] | ||||
|  | ||||
|     @property | ||||
|     def releases(self): | ||||
|         return self.get_releases(self.url) | ||||
|  | ||||
|     @property | ||||
|     def artist(self): | ||||
|         return self.get_artist(self.url) | ||||
|  | ||||
|     @property | ||||
|     def keywords(self): | ||||
|         return self.data.get("keywords") or [] | ||||
|  | ||||
|     @property | ||||
|     def tracks(self): | ||||
|         return self.get_tracks(self.url) | ||||
|  | ||||
|     @property | ||||
|     def featured_track(self): | ||||
|         if not len(self.tracks): | ||||
|             return None | ||||
|         num = self.data.get('featured_track_num', 1) or 1 | ||||
|         return self.tracks[int(num) - 1] | ||||
|  | ||||
|     @property | ||||
|     def comments(self): | ||||
|         return self.get_comments(self.url) | ||||
|  | ||||
|     @property | ||||
|     def data(self): | ||||
|         for k, v in self._page_data.items(): | ||||
|             self._data[k] = v | ||||
|         return self._data | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_releases(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         releases = [] | ||||
|         for d in data.get("albumRelease", []): | ||||
|             release = { | ||||
|                 "description": d.get("description"), | ||||
|                 'image': d.get('image'), | ||||
|                 "title": d.get('name'), | ||||
|                 "url": d.get('id', url).split("#")[0], | ||||
|                 'format': d.get('musicReleaseFormat'), | ||||
|             } | ||||
|             releases.append(release) | ||||
|         return releases | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_artist(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         d = data.get("byArtist") | ||||
|         if d: | ||||
|             return BandcampArtist({ | ||||
|                 "description": d.get("description"), | ||||
|                 'image': d.get('image'), | ||||
|                 "title": d.get('name'), | ||||
|                 "url": d.get('id', url).split("#")[0], | ||||
|                 'genre': d.get('genre'), | ||||
|                 "artist_type": d.get('type') | ||||
|             }, scrap=False) | ||||
|         return None | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_tracks(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         if not data.get("track"): | ||||
|             return [] | ||||
|  | ||||
|         data = data['track'] | ||||
|  | ||||
|         tracks = [] | ||||
|  | ||||
|         for d in data.get('itemListElement', []): | ||||
|             d = d['item'] | ||||
|             track = { | ||||
|                 "title": d.get('name'), | ||||
|                 "url": d.get('id') or url, | ||||
|                 'type': d.get('type'), | ||||
|             } | ||||
|             for k, v in get_props(d).items(): | ||||
|                 track[k] = v | ||||
|             tracks.append(BandcampTrack(track, parse=False)) | ||||
|         return tracks | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_comments(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         comments = [] | ||||
|         for d in data.get("comment", []): | ||||
|             comment = { | ||||
|                 "text": d["text"], | ||||
|                 'image': d["author"].get("image"), | ||||
|                 "author": d["author"]["name"] | ||||
|             } | ||||
|             comments.append(comment) | ||||
|         return comments | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_album_data(url): | ||||
|         data = extract_ldjson_blob(url, clean=True) | ||||
|         props = get_props(data) | ||||
|         kwords = data.get('keywords', "") | ||||
|         if isinstance(kwords, str): | ||||
|             kwords = kwords.split(", ") | ||||
|         return { | ||||
|             'dateModified': data.get('dateModified'), | ||||
|             'datePublished': data.get('datePublished'), | ||||
|             'description': data.get('description'), | ||||
|             "url": data.get('id') or url, | ||||
|             "title": data.get("name"), | ||||
|             "type": data.get("type"), | ||||
|             "n_tracks": data.get('numTracks'), | ||||
|             'image': data.get('image'), | ||||
|             'featured_track_num': props.get('featured_track_num'), | ||||
|             'keywords': kwords | ||||
|         } | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return self.__class__.__name__ + ":" + self.title | ||||
|  | ||||
|     def __str__(self): | ||||
|         return self.url | ||||
|  | ||||
|  | ||||
| class BandcampLabel: | ||||
|     def __init__(self, data, scrap=True): | ||||
|         self._url = data.get("url") | ||||
|         self._data = data or {} | ||||
|         self._page_data = {} | ||||
|         if scrap: | ||||
|             self.scrap() | ||||
|         if not self.url: | ||||
|             raise ValueError("bandcamp url is not set") | ||||
|  | ||||
|     def scrap(self): | ||||
|         self._page_data = {}  # TODO | ||||
|         return self._page_data | ||||
|  | ||||
|     @staticmethod | ||||
|     def from_url(url): | ||||
|         return BandcampTrack({"url": url}) | ||||
|  | ||||
|     @property | ||||
|     def url(self): | ||||
|         return self._url or self.data.get("url") | ||||
|  | ||||
|     @property | ||||
|     def data(self): | ||||
|         for k, v in self._page_data.items(): | ||||
|             self._data[k] = v | ||||
|         return self._data | ||||
|  | ||||
|     @property | ||||
|     def name(self): | ||||
|         return self.data.get("title") or self.data.get("name") or \ | ||||
|                self.url.split("/")[-1] | ||||
|  | ||||
|     @property | ||||
|     def location(self): | ||||
|         return self.data.get("location") | ||||
|  | ||||
|     @property | ||||
|     def tags(self): | ||||
|         return self.data.get("tags") or [] | ||||
|  | ||||
|     @property | ||||
|     def image(self): | ||||
|         return self.data.get("image") | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return self.__class__.__name__ + ":" + self.name | ||||
|  | ||||
|     def __str__(self): | ||||
|         return self.url | ||||
|  | ||||
|  | ||||
| class BandcampArtist: | ||||
|     def __init__(self, data, scrap=True): | ||||
|         self._url = data.get("url") | ||||
|         self._data = data or {} | ||||
|         self._page_data = {} | ||||
|         if scrap: | ||||
|             self.scrap() | ||||
|  | ||||
|     def scrap(self): | ||||
|         self._page_data = {}  # TODO | ||||
|         return self._page_data | ||||
|  | ||||
|     @property | ||||
|     def featured_album(self): | ||||
|         return BandcampAlbum.from_url(self.url + "/releases") | ||||
|  | ||||
|     @property | ||||
|     def featured_track(self): | ||||
|         if not self.featured_album: | ||||
|             return None | ||||
|         return self.featured_album.featured_track | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_albums(url): | ||||
|         albums = [] | ||||
|         soup = BeautifulSoup(requests.get(url).text, "html.parser") | ||||
|         for album in soup.find_all("a"): | ||||
|             album_url = album.find("p", {"class": "title"}) | ||||
|             if album_url: | ||||
|                 title = album_url.text.strip() | ||||
|                 art = album.find("div", {"class": "art"}).find("img")["src"] | ||||
|                 album_url = url + album["href"] | ||||
|                 album = BandcampAlbum({"album_name": title, | ||||
|                                        "image": art, | ||||
|                                        "url": album_url}) | ||||
|                 albums.append(album) | ||||
|         return albums | ||||
|  | ||||
|     @property | ||||
|     def albums(self): | ||||
|         return self.get_albums(self.url) | ||||
|  | ||||
|     @staticmethod | ||||
|     def from_url(url): | ||||
|         return BandcampTrack({"url": url}) | ||||
|  | ||||
|     @property | ||||
|     def url(self): | ||||
|         return self._url or self.data.get("url") | ||||
|  | ||||
|     @property | ||||
|     def data(self): | ||||
|         for k, v in self._page_data.items(): | ||||
|             self._data[k] = v | ||||
|         return self._data | ||||
|  | ||||
|     @property | ||||
|     def name(self): | ||||
|         return self.data.get("title") or self.data.get("name") or \ | ||||
|                self.url.split("/")[-1] | ||||
|  | ||||
|     @property | ||||
|     def location(self): | ||||
|         return self.data.get("location") | ||||
|  | ||||
|     @property | ||||
|     def genre(self): | ||||
|         return self.data.get("genre") | ||||
|  | ||||
|     @property | ||||
|     def tags(self): | ||||
|         return self.data.get("tags") or [] | ||||
|  | ||||
|     @property | ||||
|     def image(self): | ||||
|         return self.data.get("image") | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return self.__class__.__name__ + ":" + self.name | ||||
|  | ||||
|     def __str__(self): | ||||
|         return self.url | ||||
|  | ||||
|     def __eq__(self, other): | ||||
|         if str(self) == str(other): | ||||
|             return True | ||||
|         return False | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,3 @@ | ||||
| import requests_cache | ||||
|  | ||||
| SESSION = requests_cache.CachedSession(expire_after=5 * 60, backend="memory") | ||||
										
											Binary file not shown.
										
									
								
							| @@ -0,0 +1,77 @@ | ||||
| import json | ||||
|  | ||||
| from py_bandcamp.session import SESSION as requests | ||||
|  | ||||
|  | ||||
| def extract_blob(url, params=None): | ||||
|     blob = requests.get(url, params=params).text | ||||
|     for b in blob.split("data-blob='")[1:]: | ||||
|         json_blob = b.split("'")[0] | ||||
|         return json.loads(json_blob) | ||||
|     for b in blob.split("data-blob=\"")[1:]: | ||||
|         json_blob = b.split("\"")[0].replace(""", '"') | ||||
|         return json.loads(json_blob) | ||||
|  | ||||
|  | ||||
| def extract_ldjson_blob(url, clean=False): | ||||
|     txt_string = requests.get(url).text | ||||
|  | ||||
|     json_blob = txt_string. \ | ||||
|         split('<script type="application/ld+json">')[-1]. \ | ||||
|         split("</script>")[0] | ||||
|  | ||||
|     data = json.loads(json_blob) | ||||
|  | ||||
|     def _clean_list(l): | ||||
|         for idx, v in enumerate(l): | ||||
|             if isinstance(v, dict): | ||||
|                 l[idx] = _clean_dict(v) | ||||
|             if isinstance(v, list): | ||||
|                 l[idx] = _clean_list(v) | ||||
|         return l | ||||
|  | ||||
|     def _clean_dict(d): | ||||
|         clean = {} | ||||
|         for k, v in d.items(): | ||||
|             if isinstance(v, dict): | ||||
|                 v = _clean_dict(v) | ||||
|             if isinstance(v, list): | ||||
|                 v = _clean_list(v) | ||||
|             k = k.replace("@", "") | ||||
|             clean[k] = v | ||||
|         return clean | ||||
|  | ||||
|     if clean: | ||||
|         return _clean_dict(data) | ||||
|     return data | ||||
|  | ||||
|  | ||||
| def get_props(d, props=None): | ||||
|     props = props or [] | ||||
|     data = {} | ||||
|     for p in d['additionalProperty']: | ||||
|         if p['name'] in props or not props: | ||||
|             data[p['name']] = p['value'] | ||||
|     return data | ||||
|  | ||||
|  | ||||
| def get_stream_data(url): | ||||
|     data = extract_ldjson_blob(url) | ||||
|     artist_data = data['byArtist'] | ||||
|     album_data = data['inAlbum'] | ||||
|     kws = data["keywords"] | ||||
|     if isinstance(kws, str): | ||||
|         kws = kws.split(", ") | ||||
|     result = { | ||||
|         "categories": data["@type"], | ||||
|         'album_name': album_data['name'], | ||||
|         'artist': artist_data['name'], | ||||
|         'image': data['image'], | ||||
|         "title": data['name'], | ||||
|         "url": url, | ||||
|         "tags": kws + data.get("tags", []) | ||||
|     } | ||||
|     for p in data['additionalProperty']: | ||||
|         if p['name'] == 'file_mp3-128': | ||||
|             result["stream"] = p["value"] | ||||
|     return result | ||||
										
											Binary file not shown.
										
									
								
							
		Reference in New Issue
	
	Block a user