Creates time based Glue partitions given time range.
Keep in mind that you don't need data to add partitions. So, you can create partitions for a whole year and add the data to S3 later.
import json | |
import boto3 | |
from dateutil import rrule | |
from datetime import datetime, timedelta | |
glue = boto3.client('glue', '--') # Update with your location | |
s3 = boto3.client('s3') | |
def get_current_schema(table_name, database_name): | |
response = glue.get_table( | |
DatabaseName=database_name, | |
Name=table_name) | |
table_data = {} | |
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat'] | |
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat'] | |
table_data['table_location'] = response['Table']['StorageDescriptor']['Location'] | |
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo'] | |
table_data['partition_keys'] = response['Table']['PartitionKeys'] | |
return table_data | |
def create_partitions(data, database_name, table_name): | |
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)] | |
for i, data in enumerate(break_list_in_chunks(data, 100)): | |
print(i) | |
reate_partition_response = glue.batch_create_partition( | |
DatabaseName=database_name, | |
TableName=table_name, | |
PartitionInputList=data | |
) | |
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input, | |
table_data): | |
part_location = "{}/year={}/month={}/day={}/hour={}".format(s3_input, year, month, day, hour) | |
input_dict = { | |
'Values': [ | |
year, month, day, hour | |
], | |
'StorageDescriptor': { | |
'Location': part_location, | |
'InputFormat': table_data['input_format'], | |
'OutputFormat': table_data['output_format'], | |
'SerdeInfo': table_data['serde_info'] | |
} | |
} | |
return input_dict | |
def generate_partition_input_list(start, end, s3_input, polygons, table_data): | |
input_list = [] | |
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end): | |
year = str(date.year) | |
month = str(date.month) | |
day = str(date.day) | |
hour = str(date.hour) | |
input_list.append(generate_partition_input(year, month, day, hour, s3_input, table_data)) | |
return input_list | |
def lambda_handler(event, context): | |
# Glue table location | |
database_name = 'test' # update | |
table_name = 'raw_data_test' # update | |
# S3 info location | |
s3_bucket_glue = '--' # update | |
s3_prefix_glue = '--' # update | |
s3_input_glue = 's3://' + s3_bucket_glue + '/' + s3_prefix_glue | |
# Desired time range | |
start_time = datetime.now() + timedelta(days=0) | |
end_time = datetime.now() + timedelta(days=2) | |
# Get Glue table metadata | |
table_data = get_current_schema(table_name, database_name) | |
# Generate partition list of dicts | |
data = generate_partition_input_list(start_time, end_time, s3_input_glue, table_data) | |
# Batch insert partitions | |
create_partitions(data, database_name, table_name) |
Hey, glad that you liked it. I just put it here for my personal use, that is why it is not so clear.
The polygon stuff is related to my specific use case. But, you can replace polygon by id. In the function generate_partition_inputyou can see that polygon is used to map to an s3 path.
About your use case, maybe you don’t need partitions. What is the size of your data? If it is not that big, partitions are just going to annoy you.
If it is big. Then, if you have three dates in your csv, but you are partitioning by date, then you have to split this csv. Each part should go to a different s3 path with propped naming. There is no other way around. You can preprocess the csv using Lambdas or Batch.
Thanks for the reply.
I have smaller data as of now. It's small project for educational purpose. But trying to apply all best practices I can.
Your code is completely clear...I was just unclear about you iterate "polygons" in "generate_partition_input_list()" method, what is value part in polygon['polygon_slug']....!
I will probably take route for splitting the .csv into three (multiple) and create Glue partitions using Lambda so that I can refuse to to run expensive Glue Crawler.
Thanks !
Thanks for sharing this code. It is quite helpful.
How ever can you give some more idea about, what does "read_polygon()" method do?
what is payload['polygons'] and polygon['polygon_slug'] ?
I am trying to implement similar thing. But I have .csv file in my S3 bucket.
I.e. Suppose I have single .csv file which comes in my S3 bucket everyday. I just need that data available to query via Athena. I already have a Glue catalog table. I will just add partition and put data into that partition.
The issue is, when I have 3 dates (in my .csv) file, it should go into three different partitions on S3.
Can you throw some light?
Thanks !