➡️ leverage Immutability
val
➡️ still mutable when it points to a mutable data structure
val numbers = mutableListOf(1, 2, 3)
val numbers = listOf(1, 2, 3)
Atomic<A>
➡️ Concurrent safe ReferenceMutex
➡️ Mutual exclusion for coroutinesSemaphore
➡️ Counting semaphore for coroutinesChannel
➡️ Communication between coroutines
BlockingQueue
in Java, but with suspending operationsimport arrow.fx.coroutines.Atomic
suspend fun main() {
val num = Atomic(15)
println(num.access()) // Obtains a snapshot of the current value, and a setter for updating it
val numSetter = num.access().second
numSetter(20)
println(num.get()) // 20
num.set(15)
println(num.get()) // 15
println(num.getAndUpdate { it * 2 }) // 15, then update `num` to 15 * 2
println(num.get()) // 30
num.update { it * 2 }
println(num.get()) // 60
// Create an AtomicRef
val numLens = num.lens(
get = { it + 2 },
set = { _, newValue -> newValue + 3 }
)
println(numLens.get()) // 60 + 2 = 62
numLens.set(15) // 15 + 3 = 18
println(numLens.get()) // 18 + 2 = 20
}
🔍 Asynchronous & Coroutines
🔍 Dependency Injection
cancel()
in coroutines
catch
captures also CancellationException
acquire
and release
need to be NonCancellable
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import other.model.SuspendFile
suspend fun main() {
// file open, readContent, and close are suspended ops
val file = withContext(NonCancellable) { // acquire need to be NonCancellable
SuspendFile("FP_note.txt").open()
}
try {
println(file.readContent())
} catch (e: CancellationException) { // catch also CancellationException
withContext(NonCancellable) { file.close() } // release need to be NonCancellable
} catch (t: Throwable) {
withContext(NonCancellable) { file.close() } // release need to be NonCancellable
}
}
bracket
& bracketCase
Resource
guarantee
& guaranteeCase
onCancel
CircuitBreaker
🔍 Acquire, use, and release resources regardless of how and from where they are used.
🔍 Supports any other Arrow Fx operators
➡️ suspend
!
A resource should be released in 3 cases
Declare acquire
, use
, and release
suspend fun <A, B> bracket(
acquire: suspend () -> A,
use: suspend (A) -> B,
release: suspend (A) -> Unit
): B
Same as bracket
, but release
by ExitCase
sealed ExitCase {
object Completed: ExitCase()
data class Cancelled(val exception: CancellationException) : ExitCase()
data class Failure(val failure: Throwable) : ExitCase()
}
suspend fun <A, B> bracketCase(
acquire: suspend () -> A,
use: suspend (A) -> B,
release: suspend (A, ExitCase) -> Unit
): B
bracket
and bracketCase
will rethrow exceptions
Either.catch {}
import arrow.core.Either
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.bracketCase
import other.model.SuspendFile
suspend fun main() {
val result = Either.catch {
bracketCase(
acquire = { SuspendFile("FP_note.txt").open() },
use = { throw RuntimeException("Boom!") },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") }
is ExitCase.Failure -> { println("Release with Failure") } // will run
}
file.close()
}
)
}
println(result) // Either.Left(java.lang.RuntimeException: Boom!)
}
Release when the coroutine is cancelled
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.bracketCase
import arrow.fx.coroutines.never
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import other.model.SuspendFile
suspend fun main() {
coroutineScope {
val job = async {
bracketCase(
acquire = { SuspendFile("FP_note.txt").open() },
use = { never<Unit>() },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") } // will run
is ExitCase.Failure -> { println("Release with Failure") }
}
file.close()
}
)
}
job.cancel()
}
// File [FP_note.txt] opened
// Release with Cancelled
// File [FP_note.txt] closed
}
If you fail or cancel on acquire
, then release
is not called
➡️ Can’t release something we never acquired
bracket
and bracketCase
tied to decide how to use the resource when we acquire
it
➡️ Can we separate acquire
and use
?
suspend fun <A, B> bracket(
acquire: suspend () -> A,
use: suspend (A) -> B, // this makes it hardly reusable
release: suspend (A) -> Unit
): B
suspend fun <A, B> bracketCase(
acquire: suspend () -> A,
use: suspend (A) -> B, // this makes it hardly reusable
release: suspend (A, ExitCase) -> Unit
): B
acquire
and release
Either
, Option
map
, flatMap
, traverse
, …suspend
operationsimport arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Resource
import other.model.SuspendFile
suspend fun main() {
val resource = Resource(
acquire = { SuspendFile("FP_note.txt").open() },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") }
is ExitCase.Failure -> { println("Release with Failure") }
}
file.close()
}
)
// reuse the Resource
resource.use { file -> println(file.readContent()) }
println("---")
resource.use { file -> println("Just want to print the file name: ${file.fileName}") }
// File [FP_note.txt] opened
// The content of [FP_note.txt]
// Release with Completed
// File [FP_note.txt] closed
// ---
// File [FP_note.txt] opened
// Just want to print the file name: FP_note.txt
// Release with Completed
// File [FP_note.txt] closed
}
⚠️ Can be inefficient to acquire and release every time if we have many operations to perform over the same resource
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Resource
import other.model.SuspendFile
suspend fun main() {
val resource = Resource(
acquire = { SuspendFile("FP_note.txt").open() },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") }
is ExitCase.Failure -> { println("Release with Failure") }
}
file.close()
}
)
for (idx in 1..5) {
// acquire and release every time
resource.use { file -> println("Use the same Resource($idx/5): ${file.readContent()}") }
println("---")
}
}
use
lambda🔍 Resources guarantee that their release finalizers are always invoked in the correct order when Cancelled
and Failure
import arrow.core.Either
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Resource
import other.model.SuspendFile
suspend fun printExitCaseThenClose(file: SuspendFile, exitCase: ExitCase) {
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") }
is ExitCase.Failure -> { println("Release with Failure") }
}
file.close()
}
suspend fun main() {
val resources = Resource(
acquire = { SuspendFile("FP_note.txt").open() },
release = { file, exitCase -> printExitCaseThenClose(file, exitCase) }
).zip(
Resource(
acquire = { SuspendFile("Domain Modeling Made Functional.pdf").open() },
release = { file, exitCase -> printExitCaseThenClose(file, exitCase) }
),
Resource(
acquire = { SuspendFile("end of a life.mp3").open() },
release = { file, exitCase -> printExitCaseThenClose(file, exitCase) }
)
) { file1, file2, file3 ->
println("Zip Resources [${file1.fileName}], [${file2.fileName}], [${file3.fileName}]")
Triple(file1, file2, file3)
}
Either.catch {
resources.use { files -> throw RuntimeException("Boom!") } // guarantee the release order
}
// Release with Failure
// File [end of a life.mp3] closed
// Release with Failure
// File [Domain Modeling Made Functional.pdf] closed
// Release with Failure
// File [FP_note.txt] closed
}
If you fail or cancel on acquire
, then release
is not called
➡️ Can’t release something we never acquired
🔍 acquire
& release
step are NonCancellable
use
will rethrow exceptions
Either.catch {}
import arrow.core.Either
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Resource
import other.model.SuspendFile
suspend fun main() {
val resource = Resource(
acquire = { SuspendFile("FP_note.txt").open() },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") }
is ExitCase.Failure -> { println("Release with Failure") } // will run
}
file.close()
}
)
val result = Either.catch {
resource.use { throw RuntimeException("Boom!") }
}
// File [FP_note.txt] opened
// Release with Failure
// File [FP_note.txt] closed
}
Release when the coroutine is cancelled
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Resource
import arrow.fx.coroutines.never
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import other.model.SuspendFile
suspend fun main() {
coroutineScope {
val resource = Resource(
acquire = { SuspendFile("FP_note.txt").open() },
release = { file, exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Release with Completed") }
is ExitCase.Cancelled -> { println("Release with Cancelled") } // will run
is ExitCase.Failure -> { println("Release with Failure") }
}
file.close()
}
)
val job = async { resource.use { never<Unit>() } }
job.cancel()
}
// File [FP_note.txt] opened
// Release with Cancelled
// File [FP_note.txt] closed
}
finalizer
after fa
regardless of success, error or cancellationsuspend fun <A> guarantee(
fa: suspend () -> A,
finalizer: suspend () -> Unit
): A
Same as guarantee
, but finalizer
with ExitCase
suspend fun <A> guaranteeCase(
fa: suspend () -> A,
finalizer: suspend (ExitCase) -> Unit
): A
finalizer
if there is an error while running the effectguarantee
and guaranteeCase
will rethrow exceptions after running finalizer
Either.catch {}
import arrow.core.Either
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.guaranteeCase
suspend fun main() {
val result = Either.catch {
guaranteeCase(
fa = { throw RuntimeException("Boom!") },
finalizer = { exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Finalizer with Completed") }
is ExitCase.Cancelled -> { println("Finalizer with Cancelled") }
is ExitCase.Failure -> { println("Finalizer with Failure") } // will run
}
}
)
}
println(result) // Either.Left(java.lang.RuntimeException: Boom!)
}
Run finalizer
when the coroutine is cancelled
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.never
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
suspend fun main() {
coroutineScope {
val job = async {
guaranteeCase(
fa = { never<Unit>() },
finalizer = { exitCase ->
when(exitCase) {
is ExitCase.Completed -> { println("Finalizer with Completed") }
is ExitCase.Cancelled -> { println("Finalizer with Cancelled") } // will run
is ExitCase.Failure -> { println("Finalizer with Failure") }
}
}
)
}
job.cancel()
}
}
Register an onCancel
handler after fa
➡️ only be invoked when the coroutine is cancelled
suspend fun <A> onCancel(
fa: suspend () -> A,
onCancel: suspend () -> Unit
): A // pass `fa` to `guaranteeCase` and only invoke `onCancel` when `ExitCase.Cancelled`
import arrow.fx.coroutines.never
import arrow.fx.coroutines.onCancel
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
suspend fun main() {
coroutineScope {
val job = async {
onCancel(
fa = { never<Unit>() },
onCancel = { println("Cancelled!") } // will run
)
}
job.cancel()
}
}
🔍 Detect failures and prevent a failure from constantly recurring
➡️ Protect resources or services from being overloaded
CircuitBreaker
has 3 CircuitBreaker.State
Closed
Open
HalfOpen
import arrow.core.Either
import arrow.fx.coroutines.CircuitBreaker
import kotlinx.coroutines.delay
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
@ExperimentalTime
suspend fun main() {
val circuitBreaker = CircuitBreaker.of(
maxFailures = 2,
resetTimeout = Duration.seconds(2),
exponentialBackoffFactor = 1.2,
maxResetTimeout = Duration.seconds(60),
)
circuitBreaker.protectOrThrow { "I am in Closed: ${circuitBreaker.state()}" }.also(::println)
println("Service getting overloaded...")
// When an exception occurs it increments the failure counter
// A successful request will reset the failure counter to zero
Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
println("Reach the maxFailures threshold = 2")
circuitBreaker.protectEither { }.also { println("I am Open and short-circuit with ${it}. ${circuitBreaker.state()}") }
println("Service recovering...").also { delay(2000) }
println("After 2 seconds resetTimeout passed, allowing one request to go through as a test")
circuitBreaker.protectOrThrow { "I am running test-request in HalfOpen: ${circuitBreaker.state()}" }.also(::println)
println("I am back to normal state closed ${circuitBreaker.state()}")
// I am in Closed: Closed(failures=0)
// Service getting overloaded...
// Either.Left(java.lang.RuntimeException: Service overloaded)
// Either.Left(java.lang.RuntimeException: Service overloaded)
// Reach the maxFailures threshold = 2
// I am Open and short-circuit with Either.Left(arrow.fx.coroutines.CircuitBreaker$ExecutionRejected). CircuitBreaker.State.Open(startedAt=1635710570531, resetTimeoutNanos=2.0E9, expiresAt=1635710572531)
// Service recovering...
// After 2 seconds resetTimeout passed, allowing one request to go through as a test
// I am running test-request in HalfOpen: HalfOpen(resetTimeoutNanos=2.0E9)
// I am back to normal state closed Closed(failures=0)
}
Atomic
, Mutex
, Semaphore
, Channel
, …bracket
& bracketCase
➡️ acquire
, use
, release
Resource
➡️ acquire
, release
Cancelled
or Failure
guarantee
& guaranteeCase
➡️ fa
, finalizer
onCancel
➡️ fa
, onCancel
CircuitBreaker
➡️ Closed
, Open
, HalfOpen