diff --git a/.changeset/cold-cougars-pretend.md b/.changeset/cold-cougars-pretend.md new file mode 100644 index 0000000000..df705b7505 --- /dev/null +++ b/.changeset/cold-cougars-pretend.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add TSubscriptionRef diff --git a/.changeset/shiny-squids-sell.md b/.changeset/shiny-squids-sell.md new file mode 100644 index 0000000000..0e68f8a95f --- /dev/null +++ b/.changeset/shiny-squids-sell.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add Stream.fromTQueue & Stream.fromTPubSub diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 9a4dc08439..bbcf0bd702 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js" import type * as Emit from "./StreamEmit.js" import type * as HaltStrategy from "./StreamHaltStrategy.js" import type * as Take from "./Take.js" +import type { TPubSub } from "./TPubSub.js" +import type { TDequeue } from "./TQueue.js" import type * as Tracer from "./Tracer.js" import type { Covariant, NoInfer, TupleOf } from "./Types.js" import type * as Unify from "./Unify.js" @@ -2013,6 +2015,14 @@ export const fromPubSub: { ): Stream } = internal.fromPubSub +/** + * Creates a stream from a subscription to a `TPubSub`. + * + * @since 3.10.0 + * @category constructors + */ +export const fromTPubSub: (pubsub: TPubSub) => Stream = internal.fromTPubSub + /** * Creates a new `Stream` from an iterable collection of values. * @@ -2094,6 +2104,14 @@ export const fromQueue: ( } ) => Stream = internal.fromQueue +/** + * Creates a stream from a TQueue of values + * + * @since 3.10.0 + * @category constructors + */ +export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQueue + /** * Creates a stream from a `ReadableStream`. * diff --git a/packages/effect/src/TPubSub.ts b/packages/effect/src/TPubSub.ts index 1a15cfa1b6..f6b7b045ce 100644 --- a/packages/effect/src/TPubSub.ts +++ b/packages/effect/src/TPubSub.ts @@ -107,6 +107,15 @@ export const isEmpty: (self: TPubSub) => STM.STM = internal.isEmp */ export const isFull: (self: TPubSub) => STM.STM = internal.isFull +/** + * Interrupts any fibers that are suspended on `offer` or `take`. Future calls + * to `offer*` and `take*` will be interrupted immediately. + * + * @since 2.0.0 + * @category utils + */ +export const shutdown: (self: TPubSub) => STM.STM = internal.shutdown + /** * Returns `true` if `shutdown` has been called, otherwise returns `false`. * diff --git a/packages/effect/src/TQueue.ts b/packages/effect/src/TQueue.ts index 800555b2c8..e8b9b465fa 100644 --- a/packages/effect/src/TQueue.ts +++ b/packages/effect/src/TQueue.ts @@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue = internal.isTEn * @since 2.0.0 * @category mutations */ -export const awaitShutdown: (self: TQueue) => STM.STM = internal.awaitShutdown +export const awaitShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.awaitShutdown /** * Creates a bounded queue with the back pressure strategy. The queue will @@ -226,7 +226,7 @@ export const bounded: (requestedCapacity: number) => STM.STM> = int * @since 2.0.0 * @category getters */ -export const capacity: (self: TQueue) => number = internal.capacity +export const capacity: (self: TDequeue | TEnqueue) => number = internal.capacity /** * Creates a bounded queue with the dropping strategy. The queue will drop new @@ -245,7 +245,7 @@ export const dropping: (requestedCapacity: number) => STM.STM> = in * @since 2.0.0 * @category getters */ -export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpty +export const isEmpty: (self: TDequeue | TEnqueue) => STM.STM = internal.isEmpty /** * Returns `true` if the `TQueue` contains at least one element, `false` @@ -254,7 +254,7 @@ export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpt * @since 2.0.0 * @category getters */ -export const isFull: (self: TQueue) => STM.STM = internal.isFull +export const isFull: (self: TDequeue | TEnqueue) => STM.STM = internal.isFull /** * Returns `true` if `shutdown` has been called, otherwise returns `false`. @@ -262,7 +262,7 @@ export const isFull: (self: TQueue) => STM.STM = internal.isFull * @since 2.0.0 * @category getters */ -export const isShutdown: (self: TQueue) => STM.STM = internal.isShutdown +export const isShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.isShutdown /** * Places one value in the queue. @@ -345,7 +345,7 @@ export const seek: { * @since 2.0.0 * @category mutations */ -export const shutdown: (self: TQueue) => STM.STM = internal.shutdown +export const shutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.shutdown /** * Retrieves the size of the queue, which is equal to the number of elements @@ -355,7 +355,7 @@ export const shutdown: (self: TQueue) => STM.STM = internal.shutdown * @since 2.0.0 * @category getters */ -export const size: (self: TQueue) => STM.STM = internal.size +export const size: (self: TDequeue | TEnqueue) => STM.STM = internal.size /** * Creates a bounded queue with the sliding strategy. The queue will add new diff --git a/packages/effect/src/TRef.ts b/packages/effect/src/TRef.ts index 5b98a7c653..1dd83e9c4e 100644 --- a/packages/effect/src/TRef.ts +++ b/packages/effect/src/TRef.ts @@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js" import type * as Versioned from "./internal/stm/stm/versioned.js" import * as internal from "./internal/stm/tRef.js" import type * as Option from "./Option.js" +import type { Pipeable } from "./Pipeable.js" import type * as STM from "./STM.js" import type * as Types from "./Types.js" @@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId * @since 2.0.0 * @category models */ -export interface TRef extends TRef.Variance { +export interface TRef extends TRef.Variance, Pipeable { /** * Note: the method is unbound, exposed only for potential extensions. */ diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts new file mode 100644 index 0000000000..dfc6ccb5ff --- /dev/null +++ b/packages/effect/src/TSubscriptionRef.ts @@ -0,0 +1,192 @@ +/** + * @since 3.10.0 + */ +import type * as Effect from "./Effect.js" +import * as internal from "./internal/stm/tSubscriptionRef.js" +import type * as Option from "./Option.js" +import type * as Scope from "./Scope.js" +import type * as STM from "./STM.js" +import type * as Stream from "./Stream.js" +import type * as TPubSub from "./TPubSub.js" +import type * as TQueue from "./TQueue.js" +import type * as TRef from "./TRef.js" +import type * as Types from "./Types.js" + +/** + * @since 3.10.0 + * @category symbols + */ +export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId + +/** + * @since 3.10.0 + * @category symbols + */ +export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId + +/** + * A `TSubscriptionRef` is a `TRef` that can be subscribed to in order to + * receive a `TDequeue` of the current value and all committed changes to the value. + * + * @since 3.10.0 + * @category models + */ +export interface TSubscriptionRef extends TSubscriptionRef.Variance, TRef.TRef { + /** @internal */ + readonly ref: TRef.TRef + /** @internal */ + readonly pubsub: TPubSub.TPubSub + /** @internal */ + modify(f: (a: A) => readonly [B, A]): STM.STM + + /** + * A TDequeue containing the current value of the `Ref` as well as all changes + * to that value. + */ + readonly changes: STM.STM> +} + +/** + * @since 3.10.0 + */ +export declare namespace TSubscriptionRef { + /** + * @since 3.10.0 + * @category models + */ + export interface Variance { + readonly [TSubscriptionRefTypeId]: { + readonly _A: Types.Invariant + } + } +} + +/** + * @since 3.10.0 + * @category mutations + */ +export const get: (self: TSubscriptionRef) => STM.STM = internal.get + +/** + * @since 3.10.0 + * @category mutations + */ +export const getAndSet: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.getAndSet + +/** + * @since 3.10.0 + * @category mutations + */ +export const getAndUpdate: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.getAndUpdate + +/** + * @since 3.10.0 + * @category mutations + */ +export const getAndUpdateSome: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.getAndUpdateSome + +/** + * @since 3.10.0 + * @category constructors + */ +export const make: (value: A) => STM.STM> = internal.make + +/** + * @since 3.10.0 + * @category mutations + */ +export const modify: { + (f: (a: A) => readonly [B, A]): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => readonly [B, A]): STM.STM +} = internal.modify + +/** + * @since 3.10.0 + * @category mutations + */ +export const modifySome: { + (fallback: B, f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, fallback: B, f: (a: A) => Option.Option): STM.STM +} = internal.modifySome + +/** + * @since 3.10.0 + * @category mutations + */ +export const set: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.set + +/** + * @since 3.10.0 + * @category mutations + */ +export const setAndGet: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.setAndGet + +/** + * @since 3.10.0 + * @category mutations + */ +export const update: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.update + +/** + * @since 3.10.0 + * @category mutations + */ +export const updateAndGet: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.updateAndGet + +/** + * @since 3.10.0 + * @category mutations + */ +export const updateSome: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.updateSome + +/** + * @since 3.10.0 + * @category mutations + */ +export const updateSomeAndGet: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.updateSomeAndGet + +/** + * @since 3.10.0 + * @category mutations + */ +export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> = + internal.changesScoped + +/** + * @since 3.10.0 + * @category mutations + */ +export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream + +/** + * @since 3.10.0 + * @category mutations + */ +export const changes: (self: TSubscriptionRef) => STM.STM> = (self) => self.changes diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index cb90a4ac72..a57bd513ed 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -895,6 +895,11 @@ export * as TSemaphore from "./TSemaphore.js" */ export * as TSet from "./TSet.js" +/** + * @since 3.10.0 + */ +export * as TSubscriptionRef from "./TSubscriptionRef.js" + /** * @since 2.0.0 */ diff --git a/packages/effect/src/internal/stm/core.ts b/packages/effect/src/internal/stm/core.ts index 7c0f558715..1203090752 100644 --- a/packages/effect/src/internal/stm/core.ts +++ b/packages/effect/src/internal/stm/core.ts @@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js" import { hasProperty } from "../../Predicate.js" import type * as Scheduler from "../../Scheduler.js" import type * as STM from "../../STM.js" -import { StreamTypeId } from "../../Stream.js" import { YieldWrap } from "../../Utils.js" import { ChannelTypeId } from "../core-stream.js" import { withFiberRuntime } from "../core.js" -import { effectVariance } from "../effectable.js" +import { effectVariance, StreamTypeId } from "../effectable.js" import { OP_COMMIT } from "../opCodes/effect.js" import { SingleShotGen } from "../singleShotGen.js" import { SinkTypeId } from "../sink.js" diff --git a/packages/effect/src/internal/stm/tPubSub.ts b/packages/effect/src/internal/stm/tPubSub.ts index 089be12ce5..b838ddb90a 100644 --- a/packages/effect/src/internal/stm/tPubSub.ts +++ b/packages/effect/src/internal/stm/tPubSub.ts @@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl implements TQueue.TDequeue { capacity(): number { return this.requestedCapacity } + size: STM.STM = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { diff --git a/packages/effect/src/internal/stm/tQueue.ts b/packages/effect/src/internal/stm/tQueue.ts index 90039374f3..8ca27b9865 100644 --- a/packages/effect/src/internal/stm/tQueue.ts +++ b/packages/effect/src/internal/stm/tQueue.ts @@ -3,7 +3,7 @@ import * as Chunk from "../../Chunk.js" import { dual, pipe } from "../../Function.js" import * as Option from "../../Option.js" import { hasProperty, type Predicate } from "../../Predicate.js" -import * as STM from "../../STM.js" +import type * as STM from "../../STM.js" import type * as TQueue from "../../TQueue.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" @@ -99,7 +99,7 @@ class TQueueImpl implements TQueue.TQueue { size: STM.STM = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { - return STM.interruptAs(runtime.fiberId) + return core.interruptAs(runtime.fiberId) } return core.succeed(queue.length) }) diff --git a/packages/effect/src/internal/stm/tRef.ts b/packages/effect/src/internal/stm/tRef.ts index c780509360..3162fc252b 100644 --- a/packages/effect/src/internal/stm/tRef.ts +++ b/packages/effect/src/internal/stm/tRef.ts @@ -1,5 +1,7 @@ import { dual } from "../../Function.js" import * as Option from "../../Option.js" +import type { Pipeable } from "../../Pipeable.js" +import { pipeArguments } from "../../Pipeable.js" import type * as STM from "../../STM.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" @@ -16,13 +18,13 @@ export const TRefTypeId: TRef.TRefTypeId = Symbol.for( TRefSymbolKey ) as TRef.TRefTypeId -const tRefVariance = { +export const tRefVariance = { /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ -export class TRefImpl implements TRef.TRef { +export class TRefImpl implements TRef.TRef, Pipeable { readonly [TRefTypeId] = tRefVariance /** @internal */ todos: Map @@ -40,6 +42,9 @@ export class TRefImpl implements TRef.TRef { return retValue }) } + pipe() { + return pipeArguments(this, arguments) + } } /** @internal */ diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts new file mode 100644 index 0000000000..94a4924055 --- /dev/null +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -0,0 +1,286 @@ +import * as Effect from "../../Effect.js" +import { dual, pipe } from "../../Function.js" +import * as Option from "../../Option.js" +import { pipeArguments } from "../../Pipeable.js" +import * as STM from "../../STM.js" +import * as TPubSub from "../../TPubSub.js" +import * as TQueue from "../../TQueue.js" +import * as TRef from "../../TRef.js" +import type * as TSubscriptionRef from "../../TSubscriptionRef.js" +import * as stream from "../stream.js" +import { tDequeueVariance } from "./tQueue.js" +import { tRefVariance } from "./tRef.js" + +/** @internal */ +const TSubscriptionRefSymbolKey = "effect/TSubscriptionRef" + +/** @internal */ +export const TSubscriptionRefTypeId: TSubscriptionRef.TSubscriptionRefTypeId = Symbol.for( + TSubscriptionRefSymbolKey +) as TSubscriptionRef.TSubscriptionRefTypeId + +const TSubscriptionRefVariance = { + /* c8 ignore next */ + _A: (_: any) => _ +} + +class TDequeueMerge implements TQueue.TDequeue { + [TQueue.TDequeueTypeId] = tDequeueVariance + + constructor( + readonly first: TQueue.TDequeue, + readonly second: TQueue.TDequeue + ) {} + + peek: STM.STM = STM.gen(this, function*() { + const first = yield* this.peekOption + if (first._tag === "Some") { + return first.value + } + return yield* STM.retry + }) + + peekOption: STM.STM> = STM.gen(this, function*() { + const first = yield* this.first.peekOption + if (first._tag === "Some") { + return first + } + const second = yield* this.second.peekOption + if (second._tag === "Some") { + return second + } + return Option.none() + }) + + take: STM.STM = STM.gen(this, function*() { + if (!(yield* this.first.isEmpty)) { + return yield* this.first.take + } + if (!(yield* this.second.isEmpty)) { + return yield* this.second.take + } + return yield* STM.retry + }) + + takeAll: STM.STM> = STM.gen(this, function*() { + return [...yield* this.first.takeAll, ...yield* this.second.takeAll] + }) + + takeUpTo(max: number): STM.STM> { + return STM.gen(this, function*() { + const first = yield* this.first.takeUpTo(max) + if (first.length >= max) { + return first + } + return [...first, ...yield* this.second.takeUpTo(max - first.length)] + }) + } + + capacity(): number { + return this.first.capacity() + this.second.capacity() + } + + size: STM.STM = STM.gen(this, function*() { + return (yield* this.first.size) + (yield* this.second.size) + }) + + isFull: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isFull) && (yield* this.second.isFull) + }) + + isEmpty: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isEmpty) && (yield* this.second.isEmpty) + }) + + shutdown: STM.STM = STM.gen(this, function*() { + yield* this.first.shutdown + yield* this.second.shutdown + }) + + isShutdown: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isShutdown) && (yield* this.second.isShutdown) + }) + + awaitShutdown: STM.STM = STM.gen(this, function*() { + yield* this.first.awaitShutdown + yield* this.second.awaitShutdown + }) +} + +/** @internal */ +class TSubscriptionRefImpl implements TSubscriptionRef.TSubscriptionRef { + readonly [TSubscriptionRefTypeId] = TSubscriptionRefVariance + readonly [TRef.TRefTypeId] = tRefVariance + + constructor( + readonly ref: TRef.TRef, + readonly pubsub: TPubSub.TPubSub + ) {} + + get todos() { + return this.ref.todos + } + + get versioned() { + return this.ref.versioned + } + + pipe() { + return pipeArguments(this, arguments) + } + + get changes(): STM.STM> { + return STM.gen(this, function*() { + const first = yield* TQueue.unbounded() + yield* TQueue.offer(first, yield* TRef.get(this.ref)) + return new TDequeueMerge(first, yield* TPubSub.subscribe(this.pubsub)) + }) + } + + modify(f: (a: A) => readonly [B, A]): STM.STM { + return pipe( + TRef.get(this.ref), + STM.map(f), + STM.flatMap(([b, a]) => + pipe( + TRef.set(this.ref, a), + STM.as(b), + STM.zipLeft(TPubSub.publish(this.pubsub, a)) + ) + ) + ) + } +} + +/** @internal */ +export const make = (value: A): STM.STM> => + pipe( + STM.all([ + TPubSub.unbounded(), + TRef.make(value) + ]), + STM.map(([pubsub, ref]) => new TSubscriptionRefImpl(ref, pubsub)) + ) + +/** @internal */ +export const get = (self: TSubscriptionRef.TSubscriptionRef) => TRef.get(self.ref) + +/** @internal */ +export const set = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>( + 2, + (self: TSubscriptionRef.TSubscriptionRef, value: A): STM.STM => + self.modify((): [void, A] => [void 0, value]) +) + +/** @internal */ +export const getAndSet = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>(2, (self, value) => self.modify((a) => [a, value])) + +/** @internal */ +export const getAndUpdate = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => self.modify((a) => [a, f(a)])) + +/** @internal */ +export const getAndUpdateSome = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>(2, (self, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [a, a], + onSome: (b) => [a, b] + }) + )) + +/** @internal */ +export const setAndGet = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>(2, (self, value) => self.modify(() => [value, value])) + +/** @internal */ +export const modify = dual< + (f: (a: A) => readonly [B, A]) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => readonly [B, A]) => STM.STM +>(2, (self, f) => self.modify(f)) + +/** @internal */ +export const modifySome = dual< + ( + fallback: B, + f: (a: A) => Option.Option + ) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + ( + self: TSubscriptionRef.TSubscriptionRef, + fallback: B, + f: (a: A) => Option.Option + ) => STM.STM +>(3, (self, fallback, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [fallback, a], + onSome: (b) => b + }) + )) + +/** @internal */ +export const update = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => self.modify((a) => [void 0, f(a)])) + +/** @internal */ +export const updateAndGet = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => + self.modify((a) => { + const b = f(a) + return [b, b] + })) + +/** @internal */ +export const updateSome = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>( + 2, + (self, f) => + self.modify((a) => [ + void 0, + Option.match(f(a), { + onNone: () => a, + onSome: (b) => b + }) + ]) +) + +/** @internal */ +export const updateSomeAndGet = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>( + 2, + (self, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [a, a], + onSome: (b) => [b, b] + }) + ) +) + +/** @internal */ +export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) => + Effect.acquireRelease(self.changes, TQueue.shutdown) + +/** @internal */ +export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) => + stream.unwrap(Effect.map(self.changes, stream.fromTQueue)) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 3e677bfac9..7680583983 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -31,6 +31,8 @@ import type * as Stream from "../Stream.js" import type * as Emit from "../StreamEmit.js" import * as HaltStrategy from "../StreamHaltStrategy.js" import type * as Take from "../Take.js" +import * as TPubSub from "../TPubSub.js" +import * as TQueue from "../TQueue.js" import type * as Tracer from "../Tracer.js" import * as Tuple from "../Tuple.js" import type { NoInfer, TupleOf } from "../Types.js" @@ -3133,6 +3135,14 @@ export const fromPubSub: { return options?.shutdown ? ensuring(stream, PubSub.shutdown(pubsub)) : stream } +/** @internal */ +export const fromTPubSub = (pubsub: TPubSub.TPubSub): Stream.Stream => { + return unwrapScoped(Effect.map( + TPubSub.subscribeScoped(pubsub), + (queue) => fromTQueue(queue) + )) +} + /** @internal */ export const fromIterable = (iterable: Iterable): Stream.Stream => suspend(() => @@ -3224,6 +3234,24 @@ export const fromQueue = ( options?.shutdown ? ensuring(Queue.shutdown(queue)) : identity ) +/** @internal */ +export const fromTQueue = (queue: TQueue.TDequeue): Stream.Stream => + pipe( + TQueue.take(queue), + Effect.map(Chunk.of), + Effect.catchAllCause((cause) => + pipe( + TQueue.isShutdown(queue), + Effect.flatMap((isShutdown) => + isShutdown && Cause.isInterrupted(cause) ? + pull.end() : + pull.failCause(cause) + ) + ) + ), + repeatEffectChunkOption + ) + /** @internal */ export const fromSchedule = (schedule: Schedule.Schedule): Stream.Stream => pipe( diff --git a/packages/effect/test/TSubscriptionRef.test.ts b/packages/effect/test/TSubscriptionRef.test.ts new file mode 100644 index 0000000000..2cc54e89b9 --- /dev/null +++ b/packages/effect/test/TSubscriptionRef.test.ts @@ -0,0 +1,148 @@ +import { Chunk, Deferred, Effect, Equal, Exit, Fiber, pipe, Random, STM, Stream } from "effect" +import * as Number from "effect/Number" +import * as it from "effect/test/utils/extend" +import * as TSubscriptionRef from "effect/TSubscriptionRef" +import { assert, describe } from "vitest" + +describe.concurrent("TSubscriptionRef", () => { + it.effect("only emits comitted values", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + + const transaction = pipe( + TSubscriptionRef.update(subscriptionRef, (n) => n + 1), + STM.tap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + ) + + const subscriber = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(1), + Stream.runCollect, + Effect.fork + )) + // stream doesn't work properly without a yield, it will drop values + yield* $(Effect.yieldNow()) + yield* $(STM.commit(transaction)) + yield* $(Effect.yieldNow()) + const result = yield* $(Fiber.join(subscriber)) + + assert.deepStrictEqual(Array.from(result), [2]) + })) + + it.effect("emits every comitted value", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + + const transaction = pipe( + TSubscriptionRef.update(subscriptionRef, (n) => n + 1), + STM.commit, + // stream doesn't work properly without a yield, it will drop the first value without this + Effect.tap(() => Effect.yieldNow()), + Effect.flatMap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + ) + + const subscriber = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(2), + Stream.runCollect, + Effect.fork + )) + // stream doesn't work properly without a yield, it will drop the first value without this + yield* $(Effect.yieldNow()) + yield* $(transaction) + const result = yield* $(Fiber.join(subscriber)) + + assert.deepStrictEqual(Array.from(result), [1, 2]) + })) + + it.effect("multiple subscribers can receive committed values", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + const deferred1 = yield* $(Deferred.make()) + const deferred2 = yield* $(Deferred.make()) + const subscriber1 = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.tap(() => Deferred.succeed(deferred1, void 0)), + Stream.take(3), + Stream.runCollect, + Effect.fork + )) + yield* $(Deferred.await(deferred1)) + yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + const subscriber2 = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.tap(() => Deferred.succeed(deferred2, void 0)), + Stream.take(2), + Stream.runCollect, + Effect.fork + )) + yield* $(Deferred.await(deferred2)) + yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + const result1 = yield* $(Fiber.join(subscriber1)) + const result2 = yield* $(Fiber.join(subscriber2)) + assert.deepStrictEqual(Array.from(result1), [0, 1, 2]) + assert.deepStrictEqual(Array.from(result2), [1, 2]) + })) + + it.effect("subscriptions are interruptible", () => + Effect.gen(function*($) { + const ref = yield* $(TSubscriptionRef.make(0)) + const deferred1 = yield* $(Deferred.make()) + const deferred2 = yield* $(Deferred.make()) + const subscriber1 = yield* $( + TSubscriptionRef.changesStream(ref), + Stream.tap(() => Deferred.succeed(deferred1, void 0)), + Stream.take(5), + Stream.runCollect, + Effect.fork + ) + yield* $(Deferred.await(deferred1)) + yield* $(TSubscriptionRef.update(ref, (n) => n + 1)) + const subscriber2 = yield* $( + TSubscriptionRef.changesStream(ref), + Stream.tap(() => Deferred.succeed(deferred2, void 0)), + Stream.take(2), + Stream.runCollect, + Effect.fork + ) + yield* $(Deferred.await(deferred2)) + yield* $(TSubscriptionRef.update(ref, (n) => n + 1)) + const result1 = yield* $(Fiber.interrupt(subscriber1)) + const result2 = yield* $(Fiber.join(subscriber2)) + assert.isTrue(Exit.isInterrupted(result1)) + assert.deepStrictEqual(Array.from(result2), [1, 2]) + })) + + it.effect("concurrent subscribes and unsubscribes are handled correctly", () => + Effect.gen(function*($) { + const subscriber = (subscriptionRef: TSubscriptionRef.TSubscriptionRef) => + pipe( + Random.nextIntBetween(0, 200), + Effect.flatMap((n) => + pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(n), + Stream.runCollect + ) + ) + ) + const ref = yield* $(TSubscriptionRef.make(0)) + const fiber = yield* $( + TSubscriptionRef.update(ref, (n) => n + 1), + Effect.forever, + Effect.fork + ) + const result = yield* $( + Effect.map( + Effect.all( + Array.from({ length: 2 }, () => subscriber(ref)), + { concurrency: 2 } + ), + Chunk.unsafeFromArray + ) + ) + yield* $(Fiber.interrupt(fiber)) + const isSorted = Chunk.every(result, (chunk) => Equal.equals(chunk, Chunk.sort(chunk, Number.Order))) + assert.isTrue(isSorted) + })) +})