diff --git a/CHANGELOG.md b/CHANGELOG.md index 6dea62f8b..c1eb31869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) +- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach) ### Changed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 3c2e0fa82..42ff44c2f 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -89,45 +89,48 @@ type Unfold = /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization c: string // required - /// Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] + /// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64 + [)>] d: byte[] // required /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] + [)>] [] m: byte[] } // optional -/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc -/// Only applied to snapshots in the Tip -and Base64DeflateUtf8JsonConverter() = +/// Transparently encodes/decodes fields that can optionally by compressed by +/// 1. Writing outgoing values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter +/// 2a. Decoding incoming JSON String values by Decompressing it to a UTF-8 JSON array representation +/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter +and Base64MaybeDeflateUtf8JsonConverter() = inherit JsonConverter() - let pickle (input : byte[]) : string = - if input = null then null else - - use output = new System.IO.MemoryStream() - use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) - compressor.Write(input,0,input.Length) - compressor.Close() - System.Convert.ToBase64String(output.ToArray()) - let unpickle str : byte[] = - if str = null then null else - + let inflate str : byte[] = let compressedBytes = System.Convert.FromBase64String str use input = new System.IO.MemoryStream(compressedBytes) use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) use output = new System.IO.MemoryStream() decompressor.CopyTo(output) output.ToArray() + static member Compress(input : byte[]) : byte[] = + if input = null || input.Length = 0 then null else + use output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) + compressor.Write(input,0,input.Length) + compressor.Close() + String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"") + |> System.Text.Encoding.UTF8.GetBytes override __.CanConvert(objectType) = typeof.Equals(objectType) override __.ReadJson(reader, _, _, serializer) = - //( if reader.TokenType = JsonToken.Null then null else - serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box + match reader.TokenType with + | JsonToken.Null -> null + | JsonToken.String -> serializer.Deserialize(reader, typedefof) :?> string |> inflate |> box + | _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box override __.WriteJson(writer, value, serializer) = - let pickled = value |> unbox |> pickle - serializer.Serialize(writer, pickled) + let array = value :?> byte[] + if array = null || array.Length = 0 then serializer.Serialize(writer, null) + else System.Text.Encoding.UTF8.GetString array |> writer.WriteRawValue /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) @@ -538,13 +541,13 @@ module internal Sync = let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null e = Array.map mkEvent events; u = Array.ofSeq unfolds } - let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq = + let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq = unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset c = x.EventType - d = x.Data - m = x.Meta + d = compressor x.Data + m = compressor x.Meta t = DateTimeOffset.UtcNow } : Unfold) @@ -1089,7 +1092,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE match! store.Reload(log, (stream, pos), (codec.TryDecode,isOrigin), ?preview = preloaded) with | LoadFromTokenResult.Unchanged -> return streamToken, state | LoadFromTokenResult.Found (token', events) -> return token', fold state events } - member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context): Async> = async { + member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds): Async> = async { let state' = fold state (Seq.ofList events) let encode e = codec.Encode(context, e) let (Token.Unpack (stream,pos)) = token @@ -1101,7 +1104,8 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let events', unfolds = transmute events state' SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds let baseIndex = pos.index + int64 (List.length events) - let projections = Sync.mkUnfold baseIndex projectionsEncoded + let compressor = if compressUnfolds then Base64MaybeDeflateUtf8JsonConverter.Compress else id + let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents))) @@ -1143,6 +1147,7 @@ type internal Folder<'event, 'state, 'context> ( category: Category<'event, 'state, 'context>, fold: 'state -> 'event seq -> 'state, initial: 'state, isOrigin: 'event -> bool, mapUnfolds: Choice 'state -> 'event seq),('event list -> 'state -> 'event list * 'event list)>, + compressUnfolds, ?readCache) = let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true let batched log stream = category.Load(log, stream, initial, inspectUnfolds, fold, isOrigin) @@ -1157,7 +1162,7 @@ type internal Folder<'event, 'state, 'context> | Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) } member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context) : Async> = async { - match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context) with + match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token',state') -> return SyncResult.Written (token',state') } @@ -1278,7 +1283,12 @@ type AccessStrategy<'event,'state> = /// | Custom of isOrigin: ('event -> bool) * transmute: ('event list -> 'state -> 'event list*'event list) -type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext, codec, fold, initial, caching, access) = +type CosmosStoreCategory<'event, 'state, 'context> + ( context : CosmosStoreContext, codec, fold, initial, caching, access, + /// Compress Unfolds in Tip. Default: true. + /// NOTE when set to false, requires Equinox.Cosmos / Equinox.CosmosStore Version >= 2.3.0 to be able to read + ?compressUnfolds) = + let compressUnfolds = defaultArg compressUnfolds true let readCacheOption = match caching with | CachingStrategy.NoCaching -> None @@ -1295,7 +1305,7 @@ type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext, let resolveCategory (categoryName, container) = let createCategory _name = let cosmosCat = Category<'event, 'state, 'context>(container, codec) - let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption) + let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, compressUnfolds, ?readCache = readCacheOption) match caching with | CachingStrategy.NoCaching -> folder :> ICategory<_, _, string, 'context> | CachingStrategy.SlidingWindow(cache, window) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder diff --git a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs index 67f5d2861..2eb8ca17a 100644 --- a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs +++ b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs @@ -18,31 +18,38 @@ let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault() type Base64ZipUtf8Tests() = let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings) - [] - let ``serializes, achieving compression`` () = - let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let ser eventType data = let e : Core.Unfold = { i = 42L - c = encoded.EventType - d = encoded.Data + c = eventType + d = data m = null t = DateTimeOffset.MinValue } - let res = JsonConvert.SerializeObject e - test <@ res.Contains("\"d\":\"") && res.Length < 128 @> + JsonConvert.SerializeObject e + + [] + let ``serializes, achieving expected compression`` () = + let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let res = ser encoded.EventType (Core.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data) + test <@ res.Contains("\"d\":\"") && res.Length < 138 @> [] - let roundtrips value = - let encoded = eventCodec.Encode(None,value) - let e : Core.Unfold = - { i = 42L - c = encoded.EventType - d = encoded.Data - m = null - t = DateTimeOffset.MinValue } - let ser = JsonConvert.SerializeObject(e) - test <@ ser.Contains("\"d\":\"") @> - System.Diagnostics.Trace.WriteLine ser + let roundtrips compress value = + let encoded = eventCodec.Encode(None, value) + let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let actualData = maybeCompressor encoded.Data + let ser = ser encoded.EventType actualData + test <@ if compress then ser.Contains("\"d\":\"") + else ser.Contains("\"d\":{") @> let des = JsonConvert.DeserializeObject(ser) let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d) let decoded = eventCodec.TryDecode d |> Option.get test <@ value = decoded @> + + [] + let handlesNulls compress = + let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let maybeCompressed = maybeCompressor null + let ser = ser "AnEventType" maybeCompressed + let des = JsonConvert.DeserializeObject(ser) + test <@ null = des.d @>