Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 22, 2018
1 parent 612a083 commit 23078ec
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tests/CallPolly.Acceptance/Scenarios.fs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ type Scenarios(output : Xunit.Abstractions.ITestOutputHelper) =
test <@ between 0.3 2.5 (let t = time.Elapsed in t.TotalSeconds) @>
}

let [<Fact(Skip="Fails atm")>] ``LimitBy - BulkheadMulti functionality`` () = async {
let [<Fact>] ``LimitBy - BulkheadMulti functionality`` () = async {
let policy = Parser.parse(policy).CreatePolicy log
let sut = Sut(log, policy)
let act i =
Expand Down
79 changes: 79 additions & 0 deletions tests/CallPolly.Tests/RulesTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,85 @@ type Limit(output : Xunit.Abstractions.ITestOutputHelper) =
&& between 2 3 (Array.length delayed) // Typically, 3 should get delayed, but depending on scheduling, only 2 get logged as such, and we don't want a flickering test
&& 1 = Array.length shed @> }

type LimitBy(output : Xunit.Abstractions.ITestOutputHelper) =
let log, buffer = LogHooks.createLoggerWithCapture output

let defs = """{ "services": { "default": {
"calls": {},
"defaultPolicy": "def",
"policies": {
"def": [
{ "rule": "LimitBy", "maxParallel": 2, "maxQueue": 3, "tag": "clientIp", "dryRun": false }
]
}
}}}"""
let dryRunDefs = defs.Replace("false","true")
let expectedRules : Rules.TaggedBulkheadConfig = { dop = 2; queue = 3; tag="clientIp"; dryRun = false }

let runError = async {
do! Async.Sleep (s 1)
return raise (System.TimeoutException()) }
let runOk = async {
do! Async.Sleep (s 1)
return 42 }

//let [<Fact>] ``dryRun mode does not inhibit processing`` () = async {
// let pol = Parser.parse(dryRunDefs).CreatePolicy(log)
// let ap = pol.Find("default","any")
// test <@ not ap.Policy.isolate
// && [{ expectedRules with dryRun = true }] = ap.Policy.taggedLimits @>
// let! time, results =
// seq { for x in 0..5 -> if x % 2 = 0 then runError else runOk }
// |> Seq.mapi (fun i f -> async {
// // Stagger the starts - the dryRun mode does not force any waiting so wait before we ask for the start so we
// // get it into a state where at least 1 start shows queuing would normally take place
// do! Async.Sleep(ms (200 * i))
// // Catch inside so the first throw doesnt cancel the overall execution
// return! ap.Execute f |> Async.Catch })
// |> Async.Parallel
// |> Stopwatch.Time
// let oks, errs = Choice.partition results
// test <@ 3 = Array.length oks
// && time.Elapsed < ms 2500 // 1s+5*200+ 500ms fudge factor
// && 3 = Array.length errs
// && errs |> Seq.forall (fun x -> x.GetType() = typedefof<TimeoutException>) @>
// let evnts = buffer.Take()
// let queuedOrShed = function
// | Call ("(default)",QueuingDryRun) as x -> Choice1Of2 x
// | Call ("(default)",SheddingDryRun) as x -> Choice2Of2 x
// | x -> failwithf "Unexpected event %A" x
// let queued,shed = evnts |> Seq.map queuedOrShed |> Choice.partition
// test <@ 1 <= Array.length queued // should be = 3, but if there's a delay spinning up threads, sometimes fewer get queued
// && 1 = Array.length shed @> }

let [<Fact>] ``when active, sheds load above limit`` () = async {
let pol = Parser.parse(defs).CreatePolicy log
let ap = pol.Find("default","any")
test <@ not ap.Policy.isolate && [expectedRules] = ap.Policy.taggedLimits @>
let! time, results =
Seq.replicate 6 runOk
// Catch inside so the first throw doesnt cancel the overall execution
|> Seq.map(ap.Execute >> Async.Catch)
|> Async.Parallel
|> Stopwatch.Time
let oks, errs = Choice.partition results
test <@ 5 = Array.length oks
&& time.Elapsed > s 2 && time.Elapsed < s 4
&& 1 = Array.length errs
&& match Seq.exactlyOne errs with :? Polly.Bulkhead.BulkheadRejectedException -> true | _ -> false @>
let evnts = buffer.Take()
let queuedOrShed = function
| Call ("(default)",MaybeQueuing) as x -> Choice1Of3 x
| Deferred ("default","(default)","def",delay) as x -> Choice2Of3 delay
| Shed ("default","(default)","def") as x -> Choice3Of3 x
| x -> failwithf "Unexpected event %A" x
let queued,waited,shed = evnts |> Seq.map queuedOrShed |> Choice.partition3
let delayed = waited |> Array.filter (fun x -> x > ms 500)
// while we'll be pretty clear about shedding, we might discard some queuing notifications depending on the scheduling, and the shed one can also look like it will be queued
test <@ 4 >= Array.length queued
&& between 2 3 (Array.length delayed) // Typically, 3 should get delayed, but depending on scheduling, only 2 get logged as such, and we don't want a flickering test
&& 1 = Array.length shed @> }

type Cutoff(output : Xunit.Abstractions.ITestOutputHelper) =
let log, buffer = LogHooks.createLoggerWithCapture output

Expand Down

0 comments on commit 23078ec

Please sign in to comment.