Created
December 4, 2014 01:51
-
-
Save gjcourt/4f3d1db1763399cbf4a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import hmac | |
import pytz # pip install pytz | |
from collections import namedtuple | |
from datetime import datetime, timedelta | |
from itertools import izip | |
from urllib import quote | |
def utcaware(dt): | |
return dt.replace(tzinfo=pytz.utc) | |
def utcnow(): | |
return utcaware(datetime.utcnow()) | |
# AWS Version 4 signing example | |
# See: http://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html | |
AmazonParameters = namedtuple('AmazonParameters', [ | |
'algorithm', | |
'amz_date', | |
'amz_request_id', | |
'amz_customer_id', | |
'amz_dta_version', | |
'signed_headers', | |
'credential_public_key', | |
'credential_date_stamp', | |
'signature', | |
]) | |
class Signer(object): | |
def __init__(self, request): | |
self.request = request | |
self.public_key = 'public' | |
self.secret_key = 'secret' | |
self.service = '' | |
self.region = '' | |
def sha256(self, msg): | |
return hashlib.sha256(msg.encode('utf-8')).hexdigest() | |
def sign(self, key, msg, hex=False): | |
if hex: | |
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).hexdigest() | |
else: | |
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() | |
def get_amazon_params(self): | |
if not hasattr(self, '_amazon_parameters'): | |
amz_date = self.request.META['HTTP_X_AMZ_DATE'] | |
amz_request_id = self.request.META['HTTP_X_AMZ_REQUEST_ID'] | |
amz_customer_id = self.request.META['HTTP_X_AMZ_CUSTOMER_ID'] | |
amz_dta_version = self.request.META['HTTP_X_AMZ_DTA_VERSION'] | |
auth_header = self.request.META['HTTP_AUTHORIZATION'] | |
preamble, credential_protocol, signature_protocol = [l.strip() for l in auth_header.split(',')] | |
algorithm, signed_headers_protocol = preamble.split(' ') | |
_, signed_headers = signed_headers_protocol.split('=') | |
_, credential = credential_protocol.split('=') | |
_, signature = signature_protocol.split('=') | |
credential_public_key, credential_date_stamp = credential.split('/') | |
self._amazon_parameters = AmazonParameters( | |
algorithm, | |
amz_date, | |
amz_request_id, | |
amz_customer_id, | |
amz_dta_version, | |
signed_headers, | |
credential_public_key, | |
credential_date_stamp, | |
signature, | |
) | |
return self._amazon_parameters | |
def get_request_method(self): | |
return self.request.method | |
def get_canonical_uri(self): | |
return quote(self.request.path) | |
def get_canonical_querystring(self): | |
return self.request.META['QUERY_STRING'] # usually empty? | |
def get_canonical_headers(self): | |
# Step 4: Create the canonical headers. Header names and values | |
# must be trimmed and lowercase, and sorted in ASCII order. | |
# Note that there is a trailing \n. | |
params = self.get_amazon_params() | |
canonical_headers = params.signed_headers.split(';') | |
values = [self.request.META['HTTP_' + h.upper().replace('-', '_')] for h in canonical_headers] | |
return '\n'.join([h + ':' + v for h, v in izip(canonical_headers, values)]) + '\n' | |
def get_signed_headers(self): | |
# Step 5: Create the list of signed headers. This lists the headers | |
# in the canonical_headers list, delimited with ";" and in alpha order. | |
# Note: The request can include any headers; canonical_headers and | |
# signed_headers include those that you want to be included in the | |
# hash of the request. "Host" and "http-x-amz-date" are always required. | |
# For DynamoDB, content-type and http-x-amz-target are also required. | |
params = self.get_amazon_params() | |
return params.signed_headers | |
def get_payload_hash(self): | |
return self.sha256(self.request.body) | |
def get_canonical_request(self): | |
return self.get_request_method() + \ | |
'\n' + self.get_canonical_uri() + \ | |
'\n' + self.get_canonical_querystring() + \ | |
'\n' + self.get_canonical_headers() + \ | |
'\n' + self.get_signed_headers() + \ | |
'\n' + self.get_payload_hash() | |
def get_signing_key(self): | |
params = self.get_amazon_params() | |
date_stamp = params.credential_date_stamp | |
# date_key = self.sign(('AWS4' + self.secret_key).encode('utf-8'), date_stamp) | |
# region_key = self.sign(date_key, self.region) | |
# service_key = self.sign(region_key, self.service) | |
# signing_key = self.sign(service_key, 'aws4_request') | |
# return signing_key | |
return self.sign(self.secret_key, date_stamp) | |
def get_credential_scope(self): | |
# params = self.get_amazon_params() | |
# return params.credential_date_stamp + '/' + self.region + '/' + self.service + '/' + 'aws4_request' | |
return '' | |
def get_signing_string(self): | |
params = self.get_amazon_params() | |
credential_scope = self.get_credential_scope() | |
canonical_request = self.get_canonical_request() | |
return params.algorithm + \ | |
'\n' + params.amz_date + \ | |
'\n' + credential_scope + \ | |
'\n' + self.sha256(canonical_request) | |
def get_signature(self): | |
signing_key = self.get_signing_key() | |
signing_string = self.get_signing_string() | |
return self.sign(signing_key, signing_string, hex=True) | |
def verify_signature(self): | |
params = self.get_amazon_params() | |
amz_datetime = utcaware(datetime.strptime(params.amz_date, '%Y%m%dT%H%M%SZ')) | |
return utcnow() - amz_datetime < timedelta(minutes=45) \ | |
and params.credential_public_key == self.public_key \ | |
and params.signature == self.get_signature() | |
# Example: | |
# signer = Signer(request) | |
# signer.verify_signature() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment