Skip to content

Instantly share code, notes, and snippets.

@j-bennet
Created September 15, 2017 22:25
Show Gist options
  • Save j-bennet/9e27869029b0cdcc2405d851c736fe0e to your computer and use it in GitHub Desktop.
Save j-bennet/9e27869029b0cdcc2405d851c736fe0e to your computer and use it in GitHub Desktop.
[Simple timestamp aggregation in Spark] #spark
import datetime as dt
from pyspark.sql.types import *
from context import initialize
MY_SCHEMA = StructType([
StructField('ts', TimestampType(), True),
])
if __name__ == '__main__':
data = [
(dt.datetime(2017, 9, 1, hour, minute),)
for hour in range(24)
for minute in range(0, 60)
]
sc, sqlContext = initialize()
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA)
df.createOrReplaceTempView('df')
agg5m = sqlContext.sql("""
select window(ts, '5 minutes').start as ts_5min
from df
group by window(ts, '5 minutes')
""")
agg1h = sqlContext.sql("""
select window(ts, '1 hour').start as ts_1hour
from df
group by window(ts, '1 hour')
""")
rows5m = agg5m.sort(agg5m.ts_5min).collect()
rows1h = agg1h.sort(agg1h.ts_1hour).collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment