Skip to content
Snippets Groups Projects
usdbdownloader.py 3.77 KiB
Newer Older
TheJoKlLa's avatar
TheJoKlLa committed
import concurrent
import glob
import logging
import shutil
from pathlib import Path

from txtdownloader import TXTDownloader
from ytdownloader import YTDownloader
from coverdownloader import CoverDownloader
from db import *

class USDBDownloader:

    def removeDisallowedChars(self, value: str):
        DISALLOWED_CHARS_REGEX = "@$%&\\/:*?\"'<>|~`#^+=\{\}[];."
        for c in DISALLOWED_CHARS_REGEX:
            value = value.replace(c, '')
        
        return value

    def __init__(self, rawdir: str, libdir: str, cache: bool = True, concurrency: int = 1):
        self.rawdir = Path(rawdir)
        self.libdir = Path(libdir)
        self.db = USDB("USDB.xlsx")
        self.concurrency = concurrency
        self.txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", (self.rawdir / "txt").resolve(), cache)
        self.yt = YTDownloader((self.rawdir / "audio").resolve(), (self.rawdir / "video").resolve(), cache)
        self.cv = CoverDownloader((self.rawdir / "covers").resolve(), cache)

    def download(self):
        executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
        futures = [executor.submit(self.download_song, song) for _, song in self.db.data.iterrows()]
        concurrent.futures.wait(futures)

    def download_sample(self):
        executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
        futures = [executor.submit(self.download_song, song) for _, song in self.db.data.sample(5).iterrows()]
        concurrent.futures.wait(futures)

    def download_song(self, song: pandas.Series):
        try:
            usdb_url = get_usdb_url(song)
            cover_url = get_cover_image_url(song)
            yt_url = get_yt_video_url(song)
            song_name = self.removeDisallowedChars(get_song_name(song))
            artist = self.removeDisallowedChars(get_artist_name(song))
            spotify_uri = get_track_id(song)
            gap = get_gap(song)
            video_gap = get_video_gap(song)
            start = get_start(song)
            end = get_end(song)
            language = get_language(song)
            year = str(get_date(song).year)

            logger.info(f"Downloading: {artist} - {song_name}: {usdb_url}, {cover_url}, {yt_url}")

            if type(usdb_url) == str and usdb_url != " " and usdb_url != "MISSING":
                self.txt.download(usdb_url, artist, song_name, spotify_uri, gap, video_gap, start, end, language, year)

            if type(cover_url) == str and cover_url != " " and cover_url != "MISSING":
                self.cv.download(cover_url, artist, song_name, spotify_uri)

            if type(yt_url) == str and yt_url != " " and yt_url != "MISSING":
                self.yt.download(yt_url, artist, song_name, spotify_uri)

        except Exception as e:
            logger.error(f"{type(e)}: {e} while processing {artist} - {song_name}")

    def build_library(self):
        try:
            for file in glob.glob(str(self.rawdir / "txt" / "*.txt")):
                file = Path(file).stem
                outdir = self.libdir / file
                Path(outdir).mkdir(parents=True, exist_ok=True)

                logger.info(f"Copying {file}")

                if (self.rawdir / "txt" / f"{file}.txt").is_file():
                    shutil.copy2(self.rawdir / "txt" / f"{file}.txt", outdir)
                if (self.rawdir / "audio" / f"{file}.mp3").is_file():
                    shutil.copy2(self.rawdir / "audio" / f"{file}.mp3", outdir)
                if (self.rawdir / "video" / f"{file}.mp4").is_file():
                    shutil.copy2(self.rawdir / "video" / f"{file}.mp4", outdir)
                if (self.rawdir / "covers" / f"{file}.jpg").is_file():
                    shutil.copy2(self.rawdir / "covers" / f"{file}.jpg", outdir)

        except Exception as e:
            logger.error(f"{type(e)}: {e} while copying {file}")