Skip to content
Snippets Groups Projects
Commit 397e035d authored by kjk's avatar kjk
Browse files

added caching

parent 149ca789
Branches
No related tags found
No related merge requests found
import os
import logging
from UltraStarSongFile import UltraStarSongFile
class FileValidator:
def __init__(self):
pass
def validate(self, songfile_object: UltraStarSongFile):
basepath = os.path.abspath(os.path.dirname(songfile_object.path))
txtfile = os.path.basename(songfile_object.path)
if songfile_object.mp3 != "":
audiopath = os.path.join(basepath, songfile_object.mp3)
if not os.path.isfile(audiopath):
logging.error(f"{txtfile} has invalid mp3.")
return False
else:
logging.error(f"{txtfile} has no mp3!")
return False
if songfile_object.video != "":
logging.info(songfile_object.video)
videopath = os.path.join(basepath, songfile_object.video)
logging.info(videopath)
if not os.path.isfile(videopath):
logging.error(f"{txtfile} has invalid videofile.")
return False
else:
logging.info(f"{txtfile} has no videofile.")
if songfile_object.background != "":
backgroundpath = os.path.join(basepath, songfile_object.background)
if not os.path.isfile(backgroundpath):
logging.error(f"{txtfile} has invalid backgroundfile.")
return False
else:
logging.info(f"{txtfile} has no backgroundfile.")
if songfile_object.cover != "":
coverpath = os.path.join(basepath, songfile_object.cover)
if not os.path.isfile(coverpath):
logging.error(f"{txtfile} has invalid coverfile")
return False
else:
logging.info(f"{txtfile} has no coverfile.")
return True
def validate_mp3(self, songfile_object: UltraStarSongFile):
basepath = os.path.abspath(os.path.dirname(songfile_object.path))
txtfile = os.path.basename(songfile_object.path)
if songfile_object.mp3 != "":
audiopath = os.path.join(basepath, songfile_object.mp3)
if not os.path.isfile(audiopath):
logging.error(f"{txtfile} has invalid mp3.")
return False
else:
logging.error(f"{txtfile} has no mp3!")
return False
return True
File deleted
import logging
import uuid
from pathlib import Path
class UltraStarSongFile:
def __init__(self):
# Baisc Information
self.path = ""
self.title = ""
self.artist = ""
# TXT Metadata
self.creator = ""
self.version = ""
self.encoding = ""
# Detailed Song Information
self.edition = ""
self.genre = ""
self.language = ""
self.album = ""
self.year = ""
# Files used by Song
self.cover = ""
self.mp3 = ""
self.background = ""
self.video = ""
# Technical Information about Song
self.bpm = ""
self.length = ""
self.end = ""
self.gap = ""
self.videogap = ""
self.previewstart = ""
# Misc Shit
self.resolution = ""
self.id = ""
# Other
self.start = ""
self.notesgap = ""
self.relative = ""
self.medleystartbeat = ""
self.medleyendbeat = ""
self.calcmedley = ""
self.p1 = ""
self.p2 = ""
# Database Information
self.songid = ""
self.artistid = ""
self.albumid = ""
# Custom Tags
self.duet = False
self.custom_tags = []
# Songdata
self.songdata = [] # list of lines with songdata
def __eq__(self, other):
if isinstance(other, self.__class__):
if self.songid == "" or other.songid == "":
return (self.title == other.title) and (
self.artist == other.artist) and (self.duet == other.duet)
else:
return self.songid == other.songid
def set_attributes(self, parsed_data: dict) -> None:
"""
Setzt die attribute der Klasse
Args:
parsed_data: dict mit den geparseden tags und einem key "songdata"
der eine liste, mit den Text/Tonhöhe Zeilen enthält
"""
k: str
v: str
for k, v in parsed_data.items():
if hasattr(self, k.lower()):
setattr(self, k.lower(), v)
else:
self.custom_tags.append({k, v})
logging.debug("got custom tag: %s", k)
if self.songid == "":
self.songid = str(uuid.uuid4())
def dumps(self) -> str:
out: str = ""
out += f"#TITLE:{self.title}\n"
out += f"#ARTIST:{self.artist}\n"
out += f"#MP3:{self.mp3}\n"
out += f"#BPM:{self.bpm}\n"
out += f"#GAP:{self.gap}\n"
if self.creator:
out += f"#CREATOR:{self.creator}\n"
if self.version:
out += f"#VERSION:{self.version}\n"
if self.encoding:
out += f"#ENCODING:{self.encoding}\n"
if self.edition:
out += f"#EDITION:{self.edition}\n"
if self.genre:
out += f"#GENRE:{self.genre}\n"
if self.language:
out += f"#LANGUAGE:{self.language}\n"
if self.album:
out += f"#ALBUM:{self.album}\n"
if self.year:
out += f"#YEAR:{self.year}\n"
if self.cover:
out += f"#COVER:{self.cover}\n"
if self.background:
out += f"#BACKGROUND:{self.background}\n"
if self.video:
out += f"#VIDEO:{self.video}\n"
if self.length:
out += f"#LENGTH:{self.length}\n"
if self.end:
out += f"#END:{self.end}\n"
if self.videogap:
out += f"#VIDEOGAP:{self.videogap}\n"
if self.previewstart:
out += f"#PREVIEWSTART:{self.previewstart}\n"
if self.resolution:
out += f"#RESOLUTION:{self.resolution}\n"
if self.id:
out += f"#ID:{self.id}\n"
if self.start:
out += f"#START:{self.start}\n"
if self.notesgap:
out += f"#NOTESGAP:{self.notesgap}"
if self.relative:
out += f"#RELATIVE:{self.relative}\n"
if self.medleystartbeat:
out += f"#MEDLEYSTARTBEAT:{self.medleystartbeat}\n"
if self.medleyendbeat:
out += f"#MEDLEYENDBEAT:{self.medleyendbeat}\n"
if self.calcmedley:
out += f"#CALCMEDLEY:{self.calcmedley}\n"
if self.p1:
out += f"#P1:{self.p1}\n"
if self.p2:
out += f"#P2:{self.p2}\n"
if self.songid:
out += f"#SONGID:{self.songid}\n"
if self.artistid:
out += f"#ARTISTID:{self.artistid}\n"
if self.albumid:
out += f"#ALBUMID:{self.albumid}\n"
if self.duet:
out += f"#DUET:{self.duet}\n"
out += "".join(self.songdata)
return out
def dump(self, file: Path):
with open(file, "w", newline="\r\n") as f:
f.write(self.dumps())
import codecs
import logging
import re
import os.path
from UltraStarSongFile import UltraStarSongFile
class UltraStarSongFileParser:
IDENTIFIER_KEYS = {
"TITLE": None,
"ARTIST": None,
"MP3": None,
"BPM": None,
"GAP": None,
"COVER": None,
"BACKGROUND": None,
"VIDEO": None,
"VIDEOGAP": None,
"GENRE": None,
"EDITION": None,
"CREATOR": None,
"LANGUAGE": None,
"YEAR": None,
"START": None,
"END": None,
"RESOLUTION": None,
"NOTESGAP": None,
"RELATIVE": None,
"ENCODING": None,
"PREVIEWSTART": None,
"MEDLEYSTARTBEAT": None,
"MEDLEYENDBEAT": None,
"CALCMEDLEY": None,
"DUETSINGERP1": None,
"DUETSINGERP2": None,
"P1": None,
"P2": None,
}
HEADER_PATTERN = re.compile(r"#([A-Za-z0-9]+):(.*)")
def __init__(self, strict_mode=False):
self.strict_mode = strict_mode
def parse_file(self, file, encoding=None):
song = {}
content = []
self._check_file(file)
if not os.path.isfile(file):
raise FileNotFoundError(f"File {file} not found.")
if not encoding:
encoding = self._find_encoding(file)
if not encoding:
logging.warning("No encoding specified and none found in file. "
"Fallback to latin1.")
encoding = "latin1"
with open(file, mode="r", encoding=encoding) as f:
logging.debug("=> Searching for Encoding")
logging.debug(f"=> Reading file {file}")
for linenum, line in enumerate(f, 1):
logging.log(5, f"=> Reading line: {line}")
if not line:
continue
elif line[0] == "#":
logging.debug(f"=> Parsing header line: {line}")
self._parse_header(line.strip(), song, linenum)
elif line[0] in (":", "-", "*", "F", "P", "B"):
self._parse_content(line, content, linenum)
elif line == "E":
logging.debug("=> Parsed content end marker")
break
else:
raise ValueError(f"Line {linenum} unparsable, prefix "
f"unknown: {line}")
song["path"] = file
song["songdata"] = content
file_obj = UltraStarSongFile()
file_obj.set_attributes(parsed_data=song)
logging.info("Parsed song %s by %s, from \"%s\"", song["title"],
song["artist"], file)
return file_obj
def _find_encoding(self, file):
with open(file, mode="r", errors="ignore", encoding="iso-8859-1") as f:
encoding = None
pattern = re.compile(r"#ENCODING:(.*)")
for line in f:
match = re.fullmatch(pattern, line)
if match:
encoding = match.group(1).lower()
logging.debug("Found encoding identifier in file: %s",
encoding)
if encoding == "auto":
encoding = None
break
if encoding:
try:
# invalid encoding will raise LookupError
codecs.lookup(encoding)
except LookupError:
logging.warning(
"Encoding %s is not known by python. Using fallback.",
encoding)
encoding = None
return encoding
def _check_file(self, filename):
if "license" in filename.lower() or "readme" in filename.lower():
raise ParseIgnore("Filename sounds like a readme or license "
"file, skipping")
with open(filename, mode="rb") as f:
filebytes = f.read()
if not filebytes or re.fullmatch(b"\x00*", filebytes):
raise ParseErrorFileBroken(f"This file is empty, "
f"or only contains null bytes.")
def _parse_header(self, line, song, linenum):
match = re.fullmatch(self.HEADER_PATTERN, line)
if not match:
raise ValueError(f"Line {linenum}: Could not parse line: {line}")
identifier, value = match.group(1, 2)
if identifier.upper() not in self.IDENTIFIER_KEYS:
if self.strict_mode:
raise ValueError(f"Line {linenum}: Identifier {identifier} is "
f"unknown and strict mode is set.")
logging.warning(
"Line %3i: Identifier %s is not known, adding as custom tag.",
linenum, identifier)
tag = identifier
else:
tag = self.IDENTIFIER_KEYS[identifier.upper()]
if not tag:
tag = identifier.lower()
if tag in ("p1", "p2", "duetsingerp1", "duetsingerp2"):
song["duet"] = True
# convert legacy tags
if tag == "duetsingerp1":
tag = "p1"
if tag == "duetsingerp2":
tag = "p2"
if tag in song:
if self.strict_mode:
raise ValueError(
f"Line {linenum} Identifier {identifier} is duplicate.")
logging.error("Line %3i: Identifier %s is duplicate, ignoring.",
linenum, identifier)
else:
logging.debug("Line %3i: Parsed tag %s with value %s", linenum,
tag, value)
song[tag] = value
def _parse_content(self, line, content, linenum):
content.append(line)
class ParseIgnore(Exception):
"""Parse error which indicates that the file should be ignored"""
class ParseErrorFileBroken(Exception):
"""Parse error which indicates the file is broken beyond repair"""
...@@ -2,10 +2,14 @@ import requests ...@@ -2,10 +2,14 @@ import requests
from pathlib import Path from pathlib import Path
class CoverDownloader: class CoverDownloader:
def __init__(self, outdir: str): def __init__(self, outdir: Path, cache: bool):
self.outdir = outdir self.outdir = outdir
self.cache = cache
def download(self, url: str, artist: str, song: str, spotify_uri: str): def download(self, url: str, artist: str, song: str, spotify_uri: str):
if (Path(self.outdir) / f"{artist} - {song}.jpg").is_file() and self.cache:
return
Path(self.outdir).mkdir(parents=True, exist_ok=True) Path(self.outdir).mkdir(parents=True, exist_ok=True)
with open(Path(self.outdir) / f"{artist} - {song} - {spotify_uri}.jpg", "wb") as f: with open(Path(self.outdir) / f"{artist} - {song}.jpg", "wb") as f:
f.write(requests.get(url).content) f.write(requests.get(url).content)
\ No newline at end of file
import datetime
import pandas as pd import pandas as pd
import pandas.core.series import pandas.core.series
...@@ -6,24 +8,54 @@ def load(file: str): ...@@ -6,24 +8,54 @@ def load(file: str):
with open(file, 'rb') as f: with open(file, 'rb') as f:
return pd.read_excel(f) return pd.read_excel(f)
def get_usdb_url(song: pandas.Series): def get_usdb_url(song: pandas.Series):
return song["TXT Link"] return song["TXT Link"]
def get_cover_image_url(song: pandas.Series): def get_cover_image_url(song: pandas.Series):
return song["Cover Link"] return song["Cover Link"]
def get_yt_video_url(song: pandas.Series): def get_yt_video_url(song: pandas.Series):
return song["Video Link"] return song["Video Link"]
def get_artist_name(song: pandas.Series): def get_artist_name(song: pandas.Series):
return song["Artist Name"] return song["Artist Name"]
def get_song_name(song: pandas.Series): def get_song_name(song: pandas.Series):
return song["Track Name"] return song["Track Name"]
def get_track_id(song: pandas.Series): def get_track_id(song: pandas.Series):
return song["Spotify URI"] return song["Spotify URI"]
def get_gap(song: pandas.Series):
return song["GAP"]
def get_video_gap(song: pandas.Series):
return song["VideoGAP"]
def get_start(song: pandas.Series):
return song["Start"]
def get_end(song: pandas.Series):
return song["End"]
def get_language(song: pandas.Series):
return song["Language"]
def get_date(song: pandas.Series):
return datetime.date.fromisoformat(song["Release Date"])
class USDB: class USDB:
def __init__(self, file: str): def __init__(self, file: str):
self.data = load(file) self.data = load(file)
lib.zip 0 → 100644
File added
import concurrent import concurrent
import glob
import logging import logging
import re
import shutil
from pathlib import Path
from txtdownloader import TXTDownloader from txtdownloader import TXTDownloader
from ytdownloader import YTDownloader from ytdownloader import YTDownloader
...@@ -8,32 +12,72 @@ from db import * ...@@ -8,32 +12,72 @@ from db import *
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
db = USDB("USDB.xlsx") class USDBDownloader:
txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", "./out/txt/") def __init__(self, rawdir: str, libdir: str, cache=True):
yt = YTDownloader("./out/audio/", "./out/video") self.rawdir = Path(rawdir)
cv = CoverDownloader("./out/covers/") self.libdir = Path(libdir)
self.db = USDB("USDB.xlsx")
self.txt = TXTDownloader("nv0rb8ma82p37qrvduvch6j3f6", (self.rawdir / "txt").resolve(), cache)
self.yt = YTDownloader((self.rawdir / "audio").resolve(), (self.rawdir / "video").resolve(), cache)
self.cv = CoverDownloader((self.rawdir / "covers").resolve(), cache)
def download_song(song: pandas.Series): def download(self):
usdb_url = get_usdb_url(song) executor = concurrent.futures.ThreadPoolExecutor(24)
cover_url = get_cover_image_url(song) futures = [executor.submit(self.download_song, song) for _, song in self.db.data.iterrows()]
yt_url = get_yt_video_url(song) concurrent.futures.wait(futures)
song_name = get_song_name(song).replace("/", "")
artist = get_artist_name(song).replace("/", "")
spotify_uri = get_track_id(song).replace("/", "")
logging.info(f"Downloading: {artist} - {song_name}") def download_sample(self):
executor = concurrent.futures.ThreadPoolExecutor(24)
futures = [executor.submit(self.download_song, song) for _, song in self.db.data.sample(5).iterrows()]
concurrent.futures.wait(futures)
if type(usdb_url) == str and usdb_url != " ": def download_song(self, song: pandas.Series):
txt.download(usdb_url, artist, song_name, spotify_uri) try:
usdb_url = get_usdb_url(song)
cover_url = get_cover_image_url(song)
yt_url = get_yt_video_url(song)
song_name = get_song_name(song).replace("/", "")
artist = get_artist_name(song).replace("/", "")
spotify_uri = get_track_id(song)
gap = get_gap(song)
video_gap = get_video_gap(song)
start = get_start(song)
end = get_end(song)
language = get_language(song)
year = str(get_date(song).year)
if type(cover_url) == str and cover_url != " ": logging.info(f"Downloading: {artist} - {song_name}: {usdb_url}, {cover_url}, {type(yt_url)}")
cv.download(cover_url, artist, song_name, spotify_uri)
if type(yt_url) == str and yt_url != " ": if type(usdb_url) == str and usdb_url != " " and usdb_url != "MISSING":
yt.download(yt_url, artist, song_name, spotify_uri) self.txt.download(usdb_url, artist, song_name, spotify_uri, gap, video_gap, start, end, language, year)
if type(cover_url) == str and cover_url != " " and cover_url != "MISSING":
self.cv.download(cover_url, artist, song_name, spotify_uri)
executor = concurrent.futures.ThreadPoolExecutor(24) if type(yt_url) == str and yt_url != " " and yt_url != "MISSING":
futures = [executor.submit(download_song, song) for _, song in db.data.iterrows()] self.yt.download(yt_url, artist, song_name, spotify_uri)
concurrent.futures.wait(futures)
except Exception as e:
logging.error(e)
def build_library(self):
for file in glob.glob(str(self.rawdir / "txt" / "*.txt")):
file = Path(file).stem
outdir = self.libdir / file
Path(outdir).mkdir(parents=True, exist_ok=True)
if (self.rawdir / "txt" / f"{file}.txt").is_file():
shutil.copy2(self.rawdir / "txt" / f"{file}.txt", outdir)
if (self.rawdir / "audio" / f"{file}.mp3").is_file():
shutil.copy2(self.rawdir / "audio" / f"{file}.mp3", outdir)
if (self.rawdir / "video" / f"{file}.mp4").is_file():
shutil.copy2(self.rawdir / "video" / f"{file}.mp4", outdir)
if (self.rawdir / "covers" / f"{file}.jpg").is_file():
shutil.copy2(self.rawdir / "covers" / f"{file}.jpg", outdir)
if __name__ == '__main__':
dl = USDBDownloader("./out", "./lib", True)
dl.download_sample()
dl.build_library()
import urllib.parse import urllib.parse
from pathlib import Path from pathlib import Path
import pandas
from UltraStarSongFileParser import UltraStarSongFileParser
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
class TXTDownloader: class TXTDownloader:
def __init__(self, sessid, outdir): def __init__(self, sessid: str, outdir: Path, cache: bool):
self.sessid = sessid self.sessid = sessid
self.outdir = outdir self.outdir = outdir
self.txtparser = UltraStarSongFileParser()
self.cache = cache
def download(self, url: str, artist: str, song: str, spotify_uri: str, gap: str, video_gap: str, start: str, end: str, language: str, year: str):
if (Path(self.outdir) / f"{artist} - {song}.txt").is_file() and self.cache:
return
def download(self, url: str, artist: str, song: str, spotify_uri: str):
url = url.replace("detail", "gettxt") url = url.replace("detail", "gettxt")
with requests.Session() as s: with requests.Session() as s:
...@@ -19,6 +27,41 @@ class TXTDownloader: ...@@ -19,6 +27,41 @@ class TXTDownloader:
txt = soup.find(name="textarea").text txt = soup.find(name="textarea").text
Path(self.outdir).mkdir(parents=True, exist_ok=True) Path(self.outdir).mkdir(parents=True, exist_ok=True)
with open(Path(self.outdir) / f"{artist} - {song} - {spotify_uri}.txt", "w", encoding="utf-8") as f: with open(Path(self.outdir) / f"{artist} - {song}.txt", "w", encoding="utf-8") as f:
f.write(txt) f.write(txt)
self.fix_file(Path(self.outdir) / f"{artist} - {song}.txt", artist, song, spotify_uri, gap, video_gap, start, end, language, year)
def fix_file(self, file: Path, artist: str, song: str, spotify_uri: str, gap: str, video_gap: str, start: str, end: str, language: str, year: str):
txt = self.txtparser.parse_file(str(file), encoding="utf-8")
txt.mp3 = str(file.with_suffix(".mp3").relative_to(file.parent))
txt.video = str(file.with_suffix(".mp4").relative_to(file.parent))
txt.cover = str(file.with_suffix(".jpg").relative_to(file.parent))
if type(gap) == str and gap != " ":
txt.gap = gap
if type(video_gap) == str and video_gap != " ":
txt.videogap = video_gap
if type(start) == str and start != " ":
txt.start = start
if type(end) == str and end != " ":
txt.end = end
if type(language) == str and language != " ":
txt.language = language
if type(artist) == str and artist != " ":
txt.artist = artist
if type(song) == str and song != " ":
txt.title = song
if type(year) == str and year != " ":
txt.year = year
txt.songid = spotify_uri
txt.dump(file)
...@@ -5,14 +5,17 @@ import yt_dlp.postprocessor ...@@ -5,14 +5,17 @@ import yt_dlp.postprocessor
from yt_dlp import YoutubeDL from yt_dlp import YoutubeDL
class YTDownloader: class YTDownloader:
def __init__(self, audiodir, videodir): def __init__(self, audiodir: Path, videodir: Path, cache: bool):
self.audiodir = audiodir self.audiodir = audiodir
self.videodir = videodir self.videodir = videodir
self.cache = cache
def download(self, url: str, artist: str, song: str, spotify_uri: str): def download(self, url: str, artist: str, song: str, spotify_uri: str):
if (Path(self.videodir) / f"{artist} - {song}.mp4").is_file() and (Path(self.audiodir) / f"{artist} - {song}.mp3").is_file() and self.cache:
return
ydl_opts = { ydl_opts = {
'format': 'mp4/best', 'format': 'mp4/best',
'outtmpl': f'{str((Path(self.audiodir) / f"{artist} - {song} - {spotify_uri}").resolve())}.%(ext)s', 'outtmpl': f'{str((Path(self.audiodir) / f"{artist} - {song}").resolve())}.%(ext)s',
# ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
'keepvideo': True, 'keepvideo': True,
'postprocessors': [{ # Extract audio using ffmpeg 'postprocessors': [{ # Extract audio using ffmpeg
...@@ -24,4 +27,4 @@ class YTDownloader: ...@@ -24,4 +27,4 @@ class YTDownloader:
ydl.download(url) ydl.download(url)
Path(self.videodir).mkdir(parents=True, exist_ok=True) Path(self.videodir).mkdir(parents=True, exist_ok=True)
shutil.move(Path(self.audiodir) / f"{artist} - {song} - {spotify_uri}.mp4", Path(self.videodir) / f"{artist} - {song} - {spotify_uri}.mp4") shutil.move(Path(self.audiodir) / f"{artist} - {song}.mp4", Path(self.videodir) / f"{artist} - {song}.mp4")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment