Last active
July 6, 2023 17:13
-
-
Save leonoel/c5e32b65ec7b6ab4b5b45772082a3d85 to your computer and use it in GitHub Desktop.
functional reactive petroleum
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; missionary solution to petrol pump example by Stephen Blackheath and Anthony Jones, ISBN 978-1633430105 | |
;; huanhulan's demo : [live](https://huanhulan.github.io/petrol_pump), [code](https://github.com/huanhulan/petrol_pump) | |
(ns pump | |
(:refer-clojure :exclude [first]) | |
(:require [missionary.core :as m]) | |
(:import missionary.Cancelled)) | |
(defn rising | |
"A transducer that outputs `nil` when the input switches from logical false to logical true." | |
[rf] | |
(let [state (doto (object-array 1) | |
(aset (int 0) true))] | |
(fn | |
([r] (rf r)) | |
([r curr] | |
(let [prev (aget state (int 0))] | |
(aset state (int 0) curr) | |
(if (and (not prev) curr) | |
(rf r nil) r)))))) | |
(defn falling | |
"A transducer that outputs `nil` when the input switches from logical true to logical false." | |
[rf] | |
(let [state (doto (object-array 1) | |
(aset (int 0) false))] | |
(fn | |
([r] (rf r)) | |
([r curr] | |
(let [prev (aget state (int 0))] | |
(aset state (int 0) curr) | |
(if (and prev (not curr)) | |
(rf r nil) r)))))) | |
(def first | |
"Turns a flow into a task completing with the first value produced by the flow, or `nil` if empty." | |
(partial m/reduce (comp reduced {}) nil)) | |
(defn delivery-request | |
"A task completing with a map of the fuel slot and the current price of the fuel when the fuel nozzle is picked up." | |
[slot {:keys [<price <nozzle]}] | |
(->> <nozzle | |
(m/eduction rising) | |
(m/sample (fn [price _] {:price price :slot slot}) <price) | |
(first))) | |
(defn delivery | |
"A continuous flow of the current delivery request, or `nil` if not currently delivering." | |
[>clear-sale fuels] | |
(m/relieve {} | |
(m/ap | |
(m/amb nil | |
(loop [] | |
(m/amb | |
(m/? (apply m/race (map-indexed delivery-request fuels))) | |
(m/? (first >clear-sale)) | |
(recur))))))) | |
(defn sale-quantity | |
"A continuous flow of the accumulated fuel quantity for the current delivery, or zero if not currently delivering." | |
[<delivery <calibration >fuel-pulses] | |
(m/cp (case (m/?< <delivery) | |
nil 0 (try (m/?< (->> >fuel-pulses | |
(m/sample * <calibration) | |
(m/reductions +) | |
(m/relieve {}))) | |
(catch Cancelled _ 0))))) | |
(def delivery-cost | |
"A continuous flow of the accumulated fuel cost for the current delivery, or zero if not currently delivering." | |
(partial m/latest | |
(fn [delivery quantity] | |
(case delivery nil 0 (* quantity (:price delivery)))))) | |
(defn add-digit | |
"The keypad state machine." | |
([] 0) | |
([r] r) | |
([r d] | |
(case d | |
nil 0 | |
(let [n (-> r (* 10) (+ d))] | |
(if (< 999 n) r n))))) | |
(defn preset | |
"A continuous flow of the current state of the keypad." | |
[>clear-sale >keypad] | |
(m/relieve {} | |
(m/reductions add-digit | |
(m/ap (m/?> (m/amb= >clear-sale >keypad)))))) | |
(defn finalize-delivery | |
"Add cost and quantity to the initial delivery request." | |
[sale-quantity sale-cost delivery] | |
(assoc delivery | |
:sale-cost sale-cost | |
:sale-quantity sale-quantity)) | |
(defn sale-complete | |
"A discrete flow of delivery reports, emitted when the delivered fuel nozzle is put down." | |
[<delivery <sale-quantity <sale-cost fuels] | |
(m/sample finalize-delivery <sale-quantity <sale-cost | |
(m/ap | |
(loop [] | |
(let [d (m/? (first (m/eduction (remove nil?) <delivery)))] | |
(m/? (first (m/eduction falling (:<nozzle (nth fuels (:slot d)))))) | |
(m/amb d (recur))))))) | |
(defn sale-price | |
"A continuous flow of prices to be shown to the user for a given fuel. The delivery price is this fuel is being | |
delivered, `nil` if another fuel is being delivered, or the current sale price if no fuel is being delivered." | |
[s <price <delivery] | |
(m/cp (if-some [{:keys [slot price]} (m/?< <delivery)] | |
(when (= s slot) price) | |
(try (m/?< <price) | |
(catch Cancelled _))))) | |
(defn pump! | |
"The constructor for the pump logic." | |
[{:keys [>clear-sale >fuel-pulses >keypad <calibration fuels]}] | |
(let [<preset (m/signal! (preset >clear-sale >keypad)) | |
<delivery (m/signal! (delivery >clear-sale fuels)) | |
<sale-quantity (m/signal! (sale-quantity <delivery <calibration >fuel-pulses)) | |
<sale-cost (m/signal! (delivery-cost <delivery <sale-quantity)) | |
>sale-complete (m/stream! (sale-complete <delivery <sale-quantity <sale-cost fuels)) | |
sale-prices (into [] (map-indexed | |
(fn [slot {:keys [<price]}] | |
(m/signal! (sale-price slot <price <delivery)))) fuels)] | |
{:<preset <preset | |
:<delivery <delivery | |
:<sale-quantity <sale-quantity | |
:<sale-cost <sale-cost | |
:>sale-complete >sale-complete | |
:sale-prices sale-prices})) | |
(comment | |
(def fuels [:strawberry :lime :kerosene]) | |
(def keypad (m/mbx)) | |
(def clear-sale (m/mbx)) | |
(def fuel-pulses (m/mbx)) | |
(def calibration (atom 1)) | |
(def nozzles (zipmap fuels (repeatedly #(atom false)))) | |
(def prices (zipmap fuels (repeatedly #(atom 1)))) | |
(defn poll | |
"A discrete flow running given task repeatedly forever." | |
[task] (m/ap (loop [] (m/amb (m/? task) (recur))))) | |
(def app | |
"The pump simulation. Binds user inputs to pump logic and prints resulting events." | |
(m/reactor | |
(let [{:keys [<preset <sale-cost <sale-quantity >sale-complete sale-prices]} | |
(pump! {:>keypad (m/stream! (poll keypad)) | |
:>clear-sale (m/stream! (poll clear-sale)) | |
:>fuel-pulses (m/stream! (poll fuel-pulses)) | |
:<calibration (m/signal! (m/watch calibration)) | |
:fuels (mapv (fn [f] | |
{:<nozzle (m/signal! (m/watch (nozzles f))) | |
:<price (m/signal! (m/watch (prices f)))}) fuels)})] | |
(m/stream! | |
(m/ap | |
(m/amb= | |
(println "preset :" (m/?> <preset)) | |
(println "sale cost :" (m/?> <sale-cost)) | |
(println "sale quantity :" (m/?> <sale-quantity)) | |
(println "sale complete :" (m/?> >sale-complete)) | |
(let [[fuel <sale-price] (m/?> (count fuels) (m/seed (zipmap fuels sale-prices)))] | |
(println "sale price" (name fuel) ":" (m/?> <sale-price))))))))) | |
(defn pst [^Throwable e] | |
(.printStackTrace e)) | |
(def cancel (app prn pst)) ;; start the pump | |
(swap! (prices :lime) inc) ;; increase lime price | |
(swap! (prices :strawberry) inc) ;; increase strawberry price | |
(keypad 8) ;; typing digits on keypad | |
(keypad 3) | |
(keypad 5) | |
(keypad 4) ;; typing has no effect after the third digit | |
(keypad nil) ;; reset the keypad | |
(keypad 3) ;; typing on keypad again | |
(keypad 5) | |
(reset! (nozzles :lime) true) ;; put lime nozzle up | |
(fuel-pulses 3) ;; flow some fuel | |
(fuel-pulses 3) | |
(swap! calibration * 2) ;; change calibration | |
(fuel-pulses 3) ;; flow some fuel again | |
(fuel-pulses 3) | |
(reset! (nozzles :lime) false) ;; put nozzle down | |
(clear-sale nil) ;; start a new cycle | |
(cancel) ;; shut pump down | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment