Created
February 19, 2023 21:47
-
-
Save rom1504/59a49533b6ae96cbbe4fe047c752a36b to your computer and use it in GitHub Desktop.
bucket_dedup.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a deduplication method using pyspark. | |
input: table with id and 2 columns that contain float values | |
2 items are considered the same if the float values are equal with a threshold of 0.05 | |
algo: multiply the 2 columns by 1/0.05, resulting in 2 longs. Then use pyspark to perform exact dedup using these 2 columns | |
Pyspark does distributed sort then linear dedup, so this scales to 100B | |
""" | |
from pyspark.sql import SparkSession | |
from pyspark.sql import functions as F | |
spark = SparkSession.builder.config("spark.driver.memory", "16G") .config('spark.sql.shuffle.partitions',300).config("spark.local.dir", "/tmp/spark-tmp2").master("local[64]").appName('spark-stats').getOrCreate() | |
p = "/fsx/mlrichter/dedublication/3billion/chunkall_to_unified_results.csv" | |
df = spark.read.csv(p, header=True) | |
df = df.withColumn("perpl_ontocord/riverbed_kenlm",df["perpl_ontocord/riverbed_kenlm"].cast('float')) | |
df = df.withColumn("perpl_ccnet/wikipedia", df["perpl_ccnet/wikipedia"].cast('float')) | |
df = df.withColumn("b1", F.round(F.col("perpl_ontocord/riverbed_kenlm") / 0.05).cast("long")) | |
df = df.withColumn("b2", F.round(F.col("perpl_ccnet/wikipedia") / 0.05).cast("long")) | |
df = df.drop_duplicates(["b1", "b2"]) | |
df.repartition(100).write.mode("overwrite").parquet("output_parquet2") | |
df = spark.read.parquet("output_parquet2") | |
df.count() | |
""" | |
It is also possible to convert the 2 longs to bytes then concat the byte and use that as a hash, same idea: | |
def perplexity_to_bytes(perplexity, threshold=0.05): | |
bucket = round(perplexity / threshold).to_bytes(5, 'big') | |
return bucket | |
from pyspark.sql.types import BinaryType | |
conv = F.udf(perplexity_to_bytes, BinaryType()) | |
df = df.withColumn("hash", F.concat(conv(F.col("perpl_ontocord/riverbed_kenlm")), conv(F.col("perpl_ccnet/wikipedia")))) | |
df = df.drop_duplicates(["hash"]) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment