Last active
November 10, 2021 16:16
-
-
Save hadoopsters/8577cf5c4ce565d1af06a6c713779589 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tv.spotx.scala.monitoring.listeners | |
import org.apache.spark.streaming.kafka010.OffsetRange | |
import org.apache.spark.streaming.scheduler._ | |
import org.joda.time.DateTime | |
import tv.spotx.scala.dbutils.{ConnectionPool, InfluxDBWriter, MySQLConnection} | |
/** | |
* :: SpotXSparkStreamingListener :: | |
* A simple StreamingListener that logs summary statistics across Spark Streaming batches; inherits from DeveloperAPI. | |
* | |
* @param influxHost Hostname of the Influx service | |
* @param influxDB Database name in Influx to write to | |
* @param influxMeasurement Measurement name in Influx to write to | |
* @param mySQLHost Hostname of the MySQL service | |
* @param mySQLDB Database name in MySQL to write to | |
* @param mySQLTable Table name in MySQL to write to | |
* @param mySQLUser Username for authentication in MySQL | |
* @param mySQLPwd Password for authentication in MySQL | |
* @param mySQLConsumer Unique name for tracking offsets across streaming apps | |
*/ | |
class SpotXSparkStreamingListener (influxHost: String, | |
influxDB: String, | |
influxMeasurement: String, | |
mySQLHost: String, | |
mySQLDB: String, | |
mySQLTable: String, | |
mySQLConsumer: String, | |
mySQLUser: String, | |
mySQLPwd: String) extends StreamingListener { | |
// Establish Database Connections | |
@transient lazy val influx = InfluxDBWriter.create(influxHost) | |
@transient lazy val mysql = MySQLConnection(host = mySQLHost, table = mySQLTable, username = mySQLUser, password = mySQLPwd, database = mySQLDB) | |
@transient lazy val mySQLConnectionPool = ConnectionPool(mysql.toString).getConnection | |
// ==================== | |
// onBatch_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming batch completes. | |
* | |
* @param batchCompleted Class having information on the completed batch | |
*/ | |
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { | |
// write performance metrics to influx | |
writeBatchSchedulingStatsToInflux(batchCompleted) | |
// write offsets (state) to mysql | |
writeBatchOffsetsAndCounts(batchCompleted) | |
} | |
/** | |
* This method executes when a Spark Streaming batch is submitted to the scheduler for execution. | |
* | |
* @param batchSubmitted Class having information on the completed batch | |
*/ | |
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming batch starts. | |
* | |
* @param batchStarted Class having information on the completed batch | |
*/ | |
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { | |
} | |
// ==================== | |
// onReceiver_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming receiver has started. | |
* | |
* @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver encounters an error. | |
* | |
* @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver stops working. | |
* | |
* @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { | |
} | |
// ======================================================================= | |
// Convenience Methods (for use in onBatch_ methods) | |
// ======================================================================= | |
/** | |
* Pulls, parses, and logs the key performance metrics of the Streaming app and logs them to Influx. | |
* Processing Time: How many seconds needed to complete this batch (i.e. duration). | |
* Scheduling Delay: How many seconds the start time of this bach was delayed. | |
* Num Records: The total number of input records from a live stream consumed this batch. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchSchedulingStatsToInflux(batch: StreamingListenerBatchCompleted): Unit = { | |
// Store the processing time for this batch in seconds | |
val processingTime = if (batch.batchInfo.processingDelay.isDefined) { | |
batch.batchInfo.processingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the scheduling delay for this batch in seconds | |
val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) { | |
batch.batchInfo.schedulingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the total record count for this batch | |
val numRecords = batch.batchInfo.numRecords | |
// Log all three (3) metrics to Influx | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("processingTime", processingTime))) | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("schedulingDelay", schedulingDelay))) | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords", numRecords))) | |
} | |
/** | |
* A combination method that will handle both influx writes and MySQL offsets. | |
* This is effectively a convenience method of writeBatchOffsetsToMySQL + writeBatchCountsToInflux. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchOffsetsAndCounts(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets for this topic to mysql | |
writeTopicOffsetsToMySQL(topic) | |
// write record count for this topic this batch | |
writeTopicCountToInflux(topic) | |
}) | |
} | |
/** | |
* A convenience method for writing offsets to MySQL for each topic being consumed. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchOffsetsToMySQL(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets to mysql | |
writeTopicOffsetsToMySQL(topic) | |
}) | |
} | |
/** | |
* A convenience method for writing topic counts to Influx for each topic being consumed. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchCountsToInflux(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets to mysql | |
writeTopicCountToInflux(topic) | |
}) | |
} | |
// ======================================================================= | |
// Topic Methods (designed for use inside of convenience methods) | |
// ======================================================================= | |
/** | |
* Takes a topic object and writes the max offset for each partition it contains this batch to MySQL. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicOffsetsToMySQL(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// map offset info to OffsetRange objects | |
val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]] | |
// for every partition's range of offsets | |
partitionOffsets.map(offsetRange => { | |
// write the new starting offset for each partition in the topic to the state db | |
var maxOffset = offsetRange.untilOffset - 1 | |
// create a now() timestamp | |
val now = new DateTime().toString("YYYY-MM-dd HH:mm:ss") | |
// form the sql | |
val sql = | |
s"""INSERT INTO $mySQLDB.$mySQLTable (consumer, topic, partition_id, offset, offset_ts, batch_size) | |
VALUES | |
('$mySQLConsumer', "${offsetRange.topic}", ${offsetRange.partition}, '$maxOffset', '$now', ${offsetRange.count}) | |
ON DUPLICATE KEY UPDATE offset_ts = VALUES(offset_ts), offset = VALUES(offset), | |
batch_size = VALUES(batch_size) | |
""" | |
// execute the sql to offload offsets to the table | |
val st = mySQLConnectionPool.createStatement | |
st.execute(sql) | |
st.close() | |
}) | |
} | |
/** | |
* Takes a topic object and writes the number of records for said topic this batch to Influx. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicCountToInflux(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// store the individual record count for this topic | |
val numRecords = topic._2.numRecords | |
// store topicName | |
val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic | |
// write record count for this topic this batch | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords_" + topicName, numRecords))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment