Created
August 15, 2012 23:19
-
-
Save ajcronk/3364621 to your computer and use it in GitHub Desktop.
Python CSV batch import example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dateutil.parser | |
import optparse | |
from Queue import Queue | |
import tempodb | |
from threading import Thread | |
class Worker(Thread): | |
"""Thread executing tasks from a given tasks queue""" | |
def __init__(self, tasks): | |
Thread.__init__(self) | |
self.tasks = tasks | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
func, args, kargs = self.tasks.get() | |
try: func(*args, **kargs) | |
except Exception, e: print e | |
self.tasks.task_done() | |
class ThreadPool: | |
"""Pool of threads consuming tasks from a queue""" | |
def __init__(self, num_threads): | |
self.tasks = Queue(num_threads) | |
for _ in range(num_threads): Worker(self.tasks) | |
def add_task(self, func, *args, **kargs): | |
"""Add a task to the queue""" | |
self.tasks.put((func, args, kargs)) | |
def wait_completion(self): | |
"""Wait for completion of all the tasks in the queue""" | |
self.tasks.join() | |
def main(): | |
# This script assumes that the input file is sorted by key | |
parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1") | |
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME") | |
parser.add_option("-k", "--key", dest="key", help="tempodb database key") | |
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret") | |
parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host") | |
parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port") | |
parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure") | |
(options, args) = parser.parse_args() | |
if not options.filename: | |
parser.error("Enter a file to read from.") | |
in_filename = options.filename | |
source_file = open(in_filename) | |
client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure) | |
temperature_key = "thermostat.1.temperature" | |
solar_radiation_key = "thermostat.1.solar_radiation" | |
humidity_key = "thermostat.1.humidity" | |
temperature_data = [] | |
solar_radiation_data = [] | |
humidity_data = [] | |
count = 0 | |
# Init a Thread pool with the desired number of threads | |
pool = ThreadPool(3) | |
for line in source_file: | |
timestamp, temperature, solar_radiation, humidity = line.split(',') | |
# grab 20 lines at a time | |
if count >= 20: | |
pool.add_task(client.write_key, temperature_key, temperature_data) | |
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data) | |
pool.add_task(client.write_key, humidity_key, humidity_data) | |
temperature_data = [] | |
solar_radiation_id = [] | |
humidity_id = [] | |
count = 0 | |
input_date = dateutil.parser.parse(timestamp) | |
temperature_data.append(tempodb.DataPoint(input_date, float(temperature))) | |
solar_radiation_data.append(tempodb.DataPoint(input_date, float(solar_radiation))) | |
humidity_data.append(tempodb.DataPoint(input_date, float(humidity))) | |
count += 1 | |
# pick up any scraps | |
if len(temperature_data) > 0: | |
pool.add_task(client.write_key, temperature_key, temperature_data) | |
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data) | |
pool.add_task(client.write_key, humidity_key, humidity_data) | |
source_file.close() | |
# Wait for completion | |
pool.wait_completion() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment