Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosStore: Enable uncompressed unfolds as in v2 #249 #248

Merged
merged 3 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl)
- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach)

### Changed

Expand Down
68 changes: 39 additions & 29 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,45 +89,48 @@ type Unfold =
/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
c: string // required

/// Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Base64DeflateUtf8JsonConverter>)>]
/// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Base64MaybeDeflateUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<JsonConverter(typeof<Base64DeflateUtf8JsonConverter>)>]
[<JsonConverter(typeof<Base64MaybeDeflateUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional

/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
/// Only applied to snapshots in the Tip
and Base64DeflateUtf8JsonConverter() =
/// Transparently encodes/decodes fields that can optionally by compressed by
/// 1. Writing outgoing values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter
/// 2a. Decoding incoming JSON String values by Decompressing it to a UTF-8 JSON array representation
/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter
and Base64MaybeDeflateUtf8JsonConverter() =
inherit JsonConverter()
let pickle (input : byte[]) : string =
if input = null then null else

use output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
System.Convert.ToBase64String(output.ToArray())
let unpickle str : byte[] =
if str = null then null else

let inflate str : byte[] =
let compressedBytes = System.Convert.FromBase64String str
use input = new System.IO.MemoryStream(compressedBytes)
use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo(output)
output.ToArray()
static member Compress(input : byte[]) : byte[] =
if input = null || input.Length = 0 then null else

use output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"")
|> System.Text.Encoding.UTF8.GetBytes
override __.CanConvert(objectType) =
typeof<byte[]>.Equals(objectType)
override __.ReadJson(reader, _, _, serializer) =
//( if reader.TokenType = JsonToken.Null then null else
serializer.Deserialize(reader, typedefof<string>) :?> string |> unpickle |> box
match reader.TokenType with
| JsonToken.Null -> null
| JsonToken.String -> serializer.Deserialize(reader, typedefof<string>) :?> string |> inflate |> box
| _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box
override __.WriteJson(writer, value, serializer) =
let pickled = value |> unbox |> pickle
serializer.Serialize(writer, pickled)
let array = value :?> byte[]
if array = null || array.Length = 0 then serializer.Serialize(writer, null)
else System.Text.Encoding.UTF8.GetString array |> writer.WriteRawValue

/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
Expand Down Expand Up @@ -538,13 +541,13 @@ module internal Sync =
let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = Array.ofSeq unfolds }
let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
unfolds
|> Seq.mapi (fun offset x ->
{ i = baseIndex + int64 offset
c = x.EventType
d = x.Data
m = x.Meta
d = compressor x.Data
m = compressor x.Meta
t = DateTimeOffset.UtcNow
} : Unfold)

Expand Down Expand Up @@ -1089,7 +1092,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE
match! store.Reload(log, (stream, pos), (codec.TryDecode,isOrigin), ?preview = preloaded) with
| LoadFromTokenResult.Unchanged -> return streamToken, state
| LoadFromTokenResult.Found (token', events) -> return token', fold state events }
member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context): Async<SyncResult<'state>> = async {
member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds): Async<SyncResult<'state>> = async {
let state' = fold state (Seq.ofList events)
let encode e = codec.Encode(context, e)
let (Token.Unpack (stream,pos)) = token
Expand All @@ -1101,7 +1104,8 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE
let events', unfolds = transmute events state'
SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds
let baseIndex = pos.index + int64 (List.length events)
let projections = Sync.mkUnfold baseIndex projectionsEncoded
let compressor = if compressUnfolds then Base64MaybeDeflateUtf8JsonConverter.Compress else id
let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded
let batch = Sync.mkBatch stream eventsEncoded projections
match! store.Sync(log, stream, exp, batch) with
| InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents)))
Expand Down Expand Up @@ -1143,6 +1147,7 @@ type internal Folder<'event, 'state, 'context>
( category: Category<'event, 'state, 'context>, fold: 'state -> 'event seq -> 'state, initial: 'state,
isOrigin: 'event -> bool,
mapUnfolds: Choice<unit,('event list -> 'state -> 'event seq),('event list -> 'state -> 'event list * 'event list)>,
compressUnfolds,
?readCache) =
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
let batched log stream = category.Load(log, stream, initial, inspectUnfolds, fold, isOrigin)
Expand All @@ -1157,7 +1162,7 @@ type internal Folder<'event, 'state, 'context>
| Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) }
member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context)
: Async<SyncResult<'state>> = async {
match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context) with
match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token',state') -> return SyncResult.Written (token',state') }

