forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cursor input skeleton (elastic#19378)
This PR is part of introducing a new input architecture in filebeat. The current state of the full implementation can be seen [here](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2) and [sample inputs based on the new API](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/features/input). The full list of changes will include: - Introduce v2 API interfaces - Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality - Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs. - Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts. - Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat. The change introduces the skeleton and documentation with details for cursor based inputs. Future updates will add the actual implementation and tests. (cherry picked from commit 13633ce)
- Loading branch information
Steffen Siering
committed
Jul 7, 2020
1 parent
db27fe0
commit 630a74c
Showing
7 changed files
with
611 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package cursor | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/elastic/go-concert/unison" | ||
) | ||
|
||
// cleaner removes finished entries from the registry file. | ||
type cleaner struct { | ||
log *logp.Logger | ||
} | ||
|
||
// run starts a loop that tries to clean entries from the registry. | ||
// The cleaner locks the store, such that no new states can be created | ||
// during the cleanup phase. Only resources that are finished and whos TTL | ||
// (clean_timeout setting) has expired will be removed. | ||
// | ||
// Resources are considered "Finished" if they do not have a current owner (active input), and | ||
// if they have no pending updates that still need to be written to the registry file after associated | ||
// events have been ACKed by the outputs. | ||
// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked | ||
// for a long time, and the life time has been exhausted, then the resource will be removed immediately | ||
// once the last event has been ACKed. | ||
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) { | ||
panic("TODO: implement me") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package cursor | ||
|
||
// Cursor allows the input to check if cursor status has been stored | ||
// in the past and unpack the status into a custom structure. | ||
type Cursor struct { | ||
store *store | ||
resource *resource | ||
} | ||
|
||
func makeCursor(store *store, res *resource) Cursor { | ||
return Cursor{store: store, resource: res} | ||
} | ||
|
||
// IsNew returns true if no cursor information has been stored | ||
// for the current Source. | ||
func (c Cursor) IsNew() bool { return c.resource.IsNew() } | ||
|
||
// Unpack deserialized the cursor state into to. Unpack fails if no pointer is | ||
// given, or if the structure to points to is not compatible with the document | ||
// stored. | ||
func (c Cursor) Unpack(to interface{}) error { | ||
if c.IsNew() { | ||
return nil | ||
} | ||
return c.resource.UnpackCursor(to) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// Package cursor provides an InputManager for use with the v2 API, that is | ||
// capable of storing an internal cursor state between restarts. | ||
// | ||
// The InputManager requires authors to Implement a configuration function and | ||
// the cursor.Input interface. The configuration function returns a slice of | ||
// sources ([]Source) that it has read from the configuration object, and the | ||
// actual Input that will be used to collect events from each configured | ||
// source. | ||
// When Run a go-routine will be started per configured source. If two inputs have | ||
// configured the same source, only one will be active, while the other waits | ||
// for the resource to become free. | ||
// The manager keeps track of the state per source. When publishing an event a | ||
// new cursor value can be passed as well. Future instance of the input can | ||
// read the last published cursor state. | ||
// | ||
// For each source an in-memory and a persitent state are tracked. Internal | ||
// meta updates by the input manager can not be read by Inputs, and will be | ||
// written to the persistent store immediately. Cursor state updates are read | ||
// and update by the input. Cursor updates are written to the persistent store | ||
// only after the events have been ACKed by the output. Internally the input | ||
// manager keeps track of already ACKed updates and pending ACKs. | ||
// In order to guarantee progress even if the pbulishing is slow or blocked, all cursor | ||
// updates are written to the in-memory state immediately. Source without any | ||
// pending updates are in-sync (in-memory state == persistet state). All | ||
// updates are ordered, but we allow the in-memory state to be ahead of the | ||
// persistent state. | ||
// When an input is started, the cursor state is read from the in-memory state. | ||
// This way a new input instance can continue where other inputs have been | ||
// stopped, even if we still have in-flight events from older input instances. | ||
// The coordination between inputs guarantees that all updates are always in | ||
// order. | ||
// | ||
// When a shutdown signal is received, the publisher is directly disconnected | ||
// from the outputs. As all coordination is directly handled by the | ||
// InputManager, shutdown will be immediate (once the input itself has | ||
// returned), and can not be blocked by the outputs. | ||
// | ||
// An input that is about to collect a source that is already collected by | ||
// another input will wait until the other input has returned or the current | ||
// input did receive a shutdown signal. | ||
package cursor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package cursor | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
input "github.com/elastic/beats/v7/filebeat/input/v2" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
) | ||
|
||
// Input interface for cursor based inputs. This interface must be implemented | ||
// by inputs that with to use the InputManager in order to implement a stateful | ||
// input that can store state between restarts. | ||
type Input interface { | ||
Name() string | ||
|
||
// Test checks the configuaration and runs additional checks if the Input can | ||
// actually collect data for the given configuration (e.g. check if host/port or files are | ||
// accessible). | ||
// The input manager will call Test per configured source. | ||
Test(Source, input.TestContext) error | ||
|
||
// Run starts the data collection. Run must return an error only if the | ||
// error is fatal making it impossible for the input to recover. | ||
// The input run a go-routine can call Run per configured Source. | ||
Run(input.Context, Source, Cursor, Publisher) error | ||
} | ||
|
||
// managedInput implements the v2.Input interface, integrating cursor Inputs | ||
// with the v2 input API. | ||
// The managedInput starts go-routines per configured source. | ||
// If a Run returns the error is 'remembered', but active data collecting | ||
// continues. Only after all Run calls have returned will the managedInput be | ||
// done. | ||
type managedInput struct { | ||
manager *InputManager | ||
userID string | ||
sources []Source | ||
input Input | ||
cleanTimeout time.Duration | ||
} | ||
|
||
// Name is required to implement the v2.Input interface | ||
func (inp *managedInput) Name() string { return inp.input.Name() } | ||
|
||
// Test runs the Test method for each configured source. | ||
func (inp *managedInput) Test(ctx input.TestContext) error { | ||
panic("TODO: implement me") | ||
} | ||
|
||
// Run creates a go-routine per source, waiting until all go-routines have | ||
// returned, either by error, or by shutdown signal. | ||
// If an input panics, we create an error value with stack trace to report the | ||
// issue, but not crash the whole process. | ||
func (inp *managedInput) Run( | ||
ctx input.Context, | ||
pipeline beat.PipelineConnector, | ||
) (err error) { | ||
panic("TODO: implement me") | ||
} | ||
|
||
func (inp *managedInput) createSourceID(s Source) string { | ||
if inp.userID != "" { | ||
return fmt.Sprintf("%v::%v::%v", inp.manager.Type, inp.userID, s.Name()) | ||
} | ||
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package cursor | ||
|
||
import ( | ||
"time" | ||
|
||
input "github.com/elastic/beats/v7/filebeat/input/v2" | ||
v2 "github.com/elastic/beats/v7/filebeat/input/v2" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/elastic/beats/v7/libbeat/statestore" | ||
|
||
"github.com/elastic/go-concert/unison" | ||
) | ||
|
||
// InputManager is used to create, manage, and coordinate stateful inputs and | ||
// their persistent state. | ||
// The InputManager ensures that only one input can be active for a unique source. | ||
// If two inputs have overlapping sources, both can still collect data, but | ||
// only one input will collect from the common source. | ||
// | ||
// The InputManager automatically cleans up old entries without an active | ||
// input, and without any pending update operations for the persistent store. | ||
// | ||
// The Type field is used to create the key name in the persistent store. Users | ||
// are allowed to add a custome per input configuration ID using the `id` | ||
// setting, to collect the same source multiple times, but with different | ||
// state. The key name in the persistent store becomes <Type>-[<ID>]-<Source Name> | ||
type InputManager struct { | ||
Logger *logp.Logger | ||
|
||
// StateStore gives the InputManager access to the persitent key value store. | ||
StateStore StateStore | ||
|
||
// Type must contain the name of the input type. It is used to create the key name | ||
// for all sources the inputs collect from. | ||
Type string | ||
|
||
// DefaultCleanTimeout configures the key/value garbage collection interval. | ||
// The InputManager will only collect keys for the configured 'Type' | ||
DefaultCleanTimeout time.Duration | ||
|
||
// Configure returns an array of Sources, and a configured Input instances | ||
// that will be used to collect events from each source. | ||
Configure func(cfg *common.Config) ([]Source, Input, error) | ||
|
||
store *store | ||
} | ||
|
||
// Source describe a source the input can collect data from. | ||
// The `Name` method must return an unique name, that will be used to identify | ||
// the source in the persistent state store. | ||
type Source interface { | ||
Name() string | ||
} | ||
|
||
// StateStore interface and configurations used to give the Manager access to the persistent store. | ||
type StateStore interface { | ||
Access() (*statestore.Store, error) | ||
CleanupInterval() time.Duration | ||
} | ||
|
||
// Init starts background processes for deleting old entries from the | ||
// persistent store if mode is ModeRun. | ||
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { | ||
panic("TODO: implement me") | ||
} | ||
|
||
// Create builds a new v2.Input using the provided Configure function. | ||
// The Input will run a go-routine per source that has been configured. | ||
func (cim *InputManager) Create(config *common.Config) (input.Input, error) { | ||
panic("TODO: implement me") | ||
} | ||
|
||
func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { | ||
if !resource.lock.TryLock() { | ||
log.Infof("Resource '%v' currently in use, waiting...", resource.key) | ||
err := resource.lock.LockContext(canceler) | ||
if err != nil { | ||
log.Infof("Input for resource '%v' has been stopped while waiting", resource.key) | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func releaseResource(resource *resource) { | ||
resource.lock.Unlock() | ||
resource.Release() | ||
} |
Oops, something went wrong.