Group MP3s by BPM and distribute into CD-XX folders (max 700MB each) with BPM subfolders inside
This commit is contained in:
74
src/app.py
74
src/app.py
@@ -1,8 +1,13 @@
|
|||||||
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
from mutagen.id3 import ID3
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from collections import defaultdict
|
from mutagen.id3 import ID3
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
|
|
||||||
|
CD_SIZE = 700 * 1024 * 1024 # 700 MB in bytes
|
||||||
|
GROUP_SIZE = 5 # BPM bin size
|
||||||
|
|
||||||
|
# Directories
|
||||||
dir_a = Path(r"C:\Users\Edgar\Desktop\Peak Time BOF (ed1337x)\Already where in MP3")
|
dir_a = Path(r"C:\Users\Edgar\Desktop\Peak Time BOF (ed1337x)\Already where in MP3")
|
||||||
dir_b = Path(r"C:\Users\Edgar\Desktop\Peak Time BOF (ed1337x)\CD")
|
dir_b = Path(r"C:\Users\Edgar\Desktop\Peak Time BOF (ed1337x)\CD")
|
||||||
dir_b.mkdir(parents=True, exist_ok=True)
|
dir_b.mkdir(parents=True, exist_ok=True)
|
||||||
@@ -10,39 +15,52 @@ dir_b.mkdir(parents=True, exist_ok=True)
|
|||||||
def get_bpm(file_path):
|
def get_bpm(file_path):
|
||||||
try:
|
try:
|
||||||
tags = ID3(file_path)
|
tags = ID3(file_path)
|
||||||
bpm_tag = tags.get('TBPM')
|
bpm_tag = tags.get("TBPM")
|
||||||
if bpm_tag:
|
if bpm_tag:
|
||||||
bpm_str = str(bpm_tag.text[0])
|
return int(float(bpm_tag.text[0]))
|
||||||
return int(float(bpm_str))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[!] Error reading BPM from {file_path.name}: {e}")
|
print(f"Skipping {file_path.name} (no BPM): {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Step 1: Scan all files and collect BPMs
|
# Collect tracks
|
||||||
binned_files = defaultdict(list) # bin_index -> list of (file, bpm)
|
all_tracks = []
|
||||||
|
|
||||||
for file in dir_a.glob("*.mp3"):
|
for file in dir_a.glob("*.mp3"):
|
||||||
bpm = get_bpm(file)
|
bpm = get_bpm(file)
|
||||||
if bpm is not None:
|
if bpm is None:
|
||||||
bin_index = bpm // 5
|
continue
|
||||||
binned_files[bin_index].append((file, bpm))
|
size = file.stat().st_size
|
||||||
else:
|
bpm_range = f"{(bpm // GROUP_SIZE) * GROUP_SIZE}-to-{((bpm // GROUP_SIZE) * GROUP_SIZE) + GROUP_SIZE - 1}"
|
||||||
print(f"[-] No BPM found for {file.name}, skipping.")
|
all_tracks.append({"file": file, "size": size, "bpm_range": bpm_range})
|
||||||
|
|
||||||
if not binned_files:
|
# Sort by size descending
|
||||||
print("No BPM data found in any files. Exiting.")
|
all_tracks.sort(key=lambda x: x["size"], reverse=True)
|
||||||
exit(1)
|
|
||||||
|
|
||||||
# Step 2: Process each bin, and use actual min/max BPM in the bin for folder name
|
# Distribute into CD folders with BPM subfolders
|
||||||
for bin_index, file_data in binned_files.items():
|
cd_index = 1
|
||||||
bpms_in_bin = [bpm for _, bpm in file_data]
|
current_cd = []
|
||||||
actual_min = min(bpms_in_bin)
|
current_size = 0
|
||||||
actual_max = max(bpms_in_bin)
|
|
||||||
folder_name = f"{actual_min}-to-{actual_max}"
|
|
||||||
target_folder = dir_b / folder_name
|
|
||||||
target_folder.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
for file, _ in file_data:
|
def write_cd(cd_index, tracks):
|
||||||
shutil.copy(file, target_folder / file.name)
|
cd_folder = dir_b / f"CD-{cd_index:02}"
|
||||||
print(f"[+] Copied {file.name} to {folder_name}")
|
cd_folder.mkdir(parents=True, exist_ok=True)
|
||||||
|
for track in tracks:
|
||||||
|
bpm_subfolder = cd_folder / track["bpm_range"]
|
||||||
|
bpm_subfolder.mkdir(parents=True, exist_ok=True)
|
||||||
|
dest_file = bpm_subfolder / track["file"].name
|
||||||
|
shutil.copy(track["file"], dest_file)
|
||||||
|
total_size = sum(t['size'] for t in tracks)
|
||||||
|
print(f"[WRITE] CD-{cd_index:02} - {len(tracks)} tracks, {total_size / 1024**2:.2f} MB")
|
||||||
|
|
||||||
|
for track in all_tracks:
|
||||||
|
if current_size + track["size"] > CD_SIZE and current_cd:
|
||||||
|
write_cd(cd_index, current_cd)
|
||||||
|
cd_index += 1
|
||||||
|
current_cd = []
|
||||||
|
current_size = 0
|
||||||
|
current_cd.append(track)
|
||||||
|
current_size += track["size"]
|
||||||
|
|
||||||
|
# Final CD
|
||||||
|
if current_cd:
|
||||||
|
write_cd(cd_index, current_cd)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user