Created
December 4, 2019 01:04
-
-
Save mathias-brandewinder/329c1021575858e8247f3dbeebae8c7b to your computer and use it in GitHub Desktop.
Santa's Mailbox
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the code sample for my #fsAdvent 2019 contribution. | |
// Check the corresponding blog post for some explanations :) | |
// https://brandewinder.com/2019/12/04/santas-mailbox/ | |
open System | |
open System.Threading.Tasks | |
open System.Diagnostics | |
open System.Collections.Generic | |
let factorize x = | |
let rec factors acc fact rest = | |
if fact > rest | |
then acc | |
else | |
if rest % fact = 0 | |
then factors (fact :: acc) fact (rest / fact) | |
else factors acc (fact + 1) rest | |
factors [ 1 ] 2 x | |
type Job = { | |
Batch: int | |
Number: int | |
Value: int | |
} | |
module Log = | |
let logger = MailboxProcessor<string>.Start(fun inbox -> | |
let rec loop () = | |
async { | |
let! msg = inbox.Receive () | |
printfn "%s" msg | |
return! loop () | |
} | |
loop () | |
) | |
let log msg = | |
sprintf "%A|%s" (DateTime.Now) msg | |
|> logger.Post | |
module Throughput = | |
type Config = { | |
LearningRate: float | |
ExplorationRate: float | |
} | |
let update config (concurrency, elapsed) (throughput: Map<int, float>) = | |
let measure = float concurrency / elapsed | |
match throughput |> Map.tryFind concurrency with | |
| None -> throughput.Add (concurrency, measure) | |
| Some value -> | |
let updated = | |
(1.0 - config.LearningRate) * value | |
+ config.LearningRate * measure | |
throughput.Add (concurrency, updated) | |
let rng = Random 0 | |
let setLevel config (level: int) (throughput: Map<int, float>) = | |
let explore = rng.NextDouble () < config.ExplorationRate | |
// with a certain probability, we randomly explore | |
if explore | |
then | |
if rng.NextDouble () < 0.5 | |
then (max 1 (level - 1)) | |
else level + 1 | |
// otherwise we adjust up or down if better | |
else | |
let current = throughput |> Map.tryFind level | |
let lower = throughput |> Map.tryFind (level - 1) | |
let higher = throughput |> Map.tryFind (level + 1) | |
match current with | |
| None -> level | |
| Some current -> | |
match lower, higher with | |
| None, None -> level | |
| None, Some high -> | |
if high > current then level + 1 else level | |
| Some low, None -> | |
if low > current then level - 1 else level | |
| Some low, Some high -> | |
if low > current && low > high then level - 1 | |
elif high > current then level + 1 | |
else level | |
module Queued = | |
type Timed<'T> = { | |
Arrival: DateTime | |
Item: 'T | |
} | |
type Message = | |
| Batch of Job [] | |
| Completed of Timed<Job> | |
let processJob (inbox: MailboxProcessor<Message>) (timedJob: Timed<Job>) = | |
let task = new Task(fun _ -> | |
let job = timedJob.Item | |
sprintf "Started batch %i job %i" job.Batch job.Number | |
|> Log.log | |
let timer = Stopwatch () | |
timer.Start () | |
let _ = factorize job.Value | |
let elapsed = timer.ElapsedMilliseconds / 1000L | |
let total = (DateTime.Now - timedJob.Arrival).TotalSeconds | |
sprintf "Completed batch %i job %i in %i secs (total %.0f)" job.Batch job.Number elapsed total | |
|> Log.log | |
Completed timedJob |> inbox.Post | |
) | |
task.Start() | |
let mailbox (config: Throughput.Config)= | |
MailboxProcessor<Message>.Start(fun inbox -> | |
let mutable inFlight = 0 | |
let queue = Queue<Timed<Job>> () | |
let rec loop (throughput, parallelism) = | |
async { | |
let! msg = inbox.Receive () | |
// update observed throughput | |
let throughput = | |
match msg with | |
| Batch _ -> throughput | |
| Completed job -> | |
let elapsed = (DateTime.Now - job.Arrival).TotalSeconds | |
throughput |> Throughput.update config (inFlight, elapsed) | |
// handle the work | |
match msg with | |
| Batch jobs -> | |
jobs | |
|> Array.iter (fun job -> | |
{ Arrival = DateTime.Now; Item = job } | |
|> queue.Enqueue | |
) | |
| Completed _ -> | |
inFlight <- inFlight - 1 | |
// adjust level of parallelism | |
let parallelism = | |
throughput |> Throughput.setLevel config parallelism | |
let rec dequeue () = | |
if (inFlight < parallelism && queue.Count > 0) | |
then | |
let job = queue.Dequeue () | |
inFlight <- inFlight + 1 | |
processJob inbox job | |
dequeue () | |
dequeue () | |
sprintf "Queue: %i, In Flight: %i, Parallelism: %i" queue.Count inFlight parallelism | |
|> Log.log | |
return! loop (throughput, parallelism) | |
} | |
let throughput = Map.empty<int,float> | |
loop (throughput, 1) | |
) | |
module Producer = | |
type Config = { | |
MaxTasks: int | |
Interval: int | |
} | |
let start (config: Config) (consumer: MailboxProcessor<Queued.Message>)= | |
let rng = Random 0 | |
let rec loop batch = | |
async { | |
let batchSize = rng.Next(0, config.MaxTasks + 1) | |
sprintf "New jobs arriving: batch %i, %i jobs" batch batchSize | |
|> Log.log | |
let jobs = | |
Array.init batchSize | |
(fun i -> | |
{ | |
Batch = batch | |
Number = i | |
Value = 2130093701 | |
} | |
) | |
Queued.Batch jobs | |
|> consumer.Post | |
do! Async.Sleep (config.Interval * 1000) | |
return! loop (batch + 1) | |
} | |
loop 0 |> Async.Start | |
// put it all together | |
let consumerConfig : Throughput.Config = { | |
LearningRate = 0.2 | |
ExplorationRate = 0.2 | |
} | |
let consumer = Queued.mailbox consumerConfig | |
let producerConfig : Producer.Config = { | |
Interval = 5 | |
MaxTasks = 10 | |
} | |
Producer.start producerConfig consumer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment