Created
May 7, 2022 04:04
-
-
Save alashow/8b6536a98819d5e2c1b4eda5c643ca0a to your computer and use it in GitHub Desktop.
devil is in the details
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
from guessit import guessit | |
from guessit.jsonutils import GuessitEncoder | |
from pprint import pp, pprint | |
from tqdm import tqdm | |
from multiprocessing import cpu_count, Pool | |
import random | |
import logging | |
import sys | |
import re | |
import os | |
logging.basicConfig(level=logging.ERROR) | |
EXTENSIONS_TO_IGNORE = ['nfo-orig', 'srt', 'sub', 'ass', 'jpg'] | |
QUALITY_TYPES = ['2160p', '1080p', '720p', '480p', '360p', '240p'] | |
FILENAME_REGEX=r'(.*) \([0-9]{4}\) \[' | |
FILENAME_TV_REGEX_1=r'(.*)( \([0-9]{4}\))?\/([sS]eason )?([0-9]{1,4})|Specials\/' | |
FILENAME_TV_REGEX_2=r'(.*)\/([sS]eason )?([0-9]{1,4})\/' | |
# BASE_PATH='/home/alashov/uploader/guess-to-ignore/' | |
BASE_PATH='/Users/alashov/Dropbox/docs/other/scripts/media/san-ignores/' | |
GUESSES_PATH=BASE_PATH + 'guesses/' | |
INPUT_LISTS_PATH=BASE_PATH + 'input-lists/' | |
OUTPUT_LISTS_PATH=BASE_PATH + 'output-lists/' | |
def get_quality_type(guess): | |
for key, value in guess.items(): | |
if value in QUALITY_TYPES: | |
return value | |
def chunks(l, n): | |
n = max(1, n) | |
return (l[i:i+n] for i in range(0, len(l), n)) | |
def read_file_to_list(file_name, ignore_extensions=EXTENSIONS_TO_IGNORE): | |
if not os.path.isfile(file_name): | |
raise Exception(f'File {file_name} does not exist') | |
files = [] | |
ignored_count = 0 | |
with open(file_name, 'r') as f: | |
for line in f: | |
file = line.strip() | |
file_extension = file.split('.')[-1] | |
if file_extension not in ignore_extensions: | |
files.append(file) | |
else: | |
ignored_count += 1 | |
print(f'{len(files)} files read from {file_name}, ignored {ignored_count} files') | |
return files | |
def write_guesses_to_file_as_json(guesses: dict, file_name): | |
with open(f'{GUESSES_PATH}{file_name}.json', 'w') as f: | |
json.dump(guesses, f, cls=GuessitEncoder, ensure_ascii=False, indent=2) | |
def read_guesses_to_file_as_json(file_name): | |
if not os.path.isfile(f'{GUESSES_PATH}{file_name}.json'): | |
return {} | |
f = open(f'{GUESSES_PATH}{file_name}.json') | |
data = json.load(f) | |
return data | |
def analyze_tv_file_name(file): | |
logging.info(f'Analyzing tv file name: {file}') | |
try: | |
guess = guessit(file) | |
except Exception as e: | |
logging.error(f'Error processing file: {file}') | |
logging.error(e) | |
return file, None | |
if guess['type'] == 'movie': | |
return file, None | |
try: | |
episode_title = guess['episode_title'] if 'episode_title' in guess else None | |
if 'season' not in guess and episode_title: | |
guess['season'] = episode_title | |
if 'season' in guess and 'episode' in guess: | |
episode = f"s{guess['season']}e{guess['episode']}" | |
elif 'date' in guess: | |
episode = guess['date'] | |
elif 'episode_title' in guess: | |
episode = guess['episode_title'] | |
print('Overrode episode with episode title', episode) | |
elif 'alternative_title' in guess: | |
episode = guess['episode_title'] | |
print('Overrode episode with alternative title', episode) | |
else: | |
print('No episode found: ', file, guess) | |
quality = get_quality_type(guess) | |
filename_regex_matches = re.match(FILENAME_REGEX, file) | |
if filename_regex_matches: | |
title = filename_regex_matches.group(1) | |
if not filename_regex_matches: | |
filename_tv_regex_matches = re.match(FILENAME_TV_REGEX_1, file) | |
if filename_tv_regex_matches: | |
title = filename_tv_regex_matches.group(1) | |
else: | |
filename_tv_regex_matches = re.match(FILENAME_TV_REGEX_2, file) | |
if filename_tv_regex_matches: | |
title = filename_tv_regex_matches.group(1) | |
else: | |
title = guess['title'] | |
if not title: | |
print('No regex match or guess!: ', file) | |
if episode_title and title.isnumeric(): | |
title = episode_title | |
if ', The' in title: | |
title = 'The ' + title.replace(', The', '') | |
if "'" in title: | |
title = title.replace("'", "") | |
key = f"{title}-{episode}-{quality}" | |
except Exception as e: | |
key = file | |
data = {'path': file, 'guess': guess} | |
serialized = json.dumps(data, cls=GuessitEncoder, ensure_ascii=False) | |
return key, serialized | |
def analyze_movie_file_name(file): | |
logging.info(f'Analyzing movie file name: {file}') | |
try: | |
guess = guessit(file) | |
except Exception as e: | |
logging.error(f'Error processing file: {file}') | |
logging.error(e) | |
return file, None | |
try: | |
quality = get_quality_type(guess) or "unknown" | |
year = guess['year'] | |
title = guess['title'] | |
title = title.replace("'", "") | |
key = f"{title}-{year}-{quality}" | |
except: | |
key = file | |
data = {'path': file, 'guess': guess} | |
serialized = json.dumps(data, cls=GuessitEncoder, ensure_ascii=False) | |
return key, serialized | |
def build_guesses_for_file_names(files, filename_analyzer, parallel=True): | |
guesses = {} | |
pool = Pool(cpu_count()) | |
results = [] | |
if parallel: | |
for result in tqdm(pool.imap_unordered(filename_analyzer, files), total=len(files)): | |
results.append(result) | |
else: | |
for file in tqdm(files): | |
results.append(filename_analyzer(file)) | |
for key, data in results: | |
if data: | |
data = json.loads(data) | |
if key in guesses: | |
guesses[key] = guesses[key] + [data] | |
else: | |
guesses[key] = [data] | |
return guesses | |
def analyze_tv_files(analyze_missing=None, analyze_existing=None): | |
# ask before analyzing | |
if analyze_missing == None: | |
analyze_missing = input('Analyze missing tv files? (y/n) ') | |
analyze_missing = analyze_missing.lower() == 'y' | |
if analyze_existing == None: | |
analyze_existing = input('Analyze existing tv files? (y/n) ') | |
analyze_existing = analyze_existing.lower() == 'y' | |
if analyze_missing: | |
tv_missing_files = read_file_to_list(f'{INPUT_LISTS_PATH}tv-missing.txt') | |
print(f'{len(tv_missing_files)} missing tv files, analyzing...') | |
tv_missing_guesses = build_guesses_for_file_names(tv_missing_files, analyze_tv_file_name) | |
write_guesses_to_file_as_json(tv_missing_guesses, 'tv-missing') | |
if analyze_existing: | |
tv_existing_files = read_file_to_list(f'{INPUT_LISTS_PATH}tv-existing.txt') | |
print(f'{len(tv_existing_files)} existing tv files, analyzing...') | |
tv_existing_guesses = build_guesses_for_file_names(tv_existing_files, analyze_tv_file_name) | |
write_guesses_to_file_as_json(tv_existing_guesses, 'tv-existing') | |
def analyze_movie_files(analyze_missing=True, analyze_existing=True): | |
if analyze_missing == None: | |
analyze_missing = input('Analyze missing movie files? (y/n) ') | |
analyze_missing = analyze_missing.lower() == 'y' | |
if analyze_existing == None: | |
analyze_existing = input('Analyze existing movie files? (y/n) ') | |
analyze_existing = analyze_existing.lower() == 'y' | |
if analyze_missing: | |
missing_files = read_file_to_list(f'{INPUT_LISTS_PATH}movies-missing.txt') | |
print(f'{len(missing_files)} missing movie files, analyzing...') | |
missing_guesses = build_guesses_for_file_names(missing_files, analyze_movie_file_name) | |
write_guesses_to_file_as_json(missing_guesses, 'movies-missing') | |
if analyze_existing: | |
existing_files = read_file_to_list(f'{INPUT_LISTS_PATH}movies-existing.txt') | |
print(f'{len(existing_files)} existing movie files, analyzing...') | |
existing_guesses = build_guesses_for_file_names(existing_files, analyze_movie_file_name) | |
write_guesses_to_file_as_json(existing_guesses, 'movies-existing') | |
def find_existing_tv_missing_files(): | |
tv_missing_guesses = read_guesses_to_file_as_json('tv-missing') | |
tv_existing_guesses = read_guesses_to_file_as_json('tv-existing') | |
# print(f'{len(tv_missing_guesses)} missing tv files') | |
# print(f'{len(tv_existing_guesses)} existing tv files') | |
existing_files = [] | |
for key, data in tv_missing_guesses.items(): | |
if key in tv_existing_guesses: | |
if isinstance(data, list): | |
# print(f'{key} is in existing {tv_existing_guesses[key][0]["path"]}') | |
for item in data: | |
existing_files.append(item['path']) | |
else: | |
# print(f'{data["path"]} is in existing {tv_existing_guesses[key][0]["path"]}') | |
existing_files.append(data['path']) | |
# print("------") | |
print(f'Found {len(existing_files)} existing tv files') | |
with open(f'{OUTPUT_LISTS_PATH}tv-auto-ignores.txt', 'w') as ignore_file: | |
with open(f'{OUTPUT_LISTS_PATH}tv-auto-ignores-latest.txt', 'a') as latest_ignore_file: | |
for x in existing_files: | |
ignore_file.write(x + '\n') | |
latest_ignore_file.write(x + '\n') | |
return existing_files | |
def find_existing_movie_missing_files(): | |
missing_guesses = read_guesses_to_file_as_json('movies-missing') | |
existing_guesses = read_guesses_to_file_as_json('movies-existing') | |
# print(f'{len(tv_missing_guesses)} missing movie files') | |
# print(f'{len(tv_existing_guesses)} existing movie files') | |
existing_files = [] | |
for key, data in missing_guesses.items(): | |
if key in existing_guesses: | |
for item in data: | |
existing_files.append(item['path']) | |
print(f'Found {len(existing_files)} existing movie files to ignore, writing to file...') | |
with open(f'{OUTPUT_LISTS_PATH}movies-auto-ignores.txt', 'w') as ignore_file: | |
with open(f'{OUTPUT_LISTS_PATH}movies-auto-ignores-latest.txt', 'a') as latest_ignore_file: | |
for x in existing_files: | |
ignore_file.write(x + '\n') | |
latest_ignore_file.write(x + '\n') | |
return existing_files | |
def find_new_shows_to_ignore(file_name, min_episode_count=8): | |
files = read_file_to_list(file_name, ignore_extensions=[]) | |
show_files = {} | |
show_file_counts = {} | |
for file in files: | |
show_name = file.split('/')[0] | |
if show_name in show_files: | |
show_files[show_name] = show_files[show_name] + [file] | |
show_file_counts[show_name] = show_file_counts[show_name] + 1 | |
else: | |
show_files[show_name] = [file] | |
show_file_counts[show_name] = 1 | |
for show_name, files_count in sorted(show_file_counts.items(), key=lambda item: item[1]): | |
if files_count >= min_episode_count: | |
for file in show_files[show_name]: | |
print(file) | |
# print(f'{show_name} has {files_count} episodes, ignoring...') | |
if __name__ == '__main__': | |
arguments = sys.argv[2:] | |
command = sys.argv[1] if (len(sys.argv) > 1) else None | |
if command == 'interactive': | |
analyze_tv = input('Analyze tv files? (y/n) ') | |
analyze_tv = analyze_tv.lower() == 'y' | |
if analyze_tv: | |
analyze_tv_files() | |
analyze_movies = input('Analyze movie files? (y/n) ') | |
analyze_movies = analyze_movies.lower() == 'y' | |
if analyze_movies: | |
analyze_movie_files() | |
elif command == 'analyze-missing-tv': | |
analyze_tv_files(analyze_missing=True, analyze_existing=False) | |
elif command == 'analyze-existing-tv': | |
analyze_tv_files(analyze_missing=False, analyze_existing=True) | |
elif command == 'analyze-existing-movies': | |
analyze_movie_files(analyze_missing=False, analyze_existing=True) | |
elif command == 'analyze-find-missing-tv': | |
analyze_tv_files(analyze_missing=True, analyze_existing=False) | |
find_existing_tv_missing_files() | |
elif command == 'analyze-find-missing-movies': | |
analyze_movie_files(analyze_missing=True, analyze_existing=False) | |
find_existing_movie_missing_files() | |
elif command == 'analyze-find-missing-existing-movies': | |
analyze_movie_files(analyze_missing=True, analyze_existing=True) | |
find_existing_movie_missing_files() | |
elif command == 'find-missing-movies': | |
find_existing_movie_missing_files() | |
elif command == 'find-missing-tv': | |
find_existing_tv_missing_files() | |
elif command == 'find_new_shows_to_ignore': | |
find_new_shows_to_ignore(arguments[0]) | |
else: | |
print("Nothing to do") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment