Last active
May 13, 2019 16:39
-
-
Save nackjicholson/d558a2cdcb644dd692a877d6683f4df9 to your computer and use it in GitHub Desktop.
snippet of python to verify SNS signatures in flask app.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
from base64 import b64decode | |
from datetime import datetime, timezone | |
from urllib.request import urlopen | |
from M2Crypto import X509 | |
from flask import Blueprint, request, jsonify | |
from werkzeug.exceptions import BadRequest, Unauthorized | |
logger = logging.getLogger(__name__) | |
bp = Blueprint("events", __name__, url_prefix="/api/events") | |
def get_aws_signing_cert(url): | |
with urlopen(url) as stream: | |
if stream.getcode() != 200: | |
raise RuntimeError("Could not load aws signing cert!") | |
return stream.read().decode() | |
def get_signing_keys(message): | |
message_type = message.get("Type") | |
if message_type == "Notification": | |
if "Subject" in message: | |
return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"] | |
else: | |
return ["Message", "MessageId", "Timestamp", "TopicArn", "Type"] | |
elif message_type in {"SubscriptionConfirmation", "UnsubscribeConfirmation"}: | |
return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"] | |
else: | |
raise Exception(f"Unknown message type {message_type}") | |
def build_signature_string(message): | |
lines = [] | |
for signing_key in get_signing_keys(message): | |
lines.append(signing_key) | |
lines.append(message[signing_key]) | |
return "\n".join(lines) + "\n" | |
def verify_sns_message_signature(message): | |
if message["SignatureVersion"] != "1": | |
raise Exception(f"Wrong signature version {message['SignatureVersion']}") | |
pem = get_aws_signing_cert(message["SigningCertURL"]) | |
sig_string = build_signature_string(message) | |
cert = X509.load_cert_string(pem) | |
pubkey = cert.get_pubkey() | |
pubkey.reset_context(md="sha1") | |
pubkey.verify_init() | |
pubkey.verify_update(sig_string.encode("utf-8")) | |
if pubkey.verify_final(b64decode(message["Signature"])) != 1: | |
raise Exception("Signature could not be verified") | |
@bp.route("/events", methods=["POST"]) | |
def blast_events_webhook(): | |
message_type_header = request.headers["X-Amz-Sns-Message-Type"] | |
if message_type_header not in {"SubscriptionConfirmation", "Notification"}: | |
raise BadRequest("Unknown message type headers.") | |
try: | |
message = json.loads(request.data.decode("utf-8")) | |
verify_sns_message_signature(message) | |
except Exception as e: | |
logger.warning(str(e)) | |
raise Unauthorized("Signature could not be verified") | |
return jsonify(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment