Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
spanconfig: introduce the spanconfig.KVSubscriber
KVSubscriber presents a consistent[^1] snapshot of a spanconfig.StoreReader that's incrementally maintained with changes made to the global span configurations state. The maintenance happens transparently; callers can subscribe to learn about what key spans may have seen a configuration change. After learning about a span update, consulting the embedded StoreReader would retrieve an up-to-date[^2] config for it. When a callback is first installed, it's invoked with the [min,max) span -- a shorthand to indicate that callers should consult the StoreReader for all spans of interest. Subsequent updates are of the more incremental kind. It's possible that the span updates received are no-ops, i.e. consulting the StoreReader for the given span would retrieve the last config observed for the span[^2]. type KVSubscriber interface { StoreReader OnSpanConfigUpdate(func(updated roachpb.Span)) } Internally we maintain a rangefeed over the global store of span configurations (system.span_configurations), applying updates from it into an embedded spanconfig.Store. A read-only view of this data structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. Rangefeeds used as is don't offer any ordering guarantees with respect to updates made over non-overlapping keys, which is something we care about[^4]. For that reason we make use of a rangefeed buffer, accumulating raw rangefeed updates and flushing them out en-masse in timestamp order when the rangefeed frontier is bumped[^5]. If the buffer overflows (as dictated by the memory limit the KVSubscriber is instantiated with), the subscriber is wound down and an appropriate error is returned to the caller. When running into the errors above, it's safe for the caller to re-subscribe to effectively re-establish the underlying rangefeeds. When re-establishing a new rangefeed and populating a spanconfig.Store using the contents of the initial scan[^6], we wish to preserve the existing spanconfig.StoreReader. Discarding it would entail either blocking all external readers until a new spanconfig.StoreReader was fully populated, or presenting an inconsistent view of the spanconfig.Store that's currently being populated. For new rangefeeds what we do then is route all updates from the initial scan to a fresh spanconfig.Store, and once the initial scan is done, swap at the source for the exported spanconfig.StoreReader. During the initial scan, concurrent readers would continue to observe the last spanconfig.StoreReader if any. After the swap, it would observe the more up-to-date source instead. Future incremental updates will also target the new source. When this source swap occurs, we inform any handlers of the need to possibly refresh their view of all configs. This commit also wires up the KVSubscriber into KV stores, replacing the use of the gossiped system config span (possible given the StoreReader interface, only happens if a testing flag/env var is set). [^1]: The contents of the StoreReader at t1 corresponds exactly to the contents of the global span configuration state at t0 where t0 <= t1. If the StoreReader is read from at t2 where t2 > t1, it's guaranteed to observe a view of the global state at t >= t0. [^2]: For the canonical KVSubscriber implementation, this is typically the closed timestamp target duration. [^3]: The canonical KVSubscriber implementation internally re-establishes feeds when errors occur, possibly re-transmitting earlier updates (usually through a lazy [min,max) span) despite possibly not needing to. We could do a bit better and diff the two data structures, emitting only targeted updates. [^4]: For a given key k, it's config may be stored as part of a larger span S (where S.start <= k < S.end). It's possible for S to get deleted and replaced with sub-spans S1...SN in the same transaction if the span is getting split. When applying these updates, we need to make sure to process the deletion event for S before processing S1...SN. [^5]: In our example above deleting the config for S and adding configs for S1...SN, we want to make sure that we apply the full set of updates all at once -- lest we expose the intermediate state where the config for S was deleted but the configs for S1...SN were not yet applied. [^6]: When tearing down the subscriber due to underlying errors, we could also surface a checkpoint to use the next time the subscriber is established. That way we can avoid the full initial scan over the span configuration state and simply pick up where we left off with our existing spanconfig.Store. Release note: None
- Loading branch information