Skip to content

Commit

Permalink
Ported from original by @ylibrach in #206
Browse files Browse the repository at this point in the history
  • Loading branch information
ylibrach authored and bartelink committed Oct 2, 2020
1 parent b4fd445 commit a9f5bc3
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl)
- `Cosmos`: Added ability to turn off compression of Unfolds [#248](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach)

### Changed

Expand Down
55 changes: 30 additions & 25 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,45 +89,50 @@ type Unfold =
/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
c: string // required

/// Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Base64DeflateUtf8JsonConverter>)>]
/// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Base64MaybeDeflateUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<JsonConverter(typeof<Base64DeflateUtf8JsonConverter>)>]
[<JsonConverter(typeof<Base64MaybeDeflateUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional

/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
/// Only applied to snapshots in the Tip
and Base64DeflateUtf8JsonConverter() =
/// Transparently encodes/decodes fields that can optionally by compressed by
/// 1. Writing outgoing JSON string values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter
/// 2a. Decoding incoming JSON string values by Decompressing it to a UTF-8 JSON array representation
/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter
and Base64MaybeDeflateUtf8JsonConverter() =
inherit JsonConverter()
let pickle (input : byte[]) : string =
if input = null then null else

use output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
System.Convert.ToBase64String(output.ToArray())
let unpickle str : byte[] =
if str = null then null else

let inflate str : byte[] =
let compressedBytes = System.Convert.FromBase64String str
use input = new System.IO.MemoryStream(compressedBytes)
use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo(output)
output.ToArray()

static member Compress (input : byte[]) : byte[] =
if input = null || input.Length = 0 then null else

use output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"")
|> System.Text.Encoding.UTF8.GetBytes

override __.CanConvert(objectType) =
typeof<byte[]>.Equals(objectType)
override __.ReadJson(reader, _, _, serializer) =
//( if reader.TokenType = JsonToken.Null then null else
serializer.Deserialize(reader, typedefof<string>) :?> string |> unpickle |> box
match reader.TokenType with
| JsonToken.Null -> null
| JsonToken.String -> serializer.Deserialize(reader, typedefof<string>) :?> string |> inflate |> box
| _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box
override __.WriteJson(writer, value, serializer) =
let pickled = value |> unbox |> pickle
serializer.Serialize(writer, pickled)
let array = value :?> byte[]
if array = null || array.Length = 0 then serializer.Serialize(writer, null)
else array |> System.Text.Encoding.UTF8.GetString |> writer.WriteRawValue

/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
Expand Down Expand Up @@ -538,13 +543,13 @@ module internal Sync =
let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = Array.ofSeq unfolds }
let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
unfolds
|> Seq.mapi (fun offset x ->
{ i = baseIndex + int64 offset
c = x.EventType
d = x.Data
m = x.Meta
d = compressor x.Data
m = compressor x.Meta
t = DateTimeOffset.UtcNow
} : Unfold)

Expand Down Expand Up @@ -1101,7 +1106,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE
let events', unfolds = transmute events state'
SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds
let baseIndex = pos.index + int64 (List.length events)
let projections = Sync.mkUnfold baseIndex projectionsEncoded
let projections = Sync.mkUnfold Base64MaybeDeflateUtf8JsonConverter.Compress baseIndex projectionsEncoded
let batch = Sync.mkBatch stream eventsEncoded projections
match! store.Sync(log, stream, exp, batch) with
| InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents)))
Expand Down
43 changes: 25 additions & 18 deletions tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,38 @@ let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault()
type Base64ZipUtf8Tests() =
let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings)

[<Fact>]
let ``serializes, achieving compression`` () =
let encoded = eventCodec.Encode(None,A { embed = String('x',5000) })
let ser eventType data =
let e : Core.Unfold =
{ i = 42L
c = encoded.EventType
d = encoded.Data
c = eventType
d = data
m = null
t = DateTimeOffset.MinValue }
let res = JsonConvert.SerializeObject e
test <@ res.Contains("\"d\":\"") && res.Length < 128 @>
JsonConvert.SerializeObject e

[<Fact>]
let ``serializes, achieving expected compression`` () =
let encoded = eventCodec.Encode(None,A { embed = String('x',5000) })
let res = ser encoded.EventType (Core.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data)
test <@ res.Contains("\"d\":\"") && res.Length < 138 @>

[<Property>]
let roundtrips value =
let encoded = eventCodec.Encode(None,value)
let e : Core.Unfold =
{ i = 42L
c = encoded.EventType
d = encoded.Data
m = null
t = DateTimeOffset.MinValue }
let ser = JsonConvert.SerializeObject(e)
test <@ ser.Contains("\"d\":\"") @>
System.Diagnostics.Trace.WriteLine ser
let roundtrips compress value =
let encoded = eventCodec.Encode(None, value)
let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id
let actualData = maybeCompressor encoded.Data
let ser = ser encoded.EventType actualData
test <@ if compress then ser.Contains("\"d\":\"")
else ser.Contains("\"d\":{") @>
let des = JsonConvert.DeserializeObject<Core.Unfold>(ser)
let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d)
let decoded = eventCodec.TryDecode d |> Option.get
test <@ value = decoded @>

[<Theory; InlineData false; InlineData true>]
let handlesNulls compress =
let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id
let maybeCompressed = maybeCompressor null
let ser = ser "AnEventType" maybeCompressed
let des = JsonConvert.DeserializeObject<Core.Unfold>(ser)
test <@ null = des.d @>

0 comments on commit a9f5bc3

Please sign in to comment.