{
"source": "...",
"id": "...",
"attributes": {
"compression_ratio_zstd": 0.7
}
}
@cached_or_construct_output(success_suffix="SUCCESS")
def process_file_with_quality_classifier(input_filename: str, output_filename: str, quality_classifier: BaseClassifier):
import datasets
json_list = []
with fsspec.open(input_filename, "rt", compression="gzip") as f_in:
for line in f_in:
json_list.append(json.loads(line))
dataset = datasets.Dataset.from_list(json_list)
dataset = dataset.select_columns(["text", "id", "source"])
# predicted_dataset = dataset.map(lambda batch: quality_classifier(batch), batched=True, batch_size=512)
predicted_dataset = dataset.map(lambda batch: score_compression_ration_zstd(batch), batched=True, batch_size=512)
with fsspec.open(output_filename, "wt", compression="gzip") as f_out:
for row in predicted_dataset:
#res = {"id": row["id"], "source": row["source"], "attributes": row["attributes"]}
res = {"id": row["id"], "source": row["source"], "attributes": row["attributes"]}
json_row = json.dumps(res)
f_out.write(json_row + "\n")