Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Update docs to clarify Flush and Buffer in Storage Engine #2511

Merged
merged 7 commits into from
Dec 2, 2020
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions site/content/m3db/architecture/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ The in-memory portion of M3DB is implemented via a hierarchy of objects:
1. A `database` of which there is only one per M3DB process. The `database` owns multiple `namespace`s.
2. A `namespace` is similar to a table in other databases. Each `namespace` has a unique name and a set of configuration options, such as data retention and block size (which we will discuss in more detail later). A namespace owns multiple `shard`s.
3. A [`shard`](/docs/m3db/architecture/sharding) is effectively the same as a "virtual shard" in Cassandra in that it provides an arbitrary distribution of time series data via a simple hash of the series ID. A shard owns multiple `series`.
4. A `series` represents a sequence of time series datapoints. For example, the CPU utilization for a host could be represented as a series with the ID "host1.system.cpu.utilization" and a vector of (TIMESTAMP, CPU_LEVEL) tuples. Visualizing this example in a graph, there would a single line with time on the x-axis and CPU utilization on the y-axis. A `series` owns a `buffer` and any cached `block`s.
5. The `buffer` is where all data that has yet to be written to disk gets stored in memory. This includes both new writes to M3DB and data obtained through bootstrapping. More details on the [buffer](#buffer) is explained below. Upon [flushing](#flushing), the buffer creates a `block` of its data to be persisted to disk.
6. A `block` represents a stream of compressed time series data for a pre-configured block size, for example, a block could hold data for 6-8PM (block size of two hours). A `block` can arrive directly into the series only as a result of getting cached after a read request. Since blocks are in a compressed format, individual datapoints cannot be read from it. In other words, in order to read a single datapoint, the entire block up to that datapoint needs to be decompressed beforehand.
4. A `series` represents a sequence of time series datapoints. For example, the CPU utilization for a host could be represented as a series with the ID "host1.system.cpu.utilization" (or "__name__=system_cpu_utilization,host=host1"), and a vector of (TIMESTAMP, CPU_LEVEL) tuples. Visualizing this example in a graph, there would a single line with time on the x-axis and CPU utilization on the y-axis. A `series` owns a `buffer` and any cached `block`s.
5. The `buffer` is where all data that has yet to be written to disk gets stored in memory, for a specific series. This includes both new writes to M3DB and data obtained through [bootstrapping](/docs/operational_guide/bootstrapping_crash_recovery). More details on the [buffer](/docs/m3db/architecture/engine#buffer) is explained below. Upon [flushing](/docs/m3db/architecture/engine#flushing), the buffer creates a `block` of its data to be persisted to disk.
6. A `block` represents a stream of compressed single time series data for a pre-configured block size, for example, a block could hold data for 6-8PM (block size of two hours). A `block` can arrive directly into the series only as a result of getting cached after a read request. Since blocks are in a compressed format, individual datapoints cannot be read from it. In other words, in order to read a single datapoint, the entire block up to that datapoint needs to be decompressed beforehand.

### Persistent storage

While in-memory databases can be useful (and M3DB supports operating in a memory-only mode), some form of persistence is required for durability. In other words, without a persistence strategy, it would be impossible for M3DB to restart (or recover from a crash) without losing all of its data.

In addition, with large volumes of data, it becomes prohibitively expensive to keep all of the data in memory. This is especially true for monitoring workloads which often follow a "write-once, read-never" pattern where less than a few percent of all the data that's stored is ever read. With that type of workload, it's wasteful to keep all of that data in memory when it could be persisted on disk and retrieved when required.
In addition, with large volumes of data, it becomes prohibitively expensive to keep all the data in memory. This is especially true for monitoring workloads which often follow a "write-once, read-never" pattern where less than a few percent of all the data that's stored is ever read. With that type of workload, it's wasteful to keep all of that data in memory when it could be persisted on disk and retrieved when required.

M3DB takes a two-pronged approach to persistant storage that involves combining a [commit log](/docs/m3db/architecture/commitlogs) for disaster recovery with periodic flushing (writing [fileset files](/docs/m3db/architecture/storage) to disk) for efficient retrieval:
M3DB takes a two-pronged approach to persistent storage that involves combining a [commit log](/docs/m3db/architecture/commitlogs) for disaster recovery with periodic flushing (writing [fileset files](/docs/m3db/architecture/storage) to disk) for efficient retrieval:

1. All writes are persisted to a commit log (the commit log can be configured to fsync every write, or optionally batch writes together which is much faster but leaves open the possibility of small amounts of data loss in the case of a catastrophic failure). The commit log is completely uncompressed and exists only to recover unflushed data in the case of a database shutdown (intentional or not) and is never used to satisfy a read request.
2. Periodically (based on the configured block size), all data in the buffer is flushed to disk as immutable [fileset files](/docs/m3db/architecture/storage). These files are highly compressed and can be indexed into via their complementary index files. Check out the [flushing section](#flushing) to learn more about the background flushing process.
Expand All @@ -101,7 +101,13 @@ M3DB will consult the database object to check if the namespace exists, and if i

At the same time, the write will be appended to the commit log, which is periodically compacted via a snapshot process. Details of this is outlined in the [commit log](/docs/m3db/architecture/commitlogs) page.

**Note:** Regardless of the success or failure of the write in a single node, the client will return a success or failure to the caller for the write based on the configured [consistency level](/docs/m3db/architecture/consistencylevels).
{{% notice warning %}}
Regardless of the success or failure of the write in a single node, the client will return a success or failure to the caller for the write based on the configured [consistency level](/docs/m3db/architecture/consistencylevels).
{{% /notice %}}

{{% notice warning %}}
M3DB "default" write is writeTagged which also accepts list of pairs (tag name, tag value), which it then uses to update the reverse index of the namespace. Work to document that is TBD.
{{% /notice %}}

## Read Path

Expand Down Expand Up @@ -148,9 +154,13 @@ Once M3DB has retrieved the three blocks from their respective locations in memo

## Buffer

Each series object contains a buffer, which is in charge of handling all data that has yet to be flushed - new writes and bootstrapped data. To accomplish this, it keeps mutable "buckets" of encoders (for new writes) and immutable blocks (for bootstrapped data). M3TSZ, the database's encoding scheme, is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For metrics workloads this works very well because every subsequent datapoint is almost always after the previous one. However, out of order writes will occasionally be received, for example due to clock skew. When this happens, M3DB will allocate a new encoder for the out of order datapoints. These encoders are contained in a bucket along with any blocks that got bootstrapped.
Each series object contains a buffer, which is in charge of handling all data that has yet to be flushed - new writes and bootstrapped data (used to initialize a shard). To accomplish this, it keeps a list of "buckets" per block start-time, which contains a list of mutable encoders (for new writes) and immutable blocks (for bootstrapped data). M3TSZ, the database's encoding scheme, is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For metrics workloads this works very well because every subsequent datapoint is almost always after the previous one. However, out of order writes will occasionally be received, for example due to clock skew. When this happens, M3DB will allocate a new encoder for the out of order datapoints. These encoders are contained in a bucket along with any blocks that got bootstrapped.

When datapoint is writen to a buffer, a list of buckets is selected based on the block start-time the timestamp of the datapoint belongs to (e.g. 16:23:20 belongs to the 16:00 block). In that list, only one bucket is the active bucket (version = 0) and it will be written into an encoder in that bucket.

Supporting out-of-order writes entailed defining a time window, ending at now, called the Buffer Past: (now - bufferPast, now). It's a namespace configuration option, which dictates the way this datapoint will get flushed to disk: Warm or Cold. A write of a datapoint which its timestamp is inside the Buffer Past time window is classified as a Warm Write, while before that is a Cold Write. That classification is named Write Type. For every Write Type there exists a single active bucket, which contains the mutable encoders, where new writes for that block start-time are written to.

Upon a flush (discussed further below), all data within a bucket gets merged and its version gets incremented - the specific version it gets set to depends on the number of times this block has previously been flushed. This bucket versioning allows the buffer to know which data has been flushed so that subsequent flushes will not try to flush it again. It also indicates to the clean up process (also discussed below) that that data can be evicted.
Upon a flush (discussed further below), all data within a bucket gets merged and its version gets incremented - the new version depends on the number of times this block has previously been flushed. This bucket versioning allows the buffer to know which data has been flushed so that subsequent flushes will not try to flush it again. It also indicates to the cleanup process (also discussed below) that that data can be evicted.

Given this complex, concurrent logic, this has been [modeled in TLA](https://github.com/m3db/m3/blob/master/specs/dbnode/flush/FlushVersion.tla).

Expand Down Expand Up @@ -209,15 +219,19 @@ M3DB has a variety of processes that run in the background during normal operati

As discussed in the [architecture](#architecture) section, writes are actively buffered / compressed in memory and the commit log is continuously being written to, but eventually data needs to be flushed to disk in the form of [fileset files](/docs/m3db/architecture/storage) to facilitate efficient storage and retrieval.

This is where the configurable "block size" comes into play. The block size is simply a duration of time that dictates how long new writes will be compressed (in a streaming manner) in memory before being flushed to disk. Let's use a block size of two hours as an example.
This is where the configurable "block size" comes into play. The block size is simply a duration of time dictating how long new writes will be compressed (in a streaming manner) in memory before being flushed to disk. Let's use a block size of two hours as an example.

If the block size is set to two hours, then all writes for all series for a given shard will be buffered in memory for two hours at a time. At the end of the two hour period all of the fileset files will be generated, written to disk, and then the in-memory objects can be released and replaced with new ones for the new block. The old objects will be removed from memory in the subsequent tick.

If a flush happens for a namespace/shard/series/block for which there is already a fileset, in-memory data will get merged with data on disk from the fileset. The resultant merged data will then be flushed as a separate fileset.

There are two types of flushes: Warm and Cold. Warm is the first flush that will happen to a shard in a given block start-time, thus it's the one generating a file set from the given buffers of all series in a shard. A cold flush will only happen after a warm flush has been executed (i.e. a file-set exists on disk). A warm flush writes all un-flushed Warm Write type buckets, which a Cold Flush merges all un-flushed Cold Write type buckets with existing file-set and creates a new one instead.

A Cold Flush occurs when ever there exists Cold Write buckets in memory, and this check is runs every Tick interval (defined below)

### Ticking

The ticking process runs continously in the background and is responsible for a variety of tasks:
The ticking process runs continuously in the background and is responsible for a variety of tasks:

1. Merging all encoders for a given series / block start combination
2. Removing expired / flushed series and blocks from memory
Expand All @@ -236,13 +250,13 @@ Depending on the configured [caching policy](/docs/m3db/architecture/caching), t
Fileset files can become no longer necessary for two reasons:

1. The fileset files for a block that has fallen out of retention
2. A flush occurred for a block that already has a fileset file. The new fileset will be a superset of the existing fileset with any new data that for that block, hence, the existing fileset is no longer required
2. A flush occurred for a block that already has a fileset file. The new fileset will be a superset of the existing fileset with any new data that for that block, hence, the existing fileset is no longer required.

During the clean up process, these fileset files will get deleted.
During the cleanup process, these fileset files will get deleted.

## Caveats / Limitations

1. Currently M3DB does not support deletes.
2. M3DB does not support storing data with an indefinite retention period, every namespace in M3DB is required to have a retention policy which specifies how long data in that namespace will be retained for. While there is no upper bound on that value, it's still required and generally speaking M3DB is optimized for workloads with a well-defined [TTL](https://en.wikipedia.org/wiki/Time_to_live).
3. M3DB does not support either background data repair or Cassandra-style [read repairs](https://docs.datastax.com/en/cassandra/2.1/cassandra/operations/opsRepairNodesReadRepair.html). Future versions of M3DB will support automatic repairs of data as an ongoing background process.
4. M3DB does not support writing far into the future. Support for this will be added in future.
4. M3DB does not support writing far into the future. Support for this will be added in the future.