Skip to content

Instantly share code, notes, and snippets.

@leonoel
Last active July 6, 2023 17:13
Show Gist options
  • Save leonoel/c5e32b65ec7b6ab4b5b45772082a3d85 to your computer and use it in GitHub Desktop.
Save leonoel/c5e32b65ec7b6ab4b5b45772082a3d85 to your computer and use it in GitHub Desktop.
functional reactive petroleum
;; missionary solution to petrol pump example by Stephen Blackheath and Anthony Jones, ISBN 978-1633430105
;; huanhulan's demo : [live](https://huanhulan.github.io/petrol_pump), [code](https://github.com/huanhulan/petrol_pump)
(ns pump
(:refer-clojure :exclude [first])
(:require [missionary.core :as m])
(:import missionary.Cancelled))
(defn rising
"A transducer that outputs `nil` when the input switches from logical false to logical true."
[rf]
(let [state (doto (object-array 1)
(aset (int 0) true))]
(fn
([r] (rf r))
([r curr]
(let [prev (aget state (int 0))]
(aset state (int 0) curr)
(if (and (not prev) curr)
(rf r nil) r))))))
(defn falling
"A transducer that outputs `nil` when the input switches from logical true to logical false."
[rf]
(let [state (doto (object-array 1)
(aset (int 0) false))]
(fn
([r] (rf r))
([r curr]
(let [prev (aget state (int 0))]
(aset state (int 0) curr)
(if (and prev (not curr))
(rf r nil) r))))))
(def first
"Turns a flow into a task completing with the first value produced by the flow, or `nil` if empty."
(partial m/reduce (comp reduced {}) nil))
(defn delivery-request
"A task completing with a map of the fuel slot and the current price of the fuel when the fuel nozzle is picked up."
[slot {:keys [<price <nozzle]}]
(->> <nozzle
(m/eduction rising)
(m/sample (fn [price _] {:price price :slot slot}) <price)
(first)))
(defn delivery
"A continuous flow of the current delivery request, or `nil` if not currently delivering."
[>clear-sale fuels]
(m/relieve {}
(m/ap
(m/amb nil
(loop []
(m/amb
(m/? (apply m/race (map-indexed delivery-request fuels)))
(m/? (first >clear-sale))
(recur)))))))
(defn sale-quantity
"A continuous flow of the accumulated fuel quantity for the current delivery, or zero if not currently delivering."
[<delivery <calibration >fuel-pulses]
(m/cp (case (m/?< <delivery)
nil 0 (try (m/?< (->> >fuel-pulses
(m/sample * <calibration)
(m/reductions +)
(m/relieve {})))
(catch Cancelled _ 0)))))
(def delivery-cost
"A continuous flow of the accumulated fuel cost for the current delivery, or zero if not currently delivering."
(partial m/latest
(fn [delivery quantity]
(case delivery nil 0 (* quantity (:price delivery))))))
(defn add-digit
"The keypad state machine."
([] 0)
([r] r)
([r d]
(case d
nil 0
(let [n (-> r (* 10) (+ d))]
(if (< 999 n) r n)))))
(defn preset
"A continuous flow of the current state of the keypad."
[>clear-sale >keypad]
(m/relieve {}
(m/reductions add-digit
(m/ap (m/?> (m/amb= >clear-sale >keypad))))))
(defn finalize-delivery
"Add cost and quantity to the initial delivery request."
[sale-quantity sale-cost delivery]
(assoc delivery
:sale-cost sale-cost
:sale-quantity sale-quantity))
(defn sale-complete
"A discrete flow of delivery reports, emitted when the delivered fuel nozzle is put down."
[<delivery <sale-quantity <sale-cost fuels]
(m/sample finalize-delivery <sale-quantity <sale-cost
(m/ap
(loop []
(let [d (m/? (first (m/eduction (remove nil?) <delivery)))]
(m/? (first (m/eduction falling (:<nozzle (nth fuels (:slot d))))))
(m/amb d (recur)))))))
(defn sale-price
"A continuous flow of prices to be shown to the user for a given fuel. The delivery price is this fuel is being
delivered, `nil` if another fuel is being delivered, or the current sale price if no fuel is being delivered."
[s <price <delivery]
(m/cp (if-some [{:keys [slot price]} (m/?< <delivery)]
(when (= s slot) price)
(try (m/?< <price)
(catch Cancelled _)))))
(defn pump!
"The constructor for the pump logic."
[{:keys [>clear-sale >fuel-pulses >keypad <calibration fuels]}]
(let [<preset (m/signal! (preset >clear-sale >keypad))
<delivery (m/signal! (delivery >clear-sale fuels))
<sale-quantity (m/signal! (sale-quantity <delivery <calibration >fuel-pulses))
<sale-cost (m/signal! (delivery-cost <delivery <sale-quantity))
>sale-complete (m/stream! (sale-complete <delivery <sale-quantity <sale-cost fuels))
sale-prices (into [] (map-indexed
(fn [slot {:keys [<price]}]
(m/signal! (sale-price slot <price <delivery)))) fuels)]
{:<preset <preset
:<delivery <delivery
:<sale-quantity <sale-quantity
:<sale-cost <sale-cost
:>sale-complete >sale-complete
:sale-prices sale-prices}))
(comment
(def fuels [:strawberry :lime :kerosene])
(def keypad (m/mbx))
(def clear-sale (m/mbx))
(def fuel-pulses (m/mbx))
(def calibration (atom 1))
(def nozzles (zipmap fuels (repeatedly #(atom false))))
(def prices (zipmap fuels (repeatedly #(atom 1))))
(defn poll
"A discrete flow running given task repeatedly forever."
[task] (m/ap (loop [] (m/amb (m/? task) (recur)))))
(def app
"The pump simulation. Binds user inputs to pump logic and prints resulting events."
(m/reactor
(let [{:keys [<preset <sale-cost <sale-quantity >sale-complete sale-prices]}
(pump! {:>keypad (m/stream! (poll keypad))
:>clear-sale (m/stream! (poll clear-sale))
:>fuel-pulses (m/stream! (poll fuel-pulses))
:<calibration (m/signal! (m/watch calibration))
:fuels (mapv (fn [f]
{:<nozzle (m/signal! (m/watch (nozzles f)))
:<price (m/signal! (m/watch (prices f)))}) fuels)})]
(m/stream!
(m/ap
(m/amb=
(println "preset :" (m/?> <preset))
(println "sale cost :" (m/?> <sale-cost))
(println "sale quantity :" (m/?> <sale-quantity))
(println "sale complete :" (m/?> >sale-complete))
(let [[fuel <sale-price] (m/?> (count fuels) (m/seed (zipmap fuels sale-prices)))]
(println "sale price" (name fuel) ":" (m/?> <sale-price)))))))))
(defn pst [^Throwable e]
(.printStackTrace e))
(def cancel (app prn pst)) ;; start the pump
(swap! (prices :lime) inc) ;; increase lime price
(swap! (prices :strawberry) inc) ;; increase strawberry price
(keypad 8) ;; typing digits on keypad
(keypad 3)
(keypad 5)
(keypad 4) ;; typing has no effect after the third digit
(keypad nil) ;; reset the keypad
(keypad 3) ;; typing on keypad again
(keypad 5)
(reset! (nozzles :lime) true) ;; put lime nozzle up
(fuel-pulses 3) ;; flow some fuel
(fuel-pulses 3)
(swap! calibration * 2) ;; change calibration
(fuel-pulses 3) ;; flow some fuel again
(fuel-pulses 3)
(reset! (nozzles :lime) false) ;; put nozzle down
(clear-sale nil) ;; start a new cycle
(cancel) ;; shut pump down
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment