Created
October 12, 2011 21:15
-
-
Save cgbystrom/1282615 to your computer and use it in GitHub Desktop.
Simple proof-of-concept implementing an activity stream on top of Cassandra
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Naive implementation of an activity stream service using Cassandra. | |
# Just a proof of concept and not anything that is for production use. | |
# Probably flawed in many ways like proper key usage, writing and features. | |
import pycassa | |
import datetime | |
import uuid | |
pool = pycassa.connect('Actstream') | |
subscribers = pycassa.ColumnFamily(pool, 'Subscribers') | |
streams = pycassa.ColumnFamily(pool, 'Streams') | |
activities = pycassa.ColumnFamily(pool, 'Activities') | |
def subscribe(from_stream_id, to_stream_id): | |
""" | |
Subscribes an activity stream to another activity stream | |
Example: subscribe('elvis', 'hector') will subscribe user 'elvis' to activities generated by 'hector' | |
""" | |
subscribers.insert('subscribers_' + str(from_stream_id), {str(to_stream_id): '\0'}) | |
def unsubscribe(from_stream_id, to_stream_id): | |
""" | |
Unsubscribes an activity stream from another activity stream | |
Example: unsubscribe('elvis', 'hector') will unsubscribe user 'elvis' from activities generated by 'hector' | |
""" | |
return subscribers.remove('subscribers_' + str(from_stream_id), [str(to_stream_id)]) | |
def get_subscribers(stream_id): | |
""" | |
Get all subscribers for a given activity stream | |
Example: get_subscribers('elvis') will get the all users subscribing to user 'elvis' activity stream | |
""" | |
return [int(k) for k in subscribers.get('subscribers_' + str(stream_id)).keys()] | |
def publish_activity(stream_id, message): | |
""" | |
Publishes an activity to an activity stream and all the subscribing activity streams | |
Example: publish_activity('hector', 'Ranked up to level 27') will publish the activity | |
to activity stream for user 'hector' and all his subscribers. | |
""" | |
key = str(uuid.uuid4()) | |
activities.insert('activity_%s' % key, {'message': message}) | |
stream_ids = get_subscribers(stream_id) | |
value = {datetime.datetime.utcnow(): key} | |
rows = {'streams_%s' % stream_id: value} | |
for sid in stream_ids: | |
rows['streams_%s' % sid] = value | |
streams.batch_insert(rows) | |
def get_stream(stream_id, limit): | |
""" | |
Get an activity stream | |
Example: get_stream('elvis', 10) will get the ten latest activities in user 'elvis' activity stream | |
""" | |
activity_ids = streams.get('streams_%s' % stream_id, column_count=limit, column_reversed=True).values() | |
activity_ids = ['activity_%s' % id for id in activity_ids] | |
return [row['message'] for row in activities.multiget(activity_ids).values()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment