Skip to content

Commit

Permalink
Fix compilation warnings by replacing usages of deprecated methods (#…
Browse files Browse the repository at this point in the history
…2742)

* Fix compilation warnings by replacing usages of deprecated methods
* Update Kotlin doc and then generate the example code via Knit tool

Co-authored-by: wojda <[email protected]>
Co-authored-by: Imran Malic Settuba <[email protected]>
  • Loading branch information
3 people authored Jun 16, 2022
1 parent 39b6446 commit f26930f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import kotlin.math.roundToInt
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.nanoseconds
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.retry
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit.NANOSECONDS

/**
* # Retrying and repeating effects
Expand All @@ -48,14 +49,14 @@ import kotlinx.coroutines.flow.retry
* A more complex schedule
*
* ```kotlin
* import kotlin.time.seconds
* import kotlin.time.milliseconds
* import kotlin.time.Duration.Companion.milliseconds
* import kotlin.time.Duration.Companion.seconds
* import kotlin.time.ExperimentalTime
* import arrow.fx.coroutines.*
*
* @ExperimentalTime
* fun <A> complexPolicy(): Schedule<A, List<A>> =
* Schedule.exponential<A>(10.milliseconds).whileOutput { it.inNanoseconds < 60.seconds.inNanoseconds }
* Schedule.exponential<A>(10.milliseconds).whileOutput { it < 60.seconds }
* .andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
* .zipRight(Schedule.identity<A>().collect())
* ```
Expand Down Expand Up @@ -273,7 +274,7 @@ public sealed class Schedule<Input, Output> {
zipDuration: (duration: Duration, otherDuration: Duration) -> Duration,
zip: (Output, B) -> C
): Schedule<A, C> =
combineNanos(other, zipContinue, { a, b -> zipDuration(a.nanoseconds, b.nanoseconds).inNanoseconds }, zip)
combineNanos(other, zipContinue, { a, b -> zipDuration(a.nanoseconds, b.nanoseconds).toDouble(NANOSECONDS) }, zip)

/**
* Combines with another schedule by combining the result and the delay of the [Decision] with the functions [zipContinue], [zipDuration] and a [zip] function
Expand Down Expand Up @@ -301,7 +302,7 @@ public sealed class Schedule<Input, Output> {
*/
@ExperimentalTime
public fun modify(f: suspend (Output, Duration) -> Duration): Schedule<Input, Output> =
modifyNanos { output, d -> f(output, d.nanoseconds).inNanoseconds }
modifyNanos { output, d -> f(output, d.nanoseconds).toDouble(NANOSECONDS) }

public abstract fun modifyNanos(f: suspend (Output, Double) -> Double): Schedule<Input, Output>

Expand Down Expand Up @@ -424,7 +425,7 @@ public sealed class Schedule<Input, Output> {
public fun jittered(genRand: suspend () -> Duration): Schedule<Input, Output> =
modify { _, duration ->
val n = genRand.invoke()
duration.times(n.inNanoseconds)
duration.times(n.toDouble(NANOSECONDS))
}

/**
Expand Down Expand Up @@ -725,7 +726,7 @@ public sealed class Schedule<Input, Output> {
zip: (B, D) -> E
): Decision<Pair<A, C>, E> = Decision(
f(cont, other.cont),
g(delayInNanos.nanoseconds, other.delayInNanos.nanoseconds).inNanoseconds,
g(delayInNanos.nanoseconds, other.delayInNanos.nanoseconds).toDouble(NANOSECONDS),
Pair(state, other.state),
finish.flatMap { first -> other.finish.map { second -> zip(first, second) } }
)
Expand All @@ -746,11 +747,11 @@ public sealed class Schedule<Input, Output> {

@ExperimentalTime
public fun <A, B> cont(d: Duration, a: A, b: Eval<B>): Decision<A, B> =
cont(d.inNanoseconds, a, b)
cont(d.toDouble(NANOSECONDS), a, b)

@ExperimentalTime
public fun <A, B> done(d: Duration, a: A, b: Eval<B>): Decision<A, B> =
done(d.inNanoseconds, a, b)
done(d.toDouble(NANOSECONDS), a, b)
}
}

Expand Down Expand Up @@ -937,7 +938,7 @@ public sealed class Schedule<Input, Output> {
*/
@ExperimentalTime
public fun <A> spaced(interval: Duration): Schedule<A, Int> =
forever<A>().delayedNanos { d -> d + interval.inNanoseconds }
forever<A>().delayedNanos { d -> d + interval.toDouble(NANOSECONDS) }

/**
* Creates a Schedule that continues with increasing delay by adding the last two delays.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.DurationUnit.NANOSECONDS
import kotlin.time.ExperimentalTime
import kotlin.time.milliseconds
import kotlin.time.minutes

@ExperimentalTime
class CircuitBreakerTest : ArrowFxSpec(
Expand All @@ -24,7 +25,7 @@ class CircuitBreakerTest : ArrowFxSpec(
val maxTimeout = 600.milliseconds

"should work for successful async tasks" {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)
var effect = 0
Schedule.recurs<Unit>(10_000).repeat {
cb.protectOrThrow { withContext(Dispatchers.Default) { effect += 1 } }
Expand All @@ -33,7 +34,7 @@ class CircuitBreakerTest : ArrowFxSpec(
}

"should work for successful immediate tasks" {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)
var effect = 0
Schedule.recurs<Unit>(10_000).repeat {
cb.protectOrThrow { effect += 1 }
Expand All @@ -42,7 +43,7 @@ class CircuitBreakerTest : ArrowFxSpec(
}

"Circuit breaker stays closed after less than maxFailures" {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)

recurAndCollect<Either<Throwable, Unit>>(3).repeat {
Either.catch { cb.protectOrThrow { throw dummy } }
Expand All @@ -52,7 +53,7 @@ class CircuitBreakerTest : ArrowFxSpec(
}

"Closed circuit breaker resets failure count after success" {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)

recurAndCollect<Either<Throwable, Unit>>(3).repeat {
Either.catch { cb.protectOrThrow { throw dummy } }
Expand All @@ -66,7 +67,7 @@ class CircuitBreakerTest : ArrowFxSpec(
}

"Circuit breaker opens after max failures" {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)

recurAndCollect<Either<Throwable, Unit>>(3).repeat {
Either.catch { cb.protectOrThrow { throw dummy } }
Expand All @@ -78,7 +79,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -95,7 +96,7 @@ class CircuitBreakerTest : ArrowFxSpec(
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxTimeout
)!!.doOnOpen { openedCount += 1 }
).doOnOpen { openedCount += 1 }
.doOnClosed { closedCount += 1 }
.doOnHalfOpen { halfOpenCount += 1 }
.doOnRejectedTask { rejectedCount += 1 }
Expand All @@ -105,7 +106,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -120,7 +121,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -129,6 +130,7 @@ class CircuitBreakerTest : ArrowFxSpec(
val delayProtectLatch = CompletableDeferred<Unit>()
val stateAssertionLatch = CompletableDeferred<Unit>()

@Suppress("DeferredResultUnused")
async { // Successful tasks puts circuit breaker back in HalfOpen
cb.protectOrThrow {
checkHalfOpen.complete(Unit)
Expand All @@ -141,7 +143,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.HalfOpen -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.HalfOpen but found $s")
}
Expand All @@ -150,7 +152,7 @@ class CircuitBreakerTest : ArrowFxSpec(
shouldThrow<CircuitBreaker.ExecutionRejected> { cb.protectOrThrow { throw dummy } }
shouldThrow<CircuitBreaker.ExecutionRejected> { cb.protectOrThrow { throw dummy } }

// Once we complete `protect`, the circuitbreaker will go back to closer state
// Once we complete `protect`, the circuit breaker will go back to closer state
delayProtectLatch.complete(Unit)
stateAssertionLatch.await()

Expand All @@ -174,7 +176,7 @@ class CircuitBreakerTest : ArrowFxSpec(
resetTimeout = resetTimeout,
exponentialBackoffFactor = 2.0,
maxResetTimeout = maxTimeout
)!!.doOnOpen { openedCount += 1 }
).doOnOpen { openedCount += 1 }
.doOnClosed { closedCount += 1 }
.doOnHalfOpen { halfOpenCount += 1 }
.doOnRejectedTask { rejectedCount += 1 }
Expand All @@ -184,7 +186,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -199,7 +201,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -208,6 +210,7 @@ class CircuitBreakerTest : ArrowFxSpec(
val delayProtectLatch = CompletableDeferred<Unit>()
val stateAssertionLatch = CompletableDeferred<Unit>()

@Suppress("DeferredResultUnused")
async { // Successful tasks puts circuit breaker back in HalfOpen
// Delay protect, to inspect HalfOpen state.
Either.catch {
Expand All @@ -223,7 +226,7 @@ class CircuitBreakerTest : ArrowFxSpec(

when (val s = cb.state()) {
is CircuitBreaker.State.HalfOpen -> {
s.resetTimeoutNanos shouldBe resetTimeout.inNanoseconds
s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.HalfOpen but found $s")
}
Expand All @@ -232,15 +235,15 @@ class CircuitBreakerTest : ArrowFxSpec(
shouldThrow<CircuitBreaker.ExecutionRejected> { cb.protectOrThrow { throw dummy } }
shouldThrow<CircuitBreaker.ExecutionRejected> { cb.protectOrThrow { throw dummy } }

// Once we complete `protect`, the circuitbreaker will go back to closer state
// Once we complete `protect`, the circuit breaker will go back to closer state
delayProtectLatch.complete(Unit)
stateAssertionLatch.await()

// Circuit breaker should've stayed open on failure after timeOutReset
// resetTimeout should've applied
when (val s = cb.state()) {
is CircuitBreaker.State.Open -> {
s.resetTimeoutNanos shouldBe (resetTimeout * exponentialBackoffFactor).inNanoseconds
s.resetTimeoutNanos shouldBe (resetTimeout * exponentialBackoffFactor).toDouble(NANOSECONDS)
}
else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s")
}
Expand All @@ -253,14 +256,14 @@ class CircuitBreakerTest : ArrowFxSpec(

"should be stack safe for successful async tasks" {
stackSafeSuspend(
CircuitBreaker.of(maxFailures = 5, resetTimeout = 1.minutes)!!,
CircuitBreaker.of(maxFailures = 5, resetTimeout = 1.minutes),
stackSafeIteration(), 0
) shouldBe stackSafeIteration()
}

"should be stack safe for successful immediate tasks" {
stackSafeImmediate(
CircuitBreaker.of(maxFailures = 5, resetTimeout = 1.minutes)!!,
CircuitBreaker.of(maxFailures = 5, resetTimeout = 1.minutes),
stackSafeIteration(), 0
) shouldBe stackSafeIteration()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// This file was automatically generated from Schedule.kt by Knit tool. Do not edit.
package arrow.fx.coroutines.examples.exampleSchedule02

import kotlin.time.seconds
import kotlin.time.milliseconds
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import arrow.fx.coroutines.*

@ExperimentalTime
fun <A> complexPolicy(): Schedule<A, List<A>> =
Schedule.exponential<A>(10.milliseconds).whileOutput { it.inNanoseconds < 60.seconds.inNanoseconds }
Schedule.exponential<A>(10.milliseconds).whileOutput { it < 60.seconds }
.andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
.zipRight(Schedule.identity<A>().collect())

0 comments on commit f26930f

Please sign in to comment.