Grim's Scythe
Marcus Griep

Writings on software engineering.

Recent posts


Synchronous messaging and lightweight threading


Powerful JSON processing with computation expressions


Using Chiron to serialize types that you can't control


Getting started with Chiron


Getting started — all over again

Hopac: Getting Started with Jobs

Concurrency is a pretty hot topic these days. Processor speeds have generally plateaued over the last decade, and multiple cores are now the default. Writing software to take advantage of these cores requires a different approach than the single-threaded default. Hopac is a unique library that offers lightweight threading along with a host of other valuable concurrency constructs, all of which make it easier to write highly-concurrent software.

Comparing Job<'a>, Async<'a>, and Task<T>

Similar to Async<'a> in F# and Task<T> from the .NET Base Class Libraries, Hopac uses a Job<'a> to define a concurrent thread of execution. Each of these three constructs uses a subtly different method of running jobs1.

The BCL's Task Parallel Library schedules its jobs using threads from the .NET thread pool by default. Longer running tasks can be scheduled on their own threads by using TaskCreationOptions.LongRunning. The default generally makes Task<T>s better suited for CPU-bound operations as blocking IO-operations performed within a job can block that thread from being reused by the thread pool for other ready work.

A significant advance has been made in the C# space with the introduction of async/await. Under the covers, the C# compiler creates a state machine which handles decomposing an async method into a state machine which is better able to release threads back to the thread pool when blocking operations occur. This results in a better utilization of the thread pool threads and does a better job of enabling concurrency than the raw TPL.

F#'s Async<'a> served as the inspiration for C#'s async/await functionality, but uses an async{} computation expression and takes advantage of continuations to power the construction of its concurrency construct. The F# compiler doesn't need any special knowledge of the Async<'a> construct. Instead the computation expression is decomposed into a series of continuations. This is a bit less efficient than the state machines that C# generates and as such that means that Async<'a> is less suited for CPU-bound operations.

For a more comprehensive look at the differences between the TPL, async/await, and Async<'a>, I recommend Tomas Petricek's Asynchronous C# and F# series.

Hopac's Job<'a> has more in common with F#'s Async<'a> than the Task<T> based model of C#. A Task<T> represents a computation in progress, while a Job<'a> or Async<'a> represents a potential computation that can be invoked. Hopac also provides a job{} computation builder similar to async{}, but with several nice additions and a few idiosyncrasies.

The biggest difference with Async<'a> is that Hopac runs its jobs on threads dedicated to Hopac. Hopac pre-allocates one Hopac thread per processor, and these threads are managed directly by Hopac rather than being a part of a general .NET thread pool. The Hopac scheduler takes care of managing jobs, keeping track of which jobs are ready for execution and handling when a thread switches between jobs. Hopac is heavily optimized to minimize the overhead related to its management of jobs, providing better throughput and CPU utilization than Async<'a> and the TPL under workloads with many concurrent tasks.

Dealing with the garbage

When you first start working with Hopac, you are likely to see the following warning when you execute the first job.

WARNING: You are using single-threaded workstation garbage collection, which means that parallel programs cannot scale. Please configure your program to use server garbage collection.

This warning relates to the fact that workstation garbage collection executes on the thread that triggered the collection. If this thread happens to be one of the Hopac threads, the suspended job and any other jobs that may be waiting for that job will be blocked until the collection completes. With server garbage collection, each CPU receives its own heap and has a dedicated thread on which collections are executed. There are some instances where you may choose to ignore this warning, but blocking a Hopac worker thread can have unintended consequences across other Hopac threads due to its synchronous messaging design.

An application can request server garbage collection by adding the gcServer element to its app.config:

1: 
2: 
3: 
4: 
5: 
<configuration>
  <runtime>
    <gcServer enabled="true"/>
  </runtime>
</configuration>

For more information on GC settings, see the MSDN documentation.

Hopac is written to handle a very large number of jobs (e.g., millions) concurrently. Each job is very lightweight, taking only a few bytes of memory for itself. A Job<'a> is a simple .NET object requiring no disposal or finalization (as opposed to the MailboxProcessor<'Msg>, which is disposable). This means that when a job no longer has any references keeping it alive, it can be readily garbage collected and no special kill protocol is required for recursive jobs (servers/actors).

The job{} computation expression

For users experienced with F#'s async{} computation expression, job{} will feel very familiar. The form is exactly the same as async{} with the added benefit that the bind operation is overloaded to accept Job<'a>, Async<'a>, Task<T>, and IObservable<T>. Here's an example of the computation expression in use:

1: 
2: 
3: 
4: 
5: 
let doAThing (getThingAsync : Async<_>) delay = job {
  let! result = getThingAsync
  do! delay |> asJob
  return result
}

The first thing to note is that getThingAsync works just fine as the direct target of the bind operation (let!). In the second bind operation (do!), asJob was used when binding delay. The asJob function is a noop which tells the compiler that delay needs to be a Job<'a> without providing an explicit type or upcast. Without it, the compiler can't infer which of the several available bind overloads should be used. For those that prefer explicit function signatures, the following is equivalent:

1: 
2: 
3: 
4: 
5: 
let doAThing2 (getThingAsync : Async<_>) (delay : Job<unit>) = job {
  let! result = getThingAsync
  do! delay
  return result
}

Jobs can be started by using run, start, or queue and are similar to the Async counterparts: RunSynchronously, StartImmediate, and Start. As is the advice for Async, run should only be used in a root context. If you ever find yourself using run from inside of a job{} or async{}, you are probably doing something wrong. If Hopac detects that you are executing run on a Hopac thread, it will spit out a warning because this can result in deadlocks.

1: 
2: 
3: 
let myResult = async { return 4; }
let delay = timeOutMillis 10
doAThing myResult delay |> run |> printfn "The result was: %i"
The result was: 4

Looking at this snippet, you may have noticed2 that timeOutMillis returns an Alt<'a>. We will go into what that means in more depth in later posts, but in essence, an Alt<'a> is a Job<'a> whose completion can be waited on in combination with other Alt<'a>s, with the result being whichever Alt<'a> becomes ready first.

Memoization

Like Async<'a>, the same Job<'a> can be passed into a function and executed multiple times to get the intended effects. Hopac also offers a Promise<'a> type that can be used to memoize a job when you only want the computation—and its side-effects—to be executed once. Think of a promise as the concurrent alternative to Lazy<T>. Promise<'a> derives from Alt<'a>, so like Alt<'a>, it can be used in the same contexts as any other Job<'a>.

As an example, compare the output of these two snippets which both use the job defined by sideEffect.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
let sideEffect = job {
  printfn "> Side-effect!"
  return 4;
}
printfn "Without memoization"
run sideEffect |> printfn "The result was: %i"
run sideEffect |> printfn "The result was: %i"
Without memoization
> Side-effect!
The result was: 4
> Side-effect!
The result was: 4
1: 
2: 
3: 
4: 
let memoized = memo sideEffect
printfn "With memoization"
run memoized |> printfn "The result was: %i"
run memoized |> printfn "The result was: %i"
With memoization
> Side-effect!
The result was: 4
The result was: 4

As you can see, once memoized, the side-effect occurred only once. After that, the promise is considered fulfilled and all future requests to read from the promise will immediately return the result already computed.

In future posts, I'll introduce the Hopac's messaging channels and the star of the show: alternatives. We'll also delve into synchronous messaging to reproduce an actor using Hopac's concurrency primitives and look at how Hopac can tame the live, push-based data streams and turn them into more manageable pull-based streams. In the meantime, you can learn more about Hopac from the programming docs in the project's GitHub docs folder and the from Hopac API documentation page.

1: 
delay |> start
  1. For the purposes of this post, I will refer to the concurrent threads of execution as jobs.
  2. You can hover over most of the identifiers in the F# snippets to get tooltips with type information and XML documentation.
namespace Hopac
val run : Job<'x> -> 'x

Full name: Hopac.TopLevel.run
type Job<'T> =

Full name: Hopac.Job<_>
val unit : unit -> Job<unit>

Full name: Hopac.Job.unit
val doAThing : getThingAsync:Async<'a> -> delay:Job<unit> -> Job<'a>

Full name: 04-08-hopac-getting-started-with-jobs_.doAThing
val getThingAsync : Async<'a>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
val delay : Job<unit>
val job : JobBuilder

Full name: Hopac.TopLevel.job
val result : 'a
val asJob : Job<'x> -> Job<'x>

Full name: Hopac.TopLevel.asJob
val doAThing2 : getThingAsync:Async<'a> -> delay:Job<unit> -> Job<'a>

Full name: 04-08-hopac-getting-started-with-jobs_.doAThing2
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val myResult : Async<int>

Full name: 04-08-hopac-getting-started-with-jobs_.myResult
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val delay : Alt<unit>

Full name: 04-08-hopac-getting-started-with-jobs_.delay
val timeOutMillis : int -> Alt<unit>

Full name: Hopac.TopLevel.timeOutMillis
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val sideEffect : Job<int>

Full name: 04-08-hopac-getting-started-with-jobs_.sideEffect
val memoized : Promise<int>

Full name: 04-08-hopac-getting-started-with-jobs_.memoized
val memo : Job<'x> -> Promise<'x>

Full name: Hopac.TopLevel.memo
val start : Job<unit> -> unit

Full name: Hopac.TopLevel.start