Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Last active December 10, 2024 17:33
Show Gist options
  • Save vinothchandar/5544a92e616094c049f58c152faf0a53 to your computer and use it in GitHub Desktop.
Save vinothchandar/5544a92e616094c049f58c152faf0a53 to your computer and use it in GitHub Desktop.
Spark SQL Plans on Amazon Reviews Dataset

Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

scala> spark.sql("select count(*) from amazon_reviews").show
+---------+
| count(1)|
+---------+
|160796570|
+---------+


scala> spark.sql("select count(*) from amazon_reviews where review_date > '2007' and review_date < '2009'").show
+--------+
|count(1)|
+--------+
| 4683044|
+--------+


scala>

Query with no filter

select sum(total_votes), product_category from amazon_reviews group by product_category 

takes 15 seconds.

== Parsed Logical Plan ==
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- SubqueryAlias `amazon_reviews`
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Analyzed Logical Plan ==
sum(total_votes): bigint, product_category: string
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- SubqueryAlias `amazon_reviews`
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Optimized Logical Plan ==
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- Project [total_votes#325, product_category#332]
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Physical Plan ==
CollectLimit 1001
+- *(2) HashAggregate(keys=[product_category#332], functions=[sum(cast(total_votes#325 as bigint))], output=[sum(total_votes)#434L, product_category#332])
   +- Exchange hashpartitioning(product_category#332, 200)
      +- *(1) HashAggregate(keys=[product_category#332], functions=[partial_sum(cast(total_votes#325 as bigint))], output=[product_category#332, sum#440L])
         +- *(1) FileScan parquet [total_votes#325,product_category#332] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Volumes/HUDIDATA/input-data/amazon-reviews], PartitionCount: 43, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<total_votes:int>

+- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

select sum(total_votes), product_category from amazon_reviews where review_date > '2007' and review_date < '2009' group by product_category 

takes 17 seconds

image

image

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

Next, lets sort by review_date and ask the same question.. (no partitions)

takes 10 seconds.

val outputPath = "file:///Volumes/HUDIDATA/input-data/amazon-reviews-sorted-by-date"
df.sort("review_date").write.mode("overwrite").parquet(outputPath)
val df2 = spark.read.parquet(outputPath)
df2.registerTempTable("amazon_reviews_sorted_by_date")

select sum(total_votes), product_category from amazon_reviews_sorted_by_date where review_date > '2007' and review_date < '2009' group by product_category 

image

image

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

Next, lets sort by product_category, review_date and ask the same question.. (no partitions) . Let's see if the pruning efficiency is there, even though we are primary sorting by product_category. This should pack review_dates close to each other.

val outputPath = "file:///Volumes/HUDIDATA/input-data/amazon-reviews-sorted-by-category-date"
df.sort("product_category", "review_date").write.mode("overwrite").parquet(outputPath)
val df2 = spark.read.parquet(outputPath)
df2.registerTempTable("amazon_reviews_sorted_by_category_date")

select sum(total_votes), product_category from amazon_reviews_sorted_by_category_date where review_date > '2007' and review_date < '2009' group by product_category 

image

image

@vinothchandar
Copy link
Author

Conclusion : We already layout data by arrival order. i.e ts (time) and it's saving a lot of compute for temporal queries already.

Further clustering by a secondary field, like product_category, city_id will yield good gains.

@FelixKJose
Copy link

@vinothchandar great. When you say clustering by secondary fields - does that mean sort(ts, product_category, city_id)? How about Z-Ordering, does HUDI plans to have anything like Z-Ordering?

@vinothchandar
Copy link
Author

Clustering is similar to Z-Ordering. You can see the RFC here.

https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+speed+and+query+performance

Please keep discussion on the mailing list, since everyone else can chime in as well!

@vinothchandar
Copy link
Author

vinothchandar commented Dec 10, 2024

Creating tables from input data

 spark-shell \
  --jars  $JAR_LOCATION --executor-memory 16g --driver-memory 16g \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
 val df = spark.read.format("parquet").load("file:///.../amazon-reviews-1000-parts")
df.write.format("hudi").option("hoodie.datasource.write.partitionpath.field","product_category").option("hoodie.table.name", "amzn_reviews").save("file:////.../amazon_reviews_hudi_ds_v100")

@vinothchandar
Copy link
Author

vinothchandar commented Dec 10, 2024

1.0 - query using index on review_date

spark-sql (default)> create table hudi_table_external using hudi location 'file:///Volumes/HUDIDATA/output-data/amazon_reviews_hudi_ds_v100';

spark-sql (default)> describe table hudi_table_external;
_hoodie_commit_time 	string
_hoodie_commit_seqno	string
_hoodie_record_key  	string
_hoodie_partition_path	string
_hoodie_file_name   	string
marketplace         	string
customer_id         	string
review_id           	string
product_id          	string
product_parent      	string
product_title       	string
star_rating         	int
helpful_votes       	int
total_votes         	int
vine                	string
verified_purchase   	string
review_headline     	string
review_body         	string
review_date         	date
year                	int
product_category    	string
# Partition Information
# col_name          	data_type           	comment
product_category    	string


-- verify partition pruning works 
spark-sql (default)> select count(*) as cnt from hudi_table_external where product_category = "Books";
20726160
Time taken: 12.774 seconds, Fetched 1 row(s)
spark-sql (default)> select count(*) as cnt from hudi_table_external where product_category = "Books" or product_category = "Home";
26954727
Time taken: 16.142 seconds, Fetched 1 row(s)
spark-sql (default)> select count(*) as cnt from hudi_table_external where product_category = "Home";
6228567
Time taken: 3.918 seconds, Fetched 1 row(s)
spark-sql (default)>

Without index

spark-sql (default)> select count(*) as cnt, customer_id from hudi_table_external group by customer_id order by cnt desc limit 100;

3155	53075795
3152	52042479
3090	19380211
3089	32038204
3072	52048190
3052	36008820
3046	50027179
3043	37337835
3019	52228204
2987	33631101
2978	36952575
2974	17407953
2943	49939297
2923	52562189
2914	52500542
2899	45473710
2897	53084107
2884	52824002
2873	52215985
2867	25527589
2829	49447323

spark-sql (default)> select count(*) as cnt, customer_id from hudi_table_external where customer_id in ("36952575","52500542","49447323") group by customer_id;
24/12/10 06:58:14 WARN HoodieFileIndex: Data skipping requires Metadata Table and at least one of the indices to be enabled! (isMetadataTableEnabled = true, isColumnStatsIndexEnabled = false, isRecordIndexApplicable = false, isExpressionIndexEnabled = false, isBucketIndexEnable = false, isPartitionStatsIndexEnabled = false), isBloomFiltersIndexEnabled = false)
2829	49447323
2914	52500542
2978	36952575
Time taken: 151.666 seconds, Fetched 3 row(s)

image (11)

With index

spark-sql (default)> select count(*) as cnt, customer_id from hudi_table_external where customer_id in ("36952575","52500542","49447323") group by customer_id;
2829	49447323
2914	52500542
2978	36952575
Time taken: 152.59 seconds, Fetched 3 row(s)
spark-sql (default)>

image (10)

@vinothchandar
Copy link
Author

image (11)
image (10)

@vinothchandar
Copy link
Author

look up of a single customer_id

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment