Skip to content

Instantly share code, notes, and snippets.

@darkone23
Last active August 21, 2023 08:29
Show Gist options
  • Save darkone23/ce6c1ec5b0281bf96e34 to your computer and use it in GitHub Desktop.
Save darkone23/ce6c1ec5b0281bf96e34 to your computer and use it in GitHub Desktop.
Example generic spark setup that uses avro for schemas and the parquet file format for storage
@namespace("com.example.avro.parquet.spark")
protocol HTTP {
record Header {
string name;
string value;
}
record Request {
string method;
string path;
string query;
array<Header> headers;
bytes body;
}
record Response {
int status;
array<Header> headers;
bytes body;
}
}
// sparquet context can read and write parquet files into RDD of different avro schema types
val sc = new SparquetContext("Example")
val requests: RDD[Request] = sc.parquetFile[Request]("/http/requests.parquet")
val responses: RDD[Response] = requests.map(intoResponse)
responses.saveAsParquetFile("/http/responses.parquet")
package com.example.avro.parquet.spark
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport, AvroReadSupport}
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
import scala.reflect.ClassTag
class SparquetContext(name: String, config: Map[String, String]) {
def this(name: String) = this(name, Map())
var spark: SparkContext = _
val conf = new SparkConf()
.setAppName(name)
.setAll(config)
.set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName)
.set("spark.kryo.registrator", classOf[com.example.avro.parquet.spark.KryoProtocolRegistrator].getName)
spark = new SparkContext(conf)
def parquetFile[T](path: String)(implicit m: ClassTag[T]): RDD[T] = {
val job = Job.getInstance(spark.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[T]])
val file = spark.newAPIHadoopFile(
path,
classOf[ParquetInputFormat[T]],
classOf[Void],
m.runtimeClass.asInstanceOf[Class[T]],
job.getConfiguration)
file.map({ case (void, record) => record })
}
def saveAsParquetFile[T <: IndexedRecord](records: RDD[T], path: String)(implicit m: ClassTag[T]) = {
val keyedRecords: RDD[(Void, T)] = records.map(record => (null, record))
spark.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
val job = Job.getInstance(spark.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
AvroParquetOutputFormat.setSchema(job, m.runtimeClass.newInstance().asInstanceOf[IndexedRecord].getSchema())
keyedRecords.saveAsNewAPIHadoopFile(
path,
classOf[Void],
m.runtimeClass.asInstanceOf[Class[T]],
classOf[ParquetOutputFormat[T]],
job.getConfiguration
)
}
def stop() = spark.stop()
}
@spicoflorin
Copy link

ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) should be
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport[T]])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment