From b4fd4456846ab54336067bff15e1a6a9c1f3a04b Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 1 Oct 2020 22:47:31 +0100 Subject: [PATCH] Add tiered stream management docs (#226) --- DOCUMENTATION.md | 166 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 23b4859f6..6732af7ca 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -2045,6 +2045,172 @@ below the table](#access-strategy-glossary) for definition of terms) | `RollingState` | Read `Tip`
(can fall back to building from events as per `Snapshot` mode if nothing in Tip, but normally there are no events) | 1) produce `state'`
2) update `Tip` with `toSnapshot state'`
3) **no events are written**
4) Concurrency Control is based on the `_etag` of the Tip, not an expectedVersion / event count | | `Custom` | As per `Snapshot` or `MultiSnapshot`
1) see if any `unfold`s pass the `isOrigin` test
2) Otherwise, work backward until a _Reset Event_ or start of stream | 1) produce `state'`
2) use `transmute events state` to determine a) the `unfold`s (if any) to write b) the `events` _(if any)_ to emit
3) execute the insert and/or upsert operations, contingent on the `_etag` of the opening `state` | + +# Stream Management Policies + +The stores supported by Equinox are primarily intended to house _Domain Events_ (Facts) from an event-sourced model. Such events are retained indefinitely in an immutable form. + +Often, the management of _Ephemeral Events_ (that one might equivalently record on a bus, queue or a topic in systems such as Apache Kafka) involves needs that overlap significantly with those of managing Domain Events. However, there's a point at which maintaining equivalent levels of access to such data is of significantly lesser value than it is for Domain Events. + +In theory, it can be argued that events with an ephemeral aspect are not True Event-Sourcing Events, and as such should be considered entirely separately. + +In practice, for myriad reasons, stores such as EventStoreDB, CosmosDB and SqlStreamStore become candidates for and/or victims of the blurring of the divide between ephemeral events and Domain Events. + +For the above reasons, a key aspect of designing, maintaining and evolving an event-sourced system involves the management of the overall set of events comprising the system's state: +- grouping events into streams in accordance with the goals of the system as a whole (i.e. how one models the system in terms of aggregates), with consideration for how well a given structure aligns with the characteristics of a given Store +- implementing policies reflecting the relevance of a stream and/or its events over time via various mechanisms: from shifting them to lower performance storage, archiving them to a separated store that's not accessible from the current online system all the way to outright deletion +- drawing the line with regard to ephemeral events representing state that truly does not belong alongside your Domain Events + +## Aggregate streams + +While the store's capabilities and restrictions are where the rubber meets the road in your streams/events layout, it should not be the primary driver. + +When considering which events should be united in a given stream-category, some key factors are: + +- is there an invariant that the Aggregate is seeking to uphold? (remember, the stream is the principal unit of consistency control) +- do all the events relate to a meaningful atomic structure within your system? +- when making decisions based on grouping the events in a given way, is the resulting state a reasonable size? (this feeds into whether it's feasible to snapshot the state) +- is the state cohesive, or is it possible to partition the grouping even further? (think [SRP](https://en.wikipedia.org/wiki/Single-responsibility_principle) and [ISP](https://en.wikipedia.org/wiki/Interface_segregation_principle)) +- is there a natural characteristic of the aggregate that bounds the number of events that will occur over its lifetime? (e.g., "the appointments" vs splitting by day/month/year/facility) + +## Topic streams + +> _When you don't load and fold events to arrive at a state_. + +In some cases, a stream may not even have a meaningful state, invariant or a business process that it's supporting: + +- example: a stream is used to queue up commands and/or post outcomes as part of some process. In such a case, the 'state' boils down to checkpointing how far a given consumer has walked along the topic (as opposed to maintaining a rolling state derived primarily from the events that one queries, renders or uses to support a decision flow). + +Such topic-streams are not aggregates as such, and are not addressed as a primary use case in the Equinox [Programming Model](#programming-model). + +However, such topic-streams are nonetheless subject to almost identical considerations in terms of how we deal with managing the lifetime of the data. + +## Store-specific stream bounding / event retention considerations + +Across both Aggregate and Topic use cases, there are specific facilities afforded (and restrictions imposed) by the specific store you're using. For instance: + +- _Stream size limits - EventStoreDB_: EventStore does not impose any limitations on the maximum size or event count that a single stream can bear. This allows one to maintain a perpetual queue and/or an ordered sequence of events, with or without using a retention policy to control the trimming of expired/excess events. +- _Stream size limits - CosmosDB_: The total size of the events and _Tip-document_ of a stream must adhere to the CosmosDB [logical partition limit](https://docs.microsoft.com/en-us/azure/cosmos-db/concepts-limits) of 20GB. +- _Retention policies - EventStoreDB_: Streams can have retention policies defined via each stream's metadata stream. The server cluster manages the application of these rules. The scavenging process removes the events, compacting the data by rewriting chunks with deleted, extraneous or aged-out events elided. +- _Retention policies - CosmosDB_: the [CosmosDB TTL facility](https://docs.microsoft.com/en-us/azure/cosmos-db/time-to-live) allows one to define a TTL at the document level. CosmosDB removes expired items automatically (whenever residual RU capacity allows). + +_NOTE: Equinox does not presently expose specific controls to allow specification of either a CosmosDB TTL or EventStoreDB stream metadata_. + +## Mutation, archival and pruning of Events + +### Considerations regarding mutation or deletion of events + +> _You don't rewrite events or streams in a Store, for reasons_ + +For Domain Events in an event-sourced model, their permanence and immutability is typically considered axiomatic; readers expect to be able to cache them forever, rely on their index on a stream remaining fixed etc. Some (rare) corner cases where one might wish to deviate from such axioms in terms of Domain Events in a model include: + +- rewriting streams as an expedient solution to a bug etc: as with the rewriting in history in git, the first rule is: DONT. (But it's technically possible and in some cases this nuclear option can solve a problem) +- intentionally removing data: for GDPR or CCPA reasons, you may opt to mutate or remove events as part of addressing a need to conclusively end-of-life some data (many better solutions are available...) + +It should be noted with regard to such requirements: +- EventStoreDB does not present any APIs for mutation of events, though deleting events is a fully supported operation (although that can be restricted). Rewrites are typically approached by doing an offline database rebuild. +- `Equinox.Cosmos` and `Equinox.CosmosStore` include support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on). + +### Growth handling strategies + +> _No matter what the vendor tells you, it's literally not going to scale linearly..._ + +A more typical concern for an event-sourced model is managing what should happen when an Aggregate falls out of scope. For instance, a pick ticket entity in a warehouse is only of historical interest after a certain period of time (the customer's Order History maintains long-lived state pertaining to orders and/or associated returns etc.) + +With regard to such needs, here are some store-specific considerations: + +- EventStoreDB caches only in-use streams and events. Hosting streams that are no longer relevant is considered a completely normal use case: + - streams and/or regions of streams that are no longer relevant don't induce a major cost on the system + - each client maintains a single connection to the server cluster; there is no incremental cost in terms of the potential network or client process resource consumption related directly to the size of your dataset + - however, there is a non-zero cost; the overall dataset needs to be colocated and backed up as a whole (there are also internal index structures maintained alongside the event chunk files, with rebuild times directly related to the store's event count etc). + +- For CosmosDB, the costs and impacts of retaining events and/or streams that are no longer relevant are more direct; ways they manifest include: + - the billing model imposes a linear cost per GB that applies equally to all event-batch documents in your store, plus the size of the associated indexes (which strongly relate to the number of items stored). These costs are multiplied by the number of regions to which you replicate. + - the total size of your dataset affects the minimum number of nodes across which the data will spread. i.e. 1 TB of data will require at least 10,000 RU/s to be allocated to it regardless of traffic + - the more nodes you have, the more TCP connections and other related fixed resources each client instance requires + - the RU/s allocated to your container can only be spread _equally_ across all nodes. Thus, if you have 100GB spread over 5 nodes and allocate 10,000 RU/s to the Container, each node gets 2,000 RU/s and callers get 429s if there happen to be more than that incurred for that given node in that second (with significant latency impact as all such rate-limited clients need to back off for >= 1s for each rate-limited attempt). + - the cost of over-provisioning to ensure appropriate capacity for spikes in load and/or to handle hotspots (where one node happens to host a stream that's accessed disproportionately heavily relative to data on other nodes) is multiplied by the number of nodes. Example: if you have a single node with 5GB of data with 2,000 RU/s allocated and want to double the peak capacity, you simply assign it 4,000 RU/s; if you have 100GB over 5 nodes, you need to double your 5x2,000 to 5x4,000 to achieve the same effect + - there are significant jumps in cost for writes based on the [indexing cost](https://docs.microsoft.com/en-us/azure/cosmos-db/index-policy) as the number of items in a logical partition increases (empirically derived data; subject to change: for instance inserts of a a minimal (<100 bytes) event that initially costs ~20RU becomes > 40RU with 128 items, > 50RU with 1600 items, >60 at 2800 items and >110RU at 4900 items as snapshots or event sizes hit certain thresholds). + +There are myriad approaches to resolving these forces. Let's examine the trade-offs of some relevant ones... + +#### Database epochs + +> _Perhaps we can Just leave it all behind and switch to a new blank database?_ + +In some systems, where there's a relevant natural cycle in the domain, the answer to managing database growth may be simpler than you think. For instance: +- you may be able to start with a blank database for each trading day for the bulk of the events your system operates on. +- your domain may have a natural end of year business process that dictates a formal closing of accounts with selective migration of relevant summarized data to be carried forward into a successor epoch. In such instances, each closed year can be managed as a separated (effectively read-only) dataset. + +As a fresh epoch of data becomes the active dataset, other options open up: +- one might migrate the now-of-secondary-importance data to cheaper hardware or network resources +- one might archive the database once you've validated the transition has been effected completely + +#### Stream epochs + +> _Replace a perpetual stream with a series of finite epoch-streams, allowing superseded ones to be archived or deleted_ + +As covered above, long streams bring associated costs. A key one that hasn't been mentioned is that, because the unit of storage is a stream, there's no easy way to distinguish historic events from current ones. This has various effects on processing costs such as (for Aggregate streams), that of loading and folding the state (or generating a snapshot). + +Analogous to how data can be retired (as described in _Database epochs_), it may be possible to manage the growth cycle of continuous streams by having readers and writers coordinate the state of given stream cooperatively via the following elements: +- one _Series_ aggregate: maintains the current active _epoch id_ for the series +- many _Epoch_ streams: independent streams (sharing a root name), sufficed by the _epoch id_ +- having a deterministic way of coordinating to ensure each (independent) writer will recognize that a given epoch is _closed_ (e.g., based on event count, elapsed time since the epoch started, total event payload bytes, etc.) + +Depending on whether there's state associated with a given stream, the system periodically transitions the Series to a new Epoch by algorithms with mechanisms such as: +- Topic-stream: write a `Closed` event; have all writes be contingent on no such event preceding any write to an epoch-stream +- Aggregate stream: + 1. write a `Closed` event to the outgoing epoch-stream, followed by (as a separate action with idempotent semantics) ... + 2. write a `CarriedForward` event to open the new Epoch (again, all writers follow the same rules in order to be able to make writes idempotent even in the face of concurrent writers) + +The writing of the event to move the active Epoch id forward in the _Series_ aggregate can take place at any point after the `Closed` event has been written to the outgoing epoch-stream (including concurrently with the writing of the `CarriedForward` event). The reason for this is that the active epoch can be inferred by walking forward from any given epoch until one arrives at an epoch that's not Closed. + +[WIP implementation of a `dotnet new` template illustrating the Stream Epochs approach](https://github.com/jet/dotnet-templates/pull/40) + +#### Monitoring a primary dataset for Archival/Pruning + +> _Move or delete out-of-scope data from a primary (hot) dataset to a cheaper (warm/cold) stream_ + +As with 'Database epochs', once a given 'Stream epoch' has been marked active in a Series, we gain options as to what to do with the preceding ones: +- we may opt to retain them in order to enable replaying of projections for currently-unknown reasons +- if we intend to retain them for a significant period: we can replicate/sync/mirror/archive them to a secondary archive, then prune them from the primary dataset +- if they are only relevant to assist troubleshooting over some short term: we can delete them after a given period (without copying them anywhere) + +When writing to a secondary store, there's also an opportunity to vary the writing process from that forced by the constraints imposed when writing as part of normal online transaction processing: +- it will often make sense to have the archiver add a minimal placeholder to the secondary store regardless of whether a given stream is being archived, which can then be used to drive the walk of the primary instead of placing avoidable load on the primary by having to continually loop over all the data in order to re-assess archival criteria over time +- when copying from primary to secondary, there's an opportunity to optimally pack events into batches (for instance in `Equinox.CosmosStore`, batching writes means less documents, which reduces document count, per-document overhead, the overall data and index size in the container and hence query costs) +- when writing to warm secondary storage, it may make sense to compress the events (under normal circumstances, compressing event data is rarely considered a worthwhile tradeoff). +- where the nature of traffic on the system has peaks and troughs, there's an opportunity to shift the process of traversing the data for archival purposes to a window outside of the peak load period (although, in general, the impact of reads for the purposes of archival won't be significant enough to warrant optimizing this factor) + +### Archiver + Pruner roles + +> _Outlining the roles of the `proArchiver` and `proPruner` templates_ + +It's conceivable that one might establish a single service combining the activities of: +1. copying (archiving) to the secondary store in reaction to changes in the primary +2. pruning from the primary when the copying is complete +3. deleting immediately +4. continually visiting all streams in the primary in order to archive and/or prune streams that have fallen out of use + +However, splitting the work into two distinct facilities allows better delineation of responsibilities: +- clarifies the relative responsibilities (and allows them to be considered individually) +- allows the load (deletes can be costly in RU terms on CosmosDB) on the primary dataset to be more closely controlled + +#### Archiver + +An archiver tails a monitored store and bears the following responsibilities: +- minimizing the load on the source it's monitoring +- listens to all event writes (via `$all` in the case of EventStoreDB or a ChangeFeed Processor in the case of CosmosDB) +- ensuring the secondary becomes aware of all new streams (especially in the case of `Equinox.CosmosStore` streams in `AccessStrategy.RollingState` mode, which do not yield a new event-batch per write) + +#### Pruner + +The pruner cyclically (i.e., when it reaches the end, it loops back to the start) walks the secondary store: +- visiting each stream, identifying the current write position in the secondary +- uses that as input into a decision as to whether / how many events can be trimmed from the primary (deletion does not need to take place right away - Equinox will deal with events spread over a Primary/Secondary pair of Containers via the [Fallback mechanism](https://github.com/jet/equinox/pull/247) +- (for `Equinox.CosmosStore`) can optimize the packing of the events (e.g. if the most recent 4 events have arrived as 2 batches, the pruner can merge the two batches to minimize storage and index size). When writing to a primary collection, batches are never mutated for packing purposes both due to write costs and read amplification. +- (for `Equinox.CosmosStore`) can opt to delete from the primary if one or more full Batches have been copied to the secondary (note the unit of deletion is a Batch - mutating a Batch in order to remove an event would trigger a reordering of the document's position in the logical partition) + # Ideas ## Things that are incomplete and/or require work