Skip to content
Snippets Groups Projects
main.py 4.43 KiB
import concurrent
import glob
import logging
import re
import shutil
from pathlib import Path

from txtdownloader import TXTDownloader
from ytdownloader import YTDownloader
from coverdownloader import CoverDownloader
from db import *

logger = logging.getLogger("usdbdl")
formatter = logging.Formatter(
    '%(asctime)s | %(name)s | %(threadName)s |  %(levelname)s: %(message)s')
logger.setLevel(logging.DEBUG)
logger.propagate = False

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(formatter)

logFilePath = "error.log"
file_handler = logging.FileHandler(logFilePath)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.ERROR)

logger.addHandler(file_handler)
logger.addHandler(stream_handler)

class USDBDownloader:

    def removeDisallowedChars(self, value: str):
        DISALLOWED_CHARS_REGEX = "@$%&\\/:*?\"'<>|~`#^+=\{\}[];."
        for c in DISALLOWED_CHARS_REGEX:
            value = value.replace(c, '')
        
        return value

    def __init__(self, rawdir: str, libdir: str, cache: bool = True, concurrency: int = 1):
        self.rawdir = Path(rawdir)
        self.libdir = Path(libdir)
        self.db = USDB("USDB.xlsx")
        self.concurrency = concurrency
        self.txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", (self.rawdir / "txt").resolve(), cache)
        self.yt = YTDownloader((self.rawdir / "audio").resolve(), (self.rawdir / "video").resolve(), cache)
        self.cv = CoverDownloader((self.rawdir / "covers").resolve(), cache)

    def download(self):
        executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
        futures = [executor.submit(self.download_song, song) for _, song in self.db.data.iterrows()]
        concurrent.futures.wait(futures)

    def download_sample(self):
        executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
        futures = [executor.submit(self.download_song, song) for _, song in self.db.data.sample(5).iterrows()]
        concurrent.futures.wait(futures)

    def download_song(self, song: pandas.Series):
        try:
            usdb_url = get_usdb_url(song)
            cover_url = get_cover_image_url(song)
            yt_url = get_yt_video_url(song)
            song_name = self.removeDisallowedChars(str(get_song_name(song)))
            artist = self.removeDisallowedChars(str(get_artist_name(song)))
            spotify_uri = get_track_id(song)
            gap = get_gap(song)
            video_gap = get_video_gap(song)
            start = get_start(song)
            end = get_end(song)
            language = get_language(song)
            year = str(get_date(song).year)

            logger.info(f"Downloading: {artist} - {song_name}: {usdb_url}, {cover_url}, {yt_url}")

            if type(usdb_url) == str and usdb_url != " " and usdb_url != "MISSING":
                self.txt.download(usdb_url, artist, song_name, spotify_uri, gap, video_gap, start, end, language, year)

            if type(cover_url) == str and cover_url != " " and cover_url != "MISSING":
                self.cv.download(cover_url, artist, song_name, spotify_uri)

            if type(yt_url) == str and yt_url != " " and yt_url != "MISSING":
                self.yt.download(yt_url, artist, song_name, spotify_uri)

        except Exception as e:
            logger.error(f"{type(e)}: {e} while processing {artist} - {song_name}")

    def build_library(self):
        try:
            for file in glob.glob(str(self.rawdir / "txt" / "*.txt")):
                file = Path(file).stem
                outdir = self.libdir / file
                Path(outdir).mkdir(parents=True, exist_ok=True)

                logger.info(f"Copying {file}")

                if (self.rawdir / "txt" / f"{file}.txt").is_file():
                    shutil.copy2(self.rawdir / "txt" / f"{file}.txt", outdir)
                if (self.rawdir / "audio" / f"{file}.mp3").is_file():
                    shutil.copy2(self.rawdir / "audio" / f"{file}.mp3", outdir)
                if (self.rawdir / "video" / f"{file}.mp4").is_file():
                    shutil.copy2(self.rawdir / "video" / f"{file}.mp4", outdir)
                if (self.rawdir / "covers" / f"{file}.jpg").is_file():
                    shutil.copy2(self.rawdir / "covers" / f"{file}.jpg", outdir)

        except Exception as e:
            logger.error(f"{type(e)}: {e} while copying {file}")


if __name__ == '__main__':
    dl = USDBDownloader("./out", "./lib", True, 24)
    dl.download()
    dl.build_library()