Our goal here is to create a format and a library for data replication using state-of-the-art Replicated Data Types. Replicated Data interchange format (RDX) is like protobuf, but CRDT. Apart from RPC applications, one can use it for data storage, distributed and asynchronous data exchange and in other similar applications. RDX fully supports local-first, offline-first and peer-to-peer replication, with no central server required, as any two replicas can merge their data. By installing RDX data types as merge operators in an LSM database (leveldb, RocksDB, pebble, Cassandra, etc) one can effectively have a CRDT database (which Chotki basically is).
We will implement unified CRDTs able to synchronize using operations, full states or deltas. Types may imply causal consistency of updates in matters of performance, but their correctness does not depend on that. RDX data types are fully commutative, associative and idempotent. Hence, immune to reordering or duplication of updates.
The default syncing protocol (not described here) generally relies on version vectors. Do not confuse that with vector clocks used by Amazon Dynamo and similar systems. While there are strong parallels, inner workings of VV and VC are not identical.
Our objects can have fields of the following CRDT types. Each type is named by a letter.
- last-write-wins variables (
I
for int64,S
for string,F
is float64, andR
is id64) - counters,
N
increment-only uint64 andZ
two-way int64 - maps (M), like key-value maps, where keys and values are
FIRST
- sets (E), contain arbitrary
FIRST
elements - arrays (L) of arbitrary
FIRST
elements - version vectors (V)
- codegen
The format and the merge rules are as follows.
The last-write-wins register is the simplest data type to
implement. For each LWW field, we only need the latest "winner"
op containing the logical timestamp and the value per se. A
logical timestamp is a pair {rev, src}
where rev
is the
revision number and src
is the id of the author. For example,
let's see how a bare (no TLV envelope) I
int64 -11
would
look like, assuming it is the 4th revision of the register
autored by replica #5. The TLV would look like: 32 08 05 15
(hex) where 0x15
is a zig-zag encoded and zipped -11
,
while 32 08 05
is a tiny ToyTLV record for a zipped pair
of ints, 4 (signed, zig-zagged, so 08
) and 5 (unsigned, so
05
). If we add a ToyTLV envelope, that becomes 69 04 32 08 05 15
(type of record I
, length 4, then the bare part).
String S
values are simply UTF-8 strings. Int64 I
, float64
F
and id64 R
values get compressed using zip_int
routines. Overlong encodings are forbidden both for strings and
for zip-ints!
T
ops have a timestamp, but no value. That is the equivalent
of a nil
or void
value. Those are used as placeholders in
various cases.
The string value for FIRST
types is as follows:
F
the e-notation, JSON-likeI
signed integer notation,R
5-8-3 hex notation (e.g.c187-3a62-12
)S
double-quoted JSON-like, e.g. "Sarah O'Connor"T
null
Merge rules for LWW are straighforward:
- higher revision wins
- in case of a tie, higher value wins (like bytes.Compare())
- in case of a tie, who cares, but higher replica id wins
N
are increment-only counters. Their TLV state is a sequence
of T
records containing zipped uint64 pairs {val,src}, the
counter value and source replica id. As the counter is inc-only,
we may use the value itself as a revision number. The merge
operator is per-replica max
, as later versions are greater.
The native value is the sum of all replica values (sum of
contributions).
Z
are two-way counters (inc/dec). Their TLV format is a
sequence of I
records each having {rev,src}
metadata as
described in the FIRST
section. One record corresponds to one
source, per-source merge rules are same as LWW. The native value
is the sum of all I
values.
Generic sets containing any FIRST
elements. The TLV format is
a sequence of enveloped FIRST records. It can contain records
with negative revision numbers. Those are tombstones (deleted
entries). For example, I{4,5}-11
from the FIRST
example
would go as 69 04 32 08 05 15
. Then, if replica #3 would want
to remove that entry, it will issue a tombstone op I{-5,3}-11
or 69 04 32 09 03 15
. Here, the version number changes from
08
to 09
or 4 to -5, the author changes to 3.
Within a set, the ops are sorted in the value order. Namely,
if the type differs, they go in the alphabetical order (F
,
I
, R
, S
, T
). If the type is the same, they go in the
ascending order, as per strcmp
or bytes.Compare
. That way,
merging multiple versions of a set only requires one parallel
pass of those, no additional allocation or sorting, very much
like mergesort works.
The string value for a set is like {1,2,3}
where 1,2,3
are
FIRST
elements of the set.
Generic maps, mapping any FIRST
value to any other FIRST
value. The TLV format is a sequence of enveloped key-value op
pairs. Any update should also contain the affected key-value
pairs. Deleted entries might have T
values (the key is
present, the value is null) or the key might have a negative
revision (no such key present).
Pairs are sorted in the value-order of their keys. When merging
two pairs having an identical value of their keys, both the key
and the value ops are merged according to the LWW rules. As with
E
sets, this only requires one parallel pass of the versions.
The string value for a map is like {4:null, "key":"value"}
Generic arrays store any FIRST
elements. Internally, L
are
Causal Trees (also known as Replicated Growable Arrays, RGAs).
The TLV format is a sequence of enveloped FIRST ops. The
order of the sequence is a weave, i.e. ops go in the same
order as they appear(ed) in the resulting array. Deleted ops
change to tombstones, same as E.
The merging procedure follows the tree-traversal logic. Any
change to an array must have a form of subtrees, each one
arranged in the same weave order, each one prepended with a T
op specifying its attachment point in the edited tree.
Deletions look like T
ops with negative revision numbers. As
an example, suppose we have an array authored by #3 I{1,3}1 I{2,3}2 I{3,3}3
or [1,2,3]
and replica #4 wants to delete the
first entry. Then, it issues a patch T{1,3}T{-4,4}
that merges
to produce I{1,3}1 T{-4,4} I{2,3}2 I{3,3}3
or [2,3]
.
The string value for an array is like [1,2,3]
Version vector is a way to track dataset versions in a
causally ordered system. It is a vector of seq
numbers, where
each seq
is the version of the state as seen by each
respective replica. Alternatively, that is a map {src: seq}
,
where src
is the replica id
. It is assumed, that we received
updates from replica src
all the way up to seq
.
Bare TLV for a version vector is a sequence of V
records (yes,
V
nested in V
) each containing one id64 as a zipped seq-src
pair (see ZipUint64Pair). The sequence is sorted in the
ascenting order of record bytes, like bytes.Compare()
.
The merge algorithm for version vectors is simple: take the
maximum seq
for each src
. Note that seq=0
is distinct from
having no record.
To fully implement an RDT one has to implement these 10
functions. The function name starts with the type name letter,
here we imply I
last-write-wins int64.
// Xvalid verifies validity of a bare TLV record.
// Any other function may assume the input is valid.
func Ivalid(tlv []byte) bool
// Xstring converts a TLV representation into a string.
func Istring(tlv []byte) (txt string)
// Xparse converts a string back into bare TLV.
// Must round-trip with Xstring.
func Iparse(txt string) (tlv []byte)
// Xtlv converts the native type into a TLV, zero metadata.
func Itlv(i int64) (tlv []byte)
// Xnative converts TLV into the native value.
// Must round-trip with Xtlv.
func Inative(tlv []byte) int64
// Xdelta produces a TLV value that, once merged with
// the old TLV value using Xmerge, will produce the new
// native value using Xnative. Returns nil if none needed.
// This function we need to *save changes* from a native
// object/struct into RDX.
func Idelta(tlv []byte, new_val int64) (tlv_delta []byte)
// Xmerge CRDT-merges several bare TLV values into the
// resulting one. For example, given two I records
// {3,8}15 and {4,1}44 will return {4,1}44 as version 4 is
// newer than version 3.
func Imerge(tlvs [][]byte) (tlv []byte)
// Xdiff produces a TLV delta given a TLV value and a
// version vector of suspected changes (may skip this).
func Idiff(tlv []byte, vvdiff VV) (tlv []byte)
We use the ToyTLV format for enveloping/nesting all data. That is a bare-bones type-length-value format with zero semantics. What we put into ToyTLV envelopes is integers, strings, and floats. Strings are UTF-8, no surprises. Floats are taken as raw bits and treated same as integers. id64 is stored as a compressed pair of integers.
A note on integer compression. From the fact that protobuf has about ten integer types, one can guess that things can be complicated here. We use ZipInt routines to produce efficient varints in a TLV format (differently from protobuf which has a separate bit-level LEB128 coding for ints).
- ZipUint64 packs an integer skipping all leading zeroes
- ZipUint64Pair packs a pair of ints, each one taking 1,2,4 or 8 bytes
- ZipZagInt64 packs a signed integer using the zig-zag coding
- ZipFloat64 packs a float (integers and binary fractions pack well)
id64 and logical timestamps get packed as pairs of uint64s. All zip codings are little-endian.