-
-
Save KShaibani/62d26a8df9aa3df448b5318f64b68118 to your computer and use it in GitHub Desktop.
Youtube video downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A tiny youtube video downloader. | |
usage: yt.py [-h] [--only-audio] [--num_retries NUM_RETRIES] [--output OUTPUT] video_url | |
Built by Abigail Adegbiji on April 7, 2024. | |
Inspried by this blog post: https://tyrrrz.me/blog/reverse-engineering-youtube-revisited | |
Note that the get_video_info function will be subject to change when Youtube changes their api. | |
Also note that ffmpeg and ffmpeg-python are required dependencies. | |
""" | |
import argparse | |
import ffmpeg | |
import os | |
import requests | |
import signal | |
import sys | |
import time | |
def format_size(size): | |
units = ["B", "KB", "MB", "GB", "TB"] | |
index = 0 | |
while size > 1000: | |
size /= 1000 | |
index += 1 | |
return f"{round(size, 2)} {units[index]}" | |
def print_download_progress(bar_length, downloaded, total): | |
if downloaded != 0 and total != 0: | |
percentage = downloaded / total | |
else: | |
percentage = 0 | |
amount = percentage * bar_length | |
progress = ("=" * int(amount)).ljust(bar_length - 1) | |
sys.stdout.write("\033[K") | |
message = f"[{progress}] {format_size(downloaded)}/{format_size(total)}" | |
print(message, end="\r") | |
def download_byte_range(filename, url): | |
chunk_size = 1024 * 100 | |
total = int(requests.head(url).headers["Content-Length"]) | |
amount_read = 0 | |
try: | |
# Continue from where we stopped | |
if os.path.exists(filename): | |
filesize = os.path.getsize(filename) | |
if filesize >= total: | |
print("Already downloaded") | |
return True | |
headers = {"Range": f"bytes={filesize}-"} | |
amount_read = filesize | |
else: | |
headers = {} | |
with requests.get(url, headers=headers, stream=True) as req: | |
req.raise_for_status() | |
mode = "wb" if headers == {} else "ab" | |
with open(filename, mode) as file: | |
for chunk in req.iter_content(chunk_size=chunk_size): | |
file.write(chunk) | |
amount_read += sys.getsizeof(chunk) | |
print_download_progress(30, amount_read, total) | |
return True | |
except requests.exceptions.RequestException as e: | |
print(f"Download error: {e}") | |
return False | |
def download_bytes(filename, url, max_retries): | |
retries = 0 | |
while retries < max_retries: | |
success = download_byte_range(filename, url) | |
print() | |
if not success: | |
print(f"Retrying {retries}/{max_retries}") | |
retries += 1 | |
else: | |
break | |
time.sleep(0.25) | |
if retries == max_retries: | |
print("Max retries exceeded. Stopping") | |
exit() | |
def get_video_info(video_id): | |
key = "AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w" | |
endpoint = f"https://www.youtube.com/youtubei/v1/player?key={key}" | |
payload = { | |
"videoId": video_id, | |
"context": { | |
"client": { | |
"clientName": "ANDROID_TESTSUITE", | |
"clientVersion": "1.9", | |
"androidSdkVersion": 30, | |
"hl": "en", | |
"gl": "US", | |
"utcOffsetMinutes": 0 | |
} | |
} | |
} | |
headers = { | |
"User-Agent": "com.google.android.youtube/17.36.4 (Linux; U; Android 12; GB) gzip" | |
} | |
response = requests.post(endpoint, json=payload, headers=headers) | |
if response.status_code != 200: | |
print("Error getting video info.") | |
exit() | |
return response.json() | |
def get_best_stream(streams, searchingAudio): | |
filtered = [] | |
for stream in streams: | |
mimetype = stream["mimeType"] | |
if searchingAudio and "audio" in mimetype: | |
filtered.append(stream) | |
elif not searchingAudio and "video" in mimetype: | |
filtered.append(stream) | |
# The higher the bitrate the better the stream quality | |
def bitrates(s): return int(s["averageBitrate"]) | |
return max(filtered, key=bitrates) | |
def download_stream(streams, title, is_audio, max_retries): | |
stream = get_best_stream(streams, is_audio) | |
container = stream["mimeType"].split(";")[0][6:] | |
stream_type = "audio" if is_audio else "video" | |
url = stream["url"] | |
file = f"{title} -- {stream_type}.{container}" | |
download_bytes(file, url, max_retries) | |
return file | |
def merge_streams(video_file, audio_file, output_file, only_audio): | |
if not only_audio: | |
vstream = ffmpeg.input(video_file) | |
astream = ffmpeg.input(audio_file) | |
stream = ffmpeg.output(vstream, astream, output_file, loglevel="quiet") | |
ffmpeg.run(stream, overwrite_output=True) | |
os.remove(video_file) | |
os.remove(audio_file) | |
else: | |
astream = ffmpeg.input(audio_file) | |
stream = ffmpeg.output(astream, output_file, loglevel="quiet") | |
ffmpeg.run(stream, overwrite_output=True) | |
os.remove(audio_file) | |
def download_video(video_id, only_audio, max_retries, output_file): | |
info = get_video_info(video_id) | |
title = info["videoDetails"]["title"] | |
print(f"Downloading '{title}'") | |
video_file = "" | |
audio_file = "" | |
streams = info["streamingData"]["adaptiveFormats"] | |
if output_file is None: | |
output_file = f"{title}.{'mp3' if only_audio else 'mp4'}" | |
if not only_audio: | |
print("Downloading video ...") | |
video_file = download_stream(streams, title, False, max_retries) | |
print("Downloading audio ...") | |
audio_file = download_stream(streams, title, True, max_retries) | |
merge_streams(video_file, audio_file, output_file, only_audio) | |
def signal_handler(signal, frame): | |
print("Exiting ...") | |
exit(0) | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal_handler) | |
parser = argparse.ArgumentParser(description="Youtube video downloader") | |
parser.add_argument("video_url", type=str, help="Url to the youtube video") | |
parser.add_argument("--only-audio", action="store_true", | |
help="Only download audio") | |
parser.add_argument("--num_retries", type=int, | |
help="Set the number of retries.") | |
parser.add_argument("--output", type=str, help="The file to download to") | |
args = parser.parse_args() | |
if args.num_retries is None: | |
args.num_retries = 3 | |
id_index = args.video_url.find("v=") | |
if id_index == -1: | |
print("Please supply a valid youtube video url.") | |
exit() | |
video_id = args.video_url[id_index + 2: id_index + 13] | |
download_video(video_id, args.only_audio, args.num_retries, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment