From a9f5bc3ca933f877717888445e030f00556cfc84 Mon Sep 17 00:00:00 2001 From: Yaron Librach Date: Thu, 19 Mar 2020 11:11:34 -0400 Subject: [PATCH 1/3] Ported from original by @ylibrach in #206 --- CHANGELOG.md | 1 + src/Equinox.CosmosStore/CosmosStore.fs | 55 ++++++++++--------- .../JsonConverterTests.fs | 43 +++++++++------ 3 files changed, 56 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6dea62f8b..2a592e95e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) +- `Cosmos`: Added ability to turn off compression of Unfolds [#248](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach) ### Changed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 3c2e0fa82..92c264fa9 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -89,30 +89,22 @@ type Unfold = /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization c: string // required - /// Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] + /// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64 + [)>] d: byte[] // required /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] + [)>] [] m: byte[] } // optional -/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc -/// Only applied to snapshots in the Tip -and Base64DeflateUtf8JsonConverter() = +/// Transparently encodes/decodes fields that can optionally by compressed by +/// 1. Writing outgoing JSON string values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter +/// 2a. Decoding incoming JSON string values by Decompressing it to a UTF-8 JSON array representation +/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter +and Base64MaybeDeflateUtf8JsonConverter() = inherit JsonConverter() - let pickle (input : byte[]) : string = - if input = null then null else - - use output = new System.IO.MemoryStream() - use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) - compressor.Write(input,0,input.Length) - compressor.Close() - System.Convert.ToBase64String(output.ToArray()) - let unpickle str : byte[] = - if str = null then null else - + let inflate str : byte[] = let compressedBytes = System.Convert.FromBase64String str use input = new System.IO.MemoryStream(compressedBytes) use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) @@ -120,14 +112,27 @@ and Base64DeflateUtf8JsonConverter() = decompressor.CopyTo(output) output.ToArray() + static member Compress (input : byte[]) : byte[] = + if input = null || input.Length = 0 then null else + + use output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) + compressor.Write(input,0,input.Length) + compressor.Close() + String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"") + |> System.Text.Encoding.UTF8.GetBytes + override __.CanConvert(objectType) = typeof.Equals(objectType) override __.ReadJson(reader, _, _, serializer) = - //( if reader.TokenType = JsonToken.Null then null else - serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box + match reader.TokenType with + | JsonToken.Null -> null + | JsonToken.String -> serializer.Deserialize(reader, typedefof) :?> string |> inflate |> box + | _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box override __.WriteJson(writer, value, serializer) = - let pickled = value |> unbox |> pickle - serializer.Serialize(writer, pickled) + let array = value :?> byte[] + if array = null || array.Length = 0 then serializer.Serialize(writer, null) + else array |> System.Text.Encoding.UTF8.GetString |> writer.WriteRawValue /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) @@ -538,13 +543,13 @@ module internal Sync = let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null e = Array.map mkEvent events; u = Array.ofSeq unfolds } - let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq = + let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq = unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset c = x.EventType - d = x.Data - m = x.Meta + d = compressor x.Data + m = compressor x.Meta t = DateTimeOffset.UtcNow } : Unfold) @@ -1101,7 +1106,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let events', unfolds = transmute events state' SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds let baseIndex = pos.index + int64 (List.length events) - let projections = Sync.mkUnfold baseIndex projectionsEncoded + let projections = Sync.mkUnfold Base64MaybeDeflateUtf8JsonConverter.Compress baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents))) diff --git a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs index 67f5d2861..2eb8ca17a 100644 --- a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs +++ b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs @@ -18,31 +18,38 @@ let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault() type Base64ZipUtf8Tests() = let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings) - [] - let ``serializes, achieving compression`` () = - let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let ser eventType data = let e : Core.Unfold = { i = 42L - c = encoded.EventType - d = encoded.Data + c = eventType + d = data m = null t = DateTimeOffset.MinValue } - let res = JsonConvert.SerializeObject e - test <@ res.Contains("\"d\":\"") && res.Length < 128 @> + JsonConvert.SerializeObject e + + [] + let ``serializes, achieving expected compression`` () = + let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let res = ser encoded.EventType (Core.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data) + test <@ res.Contains("\"d\":\"") && res.Length < 138 @> [] - let roundtrips value = - let encoded = eventCodec.Encode(None,value) - let e : Core.Unfold = - { i = 42L - c = encoded.EventType - d = encoded.Data - m = null - t = DateTimeOffset.MinValue } - let ser = JsonConvert.SerializeObject(e) - test <@ ser.Contains("\"d\":\"") @> - System.Diagnostics.Trace.WriteLine ser + let roundtrips compress value = + let encoded = eventCodec.Encode(None, value) + let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let actualData = maybeCompressor encoded.Data + let ser = ser encoded.EventType actualData + test <@ if compress then ser.Contains("\"d\":\"") + else ser.Contains("\"d\":{") @> let des = JsonConvert.DeserializeObject(ser) let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d) let decoded = eventCodec.TryDecode d |> Option.get test <@ value = decoded @> + + [] + let handlesNulls compress = + let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let maybeCompressed = maybeCompressor null + let ser = ser "AnEventType" maybeCompressed + let des = JsonConvert.DeserializeObject(ser) + test <@ null = des.d @> From b0345b4b26a82e99d16f851e813604cca18ccc79 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 2 Oct 2020 13:14:30 +0100 Subject: [PATCH 2/3] Expose option on CosmosCategory --- src/Equinox.CosmosStore/CosmosStore.fs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 92c264fa9..27e9809ed 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -99,8 +99,8 @@ type Unfold = m: byte[] } // optional /// Transparently encodes/decodes fields that can optionally by compressed by -/// 1. Writing outgoing JSON string values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter -/// 2a. Decoding incoming JSON string values by Decompressing it to a UTF-8 JSON array representation +/// 1. Writing outgoing values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter +/// 2a. Decoding incoming JSON String values by Decompressing it to a UTF-8 JSON array representation /// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter and Base64MaybeDeflateUtf8JsonConverter() = inherit JsonConverter() @@ -1094,7 +1094,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE match! store.Reload(log, (stream, pos), (codec.TryDecode,isOrigin), ?preview = preloaded) with | LoadFromTokenResult.Unchanged -> return streamToken, state | LoadFromTokenResult.Found (token', events) -> return token', fold state events } - member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context): Async> = async { + member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds): Async> = async { let state' = fold state (Seq.ofList events) let encode e = codec.Encode(context, e) let (Token.Unpack (stream,pos)) = token @@ -1106,7 +1106,8 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let events', unfolds = transmute events state' SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds let baseIndex = pos.index + int64 (List.length events) - let projections = Sync.mkUnfold Base64MaybeDeflateUtf8JsonConverter.Compress baseIndex projectionsEncoded + let compressor = if compressUnfolds then Base64MaybeDeflateUtf8JsonConverter.Compress else id + let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents))) @@ -1148,6 +1149,7 @@ type internal Folder<'event, 'state, 'context> ( category: Category<'event, 'state, 'context>, fold: 'state -> 'event seq -> 'state, initial: 'state, isOrigin: 'event -> bool, mapUnfolds: Choice 'state -> 'event seq),('event list -> 'state -> 'event list * 'event list)>, + compressUnfolds, ?readCache) = let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true let batched log stream = category.Load(log, stream, initial, inspectUnfolds, fold, isOrigin) @@ -1162,7 +1164,7 @@ type internal Folder<'event, 'state, 'context> | Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) } member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context) : Async> = async { - match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context) with + match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token',state') -> return SyncResult.Written (token',state') } @@ -1283,7 +1285,12 @@ type AccessStrategy<'event,'state> = /// | Custom of isOrigin: ('event -> bool) * transmute: ('event list -> 'state -> 'event list*'event list) -type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext, codec, fold, initial, caching, access) = +type CosmosStoreCategory<'event, 'state, 'context> + ( context : CosmosStoreContext, codec, fold, initial, caching, access, + /// Compress Unfolds in Tip. Default: true. + /// NOTE when set to false, requires Equinox.Cosmos / Equinox.CosmosStore Version >= 2.3.0 to be able to read + ?compressUnfolds) = + let compressUnfolds = defaultArg compressUnfolds true let readCacheOption = match caching with | CachingStrategy.NoCaching -> None @@ -1300,7 +1307,7 @@ type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext, let resolveCategory (categoryName, container) = let createCategory _name = let cosmosCat = Category<'event, 'state, 'context>(container, codec) - let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption) + let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, compressUnfolds, ?readCache = readCacheOption) match caching with | CachingStrategy.NoCaching -> folder :> ICategory<_, _, string, 'context> | CachingStrategy.SlidingWindow(cache, window) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder From 1164785f4592e88112589c7682a9d64743e1a2c1 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 2 Oct 2020 13:58:55 +0100 Subject: [PATCH 3/3] Tidy/sync with #249 --- CHANGELOG.md | 2 +- src/Equinox.CosmosStore/CosmosStore.fs | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a592e95e..c1eb31869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) -- `Cosmos`: Added ability to turn off compression of Unfolds [#248](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach) +- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach) ### Changed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 27e9809ed..42ff44c2f 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -111,8 +111,7 @@ and Base64MaybeDeflateUtf8JsonConverter() = use output = new System.IO.MemoryStream() decompressor.CopyTo(output) output.ToArray() - - static member Compress (input : byte[]) : byte[] = + static member Compress(input : byte[]) : byte[] = if input = null || input.Length = 0 then null else use output = new System.IO.MemoryStream() @@ -121,7 +120,6 @@ and Base64MaybeDeflateUtf8JsonConverter() = compressor.Close() String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"") |> System.Text.Encoding.UTF8.GetBytes - override __.CanConvert(objectType) = typeof.Equals(objectType) override __.ReadJson(reader, _, _, serializer) = @@ -132,7 +130,7 @@ and Base64MaybeDeflateUtf8JsonConverter() = override __.WriteJson(writer, value, serializer) = let array = value :?> byte[] if array = null || array.Length = 0 then serializer.Serialize(writer, null) - else array |> System.Text.Encoding.UTF8.GetString |> writer.WriteRawValue + else System.Text.Encoding.UTF8.GetString array |> writer.WriteRawValue /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) @@ -1286,10 +1284,10 @@ type AccessStrategy<'event,'state> = | Custom of isOrigin: ('event -> bool) * transmute: ('event list -> 'state -> 'event list*'event list) type CosmosStoreCategory<'event, 'state, 'context> - ( context : CosmosStoreContext, codec, fold, initial, caching, access, - /// Compress Unfolds in Tip. Default: true. - /// NOTE when set to false, requires Equinox.Cosmos / Equinox.CosmosStore Version >= 2.3.0 to be able to read - ?compressUnfolds) = + ( context : CosmosStoreContext, codec, fold, initial, caching, access, + /// Compress Unfolds in Tip. Default: true. + /// NOTE when set to false, requires Equinox.Cosmos / Equinox.CosmosStore Version >= 2.3.0 to be able to read + ?compressUnfolds) = let compressUnfolds = defaultArg compressUnfolds true let readCacheOption = match caching with