Skip to content

Instantly share code, notes, and snippets.

@hsivonen
Created September 20, 2013 14:14
Show Gist options
  • Save hsivonen/6638222 to your computer and use it in GitHub Desktop.
Save hsivonen/6638222 to your computer and use it in GitHub Desktop.
Script for uploading photos and videos to Flickr. Written in 2006 using a 2006 version of flickrapi.py. flickrapi.py has since then gotten more official and more complicated. I've never had a reason to update the a newer version of the API.
#!/usr/bin/python
#
# Flickr API implementation
#
# Inspired largely by Michele Campeotto's flickrclient and Aaron Swartz'
# xmltramp... but I wanted to get a better idea of how python worked in
# those regards, so I mostly worked those components out for myself.
#
# http://micampe.it/things/flickrclient
# http://www.aaronsw.com/2002/xmltramp/
#
# Release 1: initial release
# Release 2: added upload functionality
# Release 3: code cleanup, convert to doc strings
# Release 4: better permission support
# Release 5: converted into fuller-featured "flickrapi"
# Release 6: fix upload sig bug (thanks Deepak Jois), encode test output
# Release 7: fix path construction, Manish Rai Jain's improvements, exceptions
# Release 8: change API endpoint to "api.flickr.com"
# Release 9: change to MIT license
# Release 10: fix horrid \r\n bug on final boundary
# Release 10.hsivonen: Fix UTF-8 issues and print auth url
#
# Work by (or inspired by) Manish Rai Jain <[email protected]>:
#
# improved error reporting, proper multipart MIME boundary creation,
# use of urllib2 to allow uploads through a proxy, upload accepts
# raw data as well as a filename
#
# Copyright (c) 2006 Brian "Beej Jorgensen" Hall
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Certain previous versions of this API were granted to the public
# domain. You're free to use those as you please.
#
# Beej Jorgensen, Maintainer, November 2006
# [email protected]
#
# Modified by Henri Sivonen, [email protected]
import sys
import md5
import string
import urllib
import urllib2
import mimetools
import httplib
import os.path
import xml.dom.minidom
from types import UnicodeType
########################################################################
# Exceptions
########################################################################
class UploadException(Exception):
pass
########################################################################
# XML functionality
########################################################################
#-----------------------------------------------------------------------
class XMLNode:
"""XMLNode -- generic class for holding an XML node
xmlStr = \"\"\"<xml foo="32">
<name bar="10">Name0</name>
<name bar="11" baz="12">Name1</name>
</xml>\"\"\"
f = XMLNode.parseXML(xmlStr)
print f.elementName # xml
print f['foo'] # 32
print f.name # [<name XMLNode>, <name XMLNode>]
print f.name[0].elementName # name
print f.name[0]["bar"] # 10
print f.name[0].elementText # Name0
print f.name[1].elementName # name
print f.name[1]["bar"] # 11
print f.name[1]["baz"] # 12
"""
def __init__(self):
"""Construct an empty XML node."""
self.elementName=""
self.elementText=""
self.attrib={}
self.xml=""
def __setitem__(self, key, item):
"""Store a node's attribute in the attrib hash."""
self.attrib[key] = item
def __getitem__(self, key):
"""Retrieve a node's attribute from the attrib hash."""
return self.attrib[key]
#-----------------------------------------------------------------------
@classmethod
def parseXML(cls, xmlStr, storeXML=False):
"""Convert an XML string into a nice instance tree of XMLNodes.
xmlStr -- the XML to parse
storeXML -- if True, stores the XML string in the root XMLNode.xml
"""
def __parseXMLElement(element, thisNode):
"""Recursive call to process this XMLNode."""
thisNode.elementName = element.nodeName
#print element.nodeName
# add element attributes as attributes to this node
for i in range(element.attributes.length):
an = element.attributes.item(i)
thisNode[an.name] = an.nodeValue
for a in element.childNodes:
if a.nodeType == xml.dom.Node.ELEMENT_NODE:
child = XMLNode()
try:
list = getattr(thisNode, a.nodeName)
except AttributeError:
setattr(thisNode, a.nodeName, [])
# add the child node as an attrib to this node
list = getattr(thisNode, a.nodeName);
#print "appending child: %s to %s" % (a.nodeName, thisNode.elementName)
list.append(child);
__parseXMLElement(a, child)
elif a.nodeType == xml.dom.Node.TEXT_NODE:
thisNode.elementText += a.nodeValue
return thisNode
dom = xml.dom.minidom.parseString(xmlStr)
# get the root
rootNode = XMLNode()
if storeXML: rootNode.xml = xmlStr
return __parseXMLElement(dom.firstChild, rootNode)
########################################################################
# Flickr functionality
########################################################################
#-----------------------------------------------------------------------
class FlickrAPI:
"""Encapsulated flickr functionality.
Example usage:
flickr = FlickrAPI(flickrAPIKey, flickrSecret)
rsp = flickr.auth_checkToken(api_key=flickrAPIKey, auth_token=token)
"""
flickrHost = "api.flickr.com"
flickrRESTForm = "/services/rest/"
flickrAuthForm = "/services/auth/"
flickrUploadForm = "/services/upload/"
#-------------------------------------------------------------------
def __init__(self, apiKey, secret):
"""Construct a new FlickrAPI instance for a given API key and secret."""
self.apiKey = apiKey
self.secret = secret
self.__handlerCache={}
#-------------------------------------------------------------------
def __sign(self, data):
"""Calculate the flickr signature for a set of params.
data -- a hash of all the params and values to be hashed, e.g.
{"api_key":"AAAA", "auth_token":"TTTT"}
"""
dataName = self.secret
keys = data.keys()
keys.sort()
for a in keys:
datum = data[a]
if isinstance(datum, UnicodeType):
datum = datum.encode('utf-8')
dataName += (a + datum)
#print dataName
hash = md5.new()
hash.update(dataName)
return hash.hexdigest()
#-------------------------------------------------------------------
def __getattr__(self, method, **arg):
"""Handle all the flickr API calls.
This is Michele Campeotto's cleverness, wherein he writes a
general handler for methods not defined, and assumes they are
flickr methods. He then converts them to a form to be passed as
the method= parameter, and goes from there.
http://micampe.it/things/flickrclient
My variant is the same basic thing, except it tracks if it has
already created a handler for a specific call or not.
example usage:
flickr.auth_getFrob(api_key="AAAAAA")
rsp = flickr.favorites_getList(api_key=flickrAPIKey, \\
auth_token=token)
"""
if not self.__handlerCache.has_key(method):
def handler(_self = self, _method = method, **arg):
_method = "flickr." + _method.replace("_", ".")
url = "http://" + FlickrAPI.flickrHost + \
FlickrAPI.flickrRESTForm
arg["method"] = _method
argEncoded = {}
for k in arg.keys():
argEncoded[k] = arg[k].encode("utf-8")
postData = urllib.urlencode(argEncoded) + "&api_sig=" + \
_self.__sign(argEncoded)
#print "--url---------------------------------------------"
#print url
#print "--postData----------------------------------------"
#print postData
f = urllib.urlopen(url, postData)
data = f.read()
#print "--response----------------------------------------"
#print data
f.close()
return XMLNode.parseXML(data, True)
self.__handlerCache[method] = handler;
return self.__handlerCache[method]
#-------------------------------------------------------------------
def __getAuthURL(self, perms, frob):
"""Return the authorization URL to get a token.
This is the URL the app will launch a browser toward if it
needs a new token.
perms -- "read", "write", or "delete"
frob -- picked up from an earlier call to FlickrAPI.auth_getFrob()
"""
data = {"api_key": self.apiKey, "frob": frob, "perms": perms}
data["api_sig"] = self.__sign(data)
return "http://%s%s?%s" % (FlickrAPI.flickrHost, \
FlickrAPI.flickrAuthForm, urllib.urlencode(data))
#-------------------------------------------------------------------
def upload(self, filename=None, jpegData=None, **arg):
"""Upload a file to flickr.
Be extra careful you spell the parameters correctly, or you will
get a rather cryptic "Invalid Signature" error on the upload!
Supported parameters:
One of filename or jpegData must be specified by name when
calling this method:
filename -- name of a file to upload
jpegData -- array of jpeg data to upload
api_key
auth_token
title
description
tags -- space-delimited list of tags, "tag1 tag2 tag3"
is_public -- "1" or "0"
is_friend -- "1" or "0"
is_family -- "1" or "0"
"""
if filename == None and jpegData == None or \
filename != None and jpegData != None:
raise UploadException("filename OR jpegData must be specified")
# verify key names
for a in arg.keys():
if a != "api_key" and a != "auth_token" and a != "title" and \
a != "description" and a != "tags" and a != "is_public" and \
a != "is_friend" and a != "is_family":
sys.stderr.write("FlickrAPI: warning: unknown parameter " \
"\"%s\" sent to FlickrAPI.upload\n" % (a))
arg["api_sig"] = self.__sign(arg)
url = "http://" + FlickrAPI.flickrHost + FlickrAPI.flickrUploadForm
# construct POST data
boundary = mimetools.choose_boundary()
body = ""
# required params
for a in ('api_key', 'auth_token', 'api_sig'):
body += "--%s\r\n" % (boundary)
body += "Content-Disposition: form-data; name=\""+a+"\"\r\n\r\n"
body += "%s\r\n" % (arg[a])
# optional params
for a in ('title', 'description', 'tags', 'is_public', \
'is_friend', 'is_family'):
if arg.has_key(a):
body += "--%s\r\n" % (boundary)
body += "Content-Disposition: form-data; name=\""+a+"\"\r\n\r\n"
body += "%s\r\n" % (arg[a])
body += "--%s\r\n" % (boundary)
body += "Content-Disposition: form-data; name=\"photo\";"
body += " filename=\"%s\"\r\n" % filename
body += "Content-Type: image/jpeg\r\n\r\n"
#print body
if filename != None:
fp = file(filename, "rb")
data = fp.read()
fp.close()
else:
data = jpegData
postData = body.encode("utf_8") + data + \
("\r\n--%s--" % (boundary)).encode("utf_8")
request = urllib2.Request(url)
request.add_data(postData)
request.add_header("Content-Type", \
"multipart/form-data; boundary=%s" % boundary)
response = urllib2.urlopen(request)
rspXML = response.read()
return XMLNode.parseXML(rspXML)
#-----------------------------------------------------------------------
@classmethod
def testFailure(cls, rsp, exit=True):
"""Exit app if the rsp XMLNode indicates failure."""
if rsp['stat'] == "fail":
sys.stderr.write("%s\n" % (cls.getPrintableError(rsp)))
if exit: sys.exit(1)
#-----------------------------------------------------------------------
@classmethod
def getPrintableError(cls, rsp):
"""Return a printed error message string."""
return "%s: error %s: %s" % (rsp.elementName, \
cls.getRspErrorCode(rsp), cls.getRspErrorMsg(rsp))
#-----------------------------------------------------------------------
@classmethod
def getRspErrorCode(cls, rsp):
"""Return the error code of a response, or 0 if no error."""
if rsp['stat'] == "fail":
return rsp.err[0]['code']
return 0
#-----------------------------------------------------------------------
@classmethod
def getRspErrorMsg(cls, rsp):
"""Return the error message of a response, or "Success" if no error."""
if rsp['stat'] == "fail":
return rsp.err[0]['msg']
return "Success"
#-----------------------------------------------------------------------
def __getCachedTokenPath(self):
"""Return the directory holding the app data."""
return os.path.expanduser(os.path.sep.join(["~", ".flickr", \
self.apiKey]))
#-----------------------------------------------------------------------
def __getCachedTokenFilename(self):
"""Return the full pathname of the cached token file."""
return os.path.sep.join([self.__getCachedTokenPath(), "auth.xml"])
#-----------------------------------------------------------------------
def __getCachedToken(self):
"""Read and return a cached token, or None if not found.
The token is read from the cached token file, which is basically the
entire RSP response containing the auth element.
"""
try:
f = file(self.__getCachedTokenFilename(), "r")
data = f.read()
f.close()
rsp = XMLNode.parseXML(data)
return rsp.auth[0].token[0].elementText
except IOError:
return None
#-----------------------------------------------------------------------
def __setCachedToken(self, xml):
"""Cache a token for later use.
The cached tag is stored by simply saving the entire RSP response
containing the auth element.
"""
path = self.__getCachedTokenPath()
if not os.path.exists(path):
os.makedirs(path)
f = file(self.__getCachedTokenFilename(), "w")
f.write(xml)
f.close()
#-----------------------------------------------------------------------
def getToken(self, perms="read", browser="lynx"):
"""Get a token either from the cache, or make a new one from the
frob.
This first attempts to find a token in the user's token cache on
disk.
If that fails (or if the token is no longer valid based on
flickr.auth.checkToken) a new frob is acquired. The frob is
validated by having the user log into flickr (with lynx), and
subsequently a valid token is retrieved.
The newly minted token is then cached locally for the next run.
perms--"read", "write", or "delete"
browser--whatever browser should be used in the system() call
"""
# see if we have a saved token
token = self.__getCachedToken()
# see if it's valid
if token != None:
rsp = self.auth_checkToken(api_key=self.apiKey, auth_token=token)
if rsp['stat'] != "ok":
token = None
else:
# see if we have enough permissions
tokenPerms = rsp.auth[0].perms[0].elementText
if tokenPerms == "read" and perms != "read": token = None
elif tokenPerms == "write" and perms == "delete": token = None
# get a new token if we need one
if token == None:
# get the frob
rsp = self.auth_getFrob(api_key=self.apiKey)
self.testFailure(rsp)
frob = rsp.frob[0].elementText
# validate online
#os.system("%s '%s'" % (browser, self.__getAuthURL(perms, frob)))
print self.__getAuthURL(perms, frob)
raw_input()
# get a token
rsp = self.auth_getToken(api_key=self.apiKey, frob=frob)
self.testFailure(rsp)
token = rsp.auth[0].token[0].elementText
# store the auth info for next time
self.__setCachedToken(rsp.xml)
return token
########################################################################
# App functionality
########################################################################
def main(argv):
# flickr auth information:
flickrAPIKey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # API key
flickrSecret = "yyyyyyyyyyyyyyyy" # shared "secret"
# make a new FlickrAPI instance
fapi = FlickrAPI(flickrAPIKey, flickrSecret)
# do the whole whatever-it-takes to get a valid token:
token = fapi.getToken(browser="firefox")
# get my favorites
rsp = fapi.favorites_getList(api_key=flickrAPIKey,auth_token=token)
fapi.testFailure(rsp)
# and print them
for a in rsp.photos[0].photo:
print "%10s: %s" % (a['id'], a['title'].encode("ascii", "replace"))
# upload the file foo.jpg
#rsp = fapi.upload(filename="foo.jpg", \
# api_key=flickrAPIKey, auth_token=token, \
# title="This is the title", description="This is the description", \
# tags="tag1 tag2 tag3", is_public="1")
#if rsp == None:
# sys.stderr.write("can't find file\n")
#else:
# fapi.testFailure(rsp)
return 0
# run the main if we're not being imported:
if __name__ == "__main__": sys.exit(main(sys.argv))
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# Copyright (c) 2006-2008 Henri Sivonen
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import sys
from flickrapi import FlickrAPI
import codecs
import re
import os
import os.path
import time
# flickr auth information:
flickrAPIKey = "REDACTED" # API key
flickrSecret = "REDACTED" # shared "secret"
publicLicense = "0" # the license to use for public photos that aren't tagged with a non-public tag
setsByTag = {
"turska":"REDACTED",
}
groupsByTag = {
"turska":"82907218@N00",
}
privateTags = [
"people",
]
junkSet = "REDACTED"
argv = sys.argv[1:]
if len(argv) < 3:
print 'must have three arguments: visibility (p, f, y, ff or -), title and tags'
sys.exit(1)
aPermissions = argv[0].decode('utf-8')
aTitle = argv[1].decode('utf-8')
aTags = argv[2].decode('utf-8')
argv = argv[3:]
# make a new FlickrAPI instance
fapi = FlickrAPI(flickrAPIKey, flickrSecret)
# do the whole whatever-it-takes to get a valid token:
token = fapi.getToken(perms="write")
namemap = {}
idList = []
for path in argv:
title = aTitle
filename = os.path.basename(path)
desc = "(%s)" % filename
tags = aTags
friend = "0"
family = "0"
public = "0"
license = publicLicense
junk = 0
isvideo = filename.lower().endswith(".avi") or filename.lower().endswith(".mp4") or filename.lower().endswith(".mov")
if aPermissions == "p":
public = "1"
if aPermissions == "f" or aPermissions == "ff":
friend = "1"
if aPermissions == "y" or aPermissions == "ff":
family = "1"
if isvideo:
tags = tags + u" video"
if filename.find("people") != -1:
tags = tags + u" people"
if peoplePat.search(tags):
if int(public):
public = "0"
friend = "1"
if filename.find("_s.") != -1:
junk = 1
friend = "0"
family = "0"
public = "0"
if not int(public):
license = "0"
print ("Uploading %s" % filename)
rsp = fapi.upload(filename=path, api_key=flickrAPIKey, auth_token=token, \
title=title, description=desc, \
tags=tags,\
is_friend=friend, is_public=public, is_family=family)
fapi.testFailure(rsp)
photoid = rsp.photoid[0].elementText
fapi.photos_licenses_setLicense(api_key=flickrAPIKey, auth_token=token, photo_id=photoid, license_id=license)
for tag in tags.split():
if setsByTag.has_key(tag):
fapi.photosets_addPhoto(api_key=flickrAPIKey, auth_token=token, photo_id=photoid, photoset_id=setsByTag[tag])
if groupsByTag.has_key(tag):
fapi.groups_pools_add(api_key=flickrAPIKey, auth_token=token, photo_id=photoid, group_id=groupsByTag[tag])
if junk:
fapi.photosets_addPhoto(api_key=flickrAPIKey, auth_token=token, photo_id=photoid, photoset_id=junkSet)
else:
idList.append(photoid)
if isvideo:
cdate = os.path.getctime(path)
formatted = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(cdate))
fapi.photos_setDates(api_key=flickrAPIKey, auth_token=token, photo_id=photoid, date_taken=formatted, date_taken_granularity="0")
rsp = fapi.photosets_create(api_key=flickrAPIKey, auth_token=token, title=aTitle, primary_photo_id=idList[0])
setId = rsp.photoset[0]['id']
fapi.photosets_editPhotos(api_key=flickrAPIKey, auth_token=token, photoset_id=setId, primary_photo_id=idList[0], photo_ids=",".join(idList))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment