Last active
January 2, 2016 04:44
-
-
Save CmdrDats/3dfb93c712ab76124bda to your computer and use it in GitHub Desktop.
Simple foray into Clojure CQRS/ES example. Now with Datomic.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns server.db.cqrs | |
(:use | |
[datomic-schema.schema :only [schema fields part]]) | |
(:require | |
[datomic.api :as d] | |
[datomic.function :as df] | |
[datomic-schema.schema :as s])) | |
;; Some simple helper functions: | |
(defn error [^String msg] | |
(throw (RuntimeException. msg))) | |
(defmacro defdbfn | |
"Define a datomic database function. All calls to datomic api's should be namespaced with datomic.api/ and you cannot use your own namespaces (since the function runs inside datomic) | |
This defines a locally namespaced function as well - which is useful for testing. | |
Your first parameter needs to always be 'db'. | |
You'll need to commit the actual function's meta into your datomic instance by calling (d/transact (meta myfn))" | |
[name params & code] | |
`(def ~name | |
(with-meta | |
(fn ~name [~@params] | |
~@code) | |
{:db/id (d/tempid :db.part/user) | |
:db/ident ~(keyword name) | |
:db/fn (df/construct | |
{:lang "clojure" | |
:params '~params | |
:code '~@code})}))) | |
(defdbfn add-value [db entid attr value] | |
(let [ent (datomic.api/entity db entid)] | |
(if ent | |
[{:db/id entid attr (+ (get ent attr) value)}] []))) | |
;; Let's setup a datomic in-memory database.. | |
(def uri "datomic:mem://cqrs") | |
(d/create-database uri) | |
(def conn (d/connect uri)) | |
;; Get the schema going (using datomic-schema: https://github.com/Yuppiechef/datomic-schema) | |
(def db-schema | |
[(schema | |
base | |
(fields | |
[type :keyword :indexed] | |
[version :long] | |
[uuid :uuid :unique-identity])) | |
(schema | |
item | |
(fields | |
[name :string] | |
[amount :long] | |
[status :enum [:live :deactivated]]))]) | |
(defn schema-list | |
[] | |
(concat | |
[(meta add-value)] | |
(s/generate-schema d/tempid db-schema))) | |
(defn setup-datomic [] | |
(d/transact conn (schema-list))) | |
;; Transact the schema and database function. | |
(setup-datomic) | |
;; Faux event and command stores. Write these into real databases pls - I'm thinking direct | |
;; DynamoDB or something. | |
(def events | |
(atom [])) | |
(def commands | |
(atom [])) | |
(def listeners | |
(atom '())) | |
;; The command and event 'bus' machinery. Basically just a multi-method. | |
;; I'm eschewing the idea of aggregate roots, since we're in functional land where verbs can exist | |
;; unescorted and have the ability affect nouns. | |
;; | |
;; http://steve-yegge.blogspot.com/2006/03/execution-in-kingdom-of-nouns.html | |
;; The event-persist is just supposed to expand to a datomic transaction (simply a list of facts, really) | |
(defmulti event-persist (fn [db [event-type & _ :as event]] event-type)) | |
;; The command-expand is responsible for doing validation and expanding to a list of events (which | |
;; it returns). | |
(defmulti command-expand (fn [db [command-type & _ :as command]] command-type)) | |
(defn handle-event [db [etype & _ :as e]] | |
(swap! events conj e) | |
(when-let [tx (seq (event-persist db e))] | |
(let [{:keys [db-after]} @(d/transact conn tx) | |
l (filter #(= etype (first %)) @listeners)] | |
(doseq [[_ f] l] | |
(f db-after e))))) | |
(defn handle-command [db c] | |
(swap! commands conj c) | |
(let [es (command-expand db c)] | |
(doseq [e es] | |
(handle-event db e)))) | |
;; Domain specific Command and Event pairs | |
;; Create Item | |
(defmethod event-persist :inventory-item-created [db [_ uuid]] | |
[{:db/id (d/tempid :db.part/user) | |
:base/uuid uuid | |
:item/amount 0 | |
:item/name "" | |
:item/status :item.status/live}]) | |
(defmethod command-expand :create-inventory-item [db [_ uuid]] | |
(let [i (d/entity db [:base/uuid uuid])] | |
(if i | |
(error "Item already created") | |
[[:inventory-item-created uuid]]))) | |
;; Rename Item | |
(defmethod event-persist :inventory-item-renamed [db [_ uuid name]] | |
[{:db/id [:base/uuid uuid] | |
:item/name name}]) | |
(defmethod command-expand :rename-inventory-item [db [_ uuid name]] | |
(let [i (d/entity db [:base/uuid uuid])] | |
(if-not i | |
(error "Item not found") | |
[[:inventory-item-renamed uuid name]]))) | |
;; Remove Item amount | |
(defmethod event-persist :items-removed-from-inventory [db [_ uuid amount]] | |
[[:add-value [:base/uuid uuid] :item/amount (- 0 amount)]]) | |
(defmethod command-expand :remove-items-from-inventory [db [_ uuid amount]] | |
(let [a (d/entity db [:base/uuid uuid])] | |
(cond | |
(not a) (error "Item not found") | |
(neg? amount) (error "Can't remove negative amount from inventory") | |
(< (:item/amount a) amount) (error "Can't go into negative inventory amount") | |
:else [[:items-removed-from-inventory uuid amount]]))) | |
;; Add item amount | |
(defmethod event-persist :items-checked-into-inventory [db [_ uuid amount]] | |
[[:add-value [:base/uuid uuid] :item/amount amount]]) | |
(defmethod command-expand :add-items-to-inventory [db [_ uuid amount]] | |
(let [a (d/entity db [:base/uuid uuid])] | |
(cond | |
(not a) (error "Item not found") | |
(<= amount 0) (error "Must have positive amount to check into inventory") | |
:else [[:items-checked-into-inventory uuid amount]]))) | |
;; Deactivate item | |
(defmethod event-persist :inventory-item-deactivated [db [_ uuid]] | |
[{:db/id [:base/uuid uuid] :item/status :item.status/deactivated}]) | |
(defmethod command-expand :deactivate-inventory-item [db [_ uuid]] | |
(let [a (d/entity db [:base/uuid uuid])] | |
(cond | |
(not a) (error "Item not found") | |
(= (:item/status a) :item.status/deactivated) (error "Item already deactivated") | |
:else [[:inventory-item-deactivated uuid]]))) | |
;; Some listeners to events - these should(?) be able to produce new commands, but in this simple | |
;; example printing will suffice. | |
(defn find-aggregate [db e] | |
[e (d/entity db [:base/uuid (second e)])]) | |
;; Yes, I should probably use a map with lists instead of just a straight filtered list of | |
;; listeners.. Not the focus of this exercise. | |
(defn setup-listeners [] | |
(reset! | |
listeners | |
[[:inventory-item-created (comp (fn [[e i]] (println (:db/id i) ": New Item Created:" (:base/uuid i))) #'find-aggregate)] | |
[:inventory-item-renamed (comp (fn [[e i]] (println (:db/id i) ": Item renamed to:" (:item/name i))) #'find-aggregate)] | |
[:items-removed-from-inventory (comp (fn [[[_ _ amount] i]] (println (:db/id i) ": Removed" amount "from" (:item/name i) ", balance:" (:item/amount i))) #'find-aggregate)] | |
[:items-checked-into-inventory (comp (fn [[[_ _ amount] i]] (println (:db/id i) ": Added" amount "to" (:item/name i) ", balance:" (:item/amount i))) #'find-aggregate)] | |
[:inventory-item-deactivated (comp (fn [[e i]] (println (:db/id i) ": Deactivated" (:item/name i))) #'find-aggregate)]])) | |
;; Swap in our listeners | |
(setup-listeners) | |
;; Run a bunch of commands: | |
(let [i1 (d/squuid) i2 (d/squuid)] | |
(doseq [c | |
[[:create-inventory-item i1] | |
[:create-inventory-item i2] | |
[:rename-inventory-item i1 "Mdoe Fridge Monkey"] | |
[:rename-inventory-item i1 "Mode Fridge Monkey"] | |
[:add-items-to-inventory i2 10] | |
[:add-items-to-inventory i1 4] | |
[:add-items-to-inventory i1 5] | |
[:rename-inventory-item i2 "Kenwood Mixer"] | |
[:remove-items-from-inventory i2 3] | |
[:remove-items-from-inventory i1 3] | |
[:add-items-to-inventory i2 10] | |
[:deactivate-inventory-item i2]]] | |
(handle-command (d/db conn) c))) | |
;; Test the final state: | |
(let [f | |
(d/q | |
'[:find ?a ?n ?sn | |
:where | |
[?e :item/name ?n] | |
[?e :item/amount ?a] | |
[?e :item/status ?s] | |
[?s :db/ident ?sn]] | |
(d/db conn))] | |
(assert (= f #{[17 "Kenwood Mixer" :item.status/deactivated] [6 "Mode Fridge Monkey" :item.status/live]}))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment