Skip to content

Instantly share code, notes, and snippets.

@CmdrDats
Last active January 2, 2016 04:44
Show Gist options
  • Save CmdrDats/3dfb93c712ab76124bda to your computer and use it in GitHub Desktop.
Save CmdrDats/3dfb93c712ab76124bda to your computer and use it in GitHub Desktop.
Simple foray into Clojure CQRS/ES example. Now with Datomic.
(ns server.db.cqrs
(:use
[datomic-schema.schema :only [schema fields part]])
(:require
[datomic.api :as d]
[datomic.function :as df]
[datomic-schema.schema :as s]))
;; Some simple helper functions:
(defn error [^String msg]
(throw (RuntimeException. msg)))
(defmacro defdbfn
"Define a datomic database function. All calls to datomic api's should be namespaced with datomic.api/ and you cannot use your own namespaces (since the function runs inside datomic)
This defines a locally namespaced function as well - which is useful for testing.
Your first parameter needs to always be 'db'.
You'll need to commit the actual function's meta into your datomic instance by calling (d/transact (meta myfn))"
[name params & code]
`(def ~name
(with-meta
(fn ~name [~@params]
~@code)
{:db/id (d/tempid :db.part/user)
:db/ident ~(keyword name)
:db/fn (df/construct
{:lang "clojure"
:params '~params
:code '~@code})})))
(defdbfn add-value [db entid attr value]
(let [ent (datomic.api/entity db entid)]
(if ent
[{:db/id entid attr (+ (get ent attr) value)}] [])))
;; Let's setup a datomic in-memory database..
(def uri "datomic:mem://cqrs")
(d/create-database uri)
(def conn (d/connect uri))
;; Get the schema going (using datomic-schema: https://github.com/Yuppiechef/datomic-schema)
(def db-schema
[(schema
base
(fields
[type :keyword :indexed]
[version :long]
[uuid :uuid :unique-identity]))
(schema
item
(fields
[name :string]
[amount :long]
[status :enum [:live :deactivated]]))])
(defn schema-list
[]
(concat
[(meta add-value)]
(s/generate-schema d/tempid db-schema)))
(defn setup-datomic []
(d/transact conn (schema-list)))
;; Transact the schema and database function.
(setup-datomic)
;; Faux event and command stores. Write these into real databases pls - I'm thinking direct
;; DynamoDB or something.
(def events
(atom []))
(def commands
(atom []))
(def listeners
(atom '()))
;; The command and event 'bus' machinery. Basically just a multi-method.
;; I'm eschewing the idea of aggregate roots, since we're in functional land where verbs can exist
;; unescorted and have the ability affect nouns.
;;
;; http://steve-yegge.blogspot.com/2006/03/execution-in-kingdom-of-nouns.html
;; The event-persist is just supposed to expand to a datomic transaction (simply a list of facts, really)
(defmulti event-persist (fn [db [event-type & _ :as event]] event-type))
;; The command-expand is responsible for doing validation and expanding to a list of events (which
;; it returns).
(defmulti command-expand (fn [db [command-type & _ :as command]] command-type))
(defn handle-event [db [etype & _ :as e]]
(swap! events conj e)
(when-let [tx (seq (event-persist db e))]
(let [{:keys [db-after]} @(d/transact conn tx)
l (filter #(= etype (first %)) @listeners)]
(doseq [[_ f] l]
(f db-after e)))))
(defn handle-command [db c]
(swap! commands conj c)
(let [es (command-expand db c)]
(doseq [e es]
(handle-event db e))))
;; Domain specific Command and Event pairs
;; Create Item
(defmethod event-persist :inventory-item-created [db [_ uuid]]
[{:db/id (d/tempid :db.part/user)
:base/uuid uuid
:item/amount 0
:item/name ""
:item/status :item.status/live}])
(defmethod command-expand :create-inventory-item [db [_ uuid]]
(let [i (d/entity db [:base/uuid uuid])]
(if i
(error "Item already created")
[[:inventory-item-created uuid]])))
;; Rename Item
(defmethod event-persist :inventory-item-renamed [db [_ uuid name]]
[{:db/id [:base/uuid uuid]
:item/name name}])
(defmethod command-expand :rename-inventory-item [db [_ uuid name]]
(let [i (d/entity db [:base/uuid uuid])]
(if-not i
(error "Item not found")
[[:inventory-item-renamed uuid name]])))
;; Remove Item amount
(defmethod event-persist :items-removed-from-inventory [db [_ uuid amount]]
[[:add-value [:base/uuid uuid] :item/amount (- 0 amount)]])
(defmethod command-expand :remove-items-from-inventory [db [_ uuid amount]]
(let [a (d/entity db [:base/uuid uuid])]
(cond
(not a) (error "Item not found")
(neg? amount) (error "Can't remove negative amount from inventory")
(< (:item/amount a) amount) (error "Can't go into negative inventory amount")
:else [[:items-removed-from-inventory uuid amount]])))
;; Add item amount
(defmethod event-persist :items-checked-into-inventory [db [_ uuid amount]]
[[:add-value [:base/uuid uuid] :item/amount amount]])
(defmethod command-expand :add-items-to-inventory [db [_ uuid amount]]
(let [a (d/entity db [:base/uuid uuid])]
(cond
(not a) (error "Item not found")
(<= amount 0) (error "Must have positive amount to check into inventory")
:else [[:items-checked-into-inventory uuid amount]])))
;; Deactivate item
(defmethod event-persist :inventory-item-deactivated [db [_ uuid]]
[{:db/id [:base/uuid uuid] :item/status :item.status/deactivated}])
(defmethod command-expand :deactivate-inventory-item [db [_ uuid]]
(let [a (d/entity db [:base/uuid uuid])]
(cond
(not a) (error "Item not found")
(= (:item/status a) :item.status/deactivated) (error "Item already deactivated")
:else [[:inventory-item-deactivated uuid]])))
;; Some listeners to events - these should(?) be able to produce new commands, but in this simple
;; example printing will suffice.
(defn find-aggregate [db e]
[e (d/entity db [:base/uuid (second e)])])
;; Yes, I should probably use a map with lists instead of just a straight filtered list of
;; listeners.. Not the focus of this exercise.
(defn setup-listeners []
(reset!
listeners
[[:inventory-item-created (comp (fn [[e i]] (println (:db/id i) ": New Item Created:" (:base/uuid i))) #'find-aggregate)]
[:inventory-item-renamed (comp (fn [[e i]] (println (:db/id i) ": Item renamed to:" (:item/name i))) #'find-aggregate)]
[:items-removed-from-inventory (comp (fn [[[_ _ amount] i]] (println (:db/id i) ": Removed" amount "from" (:item/name i) ", balance:" (:item/amount i))) #'find-aggregate)]
[:items-checked-into-inventory (comp (fn [[[_ _ amount] i]] (println (:db/id i) ": Added" amount "to" (:item/name i) ", balance:" (:item/amount i))) #'find-aggregate)]
[:inventory-item-deactivated (comp (fn [[e i]] (println (:db/id i) ": Deactivated" (:item/name i))) #'find-aggregate)]]))
;; Swap in our listeners
(setup-listeners)
;; Run a bunch of commands:
(let [i1 (d/squuid) i2 (d/squuid)]
(doseq [c
[[:create-inventory-item i1]
[:create-inventory-item i2]
[:rename-inventory-item i1 "Mdoe Fridge Monkey"]
[:rename-inventory-item i1 "Mode Fridge Monkey"]
[:add-items-to-inventory i2 10]
[:add-items-to-inventory i1 4]
[:add-items-to-inventory i1 5]
[:rename-inventory-item i2 "Kenwood Mixer"]
[:remove-items-from-inventory i2 3]
[:remove-items-from-inventory i1 3]
[:add-items-to-inventory i2 10]
[:deactivate-inventory-item i2]]]
(handle-command (d/db conn) c)))
;; Test the final state:
(let [f
(d/q
'[:find ?a ?n ?sn
:where
[?e :item/name ?n]
[?e :item/amount ?a]
[?e :item/status ?s]
[?s :db/ident ?sn]]
(d/db conn))]
(assert (= f #{[17 "Kenwood Mixer" :item.status/deactivated] [6 "Mode Fridge Monkey" :item.status/live]})))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment