forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cherry-pick elastic#19261 to 7.x: Introduce skeleton of memlog store (e…
…lastic#19691) Introduce skeleton of memlog store This changes introduces the skeleton and documentation of the memlog store. The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests This change introduces the skeleton and internal documentation of the memlog store for review and discussion, but not the full implementation yet to ease review. The final implementation and unit tests will be added later. The file doc.go documents how the final implementation works. (cherry picked from commit c0f1b75)
- Loading branch information
Steffen Siering
authored
Jul 7, 2020
1 parent
7edabf9
commit db27fe0
Showing
10 changed files
with
726 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package memlog | ||
|
||
import ( | ||
"bufio" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
) | ||
|
||
// diskstore manages the on-disk state of the memlog store. | ||
type diskstore struct { | ||
log *logp.Logger | ||
|
||
// store configuration | ||
checkpointPred CheckpointPredicate | ||
fileMode os.FileMode | ||
bufferSize int | ||
|
||
// on disk file tracking information | ||
home string // home path of the store | ||
logFilePath string // current log file | ||
dataFiles []dataFileInfo // set of data files found | ||
|
||
// txid is the sequential counter that tracks | ||
// all updates to the store. The txid is added to operation being logged | ||
// used as name for the data files. | ||
txid uint64 | ||
|
||
// log file access. The log file is updated using an in memory write buffer. | ||
logFile *os.File | ||
logBuf *bufio.Writer | ||
|
||
// internal state and metrics | ||
logFileSize uint64 | ||
logEntries uint | ||
logInvalid bool | ||
logNeedsTruncate bool | ||
} | ||
|
||
// dataFileInfo is used to track and sort on disk data files. | ||
// We should have only one data file on disk, but in case delete operations | ||
// have failed or not finished dataFileInfo is used to detect the ordering. | ||
type dataFileInfo struct { | ||
path string | ||
txid uint64 | ||
} | ||
|
||
// storeEntry is used to write entries to the checkpoint file only. | ||
type storeEntry struct { | ||
Key string `struct:"_key"` | ||
Fields common.MapStr `struct:",inline"` | ||
} | ||
|
||
// storeMeta is read from the meta file. | ||
type storeMeta struct { | ||
Version string `struct:"version"` | ||
} | ||
|
||
// logAction is prepended to each operation logged to the update file. | ||
// It contains the update ID, a sequential counter to track correctness, | ||
// and the action name. | ||
type logAction struct { | ||
Op string `json:"op"` | ||
ID uint64 `json:"id"` | ||
} | ||
|
||
const ( | ||
logFileName = "log.json" | ||
metaFileName = "meta.json" | ||
|
||
storeVersion = "1" | ||
|
||
keyField = "_key" | ||
) | ||
|
||
// newDiskStore initializes the disk store stucture only. The store must have | ||
// been opened already. It tries to open the update log file for append | ||
// operations. If opening the update log file fails, it is marked as | ||
// 'corrupted', triggering a checkpoint operation on the first update to the store. | ||
func newDiskStore( | ||
log *logp.Logger, | ||
home string, | ||
dataFiles []dataFileInfo, | ||
txid uint64, | ||
mode os.FileMode, | ||
entries uint, | ||
logInvalid bool, | ||
bufferSize uint, | ||
checkpointPred CheckpointPredicate, | ||
) *diskstore { | ||
s := &diskstore{ | ||
log: log.With("path", home), | ||
home: home, | ||
logFilePath: filepath.Join(home, logFileName), | ||
dataFiles: dataFiles, | ||
txid: txid, | ||
fileMode: mode, | ||
bufferSize: int(bufferSize), | ||
logFile: nil, | ||
logBuf: nil, | ||
logEntries: entries, | ||
logInvalid: logInvalid, | ||
logNeedsTruncate: false, // only truncate on next checkpoint | ||
checkpointPred: checkpointPred, | ||
} | ||
|
||
_ = s.tryOpenLog() | ||
return s | ||
} | ||
|
||
// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been | ||
// executed last. | ||
// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation | ||
// and another call to tryOpenLog in the future. | ||
func (s *diskstore) tryOpenLog() error { | ||
panic("TODO: implement me") | ||
} | ||
|
||
// mustCheckpoint returns true if the store is required to execute a checkpoint | ||
// operation, either by predicate or by some internal state detecting a problem | ||
// with the log file. | ||
func (s *diskstore) mustCheckpoint() bool { | ||
return s.logInvalid || s.checkpointPred(s.logFileSize) | ||
} | ||
|
||
func (s *diskstore) Close() error { | ||
panic("TODO: implement me") | ||
} | ||
|
||
// log operation adds another entry to the update log file. | ||
// The log file is marked as invalid if the write fails. This will trigger a | ||
// checkpoint operation in the future. | ||
func (s *diskstore) LogOperation(op op) error { | ||
panic("TODO: implement me") | ||
} | ||
|
||
// WriteCheckpoint serializes all state into a json file. The file contains an | ||
// array with all states known to the memory storage. | ||
// WriteCheckpoint first serializes all state to a temporary file, and finally | ||
// moves the temporary data file into the correct location. No files | ||
// are overwritten or replaced. Instead the change sequence number is used for | ||
// the filename, and older data files will be deleted after success. | ||
// | ||
// The active marker file is overwritten after all updates did succeed. The | ||
// marker file contains the filename of the current valid data-file. | ||
// NOTE: due to limitation on some Operating system or file systems, the active | ||
// marker is not a symlink, but an actual file. | ||
func (s *diskstore) WriteCheckpoint(state map[string]entry) error { | ||
panic("TODO: implement me") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// Package memlog implements the memlog statestore backend. | ||
// The store provided by memlog is a in-memory key-value store | ||
// that logs all operations to an append only log file. | ||
// Once the log file is considered full the store executes a checkpoint | ||
// operation. The checkpoint operation serializes all state to a data file. | ||
// | ||
// The memory store in memlog holds all key-value pairs in a hashtable, with | ||
// value represented by map[string]interface{}. As the store must be 'valid' | ||
// based on the state of the last update operations (Set, Remove), it | ||
// guarantees that no references into data structures passed via Set are held. | ||
// Instead structured data is serialized/deserialized into a | ||
// map[string]interface{}. The serialized states contain only primitive types | ||
// like intX, uintX, float, bool, string, slices, or map[string]interface{} | ||
// itself. As a side effect this also guarantees that the internal can always | ||
// be serialized to disk after updating the in memory representation. | ||
// | ||
// On disk we have a meta file, an update log file, data files, and an active | ||
// marker file in the store directory. | ||
// | ||
// The meta file only contains the store version number. | ||
// | ||
// Normally all operations that update the store in memory state are appended | ||
// to the update log file. | ||
// The file stores all entries in JSON format. Each entry starts with an action | ||
// entry, followed by an data entry. | ||
// The action entry has the schema: `{"op": "<name>", id: <number>}`. Supporter | ||
// operations are 'set' or 'remove'. The `id` contains a sequential counter | ||
// that must always be increased by 1. | ||
// The data entry for the 'set' operation has the format: `{"K": "<key>", "V": { ... }}`. | ||
// The data entry for the 'remove' operation has the format: `{"K": "<key>"}`. | ||
// Updates to the log file are not synced to disk. Having all updates available | ||
// between restarts/crashes also depends on the capabilities of the operation | ||
// system and file system. When opening the store we read up until it is | ||
// possible, reconstructing a last known valid state the beat can continue | ||
// from. This can lead to duplicates if the machine/filesystem has had an | ||
// outage with state not yet fully synchronised to disk. Ordinary restarts | ||
// should not lead to any problems. | ||
// If any error is encountered when reading the log file, the next updates to the store | ||
// will trigger a checkpoint operation and reset the log file. | ||
// | ||
// The store might contain multiple data files, but only the last data file is | ||
// supposed to be valid. Older data files will continiously tried to be cleaned up | ||
// on checkpoint operations. | ||
// The data files filenames do include the change sequence number. Which allows | ||
// us to sort them by name. The checkpoint operation of memlog, writes the full | ||
// state into a new data file, that consists of an JSON array with all known | ||
// key-value pairs. Each JSON object in the array consists of the value | ||
// object, with memlog private fields added. Private fields start with `_`. At | ||
// the moment the only private field is `_key`, which is used to identify the | ||
// key-value pair. | ||
// NOTE: Creating a new file guarantees that Beats can progress when creating a | ||
// new checkpoint file. Some filesystems tend to block the | ||
// delete/replace operation when the file is accessed by another process | ||
// (e.g. common problem with AV Scanners on Windows). By creating a new | ||
// file we circumvent this problem. Failures in deleting old files is | ||
// ok, and we will try to delete old data files again in the future. | ||
// | ||
// The active marker file is not really used by the store. It is written for | ||
// debugging purposes and contains the filepath of the last written data file | ||
// that is supposed to be valid. | ||
// | ||
// When opening the store we first validate the meta file and read the "last" | ||
// data file into the in-memory hashtable. Older data files are ignored. The | ||
// filename with the update sequence number is used to sort data files. | ||
// NOTE: the active marker file is not used, as the checkpoint operation is | ||
// supposed to be an atomic operation that is finalized once the data | ||
// file is moved to its correct location. | ||
// | ||
// After loading the data file we loop over all operations in the log file. | ||
// Operations with a smaller sequence number are ignored when iterating the log | ||
// file. If any subsequent entries in the log file have a sequence number difference != | ||
// 1, we assume the log file to be corrupted and stop the loop. All processing | ||
// continues from the last known accumulated state. | ||
// | ||
// When closing the store we make a last attempt at fsyncing the log file (just | ||
// in case), close the log file and clear all in memory state. | ||
// | ||
// The store provided by memlog is threadsafe and uses a RWMutex. We allow only | ||
// one active writer, but multiple concurrent readers. | ||
package memlog |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package memlog | ||
|
||
import "errors" | ||
|
||
var ( | ||
errRegClosed = errors.New("registry has been closed") | ||
errLogInvalid = errors.New("can not add operation to log file, a checkpoint is required") | ||
errTxIDInvalid = errors.New("invalid update sequence number") | ||
errKeyUnknown = errors.New("key unknown") | ||
) |
Oops, something went wrong.