Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
Add support for executing inner tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 14, 2018
1 parent 32af385 commit ade3636
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project ToolsVersion="15.0">
<PropertyGroup>
<VersionPrefix>0.0.3</VersionPrefix>
<VersionPrefix>0.0.4</VersionPrefix>
<Authors>@jet @bartelink and contributors</Authors>
<Company>Jet.com</Company>
<Description>Apply systemwide resilience strategies consistently across subsystems, standing on Polly's shoulders</Description>
Expand All @@ -9,6 +9,6 @@
<RepositoryType>git</RepositoryType>
<PackageTags>polly bulkhead circuitbreaker timeout fsharp</PackageTags>
<PackageLicenseUrl>https://github.com/jet/CallPolly/blob/master/LICENSE</PackageLicenseUrl>
<Copyright>Copyright © 2018</Copyright>
<Copyright>Copyright © 2018</Copyright>
</PropertyGroup>
</Project>
28 changes: 23 additions & 5 deletions src/CallPolly/Rules.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type PolicyConfig = { isolate: bool; cutoff: CutoffConfig option; limit: Bulkhea

type GovernorState = { circuitState : string option; bulkheadAvailable : int option; queueAvailable : int option }

[<NoComparison; NoEquality>]
[<RequireQualifiedAccess>]
type TaskOrAsync<'T> =
| Task of (CancellationToken -> Task<'T>)
| Async of (Async<'T>)

/// Translates a PolicyConfig's rules to a Polly IAsyncPolicy instance that gets held in the ActionPolicy
type Governor(log: Serilog.ILogger, buildFailurePolicy: unit -> Polly.PolicyBuilder, serviceName: string, callName: string, policyName: string, config: PolicyConfig) =
let logBreach sla interval = log |> Events.Log.cutoffSlaBreached (serviceName, callName, policyName) sla interval
Expand Down Expand Up @@ -115,9 +121,14 @@ type Governor(log: Serilog.ILogger, buildFailurePolicy: unit -> Polly.PolicyBuil
Some () // Compiler gets too clever if we never return Some

/// Execute and/or log failures regarding invocation of a function with the relevant policy applied
member __.Execute(inner : Async<'a>) : Async<'a> =
member __.Execute(inner: TaskOrAsync<'a>) : Async<'a> =
match asyncPolicy with
| None -> inner
| None ->
match inner with
| TaskOrAsync.Async a -> a
| TaskOrAsync.Task factory -> async {
let! ct = Async.CancellationToken
return! factory ct |> Async.AwaitTaskCorrect }
| Some polly -> async {
let mutable wasFull = false
// NB This logging is on a best-effort basis - obviously the guard here has an implied race condition
Expand Down Expand Up @@ -154,7 +165,10 @@ type Governor(log: Serilog.ILogger, buildFailurePolicy: unit -> Polly.PolicyBuil

// sic - cancellation of the inner computation needs to be controlled by Polly's chain of handlers
// for example, if a cutoff is configured, it's Polly that will be requesting the cancellation
Async.StartAsTask(inner, cancellationToken=pollyCt)
match inner with
| TaskOrAsync.Async a -> Async.StartAsTask(a, cancellationToken=pollyCt)
| TaskOrAsync.Task factory -> factory pollyCt

let execute = async {
let! ct = Async.CancellationToken // Grab async cancellation token of this `Execute` call, so cancellation gets propagated into the Polly [wrap]
try return! polly.ExecuteAsync(startInnerTask, ct) |> Async.AwaitTaskCorrect
Expand Down Expand Up @@ -209,9 +223,13 @@ type CallPolicy<'TConfig when 'TConfig: equality> (makeGoverner : CallConfig<'TC
member __.Policy = cfg.policy
member __.Config = cfg.config

/// Execute the call, apply the policy rules
/// Execute an inner Async computation, applying the policy rules
member __.Execute(inner : Async<'t>) =
governor.Execute inner
governor.Execute (TaskOrAsync.Async inner)

/// Execute an inner Task-Method, applying the policy rules
member __.ExecuteTask<'T>(inner : CancellationToken -> Task<'T>) =
governor.Execute (TaskOrAsync.Task inner)

/// Facilitates dumping for diagnostics
member __.InternalState =
Expand Down

0 comments on commit ade3636

Please sign in to comment.