Last active
April 8, 2018 01:35
-
-
Save roddds/655c1e64b19ed7b0f1e1f23c6c2b68b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import asyncio | |
import os | |
import re | |
from collections import namedtuple | |
from bs4 import BeautifulSoup | |
import requests | |
''' | |
Download all episodes from Destroy All Software during its free week. | |
Requires | |
Python 3.5+ | |
requests (pip install requests) | |
BeautifulSoup (pip install bs4) | |
axel (sudo apt install axel) | |
''' | |
CATALOG_URL = 'https://www.destroyallsoftware.com/screencasts/catalog' | |
EPISODE_PREFIX = 'https://www.destroyallsoftware.com' | |
Season = namedtuple('Season', ['name', 'episodes']) | |
Episode = namedtuple('Episode', ['url', 'title', 'subtitle', 'season', 'ep_number', 'video_url']) | |
def get_soup(): | |
html = requests.get(CATALOG_URL) | |
return BeautifulSoup(html.content, 'html.parser') | |
def get_seasons(soup): | |
return soup.select('.container.season') | |
def get_season_name(season): | |
return season.select_one('h1 > a').attrs['name'] | |
def get_video_url(ep): | |
url = f'{EPISODE_PREFIX}{ep.url}' | |
ep_soup = BeautifulSoup(requests.get(url).content, 'html.parser') | |
script = ep_soup.select('script')[1].text | |
hd_url = re.findall(r'(http.+?)\"', script)[1] | |
return hd_url | |
def get_episode_number(video_url): | |
return re.findall(r'das-(\d+)', video_url)[0] | |
def get_full_episode_data(episode, season_name): | |
title = episode.select_one('.title').text | |
print(f'Getting details for "{title}"') | |
ep = Episode( | |
url=episode.find('a').attrs['href'], | |
title=title, | |
subtitle=episode.select_one('.subtitle').text, | |
season=season_name, | |
ep_number=None, | |
video_url=None, | |
) | |
video_url = get_video_url(ep) | |
ep_number = get_episode_number(video_url) | |
ep = ep._replace(video_url=video_url) | |
ep = ep._replace(ep_number=ep_number) | |
return ep | |
def parse_season(season): | |
episode_divs = season.select('.episode') | |
season_name = get_season_name(season) | |
return Season( | |
name=season_name, | |
episodes=[get_full_episode_data(ep, season_name) for ep in episode_divs] | |
) | |
def get_episode_path(ep): | |
filename = f'[{ep.ep_number}] {ep.subtitle} - {ep.title}' | |
filename = ''.join([c for c in filename if c not in '/']) | |
path = f'{ep.season}/{filename}.mp4' | |
return "".join( | |
[ | |
c for c in path | |
if c.isalpha() | |
or c.isdigit() | |
or c in '/-[](). ' | |
] | |
).strip() | |
def download_url(url, path): | |
directory, filename = os.path.split(path) | |
os.makedirs(directory, exist_ok=True) | |
response = requests.get(url, stream=True) | |
with open(path, "wb") as handle: | |
for data in tqdm(response.iter_content()): | |
handle.write(data) | |
def download_with_axel(url, path): | |
cmd = ['axel', url, '-q', '-o', path] | |
directory, _ = os.path.split(path) | |
os.makedirs(directory, exist_ok=True) | |
p = subprocess.Popen(cmd) | |
try: | |
p.wait() | |
print(f'Finished {path}') | |
except KeyboardInterrupt: | |
try: | |
p.kill() | |
except OSError: | |
pass | |
async def download_season(season): | |
loop = asyncio.get_event_loop() | |
futures = [] | |
for episode in season.episodes: | |
url = episode.video_url | |
path = get_episode_path(episode) | |
print(f'Downloading {path}') | |
futures.append( | |
loop.run_in_executor( | |
None, | |
download_with_axel, | |
url, | |
path, | |
) | |
) | |
for response in await asyncio.gather(*futures): | |
pass | |
def main(): | |
print('Getting seasons...') | |
soup = get_soup() | |
loop = asyncio.get_event_loop() | |
seasons_soups = get_seasons(soup) | |
seasons = [] | |
# add season names here if your download | |
# stopped halfway through for some reason | |
seasons_to_skip = [] | |
print('Fetching episode data...') | |
for season in seasons_soups: | |
season_tuple = parse_season(season) | |
season_name = season_tuple.name | |
if season_name in seasons_to_skip: | |
print(f'Skipping {season_name}') | |
continue | |
print(f'Downloading {season_name}') | |
loop.run_until_complete(download_season(season_tuple)) | |
print(f'Finished downloading {season_name}') | |
print('Done') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment