namespace Hopac
module Infixes

from Hopac
val always : x:'a -> _arg1:'b -> 'a

Full name: Index.always
val x : 'a
val run : Job<'x> -> 'x

Full name: Hopac.TopLevel.run
type Job<'T> =

Full name: Hopac.Job<_>
val unit : unit -> Job<unit>

Full name: Hopac.Job.unit
val delayValueAsync : prior:Async<'a> -> Async<'a>

Full name: Index.delayValueAsync
val prior : Async<'a>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val printAsync : x:int -> Async<unit>

Full name: Index.printAsync
val x : int
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
static member Async.StartImmediate : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
member AsyncBuilder.Return : value:'T -> Async<'T>
val delayValueJob : prior:Job<'a> -> Job<'a>

Full name: Index.delayValueJob
val prior : Job<'a>
val job : JobBuilder

Full name: Hopac.TopLevel.job
val asJob : Job<'x> -> Job<'x>

Full name: Hopac.TopLevel.asJob
val printJob : x:int -> Job<unit>

Full name: Index.printJob
val queue : Job<unit> -> unit

Full name: Hopac.TopLevel.queue
val timeOutMillis : int -> Alt<unit>

Full name: Hopac.TopLevel.timeOutMillis
val start : Job<unit> -> unit

Full name: Hopac.TopLevel.start
val result : 'x -> Job<'x>

Full name: Hopac.Job.result
type CellMsg =
  | SetValue of string
  | GetValue of IVar<string>

Full name: Index.CellMsg
union case CellMsg.SetValue: string -> CellMsg
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
union case CellMsg.GetValue: IVar<string> -> CellMsg
Multiple items
type IVar<'T> =
  inherit Promise<'T>
  new : unit -> IVar<'T> + 2 overloads
  nested type Fill
  nested type FillFailure
  nested type TryFill

Full name: Hopac.IVar<_>

--------------------
IVar() : unit
IVar(t: 'T) : unit
IVar(e: exn) : unit
val spawnCellActor : initialValue:string -> Job<Ch<CellMsg>>

Full name: Index.spawnCellActor
val initialValue : string
Multiple items
type Ch<'T> =
  inherit Alt<'T>
  new : unit -> Ch<'T>

Full name: Hopac.Ch<_>

--------------------
Ch() : unit
val commCh : Ch<CellMsg>
val iterateServer : 'x -> ('x -> #Job<'x>) -> Job<unit>

Full name: Hopac.Job.iterateServer
val x : string
val msg : CellMsg
val take : Ch<'x> -> Alt<'x>

Full name: Hopac.Ch.take
val v : string
val reply : IVar<string>
val fill : IVar<'x> -> 'x -> Job<unit>

Full name: Hopac.IVar.fill
val send : mbx:Ch<'a> -> msg:'a -> Job<unit>

Full name: Index.send
val mbx : Ch<'a>
val msg : 'a
val send : Ch<'x> -> 'x -> Job<unit>

Full name: Hopac.Ch.send
val sendAndReceive : mbx:Ch<'a> -> toMsg:(IVar<'b> -> 'a) -> Job<'b>

Full name: Index.sendAndReceive
val toMsg : (IVar<'b> -> 'a)
val reply : IVar<'b>
val read : IVar<'x> -> Alt<'x>

Full name: Hopac.IVar.read
val cell : Ch<CellMsg>
val n1 : string
val n2 : string
type Msg =
  | NetworkMessage of string
  | Timeout

Full name: Index.Msg
union case Msg.NetworkMessage: string -> Msg
union case Msg.Timeout: Msg
val networkComm : Alt<string>

Full name: Index.networkComm
type Alt<'T> =
  inherit Job<'T>

Full name: Hopac.Alt<_>
val choose : seq<#Alt<'x>> -> Alt<'x>

Full name: Hopac.Alt.choose
val afterFun : ('x -> 'y) -> Alt<'x> -> Alt<'y>

Full name: Hopac.Alt.afterFun
val alwaysWithPrepare : x:int -> Alt<int>

Full name: Index.alwaysWithPrepare
val prepareFun : (unit -> #Alt<'x>) -> Alt<'x>

Full name: Hopac.Alt.prepareFun
val always : 'x -> Alt<'x>

Full name: Hopac.Alt.always
val resultPerf : Alt<int>

Full name: Index.resultPerf
val map : ('x -> 'y) -> Job<'x> -> Job<'y>

Full name: Hopac.Job.map
type Cell<'a> =
  {getCh: Ch<'a>;
   putCh: Ch<'a>;}

Full name: Index.Cell<_>
Cell.getCh: Ch<'a>
Cell.putCh: Ch<'a>
val cell : x:'a -> Job<Cell<'a>>

Full name: Index.cell
val c : Cell<'a>
val give : Ch<'x> -> 'x -> Alt<unit>

Full name: Hopac.Ch.give
val put : c:Cell<'a> -> v:'a -> Alt<unit>

Full name: Index.put
val v : 'a
val get : c:Cell<'a> -> Alt<'a>

Full name: Index.get
val myCell : Cell<string>
type CellWithReply<'a> =
  {getCh: Ch<'a>;
   putCh: Ch<'a * IVar<'a>>;}

Full name: Index.CellWithReply<_>
CellWithReply.getCh: Ch<'a>
CellWithReply.putCh: Ch<'a * IVar<'a>>
val handlePut : oldVal:'a -> newVal:'b * reply:IVar<'a> -> Job<'b>

Full name: Index.handlePut
val oldVal : 'a
val newVal : 'b
val reply : IVar<'a>
val cell : x:'a -> Job<CellWithReply<'a>>

Full name: Index.cell
val c : CellWithReply<'a>
val afterJob : ('x -> #Job<'y>) -> Alt<'x> -> Alt<'y>

Full name: Hopac.Alt.afterJob
val get : c:CellWithReply<'a> -> Alt<'a>

Full name: Index.get
val put : c:CellWithReply<'a> -> v:'a -> Alt<'a>

Full name: Index.put
val putSyncOnReply : c:CellWithReply<'a> -> v:'a -> Alt<'a>

Full name: Index.putSyncOnReply
val prepareJob : (unit -> #Job<'c>) -> Alt<'x> (requires 'c :> Alt<'x>)

Full name: Hopac.Alt.prepareJob
val myCell : CellWithReply<string>
val value : string
type CellWithNack<'a> =
  {getCh: Ch<'a>;
   putCh: Ch<'a * Promise<unit> * Ch<'a>>;}

Full name: Index.CellWithNack<_>
CellWithNack.getCh: Ch<'a>
CellWithNack.putCh: Ch<'a * Promise<unit> * Ch<'a>>
Multiple items
type Promise<'T> =
  inherit Alt<'T>
  new : tJ:Job<'T> -> Promise<'T> + 2 overloads
  member Full : bool

Full name: Hopac.Promise<_>

--------------------
Promise(tJ: Job<'T>) : unit
Promise(value: 'T) : unit
Promise(e: exn) : unit
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val handlePut : oldVal:'a -> newVal:'a * nack:Promise<'b> * replyCh:Ch<'a> -> Alt<'a>

Full name: Index.handlePut
val newVal : 'a
val nack : Promise<'b>
val replyCh : Ch<'a>
val read : Promise<'x> -> Alt<'x>

Full name: Hopac.Promise.read
val cell : x:'a -> Job<CellWithNack<'a>>

Full name: Index.cell
val c : CellWithNack<'a>
val get : c:CellWithNack<'a> -> Alt<'a>

Full name: Index.get
val putWithNack : c:CellWithNack<'a> -> v:'a -> Alt<'a>

Full name: Index.putWithNack
val withNackJob : (Promise<unit> -> #Job<'c>) -> Alt<'x> (requires 'c :> Alt<'x>)

Full name: Hopac.Alt.withNackJob
val nack : Promise<unit>
val myCell : CellWithNack<string>
val handlePut : oldVal:'a -> newVal:'a * nack:Alt<'b> * replyCh:Ch<'a> -> Alt<'a>

Full name: Index.handlePut
val nack : Alt<'b>
val failableServer : ch:Ch<int> -> Job<'a>

Full name: Index.failableServer
val ch : Ch<int>
val forever : Job<unit> -> Job<'b>

Full name: Hopac.Job.forever
val value : int
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val withSupervisor : supervise:('a -> 'b) -> job:'a -> 'b

Full name: Index.withSupervisor
val supervise : ('a -> 'b)
val job : 'a
val restartOnException : onRestart:Job<unit> -> server:Job<unit> -> Job<unit>

Full name: Index.restartOnException
val onRestart : Job<unit>
val server : Job<unit>
Multiple items
val exn : exn

--------------------
type exn = System.Exception

Full name: Microsoft.FSharp.Core.exn
System.Exception.GetType() : System.Type
property System.Exception.Message: string
val evilCh : Ch<int>
val supervisor : (Job<unit> -> Job<unit>)
val start : Job<unit> -> Job<unit>

Full name: Hopac.Job.start

Hopac

Powering Concurrency with Synchronous Messaging



Marcus Griep

@neoeinstein

Slides online:
bit.ly/HopacLambdaConf2016

The Reactive Mainfesto

  • Responsive
  • Resilient
  • Elastic
  • Message-Driven

Message-Driven

Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location transparent messaging as a means of communication makes it possible for the management of failure to work with the same constructs and semantics across a cluster or within a single host. Non-blocking communication allows recipients to only consume resources while active, leading to less system overhead.

Loopholes!


Synchronous messaging inside a component!


Asynchronous message-passing
Establishes component boundaries

Non-blocking communication
Threads don't consume resources while idle
Note: A "thread" refers to a thread of execution,
not an operating system thread.

The Reactive Manifesto

Still not a law!

Asynchronous message-passing

Postal System

Mailbox
  • Unbounded Mailboxes
  • Processes send messages
  • At-most-once delivery
  • Better guarantees, at a cost
  • Implemented by actor systems:
    • Akka.net
    • Orleans

Synchronous message-passing

Rendezvous Point

Meeting Point
  • Channels
  • Processes give and take messages
  • Exactly-once delivery
  • Examples include:
    • Concurrent ML / Go / Racket
    • Hopac / Clojure's core.async

Why synchronous rendezvous?

  • Sender and recipient share common knowledge of transmission
  • Failure mode is typically deadlock
  • Semantics of local and distributed interactions are different

Local communication with asynchronous messaging

  • Drop off letter in postbox
  • Mail carrier recognizes the letter is local
    and goes to the address
  • Mail carrier hand-delivers it
    to your neighbor's mailbox
    while your neighbor watches
  • Your neighbor pulls the letter out of the mailbox

Local communication with synchronous rendezvous

  • You go to rendezvous location
    and hand the letter to your neighbor

Hopac

  • F# library (not a framework)
  • Inspired by Concurrent ML
  • Uses synchronous rendezvous
  • Provides composable primitives
  • Cares about scalabilty
1: 
2: 
3: 
WARNING: You are using single-threaded workstation garbage
collection, which means that parallel programs cannot scale.
Please configure your program to use server garbage collection.

Comparison to F# Async

F# Async
  • Run on threadpool threads
  • Default threadpool scheduler

Hopac
  • Lightweight threads: Job<'x>
  • Run on dedicated Hopac threads
  • Cooperative scheduler

Using async{}

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
let delayValueAsync prior = async {
  let! x = prior
  do! Async.Sleep 1
  return x
}
let printAsync x = async {
  printfn "%i" x
}

Async.Start <| Async.Sleep 1
Async.StartImmediate <| printAsync 23 
Async.RunSynchronously <| delayValueAsync (async.Return 4)

Using job{}

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
let delayValueJob prior = job {
  let! x = prior |> asJob
  do! Async.Sleep 1
  return x
}
let printJob x = job {
  printfn "%i" x
}

queue <| timeOutMillis 1
start <| printJob 23 
run <| delayValueJob (Job.result 4)

Actor look-a-like

1: 
2: 
3: 
type CellMsg =
  | SetValue of string
  | GetValue of IVar<string>
 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
let spawnCellActor initialValue : Job<Ch<CellMsg>> = job {
  let commCh = Ch ()
  do! Job.iterateServer initialValue <| fun x -> job {
    let! msg = Ch.take commCh
    match msg with
    | SetValue v -> return v
    | GetValue reply ->
      do! IVar.fill reply x
      return x
  }
  return commCh
}

Actor look-a-like

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
// MailboxProcessor.Post
let send mbx msg = job {
  return! Ch.send mbx msg
}
// MailboxProcessor.PostAndAsyncReply
let sendAndReceive mbx toMsg = job {
  let reply = IVar ()
  do! Ch.send mbx (toMsg reply)
  return! IVar.read reply
}

Actor look-a-like

Test it out

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
run <| job {
  let! cell = spawnCellActor "Henry"
  let! n1 = sendAndReceive cell GetValue
  do! send cell (SetValue "Todd")
  let! n2 = sendAndReceive cell GetValue
  printfn "Name at 1: %s" n1
  printfn "Name at 2: %s" n2
}
Output:
Name at 1: Henry
Name at 2: Todd

Alternatives

  • Composable communication primitive: Alt<'x>
  • Alt is where the magic happens
  • Quantum superposition of waits
  • 1: 
    2: 
    3: 
    
    Alt.choose
      [ networkComm |> Alt.afterFun NetworkMessage
        timeOutMillis 250 |> Alt.afterFun (always Timeout) ]
    
Superposition of waits

Alternatives

Selection in three stages

  • Instantiated
    Alt.prepareJob / guard
  • Available
  • Committed
    or not
    Alt.afterJob / wrap

Alternatives in Hopac

  • Emphasizes performance over fairness
  • Lazy alternative instantiation
 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let alwaysWithPrepare x =
  Alt.prepareFun <| fun () ->
    printfn "Instantiated %i" x
    Alt.always x

let resultPerf =
  Alt.choose
    [ alwaysWithPrepare 0
      alwaysWithPrepare 1 ]

run (resultPerf |> Job.map (printfn "Selected case #%i"))
Output:
Instantiated 0
Selected case #0

The second case never gets instantiated.

Servers

Like actors, but more powerful!

Servers

Reference Cell
1: 
2: 
3: 
type Cell<'a> =
  { getCh: Ch<'a>
    putCh: Ch<'a> }
Ch<'a>: two-way communication: give/take

Servers

The server implementation

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let cell x = job {
  let c = {getCh = Ch (); putCh = Ch ()}
  do! Job.iterateServer x <| fun x ->
        Alt.choose
          [ Ch.take c.putCh
            Ch.give c.getCh x |> Alt.afterFun (always x) ]
  return c
}
Abstract the protocol
1: 
2: 
let put c v = Ch.give c.putCh v
let get c = Ch.take c.getCh

Servers

Test it out

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
run <| job {
  let! myCell = cell "Henry"
  let! n1 = get myCell
  do! put myCell "Todd"
  let! n2 = get myCell
  printfn "Name at 1: %s" n1
  printfn "Name at 2: %s" n2
}
Output:
Name at 1: Henry
Name at 2: Todd

Handling Replies

Talking back to your clients

Handling Replies

Create a cell that can handle replies

1: 
2: 
3: 
type CellWithReply<'a> =
  { getCh: Ch<'a>
    putCh: Ch<'a * IVar<'a>> }
IVar<'a>: write-once channel: fill/read

Handling Replies

Update the server

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let handlePut oldVal (newVal, reply) =
    IVar.fill reply oldVal |> Job.map (always newVal)

let cell x = job {
  let c = {getCh = Ch (); putCh = Ch ()}
  do! Job.iterateServer x <| fun x ->
        Alt.choose
          [ Ch.take c.putCh   |> Alt.afterJob (handlePut x)
            Ch.give c.getCh x |> Alt.afterFun (always x) ]
  return c
}

Handling Replies

Update the protocol abstraction

1: 
2: 
3: 
4: 
5: 
6: 
let get c = Ch.take c.getCh
let put c v =
  Alt.prepareFun <| fun () ->
    let reply = IVar ()
    Ch.give c.putCh (v,reply)
    |> Alt.afterJob (fun () -> IVar.read reply)
Commit on give
Rework the synchronization point
1: 
2: 
3: 
4: 
5: 
let putSyncOnReply c v =
  Alt.prepareJob <| fun () ->
    let reply = IVar ()
    Ch.send c.putCh (v,reply)
    |> Job.map (fun () -> IVar.read reply)
Commit on read

Handling Replies

Test it out

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
run <| job {
  let! myCell = cell "Henry"
  do!
    Alt.choose
      [ putSyncOnReply myCell "Todd"
          |> Alt.afterFun (printfn "Old value: %s")
        timeOutMillis 0
          |> Alt.afterFun (fun () -> printfn "Timed out…") ]

  let! value = get myCell
  printfn "Current value: %s" value
}
Output
Timed out…
Current value: Todd
This is not the semantic we want

Negative Acknowledgements

Keeping your options open

Negative Acknowledgements

Create a new cell type

1: 
2: 
3: 
type CellWithNack<'a> =
  { getCh: Ch<'a>
    putCh: Ch<'a * Promise<unit> * Ch<'a>> }
Promise<'a>: offer to provide a value at a future time: start/read

Negative Acknowledgements

Update the server so that it commits when
the client accepts the reply or sends a nack

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
let handlePut oldVal (newVal, nack, replyCh) =
  Alt.choose
    [ Ch.give replyCh oldVal |> Alt.afterFun (always newVal)
      Promise.read nack      |> Alt.afterFun (always oldVal) ]

let cell x = job {
  let c = {getCh = Ch (); putCh = Ch ()}
  do! Job.iterateServer x <| fun x ->
        Alt.choose
          [ Ch.take c.putCh   |> Alt.afterJob (handlePut x)
            Ch.give c.getCh x |> Alt.afterFun (always x) ]
  return c
}

Negative Acknowledgements

Update the client abstraction

1: 
2: 
3: 
4: 
5: 
6: 
7: 
let get c = Ch.take c.getCh
let putWithNack c v =
  Alt.withNackJob <| fun nack -> job {
    let replyCh = Ch ()
    do! Ch.send c.putCh (v,nack,replyCh)
    return Ch.take replyCh
  }

Negative Acknowledgements

Test it out

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
run <| job {
  let! myCell = cell "Henry"
  do!
    Alt.choose
      [ putWithNack myCell "Todd"
          |> Alt.afterFun (printfn "Old value: %s")
        timeOutMillis 0
          |> Alt.afterFun (fun () -> printfn "Timed out…") ]

  let! value = get myCell
  printfn "Current value: %s" value
}
Output:
Timed out…
Current value: Henry
This has the semantic we want

Negative Acknowledgements

Allow the client and server to collaborate
Enable the client to explore multiple options
Very complicated to implement with asynchronous messaging

Infix Operators

Warning: Crazy custom operators inbound

Infix Operators

Messaging
*<-
Ch.give
*<+
Ch.send
*<=
IVar.fill
*<=!
IVar.fillFailure
*<<=
MVar.fill
*<<+
Mailbox.send
Patterns
*<+->=
Nack Job
*<+->-
Nack Fun
*<-=>=
IVar Job^
*<-=>-
IVar Fun^
*<+=>=
IVar Job+
*<+=>-
IVar Fun+
Actions
^=>
Alt.afterJob
^->
Alt.afterFun
^=>.
Alt.afterJobi
^->.
Alt.afterFuni
^->!
Alt.raises
Choice
<|>
Alt.choose
<~>
Alt.chooser
<|>*
Alt.choose*
Combine
<&>
Pair sequential
<*>
Pair parallel
<+>
Pair alts
Sequencing
>>=
Job.bind
>>=*
Job.bind*
>>-
Job.map
>>-*
Job.map*
>>=.
Job.bindi
>>=*.
Job.bindi,*
>>-.
Job.mapi
>>-*.
Job.mapi,*
>=>
Compose
>=>*
Compose*
>->
Composs/Map
>->*
Compose/Map*
i: Ignore result of previous op
*: Memoize as Promise
^: Sync on giving message
+: Sync on reading reply

Infix Operators

Server
 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let handlePut oldVal (newVal, nack, replyCh) =
      replyCh *<- oldVal ^->. newVal
  <|> nack               ^->. oldVal

let cell x = job {
  let c = {getCh = Ch (); putCh = Ch ()}
  do! Job.iterateServer x <| fun x ->
        c.putCh       ^=> handlePut x
    <|> c.getCh *<- x ^->. x
  return c
}
Client
1: 
2: 
3: 
let get c = Ch.take c.getCh
let putWithNack c v =
    c.putCh *<+->- fun replyCh nack -> (v, nack, replyCh)

Infix Operators

Test it out

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
run <| job {
  let! myCell = cell "Henry"
  do!
        putWithNack myCell "Todd" ^-> printfn "Old value: %s"
    <|> timeOutMillis 0 ^->. printfn "Timed out…"

  let! value = get myCell
  printfn "Current value: %s" value
}
Output:
Timed out…
Current value: Henry

Supervision

But actor systems come with supervisors!

Simple supervision

Server that can fail
1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
let failableServer ch =
  Job.forever <| job {
    printfn "Ready for next message"
    let! value = Ch.take ch
    if value < 0 then
      failwith "Negative values make me sad"
    else
      printfn "Received %i" value
  }
This server will throw an exception if it receives negative values

Simple supervision

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
let withSupervisor supervise job =
  supervise job

let rec restartOnException onRestart server = job {
  try
    do! server |> asJob
  with exn ->
    printfn "Child had exception; restarting: %s: %s"
      (exn.GetType().Name)
      exn.Message
    do! onRestart |> asJob
    return! restartOnException onRestart server
}

Simple supervision

Test it out

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
run <| job {
  let evilCh = Ch ()
  let supervisor =
    restartOnException <| timeOutMillis 10
  let server =
    failableServer evilCh
    |> withSupervisor supervisor
  do! Job.start server
  do! Ch.give evilCh 10
  do! Ch.give evilCh -10
  do! Ch.give evilCh 20
}
Output:
Ready for next message
Received 10
Ready for next message
Child had exception; restarting: Exception: Negative values make me sad
Ready for next message
Received 20
Ready for next message

Distribution

No story here
Hopac is not designed for this use case
Look elsewhere for your distribution story

Pre-emption

Hopac's scheduler is cooperative
It will not pre-empt threads like the OS scheduler
You can do uncooperative things that starve threads

The Reactive Manifesto

Synchronous rendezvous is not at odds with the Rective Manifesto
The Reactive Manifesto focuses on systems architecture problems
Inside a component, synchronous rendezvous is fine

Hopac's Power

  • Lightweight threads
  • Garbage collection friendly
  • Minimal overhead to server–client interactions
  • Context switches between threads are highly optimized
  • Composable primitives
  • Ch<'a> is an Alt<'a>
  • IVar<'a> is a Promise<'a>
    is an Alt<'a>
  • MVar<'a> is an Alt<'a>
  • Mailbox<'a> is an Alt<'a>
Alt<'a>s compose to Alt<'a>

Hopac Streams

  • Non-deterministic stream of values
    (choice stream)
  • Similar to Rx observable sequences
  • Handle same operations as ordinary lazy streams
    Provides Stream.foldBack and Stream.groupByFun
  • Consistent
    Every consumer gets the exact same sequence of values
    No need for replay
  • Pull-based
    Puts the consumer of the stream in control

Resources

Marcus Griep
@neoeinstein
neoeinstein.github.io
neoeinstein@gmail.com