Skip to content

Instantly share code, notes, and snippets.

@ajcronk
Created August 15, 2012 23:19
Show Gist options
  • Save ajcronk/3364621 to your computer and use it in GitHub Desktop.
Save ajcronk/3364621 to your computer and use it in GitHub Desktop.
Python CSV batch import example
import dateutil.parser
import optparse
from Queue import Queue
import tempodb
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1")
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret")
parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host")
parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port")
parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
in_filename = options.filename
source_file = open(in_filename)
client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure)
temperature_key = "thermostat.1.temperature"
solar_radiation_key = "thermostat.1.solar_radiation"
humidity_key = "thermostat.1.humidity"
temperature_data = []
solar_radiation_data = []
humidity_data = []
count = 0
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
for line in source_file:
timestamp, temperature, solar_radiation, humidity = line.split(',')
# grab 20 lines at a time
if count >= 20:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
temperature_data = []
solar_radiation_id = []
humidity_id = []
count = 0
input_date = dateutil.parser.parse(timestamp)
temperature_data.append(tempodb.DataPoint(input_date, float(temperature)))
solar_radiation_data.append(tempodb.DataPoint(input_date, float(solar_radiation)))
humidity_data.append(tempodb.DataPoint(input_date, float(humidity)))
count += 1
# pick up any scraps
if len(temperature_data) > 0:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
source_file.close()
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
main()
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment