Created
September 15, 2017 22:25
-
-
Save j-bennet/9e27869029b0cdcc2405d851c736fe0e to your computer and use it in GitHub Desktop.
[Simple timestamp aggregation in Spark] #spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime as dt | |
from pyspark.sql.types import * | |
from context import initialize | |
MY_SCHEMA = StructType([ | |
StructField('ts', TimestampType(), True), | |
]) | |
if __name__ == '__main__': | |
data = [ | |
(dt.datetime(2017, 9, 1, hour, minute),) | |
for hour in range(24) | |
for minute in range(0, 60) | |
] | |
sc, sqlContext = initialize() | |
rdd = sc.parallelize(data) | |
df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA) | |
df.createOrReplaceTempView('df') | |
agg5m = sqlContext.sql(""" | |
select window(ts, '5 minutes').start as ts_5min | |
from df | |
group by window(ts, '5 minutes') | |
""") | |
agg1h = sqlContext.sql(""" | |
select window(ts, '1 hour').start as ts_1hour | |
from df | |
group by window(ts, '1 hour') | |
""") | |
rows5m = agg5m.sort(agg5m.ts_5min).collect() | |
rows1h = agg1h.sort(agg1h.ts_1hour).collect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment