Skip to content

Instantly share code, notes, and snippets.

@kylefritz
Created October 18, 2010 22:06
Show Gist options
  • Save kylefritz/633175 to your computer and use it in GitHub Desktop.
Save kylefritz/633175 to your computer and use it in GitHub Desktop.
publish a message to Amazon SNS using python or ruby
require 'openssl'
require "hmac-sha2"
require "cgi"
require 'time'
require 'net/https'
require 'base64'
$AWS_ID='<set aws id>'
$AWS_KEY='<set aws key>'
$SNS_ENDPOINT='sns.us-east-1.amazonaws.com'
def url_encode(string)
# don't encode: ~_-, spaces must me %20
CGI.escape(string.to_s).gsub("%7E", "~").gsub("+", "%20")
end
def canonical_query_string(params)
#sort params by byte order
params.keys.sort.collect {|key| [url_encode(key), url_encode(params[key])].join("=") }.join("&")
end
def sign_request(params)
hmac = HMAC::SHA256.new($AWS_KEY)
hmac.update ['GET',$SNS_ENDPOINT,'/',canonical_query_string(params)].join("\n")
signature = Base64.encode64(hmac.digest).chomp
end
def publish(arn, subject, msg)
params = {
'Subject' => subject,
'TopicArn' => arn,
'Message' => msg,
'Action' => 'Publish',
'SignatureMethod' => 'HmacSHA256',
'SignatureVersion' => 2,
'Timestamp' => Time.now.utc.iso8601,
'AWSAccessKeyId' => $AWS_ID
}
params['Signature']=sign_request(params)
query_string = canonical_query_string(params)
response=Net::HTTP.get_response(URI.parse("http://#{$SNS_ENDPOINT}/?#{query_string}"))
raise "SNS Publish failed #{response.code}\n#{response.body}" unless response.code=="200"
end
from time import strftime,gmtime,time
import urllib2
import hmac
import hashlib
import base64
import string
def publishAmazonSnsMsg(Subject,TopicArn,Message,AWSAccessKeyId,privatekey):
#http://docs.amazonwebservices.com/AWSSimpleQueueService/2008-01-01/SQSDeveloperGuide/
amzsnshost = 'sns.us-east-1.amazonaws.com'
params = {'Subject' : Subject,
'TopicArn' : TopicArn,
'Message' :Message,
'Timestamp' : strftime("%Y-%m-%dT%H:%M:%S.000Z", gmtime(time())),
'AWSAccessKeyId' : AWSAccessKeyId,
'Action' : 'Publish',
'SignatureVersion' : '2',
'SignatureMethod' : 'HmacSHA256',
}
cannqs=string.join(["%s=%s"%(urllib2.quote(key),urllib2.quote(params[key], safe='-_~')) \
for key in sorted(params.keys())],'&')
string_to_sign=string.join(["GET",amzsnshost,"/",cannqs],'\n')
sig=base64.b64encode(hmac.new(privatekey,string_to_sign,digestmod=hashlib.sha256).digest())
url="http://%s/?%s&Signature=%s"%(amzsnshost,cannqs,urllib2.quote(sig))
try:
return urllib2.urlopen(url).read()
except urllib2.HTTPError, exception:
return "Error %s (%s):\n%s"%(exception.code,exception.msg,exception.read())
@davidmarquis
Copy link

As @kylefritz mentioned, better to use boto SNS client for this...

@sajnikanth
Copy link

sajnikanth commented Jul 20, 2017

If you see 'dict' object has no attribute 'rstrip’, replace line 21 in publishAmazonSnsMsg.py with:

cannqs=string.join(["%s=%s"%(urllib2.quote(key),urllib2.quote(str(params[key]), safe='-_~')) for key in sorted(params.keys())],'&')

(convert params[key] to string)

As mentioned by others, boto3 is a nicer way of doing this. Here’s the code for that

import boto3

def publish(topic_arn, message_json, region):
    client = boto3.client('sns', region_name = region)
    return client.publish(TopicArn=topic_arn, Message=message_json)

Your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY can be environment variables

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment