How to compose task values with helper functions
Use the helper modules when you want to build a pipeline without writing every step as a computation expression. This is useful when you already have functions that return task-like values and want to combine them with normal pipeline operators.
This page uses CancellableTask because it has the richest helper set. The same naming pattern is available on the other helper modules where the operation exists.
Start with functions that return CancellableTask
Each operation is cold and cancellable: it does not start until a caller supplies a CancellationToken.
type CustomerId = CustomerId of int
type Customer = { Id: CustomerId; Name: string }
type Order = {
Id: int
CustomerId: CustomerId
Total: decimal
}
type CustomerSummary = {
Customer: Customer
Orders: Order array
Total: decimal
}
let loadCustomer (customerId: CustomerId) : CancellableTask<Customer> =
cancellableTask {
let! cancellationToken = CancellableTask.getCancellationToken ()
do! Task.Delay(1, cancellationToken)
let (CustomerId id) = customerId
return {
Id = customerId
Name = $"Customer {id}"
}
}
let loadOrders (customerId: CustomerId) : CancellableTask<Order array> =
cancellableTask {
let! cancellationToken = CancellableTask.getCancellationToken ()
do! Task.Delay(1, cancellationToken)
return [|
{
Id = 1
CustomerId = customerId
Total = 12.50M
}
{
Id = 2
CustomerId = customerId
Total = 7.25M
}
|]
}
let summarize customer orders = {
Customer = customer
Orders = orders
Total =
orders
|> Array.sumBy _.Total
}
Transform one result with map
map means "transform the successful result without starting another async operation".
Use it when your next step is a plain function.
let loadCustomerName customerId =
customerId
|> loadCustomer
|> CancellableTask.map _.Name
Continue with another async operation using bind
bind means "wait for this result, then choose the next task-like operation".
Use it when the next step also returns a CancellableTask.
let loadCustomerSummary customerId =
customerId
|> loadCustomer
|> CancellableTask.bind (fun customer ->
customer.Id
|> loadOrders
|> CancellableTask.map (summarize customer)
)
Apply a task-wrapped function with apply
apply means "combine a task-like function with a task-like value".
In pipeline terms, it lets you keep both halves in the CancellableTask world.
let loadCustomerSummaryWithApply customerId =
let summaryBuilder =
customerId
|> loadCustomer
|> CancellableTask.map summarize
customerId
|> loadOrders
|> CancellableTask.apply summaryBuilder
Combine two operations with zip or parallelZip
zip starts the left operation, waits for it, then starts the right operation.
parallelZip starts both operations with the same cancellation token before awaiting either result.
Use zip when the order or side effects matter. Use parallelZip when the operations are independent.
let loadCustomerAndOrdersSerially customerId =
CancellableTask.zip (loadCustomer customerId) (loadOrders customerId)
let loadCustomerAndOrdersConcurrently customerId =
CancellableTask.parallelZip (loadCustomer customerId) (loadOrders customerId)
Run a collection of operations
whenAll starts every operation and waits for all results.
whenAllThrottled does the same with a maximum degree of parallelism.
sequential runs one operation at a time.
let loadSummaries customerIds =
customerIds
|> Seq.map loadCustomerSummary
|> CancellableTask.whenAllThrottled 4
let loadSummariesSequentially customerIds =
customerIds
|> Seq.map loadCustomerSummary
|> CancellableTask.sequential
Convert between generic and unit-returning tasks
ofUnit turns a non-generic CancellableTask into CancellableTask<unit>.
toUnit discards the result of CancellableTask<'T> and returns a non-generic CancellableTask.
let writeAuditEvent (summary: CustomerSummary) : CancellableTask = fun _ -> Task.CompletedTask
let saveSummary (summary: CustomerSummary) : CancellableTask<int> =
cancellableTask {
do! writeAuditEvent summary
return summary.Orders.Length
}
let saveSummaryWithoutResult summary =
summary
|> saveSummary
|> CancellableTask.toUnit
let auditAsGenericTask summary =
summary
|> writeAuditEvent
|> CancellableTask.ofUnit
Execute the pipeline
Call the composed CancellableTask with a token at the boundary of your application.
let sampleIds = [
CustomerId 1
CustomerId 2
CustomerId 3
]
let sampleSummaries =
loadSummaries sampleIds CancellationToken.None
|> Async.AwaitTask
|> Async.RunSynchronously
Helper availability by module
Helper module |
|
|
|
|
|
|
|
|
|
|
|---|---|---|---|---|---|---|---|---|---|---|
|
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
|
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
No |
No |
No |
Yes |
|
Yes |
Yes |
Yes |
Yes |
Yes |
No |
No |
No |
No |
Yes |
|
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
No |
No |
No |
Yes |
The .NET 6+ pooling builders expose the same ValueTask and CancellableValueTask helper names for their task shapes.
union case CustomerId.CustomerId: int -> CustomerId
--------------------
type CustomerId = | CustomerId of int
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val string: value: 'T -> string
--------------------
type string = System.String
val decimal: value: 'T -> decimal (requires member op_Explicit)
--------------------
type decimal = System.Decimal
--------------------
type decimal<'Measure> = decimal
module CancellableTask from IcedTasks.CancellableTasks.CancellableTasks
<summary> Contains functional helper functions for composing and converting CancellableTask values. </summary>
--------------------
type CancellableTask = CancellationToken -> Task
<summary> CancellationToken -> Task </summary>
--------------------
type CancellableTask<'T> = CancellationToken -> Task<'T>
<summary> CancellationToken -> Task<'T> </summary>
<summary> Builds a cancellableTask using computation expression syntax. </summary>
<summary>Gets the default cancellation token for executing computations.</summary>
<returns>The default CancellationToken.</returns>
<category index="3">Cancellation and Exceptions</category>
<example id="default-cancellation-token-1"><code lang="F#"> use tokenSource = new CancellationTokenSource() let primes = [ 2; 3; 5; 7; 11 ] for i in primes do let computation = cancellableTask { let! cancellationToken = CancellableTask.getCancellationToken() do! Task.Delay(i * 1000, cancellationToken) printfn $"{i}" } computation tokenSource.Token |> ignore Thread.Sleep(6000) tokenSource.Cancel() printfn "Tasks Finished" </code> This will print "2" 2 seconds from start, "3" 3 seconds from start, "5" 5 seconds from start, cease computation and then followed by "Tasks Finished". </example>
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>
--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>
--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
--------------------
Task(``function`` : System.Func<'TResult>) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task.Delay(millisecondsDelay: int) : Task
Task.Delay(delay: System.TimeSpan, timeProvider: System.TimeProvider) : Task
Task.Delay(delay: System.TimeSpan, cancellationToken: CancellationToken) : Task
Task.Delay(millisecondsDelay: int, cancellationToken: CancellationToken) : Task
Task.Delay(delay: System.TimeSpan, timeProvider: System.TimeProvider, cancellationToken: CancellationToken) : Task
<summary>Allows chaining of CancellableTasks.</summary>
<param name="mapper">The continuation.</param>
<param name="cTask">The value.</param>
<returns>The result of the mapper wrapped in a CancellableTasks.</returns>
<summary>Allows chaining of CancellableTasks.</summary>
<param name="binder">The continuation.</param>
<param name="cTask">The value.</param>
<returns>The result of the binder.</returns>
<summary>Allows chaining of CancellableTasks.</summary>
<param name="applicable">A function wrapped in a CancellableTasks</param>
<param name="cTask">The value.</param>
<returns>The result of the applicable.</returns>
<summary>Takes two CancellableTasks, starts them serially in order of left to right, and returns a tuple of the pair.</summary>
<param name="left">The left value.</param>
<param name="right">The right value.</param>
<returns>A tuple of the parameters passed in</returns>
<summary>Takes two CancellableTasks, starts them concurrently with the same cancellation token, and returns a tuple of the pair.</summary>
<param name="left">The left value.</param>
<param name="right">The right value.</param>
<returns>A tuple of the parameters passed in.</returns>
<example id="cancellable-task-parallel-zip-1"><code lang="F#"> let both = CancellableTask.parallelZip firstOperation secondOperation let! left, right = both cancellationToken </code></example>
<summary>Creates a task that runs the supplied CancellableTasks with a maximum degree of parallelism and completes when all have completed.</summary>
<param name="tasks">The tasks to wait on for completion</param>
<param name="maxDegreeOfParallelism">The maximum number of tasks to run concurrently.</param>
<returns>A CancellableTask that represents the completion of all of the supplied tasks.</returns>
<exception cref="T:System.ArgumentNullException">The <paramref name="tasks" /> argument was <see langword="null" />.</exception>
<exception cref="T:System.ArgumentException">The <paramref name="tasks" /> collection contained a <see langword="null" /> task.</exception>
<example id="cancellable-task-when-all-throttled-1"><code lang="F#"> let loadAll ids = ids |> Seq.map loadOne |> CancellableTask.whenAllThrottled 4 </code></example>
<summary>Creates a <see cref="T:IcedTasks.CancellableTasks.CancellableTask`1" /> that will complete when all of the <see cref="T:IcedTasks.CancellableTasks.CancellableTask`1" />s in an enumerable collection have completed sequentially.</summary>
<param name="tasks">The tasks to wait on for completion</param>
<returns>A CancellableTask that represents the completion of all of the supplied tasks.</returns>
<summary>Gets a task that has already completed successfully.</summary>
<returns>The successfully completed task.</returns>
<summary>Gets the total number of elements in all the dimensions of the <see cref="T:System.Array" />.</summary>
<exception cref="T:System.OverflowException">The array is multidimensional and contains more than <see cref="F:System.Int32.MaxValue">Int32.MaxValue</see> elements.</exception>
<returns>The total number of elements in all the dimensions of the <see cref="T:System.Array" />; zero if there are no elements in the array.</returns>
<summary>Converts a CancellableTask\<_> to a non-generic CancellableTask by discarding the result.</summary>
<param name="ctask">The CancellableTask to convert.</param>
<returns>A non-generic CancellableTask that completes when <paramref name="ctask" /> completes.</returns>
<summary>Converts a non-generic CancellableTask to a CancellableTask\<unit\>.</summary>
<param name="unitCancellableTask">The CancellableTask to convert.</param>
<returns>A CancellableTask\<unit\> that completes when <paramref name="unitCancellableTask" /> completes.</returns>
[<Struct>] type CancellationToken = new: canceled: bool -> unit member Equals: other: obj -> bool + 1 overload member GetHashCode: unit -> int member Register: callback: Action -> CancellationTokenRegistration + 4 overloads member ThrowIfCancellationRequested: unit -> unit member UnsafeRegister: callback: Action<obj,CancellationToken> * state: obj -> CancellationTokenRegistration + 1 overload static member (<>) : left: CancellationToken * right: CancellationToken -> bool static member (=) : left: CancellationToken * right: CancellationToken -> bool member CanBeCanceled: bool member IsCancellationRequested: bool ...
<summary>Propagates notification that operations should be canceled.</summary>
--------------------
CancellationToken ()
CancellationToken(canceled: bool) : CancellationToken
<summary>Returns an empty <see cref="T:System.Threading.CancellationToken" /> value.</summary>
<returns>An empty cancellation token.</returns>
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * obj -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
--------------------
type Async<'T>
static member Async.AwaitTask: task: Task<'T> -> Async<'T>
IcedTasks