Last active
August 21, 2023 08:29
-
-
Save darkone23/ce6c1ec5b0281bf96e34 to your computer and use it in GitHub Desktop.
Example generic spark setup that uses avro for schemas and the parquet file format for storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@namespace("com.example.avro.parquet.spark") | |
protocol HTTP { | |
record Header { | |
string name; | |
string value; | |
} | |
record Request { | |
string method; | |
string path; | |
string query; | |
array<Header> headers; | |
bytes body; | |
} | |
record Response { | |
int status; | |
array<Header> headers; | |
bytes body; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sparquet context can read and write parquet files into RDD of different avro schema types | |
val sc = new SparquetContext("Example") | |
val requests: RDD[Request] = sc.parquetFile[Request]("/http/requests.parquet") | |
val responses: RDD[Response] = requests.map(intoResponse) | |
responses.saveAsParquetFile("/http/responses.parquet") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.avro.parquet.spark | |
import org.apache.avro.generic.IndexedRecord | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.SparkContext._ | |
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport, AvroReadSupport} | |
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat} | |
import scala.reflect.ClassTag | |
class SparquetContext(name: String, config: Map[String, String]) { | |
def this(name: String) = this(name, Map()) | |
var spark: SparkContext = _ | |
val conf = new SparkConf() | |
.setAppName(name) | |
.setAll(config) | |
.set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName) | |
.set("spark.kryo.registrator", classOf[com.example.avro.parquet.spark.KryoProtocolRegistrator].getName) | |
spark = new SparkContext(conf) | |
def parquetFile[T](path: String)(implicit m: ClassTag[T]): RDD[T] = { | |
val job = Job.getInstance(spark.hadoopConfiguration) | |
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[T]]) | |
val file = spark.newAPIHadoopFile( | |
path, | |
classOf[ParquetInputFormat[T]], | |
classOf[Void], | |
m.runtimeClass.asInstanceOf[Class[T]], | |
job.getConfiguration) | |
file.map({ case (void, record) => record }) | |
} | |
def saveAsParquetFile[T <: IndexedRecord](records: RDD[T], path: String)(implicit m: ClassTag[T]) = { | |
val keyedRecords: RDD[(Void, T)] = records.map(record => (null, record)) | |
spark.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false) | |
val job = Job.getInstance(spark.hadoopConfiguration) | |
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) | |
AvroParquetOutputFormat.setSchema(job, m.runtimeClass.newInstance().asInstanceOf[IndexedRecord].getSchema()) | |
keyedRecords.saveAsNewAPIHadoopFile( | |
path, | |
classOf[Void], | |
m.runtimeClass.asInstanceOf[Class[T]], | |
classOf[ParquetOutputFormat[T]], | |
job.getConfiguration | |
) | |
} | |
def stop() = spark.stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) should be
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport[T]])