Created
June 5, 2024 22:00
-
-
Save enjalot/5a8746e77f022d09e5b31b78277438a4 to your computer and use it in GitHub Desktop.
trying to parallelize embedding on modal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import time | |
import asyncio | |
import subprocess | |
from modal import App, Image, Secret, Volume, build, enter, exit, gpu, method | |
# We first set out configuration variables for our script. | |
## Embedding Containers Configuration | |
# GPU_CONCURRENCY = 100 | |
MODEL_ID = "nomic-ai/nomic-embed-text-v1.5" | |
MODEL_SLUG = MODEL_ID.split("/")[-1] | |
MODEL_DIR = "/model" | |
MODEL_REVISION="main" | |
GPU_CONCURRENCY = 100 | |
# GPU_CONFIG = gpu.A100(size="80GB") | |
GPU_CONFIG = gpu.A100(size="40GB") | |
# GPU_CONFIG = gpu.H100() | |
# BATCH_SIZE = 512 | |
BATCH_SIZE = 64 | |
# BATCH_SIZE = 128 | |
# MAX_TOKENS = 8192 | |
MAX_TOKENS = 2048 | |
## Dataset-Specific Configuration | |
DATASET_READ_VOLUME = Volume.from_name( | |
"embedding-fineweb-edu", create_if_missing=True | |
) | |
EMBEDDING_CHECKPOINT_VOLUME = Volume.from_name( | |
"checkpoint", create_if_missing=True | |
) | |
DATASET_DIR = "/data" | |
# DATASET_SAVE ="fineweb-edu-sample-10BT" | |
DATASET_SAVE ="fineweb-edu-sample-10BT-100k" | |
CHECKPOINT_DIR = "/checkpoint" | |
SAVE_TO_DISK = True | |
## Upload-Specific Configuration | |
# DATASET_HF_UPLOAD_REPO_NAME = "enjalot/fineweb-edu-sample-10BT" | |
DATASET_HF_UPLOAD_REPO_NAME = f"enjalot/{DATASET_SAVE}" | |
UPLOAD_TO_HF = True | |
def download_model_to_image(model_dir, model_name, model_revision): | |
from huggingface_hub import snapshot_download | |
from transformers.utils import move_cache | |
os.makedirs(model_dir, exist_ok=True) | |
snapshot_download( | |
repo_id=model_name, | |
revision=model_revision, | |
local_dir=model_dir, | |
ignore_patterns=["*.pt", "*.bin"], # Using safetensors | |
) | |
move_cache() | |
st_image = ( | |
Image.debian_slim(python_version="3.10") | |
.pip_install( | |
"torch==2.1.2", | |
"numpy==1.26.3", | |
"transformers==4.39.3", | |
"hf-transfer==0.1.6", | |
"huggingface_hub==0.22.2", | |
"einops==0.7.0" | |
) | |
.env({"HF_HUB_ENABLE_HF_TRANSFER": "1"}) | |
.run_function( | |
download_model_to_image, | |
timeout=60 * 20, | |
kwargs={ | |
"model_dir": MODEL_DIR, | |
"model_name": MODEL_ID, | |
"model_revision": MODEL_REVISION, | |
}, | |
secrets=[Secret.from_name("huggingface-secret")], | |
) | |
) | |
with st_image.imports(): | |
import numpy as np | |
import torch | |
from torch.cuda.amp import autocast | |
from transformers import AutoTokenizer, AutoModel | |
app = App( | |
"fineweb-embeddings-st" | |
) | |
@app.cls( | |
gpu=GPU_CONFIG, | |
# cpu=16, | |
concurrency_limit=GPU_CONCURRENCY, | |
timeout=60 * 10, | |
container_idle_timeout=60 * 10, | |
allow_concurrent_inputs=1, | |
image=st_image, | |
) | |
class TransformerModel: | |
@enter() | |
def start_engine(self): | |
# import torch | |
# from transformers import AutoTokenizer, AutoModel | |
self.device = torch.device("cuda") | |
print("🥶 cold starting inference") | |
start = time.monotonic_ns() | |
self.model = AutoModel.from_pretrained(MODEL_ID, trust_remote_code=True, safe_serialization=True)#, rotary_scaling_factor=2 ) | |
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", model_max_length=MAX_TOKENS) | |
self.model.to(self.device) | |
self.model.eval() | |
print(f"CUDA memory allocated: {torch.cuda.memory_allocated() / 1e6} MB") | |
duration_s = (time.monotonic_ns() - start) / 1e9 | |
print(f"🏎️ engine started in {duration_s:.0f}s") | |
@method() | |
def embed(self, inputs): | |
# import numpy as np | |
# import torch | |
tok = self.tokenizer | |
# TODO: better understanding of how this gets called | |
print("inputs", len(inputs)) | |
start = time.monotonic_ns() | |
texts = [x[1] for x in inputs] | |
texts = [t if len(t) <= 8000 else tok.decode(tok.encode(t)[:MAX_TOKENS]) for t in texts] | |
print("truncated in", (time.monotonic_ns() - start) / 1e9) | |
print("texts", len(texts)) | |
# print(f"CUDA memory allocated before encoding: {torch.cuda.memory_allocated() / 1e6} MB") | |
start = time.monotonic_ns() | |
encoded_input = tok(texts, padding=True, truncation=True, return_tensors='pt') | |
print("encoded in", (time.monotonic_ns() - start) / 1e9) | |
start = time.monotonic_ns() | |
# print("moving to device") | |
encoded_input = {key: value.to(self.device) for key, value in encoded_input.items()} | |
# print("moved to device", (time.monotonic_ns() - start) / 1e9) | |
# print("encoded input size", encoded_input['input_ids'].nelement() * encoded_input['input_ids'].element_size() / 1e6, "MB") | |
# print(f"CUDA memory allocated after encoding: {torch.cuda.memory_allocated() / 1e6} MB") | |
start = time.monotonic_ns() | |
# print(torch.cuda.memory_summary(device=None, abbreviated=False)) | |
with torch.no_grad(), autocast(): | |
print(f"CUDA memory allocated before embedding: {torch.cuda.memory_allocated() / 1e6} MB") | |
model_output = self.model(**encoded_input) | |
print(f"CUDA memory allocated after model output: {torch.cuda.memory_allocated() / 1e6} MB") | |
# print(f"model output size: {model_output.nelement() * model_output.element_size() / 1e6} MB") | |
embeddings = model_output[0][:, 0] | |
# print(f"Embedding size: {embeddings.nelement() * embeddings.element_size() / 1e6} MB") | |
# print(f"CUDA memory allocated after embedding: {torch.cuda.memory_allocated() / 1e6} MB") | |
normalized_embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
normalized_embeddings_cpu = normalized_embeddings.cpu().numpy() | |
# Clean up torch memory | |
del encoded_input | |
del model_output | |
del embeddings | |
del normalized_embeddings | |
torch.cuda.empty_cache() | |
duration_s = (time.monotonic_ns() - start) / 1e9 | |
print(f"embedding took {duration_s:.0f}s") | |
return inputs, normalized_embeddings_cpu | |
def generate_chunks_from_dataset(xs, max_tokens: int): | |
""" | |
Generate chunks from a dataset. | |
Args: | |
xs (list): The dataset containing dictionaries with "id", "text" keys. | |
chunk_size (int): The size of each chunk. | |
Yields: | |
tuple: A tuple containing the id and a chunk of text. | |
""" | |
for data in xs: | |
yield (data["id"], "clustering: " + data["text"]) | |
def generate_batches(xs, batch_size): | |
batch = [] | |
for x in xs: | |
batch.append(x) | |
if len(batch) == batch_size: | |
yield batch | |
batch = [] | |
if batch: | |
yield batch | |
def load_dataset_from_disk(): | |
""" | |
Load a dataset from disk and return a subset of the training data. | |
Returns: | |
Dataset: A subset of the training data. | |
""" | |
import time | |
from datasets import load_from_disk | |
start = time.perf_counter() | |
# Load the dataset as a Hugging Face dataset | |
print(f"Loading dataset from {DATASET_DIR}/{DATASET_SAVE}") | |
dataset = load_from_disk(f"{DATASET_DIR}/{DATASET_SAVE}") | |
print(f"Dataset loaded in {time.perf_counter()-start:.2f} seconds") | |
# return dataset["train"] | |
# TODO: have the 100k subset be proper subset | |
return dataset#["train"] | |
def save_dataset_to_intermediate_checkpoint(acc_chunks, embeddings, batch_size): | |
"""Saves the dataset to an intermediate checkpoint. | |
Args: | |
acc_chunks (list): Accumulated chunks | |
embeddings (list): Accumulated embeddings | |
batch_size (int): Batch size | |
""" | |
import pyarrow as pa | |
from datasets import Dataset | |
table = pa.Table.from_arrays( | |
[ | |
pa.array([chunk[0] for chunk in acc_chunks]), # id | |
pa.array([chunk[1] for chunk in acc_chunks]), # text | |
pa.array(embeddings), | |
], | |
names=["id", "text", "embedding"], | |
) | |
path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}" | |
dataset = Dataset(table) | |
dataset.save_to_disk(path_parent_folder) | |
EMBEDDING_CHECKPOINT_VOLUME.commit() | |
print(f"Saved checkpoint at {path_parent_folder}") | |
def upload_result_to_hf(batch_size: int) -> None: | |
""" | |
Uploads the result to the Hugging Face Hub. | |
Args: | |
batch_size (int): The batch size for the model. | |
Returns: | |
None | |
""" | |
import os | |
import time | |
from huggingface_hub import HfApi | |
path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}" | |
api = HfApi(token=os.environ["HUGGINGFACE_TOKEN"]) | |
api.create_repo( | |
repo_id=DATASET_HF_UPLOAD_REPO_NAME, | |
private=False, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
print(f"Pushing to hub {DATASET_HF_UPLOAD_REPO_NAME}") | |
start = time.perf_counter() | |
api.upload_folder( | |
folder_path=path_parent_folder, | |
repo_id=DATASET_HF_UPLOAD_REPO_NAME, | |
repo_type="dataset", | |
multi_commits=True, | |
multi_commits_verbose=True, | |
) | |
end = time.perf_counter() | |
print(f"Uploaded in {end-start}s") | |
@app.function( | |
# cpu=1 | |
image=Image.debian_slim().pip_install( | |
"datasets", "pyarrow", "hf_transfer", "huggingface_hub", "transformers" | |
), | |
volumes={ | |
DATASET_DIR: DATASET_READ_VOLUME, | |
CHECKPOINT_DIR: EMBEDDING_CHECKPOINT_VOLUME, | |
}, | |
timeout=86400, | |
secrets=[Secret.from_name("huggingface-secret")], | |
) | |
def embed_dataset(batch_size: int = 512 * 50): | |
""" | |
Embeds a dataset with the Text Embeddings Inference container. | |
Args: | |
batch_size (int): The batch size to use. Defaults to 512 * 50. | |
Returns: | |
dict: A dictionary containing the benchmark results. | |
""" | |
import datetime | |
import time | |
if UPLOAD_TO_HF and not SAVE_TO_DISK: | |
raise ValueError( | |
"Uploading to HF requires SAVE_TO_DISK to be set to true in case of intermediate failure." | |
) | |
data = load_dataset_from_disk() | |
model = TransformerModel() | |
start = time.perf_counter() | |
print("generating chunks") | |
text_chunks = generate_chunks_from_dataset(data, max_tokens=MAX_TOKENS) | |
print("generated chunks", time.perf_counter() - start) | |
start = time.perf_counter() | |
print("generating batches") | |
batches = generate_batches(text_chunks, batch_size=batch_size) | |
print("generated batches", time.perf_counter() - start) | |
start = time.perf_counter() | |
acc_chunks = [] | |
embeddings = [] | |
print("BATCHES", len(data) / batch_size) | |
i = 0 | |
for resp in model.embed.map( | |
batches, order_outputs=False, return_exceptions=True | |
): | |
if isinstance(resp, Exception): | |
print(f"Exception: {resp}") | |
# continue | |
return | |
batch_chunks, batch_embeddings = resp | |
acc_chunks.extend(batch_chunks) | |
embeddings.extend(batch_embeddings) | |
print("done with batch", i) | |
i+=1 | |
end = time.perf_counter() | |
duration = end - start | |
resp = { | |
"batch_size": batch_size, | |
"n_gpu": GPU_CONCURRENCY, | |
"duration_mins": duration / 60, | |
} | |
if SAVE_TO_DISK: | |
save_dataset_to_intermediate_checkpoint( | |
acc_chunks, embeddings, batch_size | |
) | |
if UPLOAD_TO_HF: | |
upload_result_to_hf(batch_size) | |
return resp | |
@app.local_entrypoint() | |
def full_job(): | |
batch_size = BATCH_SIZE | |
with open("benchmarks.json", "a") as f: | |
benchmark = embed_dataset.remote(batch_size=batch_size) | |
f.write(json.dumps(benchmark, indent=2) + "\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment