namespace Hopac
module Infixes from Hopac
val always : x:'a -> _arg1:'b -> 'a Full name: Index.always
val x : 'a
val run : Job<'x> -> 'x Full name: Hopac.TopLevel.run
type Job<'T> = Full name: Hopac.Job<_>
val unit : unit -> Job<unit> Full name: Hopac.Job.unit
val delayValueAsync : prior:Async<'a> -> Async<'a> Full name: Index.delayValueAsync
val prior : Async<'a>
val async : AsyncBuilder Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items type Async static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool> static member AwaitTask : task:Task -> Async<unit> static member AwaitTask : task:Task<'T> -> Async<'T> static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool> static member CancelDefaultToken : unit -> unit static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>> static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> static member Ignore : computation:Async<'T> -> Async<unit> static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable> static member Parallel : computations:seq<Async<'T>> -> Async<'T []> static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T static member Sleep : millisecondsDueTime:int -> Async<unit> static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T> static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>> static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>> static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit> static member SwitchToNewThread : unit -> Async<unit> static member SwitchToThreadPool : unit -> Async<unit> static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T> static member CancellationToken : Async<CancellationToken> static member DefaultCancellationToken : CancellationToken Full name: Microsoft.FSharp.Control.Async -------------------- type Async<'T> Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val printAsync : x:int -> Async<unit> Full name: Index.printAsync
val x : int
val printfn : format:Printf.TextWriterFormat<'T> -> 'T Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
static member Async.StartImmediate : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
member AsyncBuilder.Return : value:'T -> Async<'T>
val delayValueJob : prior:Job<'a> -> Job<'a> Full name: Index.delayValueJob
val prior : Job<'a>
val job : JobBuilder Full name: Hopac.TopLevel.job
val asJob : Job<'x> -> Job<'x> Full name: Hopac.TopLevel.asJob
val printJob : x:int -> Job<unit> Full name: Index.printJob
val queue : Job<unit> -> unit Full name: Hopac.TopLevel.queue
val timeOutMillis : int -> Alt<unit> Full name: Hopac.TopLevel.timeOutMillis
val start : Job<unit> -> unit Full name: Hopac.TopLevel.start
val result : 'x -> Job<'x> Full name: Hopac.Job.result
type CellMsg = | SetValue of string | GetValue of IVar<string> Full name: Index.CellMsg
union case CellMsg.SetValue: string -> CellMsg
Multiple items val string : value:'T -> string Full name: Microsoft.FSharp.Core.Operators.string -------------------- type string = System.String Full name: Microsoft.FSharp.Core.string
union case CellMsg.GetValue: IVar<string> -> CellMsg
Multiple items type IVar<'T> = inherit Promise<'T> new : unit -> IVar<'T> + 2 overloads nested type Fill nested type FillFailure nested type TryFill Full name: Hopac.IVar<_> -------------------- IVar() : unit IVar(t: 'T) : unit IVar(e: exn) : unit
val spawnCellActor : initialValue:string -> Job<Ch<CellMsg>> Full name: Index.spawnCellActor
val initialValue : string
Multiple items type Ch<'T> = inherit Alt<'T> new : unit -> Ch<'T> Full name: Hopac.Ch<_> -------------------- Ch() : unit
val commCh : Ch<CellMsg>
val iterateServer : 'x -> ('x -> #Job<'x>) -> Job<unit> Full name: Hopac.Job.iterateServer
val x : string
val msg : CellMsg
val take : Ch<'x> -> Alt<'x> Full name: Hopac.Ch.take
val v : string
val reply : IVar<string>
val fill : IVar<'x> -> 'x -> Job<unit> Full name: Hopac.IVar.fill
val send : mbx:Ch<'a> -> msg:'a -> Job<unit> Full name: Index.send
val mbx : Ch<'a>
val msg : 'a
val send : Ch<'x> -> 'x -> Job<unit> Full name: Hopac.Ch.send
val sendAndReceive : mbx:Ch<'a> -> toMsg:(IVar<'b> -> 'a) -> Job<'b> Full name: Index.sendAndReceive
val toMsg : (IVar<'b> -> 'a)
val reply : IVar<'b>
val read : IVar<'x> -> Alt<'x> Full name: Hopac.IVar.read
val cell : Ch<CellMsg>
val n1 : string
val n2 : string
type Msg = | NetworkMessage of string | Timeout Full name: Index.Msg
union case Msg.NetworkMessage: string -> Msg
union case Msg.Timeout: Msg
val networkComm : Alt<string> Full name: Index.networkComm
type Alt<'T> = inherit Job<'T> Full name: Hopac.Alt<_>
val choose : seq<#Alt<'x>> -> Alt<'x> Full name: Hopac.Alt.choose
val afterFun : ('x -> 'y) -> Alt<'x> -> Alt<'y> Full name: Hopac.Alt.afterFun
val alwaysWithPrepare : x:int -> Alt<int> Full name: Index.alwaysWithPrepare
val prepareFun : (unit -> #Alt<'x>) -> Alt<'x> Full name: Hopac.Alt.prepareFun
val always : 'x -> Alt<'x> Full name: Hopac.Alt.always
val resultPerf : Alt<int> Full name: Index.resultPerf
val map : ('x -> 'y) -> Job<'x> -> Job<'y> Full name: Hopac.Job.map
type Cell<'a> = {getCh: Ch<'a>; putCh: Ch<'a>;} Full name: Index.Cell<_>
Cell.getCh: Ch<'a>
Cell.putCh: Ch<'a>
val cell : x:'a -> Job<Cell<'a>> Full name: Index.cell
val c : Cell<'a>
val give : Ch<'x> -> 'x -> Alt<unit> Full name: Hopac.Ch.give
val put : c:Cell<'a> -> v:'a -> Alt<unit> Full name: Index.put
val v : 'a
val get : c:Cell<'a> -> Alt<'a> Full name: Index.get
val myCell : Cell<string>
type CellWithReply<'a> = {getCh: Ch<'a>; putCh: Ch<'a * IVar<'a>>;} Full name: Index.CellWithReply<_>
CellWithReply.getCh: Ch<'a>
CellWithReply.putCh: Ch<'a * IVar<'a>>
val handlePut : oldVal:'a -> newVal:'b * reply:IVar<'a> -> Job<'b> Full name: Index.handlePut
val oldVal : 'a
val newVal : 'b
val reply : IVar<'a>
val cell : x:'a -> Job<CellWithReply<'a>> Full name: Index.cell
val c : CellWithReply<'a>
val afterJob : ('x -> #Job<'y>) -> Alt<'x> -> Alt<'y> Full name: Hopac.Alt.afterJob
val get : c:CellWithReply<'a> -> Alt<'a> Full name: Index.get
val put : c:CellWithReply<'a> -> v:'a -> Alt<'a> Full name: Index.put
val putSyncOnReply : c:CellWithReply<'a> -> v:'a -> Alt<'a> Full name: Index.putSyncOnReply
val prepareJob : (unit -> #Job<'c>) -> Alt<'x> (requires 'c :> Alt<'x>) Full name: Hopac.Alt.prepareJob
val myCell : CellWithReply<string>
val value : string
type CellWithNack<'a> = {getCh: Ch<'a>; putCh: Ch<'a * Promise<unit> * Ch<'a>>;} Full name: Index.CellWithNack<_>
CellWithNack.getCh: Ch<'a>
CellWithNack.putCh: Ch<'a * Promise<unit> * Ch<'a>>
Multiple items type Promise<'T> = inherit Alt<'T> new : tJ:Job<'T> -> Promise<'T> + 2 overloads member Full : bool Full name: Hopac.Promise<_> -------------------- Promise(tJ: Job<'T>) : unit Promise(value: 'T) : unit Promise(e: exn) : unit
type unit = Unit Full name: Microsoft.FSharp.Core.unit
val handlePut : oldVal:'a -> newVal:'a * nack:Promise<'b> * replyCh:Ch<'a> -> Alt<'a> Full name: Index.handlePut
val newVal : 'a
val nack : Promise<'b>
val replyCh : Ch<'a>
val read : Promise<'x> -> Alt<'x> Full name: Hopac.Promise.read
val cell : x:'a -> Job<CellWithNack<'a>> Full name: Index.cell
val c : CellWithNack<'a>
val get : c:CellWithNack<'a> -> Alt<'a> Full name: Index.get
val putWithNack : c:CellWithNack<'a> -> v:'a -> Alt<'a> Full name: Index.putWithNack
val withNackJob : (Promise<unit> -> #Job<'c>) -> Alt<'x> (requires 'c :> Alt<'x>) Full name: Hopac.Alt.withNackJob
val nack : Promise<unit>
val myCell : CellWithNack<string>
val handlePut : oldVal:'a -> newVal:'a * nack:Alt<'b> * replyCh:Ch<'a> -> Alt<'a> Full name: Index.handlePut
val nack : Alt<'b>
val failableServer : ch:Ch<int> -> Job<'a> Full name: Index.failableServer
val ch : Ch<int>
val forever : Job<unit> -> Job<'b> Full name: Hopac.Job.forever
val value : int
val failwith : message:string -> 'T Full name: Microsoft.FSharp.Core.Operators.failwith
val withSupervisor : supervise:('a -> 'b) -> job:'a -> 'b Full name: Index.withSupervisor
val supervise : ('a -> 'b)
val job : 'a
val restartOnException : onRestart:Job<unit> -> server:Job<unit> -> Job<unit> Full name: Index.restartOnException
val onRestart : Job<unit>
val server : Job<unit>
Multiple items val exn : exn -------------------- type exn = System.Exception Full name: Microsoft.FSharp.Core.exn
System.Exception.GetType() : System.Type
property System.Exception.Message: string
val evilCh : Ch<int>
val supervisor : (Job<unit> -> Job<unit>)
val start : Job<unit> -> Job<unit> Full name: Hopac.Job.start
Hopac
Powering Concurrency with Synchronous Messaging
Marcus Griep
0:45
Introduce self
Lead software engineer with Cimpress
Lead a squad of engineers
The Reactive Mainfesto
Responsive
Resilient
Elastic
Message-Driven
2:00
Model for systems architecture
Millisecond response times
100% uptime/fault-tolerance
Highly-scalable systems
Evolve over time
Message-Driven
Reactive Systems rely on asynchronous message-passing to establish a boundary between
components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to
delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control
by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location
transparent messaging as a means of communication makes it possible for the management of failure to work with the same
constructs and semantics across a cluster or within a single host. Non-blocking communication
allows recipients to only consume resources while active , leading to less system overhead.
3:00
Cut out cruft and focus
Async & non-blocking
What reasons does the reactive manifesto give for async and non-blocking
Asynchronous message-passing: establishes component boundaries
Non-blocking: threads don't consume resources while idle
Loopholes!
Synchronous messaging inside a component!
Asynchronous message-passing
Establishes component boundaries
Non-blocking communication
Threads don't consume resources while idle
Note: A "thread" refers to a thread of execution,not an operating system thread.
The Reactive Manifesto
Still not a law!
Asynchronous message-passing
Postal System
Unbounded Mailboxes
Processes send
messages
At-most-once delivery
Better guarantees, at a cost
Implemented by actor systems:
5:00
Characterized by:
Recipients have unbounded mailboxes (or bounded boxes that overflow as dead letters)
Once sent, sender doesn't have view of the message status
Getting better guarantees requires tracking of some sort, either by the recipient or system
Existing frameworks: Akka/.Net, Orleans
Synchronous message-passing
Rendezvous Point
Channels
Processes give
and take
messages
Exactly-once delivery
Examples include:
Concurrent ML / Go / Racket
Hopac / Clojure's core.async
6:00
No buffer
Communication occurs over a channel and requires both parties to a communication to be there
Offers to give or take can be withdrawn if another communication alternative is available
Don't want to wait for the counterparty to receive your message? That's ok. Spawn a process to wait for you instead.
I will refer to it as synchronous rendezvous
Why synchronous rendezvous?
Sender and recipient share common knowledge of transmission
Failure mode is typically deadlock
Semantics of local and distributed interactions are different
10:00
Async has low overhead, but no sync info to sender
I don't like deadlock!!
Alternative: Error detection delayed until buffer exhausted (or memory)
With local comm, client can know if server accepted request
Because async is a weaker mechanism, it reduces local case to distributed case
Some think this is an argument in favor of async
Should provide abstraction for interacting with server
Semantics of that abstraction are different from distributed case
Dirty little secret: Akka/.net "optimizes" local communication
Within a process, Akka.net optimizes to directly delivering messages, avoiding the postal system, but that's
like going to the postbox and dropping off a letter for your next-door neighbor. The postman optimizes the process,
bringing the letter directly to your neighbor's mailbox. Meanwhile, your neighbor has been standing next to his
mailbox eagerly awaiting a new message. As soon as the postman drops off the letter, your neighbor springs into
action to process the message. If you are counting on a reply, you've included your own self-addressed stamped
envelope, and your neighbor repeats the process to send you the reply while you stand next to your mailbox.
Local communication with asynchronous messaging
What if you need a reply?
Local communication with synchronous rendezvous
What if you need a reply?
F# library (not a framework)
Inspired by Concurrent ML
Uses synchronous rendezvous
Provides composable primitives
Cares about scalabilty
1:
2:
3:
WARNING: You are using single-threaded workstation garbage
collection, which means that parallel programs cannot scale.
Please configure your program to use server garbage collection.
12:00
Now let's move on and talk about Hopac.
Comparison to F# Async
F# Async
Run on threadpool threads
Default threadpool scheduler
Hopac
Lightweight threads: Job<'x>
Run on dedicated Hopac threads
Cooperative scheduler
One Hopac thread per logical processor
Using async{}
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
let delayValueAsync prior = async {
let! x = prior
do! Async . Sleep 1
return x
}
let printAsync x = async {
printfn " %i " x
}
Async . Start <| Async . Sleep 1
Async . StartImmediate <| printAsync 23
Async . RunSynchronously <| delayValueAsync (async . Return 4 )
Using job{}
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
let delayValueJob prior = job {
let! x = prior |> asJob
do! Async . Sleep 1
return x
}
let printJob x = job {
printfn " %i " x
}
queue <| timeOutMillis 1
start <| printJob 23
run <| delayValueJob (Job . result 4 )
13:00
Note use of `asJob`
Provides bind for `async`, `job`, and `Task`
Actor look-a-like
1:
2:
3:
type CellMsg =
| SetValue of string
| GetValue of IVar < string >
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
let spawnCellActor initialValue : Job < Ch < CellMsg > > = job {
let commCh = Ch ()
do! Job . iterateServer initialValue <| fun x -> job {
let! msg = Ch . take commCh
match msg with
| SetValue v -> return v
| GetValue reply ->
do! IVar . fill reply x
return x
}
return commCh
}
Actor look-a-like
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
// MailboxProcessor.Post
let send mbx msg = job {
return! Ch . send mbx msg
}
// MailboxProcessor.PostAndAsyncReply
let sendAndReceive mbx toMsg = job {
let reply = IVar ()
do! Ch . send mbx (toMsg reply )
return! IVar . read reply
}
Actor look-a-like
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
run <| job {
let! cell = spawnCellActor "Henry"
let! n1 = sendAndReceive cell GetValue
do! send cell (SetValue "Todd" )
let! n2 = sendAndReceive cell GetValue
printfn "Name at 1: %s " n1
printfn "Name at 2: %s " n2
}
Output:
Name at 1: Henry
Name at 2: Todd
Alternatives
Composable communication primitive: Alt<'x>
Alt
is where the magic happens
Quantum superposition of waits
1:
2:
3:
Alt . choose
[ networkComm |> Alt . afterFun NetworkMessage
timeOutMillis 250 |> Alt . afterFun (always Timeout ) ]
15:00
Synchronous rendezvous demands a way to select between communication alternatives
Alternatives provide a way to wait for different possible events
Produce more `Alt`s when composed
Alternatives give ability to select program flow based on concurrent state
Will be trying to avoid some of the infixes in this presentation, but they can increase expresiveness
Alternatives
Selection in three stages
InstantiatedAlt.prepareJob
/ guard
Available
Committed or not
Alt.afterJob
/ wrap
16:00
Conceptually, an alternative operation is *instantiated* by a consumer and then may
become available. When an alternative becomes available, a consumer can *commit* to
the result of that communication.
An alternative also knows when it has not been selected, important for negative acknowledgements
Alternatives in Hopac
Emphasizes performance over fairness
Lazy alternative instantiation
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
let alwaysWithPrepare x =
Alt . prepareFun <| fun () ->
printfn "Instantiated %i " x
Alt . always x
let resultPerf =
Alt . choose
[ alwaysWithPrepare 0
alwaysWithPrepare 1 ]
run (resultPerf |> Job . map (printfn "Selected case # %i " ))
Output:
Instantiated 0
Selected case #0
The second case never gets instantiated.
Concurrent ML eagerly instantiates all alternatives and uses a fair choice
Hopac emphasizes performance over fairness, and uses lazy alternative instantiation
So if we run this 3 times
Servers
Like actors, but more powerful!
Servers
Reference Cell
1:
2:
3:
type Cell < ' a > =
{ getCh : Ch < ' a >
putCh : Ch < ' a > }
Ch<'a>
: two-way communication: give
/take
Servers
The server implementation
1:
2:
3:
4:
5:
6:
7:
8:
let cell x = job {
let c = {getCh = Ch (); putCh = Ch ()}
do! Job . iterateServer x <| fun x ->
Alt . choose
[ Ch . take c . putCh
Ch . give c . getCh x |> Alt . afterFun (always x ) ]
return c
}
Abstract the protocol
1:
2:
let put c v = Ch . give c . putCh v
let get c = Ch . take c . getCh
Servers
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
run <| job {
let! myCell = cell "Henry"
let! n1 = get myCell
do! put myCell "Todd"
let! n2 = get myCell
printfn "Name at 1: %s " n1
printfn "Name at 2: %s " n2
}
Output:
Name at 1: Henry
Name at 2: Todd
Handling Replies
Talking back to your clients
Handling Replies
Create a cell that can handle replies
1:
2:
3:
type CellWithReply < ' a > =
{ getCh : Ch < ' a >
putCh : Ch < ' a * IVar < ' a > > }
IVar<'a>
: write-once channel: fill
/read
Handling Replies
Update the server
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
let handlePut oldVal (newVal , reply ) =
IVar . fill reply oldVal |> Job . map (always newVal )
let cell x = job {
let c = {getCh = Ch (); putCh = Ch ()}
do! Job . iterateServer x <| fun x ->
Alt . choose
[ Ch . take c . putCh |> Alt . afterJob (handlePut x )
Ch . give c . getCh x |> Alt . afterFun (always x ) ]
return c
}
Added an IVar to the put channel, allows us to send a reply back to the client
Create this handlePut function that will fill the reply with the old value
and then return the new state.
IVar is optimized for this use case.
Handling Replies
Update the protocol abstraction
1:
2:
3:
4:
5:
6:
let get c = Ch . take c . getCh
let put c v =
Alt . prepareFun <| fun () ->
let reply = IVar ()
Ch . give c . putCh (v ,reply )
|> Alt . afterJob (fun () -> IVar . read reply )
Commit on give
Use prepareFun because now we need to create the IVar that we give to the
the server. This is similar to the F# AsyncReplyChannel<'a>
Synchronization point is when we give the new value to the reference cell
But we want to synchronize on the server's reply instead
In other words, we want the server to synchronize committing to the change
when the client commits to receiving the reply.
Rework the synchronization point
1:
2:
3:
4:
5:
let putSyncOnReply c v =
Alt . prepareJob <| fun () ->
let reply = IVar ()
Ch . send c . putCh (v ,reply )
|> Job . map (fun () -> IVar . read reply )
Commit on read
So we switch to prepareJob, and asynchronously send the message to the server,
then return an alternative that synchronizes on the reply.
So that's good. We can now have a choice
Handling Replies
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
run <| job {
let! myCell = cell "Henry"
do!
Alt . choose
[ putSyncOnReply myCell "Todd"
|> Alt . afterFun (printfn "Old value: %s " )
timeOutMillis 0
|> Alt . afterFun (fun () -> printfn "Timed out…" ) ]
let! value = get myCell
printfn "Current value: %s " value
}
Output
Timed out…
Current value: Todd
This is not the semantic we want
What happens if the server has been busy and we hit the timeout? We've already sent the
message. The server has still committed to the change when it receives the message. This
is similar to the way that a message is effectively committed to once sent in the actor model.
But we can do better. We can inform the server in the event we don't commit to action.
Negative Acknowledgements
Keeping your options open
Negative Acknowledgements
Create a new cell type
1:
2:
3:
type CellWithNack < ' a > =
{ getCh : Ch < ' a >
putCh : Ch < ' a * Promise < unit > * Ch < ' a > > }
Promise<'a>
: offer to provide a value at a future time: start
/read
Negative Acknowledgements
Update the server so that it commits when the client accepts the reply or sends a nack
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
let handlePut oldVal (newVal , nack , replyCh ) =
Alt . choose
[ Ch . give replyCh oldVal |> Alt . afterFun (always newVal )
Promise . read nack |> Alt . afterFun (always oldVal ) ]
let cell x = job {
let c = {getCh = Ch (); putCh = Ch ()}
do! Job . iterateServer x <| fun x ->
Alt . choose
[ Ch . take c . putCh |> Alt . afterJob (handlePut x )
Ch . give c . getCh x |> Alt . afterFun (always x ) ]
return c
}
Negative Acknowledgements
Update the client abstraction
1:
2:
3:
4:
5:
6:
7:
let get c = Ch . take c . getCh
let putWithNack c v =
Alt . withNackJob <| fun nack -> job {
let replyCh = Ch ()
do! Ch . send c . putCh (v ,nack ,replyCh )
return Ch . take replyCh
}
Negative Acknowledgements
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
run <| job {
let! myCell = cell "Henry"
do!
Alt . choose
[ putWithNack myCell "Todd"
|> Alt . afterFun (printfn "Old value: %s " )
timeOutMillis 0
|> Alt . afterFun (fun () -> printfn "Timed out…" ) ]
let! value = get myCell
printfn "Current value: %s " value
}
Output:
Timed out…
Current value: Henry
This has the semantic we want
Negative Acknowledgements
Allow the client and server to collaborate
Enable the client to explore multiple options
Very complicated to implement with asynchronous messaging
Infix Operators
Warning: Crazy custom operators inbound
Infix Operators
*<-
Ch.give
*<+
Ch.send
*<=
IVar.fill
*<=!
IVar.fillFailure
*<<=
MVar.fill
*<<+
Mailbox.send
*<+->=
Nack Job
*<+->-
Nack Fun
*<-=>=
IVar Job^
*<-=>-
IVar Fun^
*<+=>=
IVar Job+
*<+=>-
IVar Fun+
^=>
Alt.afterJob
^->
Alt.afterFun
^=>.
Alt.afterJobi
^->.
Alt.afterFuni
^->!
Alt.raises
<|>
Alt.choose
<~>
Alt.chooser
<|>*
Alt.choose*
<&>
Pair sequential
<*>
Pair parallel
<+>
Pair alts
>>=
Job.bind
>>=*
Job.bind*
>>-
Job.map
>>-*
Job.map*
>>=.
Job.bindi
>>=*.
Job.bindi,*
>>-.
Job.mapi
>>-*.
Job.mapi,*
>=>
Compose
>=>*
Compose*
>->
Composs/Map
>->*
Compose/Map*
Infix Operators
Server
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
let handlePut oldVal (newVal , nack , replyCh ) =
replyCh *<- oldVal ^->. newVal
<|> nack ^->. oldVal
let cell x = job {
let c = {getCh = Ch (); putCh = Ch ()}
do! Job . iterateServer x <| fun x ->
c . putCh ^=> handlePut x
<|> c . getCh *<- x ^->. x
return c
}
Client
1:
2:
3:
let get c = Ch . take c . getCh
let putWithNack c v =
c . putCh *<+->- fun replyCh nack -> (v , nack , replyCh )
Infix Operators
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
9:
run <| job {
let! myCell = cell "Henry"
do!
putWithNack myCell "Todd" ^-> printfn "Old value: %s "
<|> timeOutMillis 0 ^->. printfn "Timed out…"
let! value = get myCell
printfn "Current value: %s " value
}
Output:
Timed out…
Current value: Henry
Supervision
But actor systems come with supervisors!
Simple supervision
Server that can fail
1:
2:
3:
4:
5:
6:
7:
8:
9:
let failableServer ch =
Job . forever <| job {
printfn "Ready for next message"
let! value = Ch . take ch
if value < 0 then
failwith "Negative values make me sad"
else
printfn "Received %i " value
}
This server will throw an exception if it receives negative values
Simple supervision
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
let withSupervisor supervise job =
supervise job
let rec restartOnException onRestart server = job {
try
do! server |> asJob
with exn ->
printfn "Child had exception; restarting: %s : %s "
(exn . GetType (). Name )
exn . Message
do! onRestart |> asJob
return! restartOnException onRestart server
}
Simple supervision
Test it out
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
run <| job {
let evilCh = Ch ()
let supervisor =
restartOnException <| timeOutMillis 10
let server =
failableServer evilCh
|> withSupervisor supervisor
do! Job . start server
do! Ch . give evilCh 10
do! Ch . give evilCh - 10
do! Ch . give evilCh 20
}
Output:
Ready for next message
Received 10
Ready for next message
Child had exception; restarting: Exception: Negative values make me sad
Ready for next message
Received 20
Ready for next message
Distribution
No story here
Hopac is not designed for this use case
Look elsewhere for your distribution story
Pre-emption
Hopac's scheduler is cooperative
It will not pre-empt threads like the OS scheduler
You can do uncooperative things that starve threads
The Reactive Manifesto
Synchronous rendezvous is not at odds with the Rective Manifesto
The Reactive Manifesto focuses on systems architecture problems
Inside a component, synchronous rendezvous is fine
Hopac's Power
Lightweight threads
Garbage collection friendly
Minimal overhead to server–client interactions
Context switches between threads are highly optimized
Composable primitives
Ch<'a>
is an Alt<'a>
IVar<'a>
is a Promise<'a>
is an Alt<'a>
MVar<'a>
is an Alt<'a>
Mailbox<'a>
is an Alt<'a>
Alt<'a>
s compose to Alt<'a>
Hopac Streams
Non-deterministic stream of values (choice stream)
Similar to Rx observable sequences
Handle same operations as ordinary lazy streamsProvides Stream.foldBack
and Stream.groupByFun
ConsistentEvery consumer gets the exact same sequence of values
No need for replay
Pull-basedPuts the consumer of the stream in control