Created
January 17, 2020 13:56
-
-
Save realFranco/2ffece4e5de503f5b5d55b2232d35bbd to your computer and use it in GitHub Desktop.
python - AWS DynamoDB Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Dev: Franco Gil. | |
DynamoDB Documentation: | |
boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb | |
docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html | |
** The Front End are check the Optional & Non - Optional data requeriments. ** | |
""" | |
import json | |
import argparse | |
import time | |
from time import strftime, gmtime | |
import re | |
import boto3 | |
from boto3.dynamodb.conditions import Key, Attr | |
# TODO: Create a class for the 'Data Model' and keep this class as a template. | |
class dynamoInterface(object): | |
def __init__(self, table_name): | |
print('Dynamo Interface') | |
self.table_name = table_name | |
def connect(self): | |
self.dynamodb = boto3.resource('dynamodb') | |
self.table = self.dynamodb.Table(self.table_name) | |
def create(self): | |
table = self.dynamodb.create_table( | |
AttributeDefinitions=[ | |
{ | |
'AttributeName': 'pkid', | |
'AttributeType': 'S' | |
} | |
], | |
TableName=self.table_name, | |
KeySchema=[ | |
{ | |
'AttributeName': 'pkid', | |
'KeyType': 'HASH' | |
} | |
], | |
BillingMode='PAY_PER_REQUEST' | |
) | |
# Wait until the table exists. | |
table.meta.client.get_waiter('table_exists').wait(TableName=self.table_name) | |
# Print out some data about the table | |
print(table) | |
def insert(self, item): | |
""" item: Dict. Elements to insert. | |
Example: | |
{ | |
'gsidataportion' : 'partition_key', | |
'pkid': 'key', | |
'attr1': 'v_attr1', | |
'attr_n': 'v_attr_n', | |
} | |
This method do not check if an element exists. | |
""" | |
response = None | |
if item: | |
time.sleep(1) | |
timestamp = { | |
'gsidataportion' :'applicant', | |
'time': strftime("%d-%b-%Y", gmtime()), | |
'date': strftime('%H:%M:%S', gmtime()) | |
} | |
item.update(timestamp) | |
response = self.table.put_item(Item=item) | |
response = response['ResponseMetadata'] | |
return response | |
def insert_unique(self, item, debug=False): | |
"""Principally add a new row on the table, before insertion, | |
check if the element already exist. | |
If exist, the older row will be update, and will be add a new | |
attr. 'try', meaning the number of tries that the user do | |
over the application. | |
:item, dict. Data content to store. | |
:debug, bool. Avaliable for show the attrs. of the dynamo's request, | |
insert | update. By defaul is False. | |
""" | |
if item: | |
args = {'gsidataportion': 'applicant'} # Static Key - Value | |
item.update(args) | |
pkid = {'pkid': item.pop('pkid')} # Divide Key & Args. | |
_t_data = self.query_items(arguments=args, equal=pkid) | |
if _t_data['Count'] > 0: | |
# Data Founded - Replicate Data - Update old data. | |
print('Data Founded') | |
try: | |
# 'try' arg. exist, increase try's value. | |
_try = int(_t_data['Items'][0]['try']) + 1 | |
_try = {'try': _try} | |
except KeyError: | |
# 'try' arg. not exist, add the 2nd. try. | |
_try = {'try': 2} | |
item.update(_try) | |
self.update(args=item, key=pkid, debug=debug) | |
else: | |
# Insert new data. | |
item.update(pkid) | |
self.insert(item=item) | |
else: | |
print('Empty items.') | |
# TODO: 'query_items.more_elements | |
# Check if the Query has chained elements and return all of them | |
def query_items(self, arguments:dict, attr_exist=[], equal={}, debug_query=False): | |
""" Method that query elements on the dynamo table. | |
arguments:dict. Arguments container for the insertion. | |
{ | |
'gsidataportion':'applicant', * NOT OPTIONAL ATTR | |
'key':'value', ..., 'key_n':'value_n' | |
} | |
attr_exists:list, checking if the attrs. exist in the row fetched | |
after the query was done. | |
['attr', 'attr2', ..., 'attrn'] | |
equal:dict all the attrs. names (key) located in this dict. will be | |
compared using the rel. op. '=' (equal). This is useful, when the | |
attrs. need to be the same as the stored in the DB. Also this is | |
usefull because all the comparissions (by default) are maded with | |
the op. contains. (substr.) | |
{'k_attr', 'v_attr', 'k_attr2': 'k_attr2', ..., } | |
debug_query:bool usefull to debuging and for see how it is look the query | |
or the dictionaries passed to the Dynamo's API. | |
""" | |
# Argument's Keys and Values, per separated. | |
keys, values = list(arguments.keys()), list(arguments.values()) | |
n, m = len(values), len(attr_exist) | |
# Condittion Expr. | |
keys_ce = [":v"+k for k in list(keys) ] | |
# Expression Attrs. & Filter Expr. | |
ea, fe = dict(zip(keys_ce, values)), '' | |
# GSI | |
key_ce = '%s=%s' %(keys[0], keys_ce[0]) | |
for i in range(1,n): | |
fe += 'contains(%s, %s)' %(keys[i], keys_ce[i]) | |
if i < n-1: fe += ' and ' | |
if m: | |
if fe : fe += ' and ' # divide contains from attr_exits | |
for i in range(m): | |
fe += 'attribute_exists(%s)' % (attr_exist[i]) | |
if i < m-1: fe += ' and ' | |
if equal: | |
keys_eq, values_eq = list(equal.keys()), list(equal.values()) | |
keys_eq_modf = [':v'+k for k in list(keys_eq)] | |
n = len(keys_eq_modf) | |
eq_dict = dict(zip(keys_eq_modf, values_eq)) | |
ea.update(eq_dict) | |
if fe: fe += ' and ' | |
for i in range(n): | |
fe += '%s = %s' %(keys_eq_modf[i], keys_eq[i]) | |
if i < n-1: fe += 'and ' | |
if debug_query: | |
print('keys', keys) | |
print('values', values) | |
print('keys cond. expr.', keys_ce) | |
print("\nExpression Atribute: %s\n" % (ea)) | |
print("Key Condition Expression: %s\n" % (key_ce)) | |
print("Filter Expression: %s\n\n" % (fe)) | |
if fe: | |
response = self.table.query( | |
IndexName='gsidataportion-index', | |
ExpressionAttributeValues=ea, | |
KeyConditionExpression=key_ce, | |
FilterExpression=fe | |
) | |
else: | |
response = self.table.query( | |
IndexName='gsidataportion-index', | |
ExpressionAttributeValues=ea, | |
KeyConditionExpression=key_ce | |
) | |
return { | |
"Items": response["Items"], | |
"Count": response["Count"] | |
} | |
def update(self, args:dict, key:dict, debug=False): | |
args_keys = list(args.keys()) | |
keys_ea = [":v"+k for k in args_keys] | |
values = list(args.values()) | |
eav = dict(zip(keys_ea, values)) | |
ue = 'SET ' | |
n = len(values) | |
for i in range(n): | |
ue += "{} = {}".format(args_keys[i], keys_ea[i]) | |
if i < n-1: ue += ', ' | |
if debug: | |
print("Expression Attributes"); print(eav); print('\n') | |
print("Key"); print(key); print('\n') | |
print("Update Expression"); print(ue); print('\n') | |
response = self.table.update_item( | |
ExpressionAttributeValues=eav, | |
Key=key, | |
UpdateExpression=ue, | |
ReturnValues='UPDATED_NEW' | |
) | |
return response | |
def delete(self, key): | |
""" For delete it is need just the primary key of the item. | |
key:dict, | |
{ | |
"pk_name": "pk_value" | |
} | |
""" | |
response = self.table.delete_item(Key=key) | |
return response["ResponseMetadata"] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I forgot the python's docstrings, sorry PEP 257.