Skip to content

Instantly share code, notes, and snippets.

@paulmwatson
Created November 23, 2011 17:49
Show Gist options
  • Save paulmwatson/1389349 to your computer and use it in GitHub Desktop.
Save paulmwatson/1389349 to your computer and use it in GitHub Desktop.
Records DataSift Twitter interaction stream to Google Fusion Table
# Description: Consume a DataSift stream and save Twitter interactions to a Google Fusion Table
# Author: Paul M. Watson <[email protected]>
# Date: 2011/11/28
# Usage:
# ruby ds_to_gft.rb <DataSift stream id hash>
# config.yml should contain;
# datasift:
# username: datasift username
# api_key: datasift api key
# googlefusiontables:
# username: google account login
# password: google account password
# Code credit:
# datasift: https://github.com/datasift/datasift-ruby/blob/master/examples/consume-stream.rb
# tokumine: https://github.com/tokumine/fusion_tables/blob/master/examples/boris_bikes.rb
require 'rubygems'
require 'fusion_tables'
require 'json'
require 'datasift'
if ARGV.size == 0
puts 'ERR: Please specify the DataSift stream hash ID to consume'
exit!
end
streamhashid = ARGV[0]
config = YAML::load(File.open(File.join(File.dirname(__FILE__), 'config.yml')))
# Setup Google Fusion Table
puts 'Google Fusion Tables: Creating...'
ft = GData::Client::FusionTables.new
ft.clientlogin(config['googlefusiontables']['username'], config['googlefusiontables']['password'])
table_name = 'datasift_stream_' + streamhashid
cols = [
{:name => 'screen_name', :type => 'string'},
{:name => 'content', :type => 'string'},
{:name => 'created_at', :type => 'datetime'},
{:name => 'id', :type => 'number'},
{:name => 'location_user', :type => 'location'},
{:name => 'location_tweet', :type => 'location'}
]
tables = ft.show_tables
table = tables.select{|t| t.name == table_name}.first
table = ft.create_table(table_name, cols) if !table
# Consume DataSift stream
puts 'DataSift: Authenticating...'
user = DataSift::User.new(config['datasift']['username'], config['datasift']['api_key'])
consumer = user.getConsumer(DataSift::StreamConsumer::TYPE_HTTP, streamhashid)
consumer.onStopped do |reason|
puts 'DataSift: Stopped: ' + reason
end
puts 'DataSift: Consuming...'
rowcounter = 0
batchrows = 10
data = []
consumer.consume(true) do |interaction|
if interaction && interaction['interaction']['type'] == 'twitter'
screen_name = interaction['twitter']['user']['screen_name']
content = interaction['interaction']['content']
created_at = interaction['interaction']['created_at']
id = interaction['twitter']['id']
location_user = interaction['twitter']['user']['location']
location_tweet = ''
if interaction['twitter']['geo']
location_tweet = "#{interaction['twitter']['geo']['latitude']} #{interaction['twitter']['geo']['longitude']}"
end
puts "#{id} >> @#{screen_name} at #{created_at}"
puts interaction.to_json
data << {
"screen_name" => screen_name,
"created_at" => Time::parse(created_at),
"id" => id,
"content" => content,
"location_user" => location_user,
"location_tweet" => location_tweet
}
rowcounter += 1
if rowcounter > batchrows
puts "Google Fusion Tables: Inserting rows"
rowcounter = 0
table.insert data
data = []
end
end
end
puts 'DataSift: Finished consuming'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment