Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tip compression toggle through optional argument #206

Merged
merged 5 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Equinox.Core/Stream.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
module Equinox.Core.Stream

/// Represents a specific stream in a ICategory
type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context) =
type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context, compress) =
interface IStream<'event, 'state> with
member __.Load log =
category.Load(log, streamId, opt)

member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, token, originState, events, context)
category.TrySync(log, token, originState, events, context, compress)

let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _
let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context compress : IStream<'event, 'state> = Stream(category, streamId, opt, context, compress) :> _

/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip
type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) =
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ICategory<'event, 'state, 'streamId, 'context> =
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option -> Async<SyncResult<'state>>
abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option * compress: bool -> Async<SyncResult<'state>>

/// Represents a time measurement of a computation that includes stopwatch tick metadata
[<NoEquality; NoComparison>]
Expand Down
78 changes: 51 additions & 27 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,19 @@ type [<NoEquality; NoComparison>] // TODO for STJ v5: All fields required unless
type JsonCompressedBase64Converter() =
inherit JsonConverter<JsonElement>()

static member Compress (value: JsonElement) =
if value.ValueKind = JsonValueKind.Null || value.ValueKind = JsonValueKind.Undefined then
value
else
let input = System.Text.Encoding.UTF8.GetBytes(value.GetRawText())
use output = new MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input, 0, input.Length)
compressor.Close()
JsonDocument.Parse("\"" + System.Convert.ToBase64String(output.ToArray()) + "\"").RootElement
bartelink marked this conversation as resolved.
Show resolved Hide resolved

override __.Read (reader, _typeToConvert, options) =
if reader.TokenType = JsonTokenType.Null then
if reader.TokenType <> JsonTokenType.String then
JsonSerializer.Deserialize<JsonElement>(&reader, options)
else
let compressedBytes = reader.GetBytesFromBase64()
Expand All @@ -90,16 +101,8 @@ type JsonCompressedBase64Converter() =
decompressor.CopyTo(output)
JsonSerializer.Deserialize<JsonElement>(ReadOnlySpan.op_Implicit(output.ToArray()), options)

override __.Write (writer, value, _options) =
if value.ValueKind = JsonValueKind.Null || value.ValueKind = JsonValueKind.Undefined then
writer.WriteNullValue()
else
let input = System.Text.Encoding.UTF8.GetBytes(value.GetRawText())
use output = new MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input, 0, input.Length)
compressor.Close()
writer.WriteBase64StringValue(ReadOnlySpan.op_Implicit(output.ToArray()))
override __.Write (writer, value, options) =
JsonSerializer.Serialize<JsonElement>(writer, value, options)

type JsonCompressedBase64ConverterAttribute () =
inherit JsonConverterAttribute(typeof<JsonCompressedBase64Converter>)
Expand Down Expand Up @@ -536,12 +539,23 @@ module Sync =
let batch (log : ILogger) retryPolicy containerStream batch: Async<Result> =
let call = logged containerStream batch
Log.withLoggedRetries retryPolicy "writeAttempt" call log

let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = [| for e in events -> { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } |]
u = Array.ofSeq unfolds }
let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta; t = DateTimeOffset.UtcNow } : Unfold)

let mkUnfold compress baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
let compressor = if compress then JsonCompressedBase64Converter.Compress else id
unfolds
|> Seq.mapi (fun offset x ->
{
i = baseIndex + int64 offset
c = x.EventType
d = compressor x.Data
m = compressor x.Meta
t = DateTimeOffset.UtcNow
} : Unfold)

module internal Tip =
let private get (container : EquinoxCosmosClient, stream : string) (maybePos: Position option) =
Expand Down Expand Up @@ -891,7 +905,7 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven
match res with
| LoadFromTokenResult.Unchanged -> return current
| LoadFromTokenResult.Found (token', events') -> return token', fold state events' }
member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log, context): Async<SyncResult<'state>> = async {
member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, compress, log, context): Async<SyncResult<'state>> = async {
let state' = fold state (Seq.ofList events)
let encode e = codec.Encode(context,e)
let exp,events,eventsEncoded,projectionsEncoded =
Expand All @@ -902,7 +916,7 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven
let events', unfolds = transmute events state'
Sync.Exp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds
let baseIndex = pos.index + int64 (List.length events)
let projections = Sync.mkUnfold baseIndex projectionsEncoded
let projections = Sync.mkUnfold compress baseIndex projectionsEncoded
let batch = Sync.mkBatch stream eventsEncoded projections
let! res = gateway.Sync log (container,stream) (exp,batch)
match res with
Expand All @@ -922,9 +936,9 @@ module Caching =
interface ICategory<'event, 'state, EquinoxCosmosClient*string, 'context> with
member __.Load(log, (container,streamName), opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, (container,streamName), opt)) streamName
member __.TrySync(log : ILogger, (Token.Unpack (_container,stream,_) as streamToken), state, events : 'event list, context)
member __.TrySync(log : ILogger, (Token.Unpack (_container,stream,_) as streamToken), state, events : 'event list, context, compress)
: Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, streamToken, state, events, context)
let! syncRes = inner.TrySync(log, streamToken, state, events, context, compress)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict(loadAndIntercept resync stream)
| SyncResult.Written(token', state') ->
Expand Down Expand Up @@ -958,9 +972,9 @@ type private Folder<'event, 'state, 'context>
| None -> return! batched log (container,streamName)
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log }
member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context)
member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context, compress)
: Async<SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log, context)
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, compress, log, context)
match res with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token',state') -> return SyncResult.Written (token',state') }
Expand Down Expand Up @@ -1109,28 +1123,38 @@ type Resolver<'event, 'state, 'context>(context : Context, codec, fold, initial,
| CachingStrategy.SlidingWindow(cache, window) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let resolveStream (streamId, maybeContainerInitializationGate) opt context =
let resolveStream (streamId, maybeContainerInitializationGate) opt context compress =
{ new IStream<'event, 'state> with
member __.Load log = category.Load(log, streamId, opt)
member __.TrySync(log: ILogger, token: StreamToken, originState: 'state, events: 'event list) =
match maybeContainerInitializationGate with
| None -> category.TrySync(log, token, originState, events, context)
| None -> category.TrySync(log, token, originState, events, context, compress)
| Some init -> async {
do! init ()
return! category.TrySync(log, token, originState, events, context) } }
return! category.TrySync(log, token, originState, events, context, compress) } }

let resolveTarget = function
| StreamName.CategoryAndId (categoryName, streamId) -> context.ResolveContainerStream(categoryName, streamId)

member __.Resolve(streamName : StreamName, [<O; D null>]?option, [<O; D null>]?context) =
member __.Resolve
( streamName : StreamName,
[<O; D null>]?option,
[<O; D null>]?context,
/// Determines whether the data and metadata payloads of the `u`nfolds in the Tip document are base64 encoded and compressed; defaults to true
[<O; D true>]?compress) =
let compress = defaultArg compress true
match resolveTarget streamName, option with
| streamArgs,(None|Some AllowStale) -> resolveStream streamArgs option context
| streamArgs,(None|Some AllowStale) -> resolveStream streamArgs option context compress
| (containerStream,maybeInit),Some AssumeEmpty ->
Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit) option context)
Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit) option context compress)

member __.FromMemento(Token.Unpack (container,stream,_pos) as streamToken,state) =
member __.FromMemento
( Token.Unpack (container,stream,_pos) as streamToken,
state,
/// Determines whether the data and metadata payloads of the `u`nfolds in the Tip document are base64 encoded and compressed; defaults to true
[<O; D true>]?compress) =
let skipInitialization = None
Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization) None None)
Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization) None None (defaultArg compress true))

[<RequireQualifiedAccess; NoComparison>]
type Discovery =
Expand Down
12 changes: 6 additions & 6 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ module Caching =
member __.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName, opt)) streamName

member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, token, state, events, context)
member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context, compress) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, token, state, events, context, compress)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name)
| SyncResult.Written (token', state') ->
Expand Down Expand Up @@ -548,7 +548,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }

member __.TrySync(log : ILogger, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
member __.TrySync(log : ILogger, token, initialState, events : 'event list, context, _compress) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync(log, fold, token, initialState, events, context)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
Expand Down Expand Up @@ -596,12 +596,12 @@ type Resolver<'event, 'state, 'context>

member __.Resolve(streamName : FsCodec.StreamName, [<O; D null>] ?option, [<O; D null>] ?context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None|Some AllowStale) -> resolveStream sn option context
| sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context)
| sn, (None|Some AllowStale) -> resolveStream sn option context true
| sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context true)

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack token as streamToken, state, ?context) =
Stream.ofMemento (streamToken, state) (resolveStream token.stream.name context None)
Stream.ofMemento (streamToken, state) (resolveStream token.stream.name context None true)

type private SerilogAdapter(log : ILogger) =
interface EventStore.ClientAPI.ILogger with
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
| Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option, _compress) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i) (codec.Encode(context,e))) |> Array.ofSeq
Expand All @@ -82,7 +82,7 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,

type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context
let resolveStream streamName context = Stream.create category streamName None context true
member __.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None|Some AllowStale) -> resolveStream sn context
Expand Down
12 changes: 6 additions & 6 deletions src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ module Caching =
interface ICategory<'event, 'state, string, 'context> with
member __.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName, opt)) streamName
member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, token, state, events, context)
member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context, compress) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, token, state, events, context, compress)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name)
| SyncResult.Written (token',state') ->
Expand Down Expand Up @@ -504,7 +504,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| None -> return! batched log streamName
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }
member __.TrySync(log : ILogger, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
member __.TrySync(log : ILogger, token, initialState, events : 'event list, context, _compress) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync(log, fold, token, initialState, events, context)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
Expand Down Expand Up @@ -547,12 +547,12 @@ type Resolver<'event, 'state, 'context>
let loadEmpty sn = context.LoadEmpty sn,initial
member __.Resolve(streamName : FsCodec.StreamName, [<O; D null>]?option, [<O; D null>]?context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None|Some AllowStale) -> resolveStream sn option context
| sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context)
| sn, (None|Some AllowStale) -> resolveStream sn option context true
| sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context true)

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack token as streamToken, state, ?context) =
Stream.ofMemento (streamToken,state) (resolveStream token.stream.name context None)
Stream.ofMemento (streamToken,state) (resolveStream token.stream.name context None true)

[<AbstractClass>]
type ConnectorBase([<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
Expand Down
Loading