Expand Down Expand Up @@ -1278,7 +1283,12 @@ type AccessStrategy<'event,'state> =
/// </remarks>
| Custom of isOrigin: ('event -> bool) * transmute: ('event list -> 'state -> 'event list*'event list)

type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext, codec, fold, initial, caching, access) =
type CosmosStoreCategory<'event, 'state, 'context>
( context : CosmosStoreContext, codec, fold, initial, caching, access,
/// Compress Unfolds in Tip. Default: <c>true</c>.
/// NOTE when set to <c>false</c>, requires Equinox.Cosmos / Equinox.CosmosStore Version >= 2.3.0 to be able to read
?compressUnfolds) =
let compressUnfolds = defaultArg compressUnfolds true
let readCacheOption =
match caching with
| CachingStrategy.NoCaching -> None
Expand All @@ -1295,7 +1305,7 @@ type CosmosStoreCategory<'event, 'state, 'context>(context : CosmosStoreContext,
let resolveCategory (categoryName, container) =
let createCategory _name =
let cosmosCat = Category<'event, 'state, 'context>(container, codec)
let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption)
let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, compressUnfolds, ?readCache = readCacheOption)
match caching with
| CachingStrategy.NoCaching -> folder :> ICategory<_, _, string, 'context>
| CachingStrategy.SlidingWindow(cache, window) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder
Expand Down
43 changes: 25 additions & 18 deletions tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,38 @@ let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault()
type Base64ZipUtf8Tests() =
let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings)

[<Fact>]
let ``serializes, achieving compression`` () =
let encoded = eventCodec.Encode(None,A { embed = String('x',5000) })
let ser eventType data =
let e : Core.Unfold =
{ i = 42L
c = encoded.EventType
d = encoded.Data
c = eventType
d = data
m = null
t = DateTimeOffset.MinValue }
let res = JsonConvert.SerializeObject e
test <@ res.Contains("\"d\":\"") && res.Length < 128 @>
JsonConvert.SerializeObject e

[<Fact>]
let ``serializes, achieving expected compression`` () =
let encoded = eventCodec.Encode(None,A { embed = String('x',5000) })
let res = ser encoded.EventType (Core.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data)
test <@ res.Contains("\"d\":\"") && res.Length < 138 @>

[<Property>]
let roundtrips value =
let encoded = eventCodec.Encode(None,value)
let e : Core.Unfold =
{ i = 42L
c = encoded.EventType
d = encoded.Data
m = null
t = DateTimeOffset.MinValue }
let ser = JsonConvert.SerializeObject(e)
test <@ ser.Contains("\"d\":\"") @>
System.Diagnostics.Trace.WriteLine ser
let roundtrips compress value =
let encoded = eventCodec.Encode(None, value)
let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id
let actualData = maybeCompressor encoded.Data
let ser = ser encoded.EventType actualData
test <@ if compress then ser.Contains("\"d\":\"")
else ser.Contains("\"d\":{") @>
let des = JsonConvert.DeserializeObject<Core.Unfold>(ser)
let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d)
let decoded = eventCodec.TryDecode d |> Option.get
test <@ value = decoded @>

[<Theory; InlineData false; InlineData true>]
let handlesNulls compress =
let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id
let maybeCompressed = maybeCompressor null
let ser = ser "AnEventType" maybeCompressed
let des = JsonConvert.DeserializeObject<Core.Unfold>(ser)
test <@ null = des.d @>