Created
October 14, 2021 20:00
-
-
Save skchronicles/afbdfb8b41317f1a78d1eaac618a779a to your computer and use it in GitHub Desktop.
Real world parallel processing example using Ray
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""md5.py: calculates md5s of multiple files in parallel. | |
The md5 calculation is memory safe. It reads in a file | |
in blocks of 64 KiB. | |
USAGE: | |
python3 md5.py file1.txt file2.csv 8 | |
The last positional argument is number of concurrent | |
MD5 processes to spawn using ray. | |
""" | |
# Python standard library | |
from __future__ import print_function | |
import os, sys, hashlib | |
# 3rd party imports | |
import ray | |
@ray.remote | |
def md5sum(filename, first_block_only = False, blocksize = 65536): | |
"""Gets md5checksum of a file in memory-safe manner. | |
The file is read in blocks/chunks defined by the blocksize parameter. This is | |
a safer option to reading the entire file into memory if the file is very large. | |
@param filename <str>: | |
Input file on local filesystem to find md5 checksum | |
@param first_block_only <bool>: | |
Calculate md5 checksum of the first block/chunk only | |
@param blocksize <int>: | |
Blocksize of reading N chunks of data to reduce memory profile | |
@return (filename, hasher.hexdigest()) <tuple>: | |
filename and MD5 checksum of the file's contents | |
""" | |
hasher = hashlib.md5() | |
with open(filename, 'rb') as fh: | |
buf = fh.read(blocksize) | |
if first_block_only: | |
# Calculate MD5 of first block or chunck of file. | |
# This is a useful heuristic for when potentially | |
# calculating an MD5 checksum of thousand or | |
# millions of file. | |
hasher.update(buf) | |
return (filename, hasher.hexdigest()) | |
while len(buf) > 0: | |
# Calculate MD5 checksum of entire file | |
hasher.update(buf) | |
buf = fh.read(blocksize) | |
return (filename, hasher.hexdigest()) | |
if __name__ == '__main__': | |
# List of files to checksum | |
files = sys.argv[1:-1] | |
# Number of concurrent tasks | |
# or remote workers. | |
try: threads = int(sys.argv[-1]) | |
except ValueError: threads = 4 | |
# Initialize a ray cluster | |
# with X remote workers | |
ray.init(num_cpus = threads) | |
# Run md5sum function in | |
# parallel with the .remote() | |
# method. This methods yields | |
# a future or ObjectRef which | |
# can be fetched later with | |
# ray.get(). As files are | |
# processed, remote workers | |
# freed from a job queue. | |
workers = [md5sum.remote(f) for f in files] | |
# The ray.get() method is | |
# blocking waits until all | |
# tasks have completed. The | |
# order of the inputs and | |
# the function results are | |
# preserved. Print out results | |
# to standard output. | |
for worker in workers: | |
try: | |
result = ray.get(worker) | |
file, md5 = result | |
except Exception as e: | |
print('An error occured:\n{}'.format(e), file=sys.stderr) | |
continue | |
print("{}\t{}".format(file, md5)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment