Skip to content

Instantly share code, notes, and snippets.

@mathias-brandewinder
Created December 4, 2019 01:04
Show Gist options
  • Save mathias-brandewinder/329c1021575858e8247f3dbeebae8c7b to your computer and use it in GitHub Desktop.
Save mathias-brandewinder/329c1021575858e8247f3dbeebae8c7b to your computer and use it in GitHub Desktop.
Santa's Mailbox
// This is the code sample for my #fsAdvent 2019 contribution.
// Check the corresponding blog post for some explanations :)
// https://brandewinder.com/2019/12/04/santas-mailbox/
open System
open System.Threading.Tasks
open System.Diagnostics
open System.Collections.Generic
let factorize x =
let rec factors acc fact rest =
if fact > rest
then acc
else
if rest % fact = 0
then factors (fact :: acc) fact (rest / fact)
else factors acc (fact + 1) rest
factors [ 1 ] 2 x
type Job = {
Batch: int
Number: int
Value: int
}
module Log =
let logger = MailboxProcessor<string>.Start(fun inbox ->
let rec loop () =
async {
let! msg = inbox.Receive ()
printfn "%s" msg
return! loop ()
}
loop ()
)
let log msg =
sprintf "%A|%s" (DateTime.Now) msg
|> logger.Post
module Throughput =
type Config = {
LearningRate: float
ExplorationRate: float
}
let update config (concurrency, elapsed) (throughput: Map<int, float>) =
let measure = float concurrency / elapsed
match throughput |> Map.tryFind concurrency with
| None -> throughput.Add (concurrency, measure)
| Some value ->
let updated =
(1.0 - config.LearningRate) * value
+ config.LearningRate * measure
throughput.Add (concurrency, updated)
let rng = Random 0
let setLevel config (level: int) (throughput: Map<int, float>) =
let explore = rng.NextDouble () < config.ExplorationRate
// with a certain probability, we randomly explore
if explore
then
if rng.NextDouble () < 0.5
then (max 1 (level - 1))
else level + 1
// otherwise we adjust up or down if better
else
let current = throughput |> Map.tryFind level
let lower = throughput |> Map.tryFind (level - 1)
let higher = throughput |> Map.tryFind (level + 1)
match current with
| None -> level
| Some current ->
match lower, higher with
| None, None -> level
| None, Some high ->
if high > current then level + 1 else level
| Some low, None ->
if low > current then level - 1 else level
| Some low, Some high ->
if low > current && low > high then level - 1
elif high > current then level + 1
else level
module Queued =
type Timed<'T> = {
Arrival: DateTime
Item: 'T
}
type Message =
| Batch of Job []
| Completed of Timed<Job>
let processJob (inbox: MailboxProcessor<Message>) (timedJob: Timed<Job>) =
let task = new Task(fun _ ->
let job = timedJob.Item
sprintf "Started batch %i job %i" job.Batch job.Number
|> Log.log
let timer = Stopwatch ()
timer.Start ()
let _ = factorize job.Value
let elapsed = timer.ElapsedMilliseconds / 1000L
let total = (DateTime.Now - timedJob.Arrival).TotalSeconds
sprintf "Completed batch %i job %i in %i secs (total %.0f)" job.Batch job.Number elapsed total
|> Log.log
Completed timedJob |> inbox.Post
)
task.Start()
let mailbox (config: Throughput.Config)=
MailboxProcessor<Message>.Start(fun inbox ->
let mutable inFlight = 0
let queue = Queue<Timed<Job>> ()
let rec loop (throughput, parallelism) =
async {
let! msg = inbox.Receive ()
// update observed throughput
let throughput =
match msg with
| Batch _ -> throughput
| Completed job ->
let elapsed = (DateTime.Now - job.Arrival).TotalSeconds
throughput |> Throughput.update config (inFlight, elapsed)
// handle the work
match msg with
| Batch jobs ->
jobs
|> Array.iter (fun job ->
{ Arrival = DateTime.Now; Item = job }
|> queue.Enqueue
)
| Completed _ ->
inFlight <- inFlight - 1
// adjust level of parallelism
let parallelism =
throughput |> Throughput.setLevel config parallelism
let rec dequeue () =
if (inFlight < parallelism && queue.Count > 0)
then
let job = queue.Dequeue ()
inFlight <- inFlight + 1
processJob inbox job
dequeue ()
dequeue ()
sprintf "Queue: %i, In Flight: %i, Parallelism: %i" queue.Count inFlight parallelism
|> Log.log
return! loop (throughput, parallelism)
}
let throughput = Map.empty<int,float>
loop (throughput, 1)
)
module Producer =
type Config = {
MaxTasks: int
Interval: int
}
let start (config: Config) (consumer: MailboxProcessor<Queued.Message>)=
let rng = Random 0
let rec loop batch =
async {
let batchSize = rng.Next(0, config.MaxTasks + 1)
sprintf "New jobs arriving: batch %i, %i jobs" batch batchSize
|> Log.log
let jobs =
Array.init batchSize
(fun i ->
{
Batch = batch
Number = i
Value = 2130093701
}
)
Queued.Batch jobs
|> consumer.Post
do! Async.Sleep (config.Interval * 1000)
return! loop (batch + 1)
}
loop 0 |> Async.Start
// put it all together
let consumerConfig : Throughput.Config = {
LearningRate = 0.2
ExplorationRate = 0.2
}
let consumer = Queued.mailbox consumerConfig
let producerConfig : Producer.Config = {
Interval = 5
MaxTasks = 10
}
Producer.start producerConfig consumer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment