Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import concurrent
import glob
import logging
import shutil
from pathlib import Path
from txtdownloader import TXTDownloader
from ytdownloader import YTDownloader
from coverdownloader import CoverDownloader
from db import *
class USDBDownloader:
def removeDisallowedChars(self, value: str):
DISALLOWED_CHARS_REGEX = "@$%&\\/:*?\"'<>|~`#^+=\{\}[];."
for c in DISALLOWED_CHARS_REGEX:
value = value.replace(c, '')
return value
def __init__(self, rawdir: str, libdir: str, cache: bool = True, concurrency: int = 1):
self.rawdir = Path(rawdir)
self.libdir = Path(libdir)
self.db = USDB("USDB.xlsx")
self.concurrency = concurrency
self.txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", (self.rawdir / "txt").resolve(), cache)
self.yt = YTDownloader((self.rawdir / "audio").resolve(), (self.rawdir / "video").resolve(), cache)
self.cv = CoverDownloader((self.rawdir / "covers").resolve(), cache)
def download(self):
executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
futures = [executor.submit(self.download_song, song) for _, song in self.db.data.iterrows()]
concurrent.futures.wait(futures)
def download_sample(self):
executor = concurrent.futures.ThreadPoolExecutor(self.concurrency)
futures = [executor.submit(self.download_song, song) for _, song in self.db.data.sample(5).iterrows()]
concurrent.futures.wait(futures)
def download_song(self, song: pandas.Series):
try:
usdb_url = get_usdb_url(song)
cover_url = get_cover_image_url(song)
yt_url = get_yt_video_url(song)
song_name = self.removeDisallowedChars(get_song_name(song))
artist = self.removeDisallowedChars(get_artist_name(song))
spotify_uri = get_track_id(song)
gap = get_gap(song)
video_gap = get_video_gap(song)
start = get_start(song)
end = get_end(song)
language = get_language(song)
year = str(get_date(song).year)
logger.info(f"Downloading: {artist} - {song_name}: {usdb_url}, {cover_url}, {yt_url}")
if type(usdb_url) == str and usdb_url != " " and usdb_url != "MISSING":
self.txt.download(usdb_url, artist, song_name, spotify_uri, gap, video_gap, start, end, language, year)
if type(cover_url) == str and cover_url != " " and cover_url != "MISSING":
self.cv.download(cover_url, artist, song_name, spotify_uri)
if type(yt_url) == str and yt_url != " " and yt_url != "MISSING":
self.yt.download(yt_url, artist, song_name, spotify_uri)
except Exception as e:
logger.error(f"{type(e)}: {e} while processing {artist} - {song_name}")
def build_library(self):
try:
for file in glob.glob(str(self.rawdir / "txt" / "*.txt")):
file = Path(file).stem
outdir = self.libdir / file
Path(outdir).mkdir(parents=True, exist_ok=True)
logger.info(f"Copying {file}")
if (self.rawdir / "txt" / f"{file}.txt").is_file():
shutil.copy2(self.rawdir / "txt" / f"{file}.txt", outdir)
if (self.rawdir / "audio" / f"{file}.mp3").is_file():
shutil.copy2(self.rawdir / "audio" / f"{file}.mp3", outdir)
if (self.rawdir / "video" / f"{file}.mp4").is_file():
shutil.copy2(self.rawdir / "video" / f"{file}.mp4", outdir)
if (self.rawdir / "covers" / f"{file}.jpg").is_file():
shutil.copy2(self.rawdir / "covers" / f"{file}.jpg", outdir)
except Exception as e:
logger.error(f"{type(e)}: {e} while copying {file}")