import concurrent import glob import logging import shutil from pathlib import Path from txtdownloader import TXTDownloader from ytdownloader import YTDownloader from coverdownloader import CoverDownloader from db import * class USDBDownloader: def removeDisallowedChars(self, value: str): DISALLOWED_CHARS_REGEX = "@$%&\\/:*?\"'<>|~`#^+=\{\}[];." for c in DISALLOWED_CHARS_REGEX: value = value.replace(c, '') return value def __init__(self, rawdir: str, libdir: str, cache: bool = True, concurrency: int = 1): self.rawdir = Path(rawdir) self.libdir = Path(libdir) self.db = USDB("USDB.xlsx") self.concurrency = concurrency self.txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", (self.rawdir / "txt").resolve(), cache) self.yt = YTDownloader((self.rawdir / "audio").resolve(), (self.rawdir / "video").resolve(), cache) self.cv = CoverDownloader((self.rawdir / "covers").resolve(), cache) def download(self): executor = concurrent.futures.ThreadPoolExecutor(self.concurrency) futures = [executor.submit(self.download_song, song) for _, song in self.db.data.iterrows()] concurrent.futures.wait(futures) def download_sample(self): executor = concurrent.futures.ThreadPoolExecutor(self.concurrency) futures = [executor.submit(self.download_song, song) for _, song in self.db.data.sample(5).iterrows()] concurrent.futures.wait(futures) def download_song(self, song: pandas.Series): try: usdb_url = get_usdb_url(song) cover_url = get_cover_image_url(song) yt_url = get_yt_video_url(song) song_name = self.removeDisallowedChars(get_song_name(song)) artist = self.removeDisallowedChars(get_artist_name(song)) spotify_uri = get_track_id(song) gap = get_gap(song) video_gap = get_video_gap(song) start = get_start(song) end = get_end(song) language = get_language(song) year = str(get_date(song).year) logger.info(f"Downloading: {artist} - {song_name}: {usdb_url}, {cover_url}, {yt_url}") if type(usdb_url) == str and usdb_url != " " and usdb_url != "MISSING": self.txt.download(usdb_url, artist, song_name, spotify_uri, gap, video_gap, start, end, language, year) if type(cover_url) == str and cover_url != " " and cover_url != "MISSING": self.cv.download(cover_url, artist, song_name, spotify_uri) if type(yt_url) == str and yt_url != " " and yt_url != "MISSING": self.yt.download(yt_url, artist, song_name, spotify_uri) except Exception as e: logger.error(f"{type(e)}: {e} while processing {artist} - {song_name}") def build_library(self): try: for file in glob.glob(str(self.rawdir / "txt" / "*.txt")): file = Path(file).stem outdir = self.libdir / file Path(outdir).mkdir(parents=True, exist_ok=True) logger.info(f"Copying {file}") if (self.rawdir / "txt" / f"{file}.txt").is_file(): shutil.copy2(self.rawdir / "txt" / f"{file}.txt", outdir) if (self.rawdir / "audio" / f"{file}.mp3").is_file(): shutil.copy2(self.rawdir / "audio" / f"{file}.mp3", outdir) if (self.rawdir / "video" / f"{file}.mp4").is_file(): shutil.copy2(self.rawdir / "video" / f"{file}.mp4", outdir) if (self.rawdir / "covers" / f"{file}.jpg").is_file(): shutil.copy2(self.rawdir / "covers" / f"{file}.jpg", outdir) except Exception as e: logger.error(f"{type(e)}: {e} while copying {file}")