See https://gist.github.com/rom1504/67ada3dedbecc113ae2dbdfd9c642d83 for step by step about spark jars
Last active
August 7, 2023 02:03
-
-
Save rom1504/561aa2c519c8a7892fe36db77a068b88 to your computer and use it in GitHub Desktop.
spark_on_ssh.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pssh.clients import ParallelSSHClient | |
import subprocess | |
import socket | |
import fire | |
import os | |
def get_ips_of_slurm_job(job_id): | |
c = "sinfo -N -n `squeue -j "+str(job_id)+" | tail -1 | awk '{print $8}'` | tail -n +2 | awk '{print $1}'" | |
hosts = subprocess.check_output(c, shell=True).decode("utf8")[:-1].split("\n") | |
ips = [socket.gethostbyname(host) for host in hosts] | |
return ips | |
def run(ips_to_run, command): | |
print(ips_to_run) | |
client = ParallelSSHClient(ips_to_run, timeout=10, pool_size=len(ips_to_run)) | |
output = list(client.run_command(command, stop_on_errors=False)) | |
print([(o.client.host if o.client is not None else "", ("\n".join(o.stdout) if o.stdout else "")) for o in output]) | |
def start_spark_cluster(ips, cpus, mem_in_gb, spark_path, spark_local_dir): | |
master_addr = ips[0] | |
run([master_addr], f'{spark_path}/spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h {master_addr}') | |
c = f'{spark_path}/spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c {cpus} -m {mem_in_gb}g "spark://{master_addr}:7077"' | |
run(ips, "bash -c 'SPARK_WORKER_DIR="+spark_local_dir+"/work SPARK_LOCAL_DIRS="+spark_local_dir+"/local "+c+"'") | |
def stop_spark_cluster(ips): | |
run(ips, f'bash -c "pkill java"') | |
def main(cpus=48, mem_in_gb=256,spark_path=None,job_id=None,command="start", spark_local_dir="/tmp"): | |
if spark_path is None: | |
spark_path = os.getcwd() | |
ips = get_ips_of_slurm_job(job_id) | |
if command == "stop": | |
stop_spark_cluster(ips) | |
elif command == "start": | |
start_spark_cluster(ips, cpus, mem_in_gb, spark_path, spark_local_dir) | |
# python spark_on_ssh.py --cpus=48 --mem_in_gb=256 --job_id=2213 --spark_local_dir="/scratch/spark" --command="start" | |
if __name__ == '__main__': | |
fire.Fire(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment