-
-
Save crawles/b47e23da8218af0b9bd9d47f5242d189 to your computer and use it in GitHub Desktop.
Cheat sheet for Spark Dataframes (using Python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A simple cheat sheet of Spark Dataframe syntax | |
# Current for Spark 1.6.1 | |
# import statements | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import * | |
#creating dataframes | |
df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data | |
df = sqlContext.read.format('com.databricks.spark.csv') \ | |
.options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \ | |
.load('csv_file_name_or_*_reference') | |
# adding columns and keeping existing ones | |
df.withColumn('zero', F.lit(0)) # add constant column | |
df.withColumn('A_times_two', df.A * 2) | |
# selecting columns, and creating new ones | |
df.select( | |
'A' # most of the time it's sufficient to just use the column name | |
, col('A').alias('new_name_for_A') # in other cases the col method is nice for referring to columnswithout having to repeat the dataframe name | |
, ( col('B') > 0 ).alias('is_B_greater_than_zero') | |
, unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text | |
) | |
# filtering | |
df.filter('A_in_unix_time > 946684800') | |
# grouping and aggregating | |
df.groupBy("A").agg( | |
first("B").alias("my first") | |
, last("B").alias("my last") | |
, sum("B").alias("my everything") | |
) | |
# pivoting | |
df.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function | |
# inspecting dataframes | |
display(df) # table in notebook at least | |
df.show() # text table | |
######################################### Date time manipulation ################################ | |
# Casting to timestamp from string with format 2015-01-01 23:59:59 | |
df.select( df.start_time.cast("timestamp").alias("start_time") ) | |
# Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour. | |
condition = \ | |
(to_date(df.start_time) == to_date(df.end_time)) & \ | |
(df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time) | |
df.filter(condition).show() | |
############### WRITING TO AMAZON REDSHIFT ############### | |
REDSHIFT_JDBC_URL = "jdbc:redshift://%s:5439/%s" % (REDSHIFT_SERVER,DATABASE) | |
df.write \ | |
.format("com.databricks.spark.redshift") \ | |
.option("url", REDSHIFT_JDBC_URL) \ | |
.option("dbtable", TABLE_NAME) \ | |
.option("tempdir", "s3n://%s:%s@%s" % (ACCESS_KEY,SECRET, S3_BUCKET_PATH)) \ | |
.mode("overwrite") \ | |
.save() | |
######################### REFERENCE ######################### | |
# recast variable | |
df.select(df[c],df[c].cast('int')) | |
dtype_dict = {'Player' : StringType, 'Pos' : StringType, 'Tm' : StringType, 'Age' : IntegerType, 'G' : IntegerType, 'GS' : IntegerType, 'yr' : IntegerType} | |
df2 = df.fillna('0') | |
for c in df2.schema.names[6:]: | |
dtype = DoubleType if c not in dtype_dict.keys() else dtype_dict[c] | |
df2 = df2.withColumn(c,df2[c].cast(dtype())) | |
## UDF | |
from pyspark.sql.types import StringType | |
from pyspark.sql.functions import udf | |
maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType()) | |
df = sqlContext.createDataFrame([{'name': 'Alice', 'age': 1}]) | |
df.withColumn("maturity", maturity_udf(df.age)) | |
## | |
# aggregate functions | |
approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct | |
# window functions | |
cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber | |
# string functions | |
ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper | |
# null and nan functions | |
isNaN, isnotnull, isnull | |
# misc functions | |
array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when | |
# datetime | |
current_date, current_timestamp, trunc, date_format | |
datediff, date_add, date_sub, add_months, last_day, next_day, months_between | |
year, month, dayofmonth, hour, minute, second | |
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Excellent reference material for learning! Thanks a lot!!