diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ffb4a56077e..0aee98e93b82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ Main (unreleased) - Set permissions on the `Grafana Agent [Flow]` folder when installing via the windows installer rather than relying on the parent folder permissions. (@erikbaranowski) +- Fix an issue where the import config node would not run after a config reload. (@wildum) + - Fix an issue where Loki could reject a batch of logs when structured metadata feature is used. (@thampiotr) v0.40.1 (2024-02-27) diff --git a/internal/flow/internal/controller/block_node.go b/internal/flow/internal/controller/block_node.go index 6d3aca5bbb03..bab884f2abf9 100644 --- a/internal/flow/internal/controller/block_node.go +++ b/internal/flow/internal/controller/block_node.go @@ -11,14 +11,15 @@ import ( type BlockNode interface { dag.Node - // Block returns the current block of the managed config node. + // Block returns the current block managed by the node. Block() *ast.BlockStmt - // Evaluate updates the arguments for the managed component - // by re-evaluating its River block with the provided scope. The managed component - // will be built the first time Evaluate is called. + // Evaluate updates the arguments by re-evaluating the River block with the provided scope. // // Evaluate will return an error if the River block cannot be evaluated or if // decoding to arguments fails. Evaluate(scope *vm.Scope) error + + // UpdateBlock updates the River block used to construct arguments. + UpdateBlock(b *ast.BlockStmt) } diff --git a/internal/flow/internal/controller/component_node.go b/internal/flow/internal/controller/component_node.go index f3d9cdcaa4a4..1deb18df353a 100644 --- a/internal/flow/internal/controller/component_node.go +++ b/internal/flow/internal/controller/component_node.go @@ -2,7 +2,6 @@ package controller import ( "github.com/grafana/agent/internal/component" - "github.com/grafana/river/ast" ) // ComponentNode is a generic representation of a Flow component. @@ -27,9 +26,6 @@ type ComponentNode interface { // ID returns the component ID of the managed component from its River block. ID() ComponentID - // UpdateBlock updates the River block used to construct arguments for the managed component. - UpdateBlock(b *ast.BlockStmt) - // ModuleIDs returns the current list of modules managed by the component. ModuleIDs() []string } diff --git a/internal/flow/internal/controller/loader.go b/internal/flow/internal/controller/loader.go index 74d4b344b26e..43e102963ed7 100644 --- a/internal/flow/internal/controller/loader.go +++ b/internal/flow/internal/controller/loader.go @@ -338,9 +338,15 @@ func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks, } func (l *Loader) populateDeclareNodes(g *dag.Graph, declareBlocks []*ast.BlockStmt) diag.Diagnostics { - var diags diag.Diagnostics + var ( + diags diag.Diagnostics + node *DeclareNode + blockMap = make(map[string]*ast.BlockStmt, len(declareBlocks)) + ) l.declareNodes = map[string]*DeclareNode{} for _, declareBlock := range declareBlocks { + id := BlockComponentID(declareBlock).String() + if declareBlock.Label == declareType { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, @@ -350,16 +356,18 @@ func (l *Loader) populateDeclareNodes(g *dag.Graph, declareBlocks []*ast.BlockSt }) continue } - // TODO: if node already exists in the graph, update the block - // instead of copying it. - node := NewDeclareNode(declareBlock) - if g.GetByID(node.NodeID()) != nil { - diags.Add(diag.Diagnostic{ - Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("cannot add declare node %q; node with same ID already exists", node.NodeID()), - }) + + if diag, defined := blockAlreadyDefined(blockMap, id, declareBlock); defined { + diags = append(diags, diag) continue } + + if exist := l.graph.GetByID(id); exist != nil { + node = exist.(*DeclareNode) + node.UpdateBlock(declareBlock) + } else { + node = NewDeclareNode(declareBlock) + } l.componentNodeManager.customComponentReg.registerDeclare(declareBlock) l.declareNodes[node.label] = node g.Add(node) @@ -367,6 +375,21 @@ func (l *Loader) populateDeclareNodes(g *dag.Graph, declareBlocks []*ast.BlockSt return diags } +// blockAlreadyDefined returns (diag, true) if the given id is already in the provided blockMap. +// else it adds the block to the map and returns (empty diag, false). +func blockAlreadyDefined(blockMap map[string]*ast.BlockStmt, id string, block *ast.BlockStmt) (diag.Diagnostic, bool) { + if orig, redefined := blockMap[id]; redefined { + return diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("block %s already declared at %s", id, ast.StartPos(orig).Position()), + StartPos: block.NamePos.Position(), + EndPos: block.NamePos.Add(len(id) - 1).Position(), + }, true + } + blockMap[id] = block + return diag.Diagnostic{}, false +} + // populateServiceNodes adds service nodes to the graph. func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockStmt) diag.Diagnostics { var diags diag.Diagnostics @@ -441,25 +464,33 @@ func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockSt // populateConfigBlockNodes adds any config blocks to the graph. func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, configBlocks []*ast.BlockStmt) diag.Diagnostics { var ( - diags diag.Diagnostics - nodeMap = NewConfigNodeMap() + diags diag.Diagnostics + nodeMap = NewConfigNodeMap() + blockMap = make(map[string]*ast.BlockStmt, len(configBlocks)) ) for _, block := range configBlocks { - node, newConfigNodeDiags := NewConfigNode(block, l.globals) - diags = append(diags, newConfigNodeDiags...) - - if g.GetByID(node.NodeID()) != nil { - configBlockStartPos := ast.StartPos(block).Position() - diags.Add(diag.Diagnostic{ - Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("%q block already declared at %s", node.NodeID(), configBlockStartPos), - StartPos: configBlockStartPos, - EndPos: ast.EndPos(block).Position(), - }) - + var ( + node BlockNode + newConfigNodeDiags diag.Diagnostics + ) + id := BlockComponentID(block).String() + if diag, defined := blockAlreadyDefined(blockMap, id, block); defined { + diags = append(diags, diag) continue } + // Check the graph from the previous call to Load to see we can copy an + // existing instance of BlockNode. + if exist := l.graph.GetByID(id); exist != nil { + node = exist.(BlockNode) + node.UpdateBlock(block) + } else { + node, newConfigNodeDiags = NewConfigNode(block, l.globals) + diags = append(diags, newConfigNodeDiags...) + if diags.HasErrors() { + continue + } + } nodeMapDiags := nodeMap.Append(node) diags = append(diags, nodeMapDiags...) @@ -502,18 +533,10 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo ) for _, block := range componentBlocks { id := BlockComponentID(block).String() - - if orig, redefined := blockMap[id]; redefined { - diags.Add(diag.Diagnostic{ - Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("Component %s already declared at %s", id, ast.StartPos(orig).Position()), - StartPos: block.NamePos.Position(), - EndPos: block.NamePos.Add(len(id) - 1).Position(), - }) + if diag, defined := blockAlreadyDefined(blockMap, id, block); defined { + diags = append(diags, diag) continue } - blockMap[id] = block - // Check the graph from the previous call to Load to see if we can copy an // existing instance of ComponentNode. if exist := l.graph.GetByID(id); exist != nil { diff --git a/internal/flow/internal/controller/loader_test.go b/internal/flow/internal/controller/loader_test.go index 5105587ba1f2..672e2dce672a 100644 --- a/internal/flow/internal/controller/loader_test.go +++ b/internal/flow/internal/controller/loader_test.go @@ -90,14 +90,30 @@ func TestLoader(t *testing.T) { t.Run("New Graph", func(t *testing.T) { l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig)) + diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig), nil) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) }) + t.Run("Reload Graph New Config", func(t *testing.T) { + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig), nil) + require.NoError(t, diags.ErrorOrNil()) + requireGraph(t, l.Graph(), testGraphDefinition) + updatedTestConfig := ` + tracing { + sampling_fraction = 2 + } + ` + diags = applyFromContent(t, l, []byte(testFile), []byte(updatedTestConfig), nil) + require.NoError(t, diags.ErrorOrNil()) + // Expect the same graph because tracing is still there and logging will be added by default. + requireGraph(t, l.Graph(), testGraphDefinition) + }) + t.Run("New Graph No Config", func(t *testing.T) { l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(testFile), nil) + diags := applyFromContent(t, l, []byte(testFile), nil, nil) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) }) @@ -115,11 +131,11 @@ func TestLoader(t *testing.T) { } ` l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(startFile), []byte(testConfig)) + diags := applyFromContent(t, l, []byte(startFile), []byte(testConfig), nil) origGraph := l.Graph() require.NoError(t, diags.ErrorOrNil()) - diags = applyFromContent(t, l, []byte(testFile), []byte(testConfig)) + diags = applyFromContent(t, l, []byte(testFile), []byte(testConfig), nil) require.NoError(t, diags.ErrorOrNil()) newGraph := l.Graph() @@ -134,7 +150,7 @@ func TestLoader(t *testing.T) { } ` l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(invalidFile), nil) + diags := applyFromContent(t, l, []byte(invalidFile), nil, nil) require.ErrorContains(t, diags.ErrorOrNil(), `cannot find the definition of component name "doesnotexist`) }) @@ -145,25 +161,25 @@ func TestLoader(t *testing.T) { } ` l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(invalidFile), nil) + diags := applyFromContent(t, l, []byte(invalidFile), nil, nil) require.ErrorContains(t, diags.ErrorOrNil(), `component "testcomponents.tick" must have a label`) }) t.Run("Load with correct stability level", func(t *testing.T) { l := controller.NewLoader(newLoaderOptionsWithStability(featuregate.StabilityBeta)) - diags := applyFromContent(t, l, []byte(testFile), nil) + diags := applyFromContent(t, l, []byte(testFile), nil, nil) require.NoError(t, diags.ErrorOrNil()) }) t.Run("Load with below minimum stability level", func(t *testing.T) { l := controller.NewLoader(newLoaderOptionsWithStability(featuregate.StabilityStable)) - diags := applyFromContent(t, l, []byte(testFile), nil) + diags := applyFromContent(t, l, []byte(testFile), nil, nil) require.ErrorContains(t, diags.ErrorOrNil(), "component \"testcomponents.tick\" is at stability level \"beta\", which is below the minimum allowed stability level \"stable\"") }) t.Run("Load with undefined minimum stability level", func(t *testing.T) { l := controller.NewLoader(newLoaderOptionsWithStability(featuregate.StabilityUndefined)) - diags := applyFromContent(t, l, []byte(testFile), nil) + diags := applyFromContent(t, l, []byte(testFile), nil, nil) require.ErrorContains(t, diags.ErrorOrNil(), "stability levels must be defined: got \"beta\" as stability of component \"testcomponents.tick\" and as the minimum stability level") }) @@ -182,7 +198,7 @@ func TestLoader(t *testing.T) { } ` l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(invalidFile), nil) + diags := applyFromContent(t, l, []byte(invalidFile), nil, nil) require.Error(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), graphDefinition{ @@ -210,9 +226,94 @@ func TestLoader(t *testing.T) { } ` l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(invalidFile), nil) + diags := applyFromContent(t, l, []byte(invalidFile), nil, nil) require.Error(t, diags.ErrorOrNil()) }) + + t.Run("Config block redefined", func(t *testing.T) { + invalidFile := ` + logging {} + logging {} + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, nil, []byte(invalidFile), nil) + require.ErrorContains(t, diags.ErrorOrNil(), `block logging already declared at TestLoader/Config_block_redefined:2:4`) + }) + + t.Run("Config block redefined after reload", func(t *testing.T) { + file := ` + logging {} + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, nil, []byte(file), nil) + require.NoError(t, diags.ErrorOrNil()) + invalidFile := ` + logging {} + logging {} + ` + diags = applyFromContent(t, l, nil, []byte(invalidFile), nil) + require.ErrorContains(t, diags.ErrorOrNil(), `block logging already declared at TestLoader/Config_block_redefined_after_reload:2:4`) + }) + + t.Run("Component block redefined", func(t *testing.T) { + invalidFile := ` + testcomponents.tick "ticker" { + frequency = "1s" + } + testcomponents.tick "ticker" { + frequency = "1s" + } + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, []byte(invalidFile), nil, nil) + require.ErrorContains(t, diags.ErrorOrNil(), `block testcomponents.tick.ticker already declared at TestLoader/Component_block_redefined:2:4`) + }) + + t.Run("Component block redefined after reload", func(t *testing.T) { + file := ` + testcomponents.tick "ticker" { + frequency = "1s" + } + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, []byte(file), nil, nil) + require.NoError(t, diags.ErrorOrNil()) + invalidFile := ` + testcomponents.tick "ticker" { + frequency = "1s" + } + testcomponents.tick "ticker" { + frequency = "1s" + } + ` + diags = applyFromContent(t, l, []byte(invalidFile), nil, nil) + require.ErrorContains(t, diags.ErrorOrNil(), `block testcomponents.tick.ticker already declared at TestLoader/Component_block_redefined_after_reload:2:4`) + }) + + t.Run("Declare block redefined", func(t *testing.T) { + invalidFile := ` + declare "a" {} + declare "a" {} + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, nil, nil, []byte(invalidFile)) + require.ErrorContains(t, diags.ErrorOrNil(), `block declare.a already declared at TestLoader/Declare_block_redefined:2:4`) + }) + + t.Run("Declare block redefined after reload", func(t *testing.T) { + file := ` + declare "a" {} + ` + l := controller.NewLoader(newLoaderOptions()) + diags := applyFromContent(t, l, nil, nil, []byte(file)) + require.NoError(t, diags.ErrorOrNil()) + invalidFile := ` + declare "a" {} + declare "a" {} + ` + diags = applyFromContent(t, l, nil, nil, []byte(invalidFile)) + require.ErrorContains(t, diags.ErrorOrNil(), `block declare.a already declared at TestLoader/Declare_block_redefined_after_reload:2:4`) + }) } // TestScopeWithFailingComponent is used to ensure that the scope is filled out, even if the component @@ -253,13 +354,13 @@ func TestScopeWithFailingComponent(t *testing.T) { } l := controller.NewLoader(newLoaderOptions()) - diags := applyFromContent(t, l, []byte(testFile), nil) + diags := applyFromContent(t, l, []byte(testFile), nil, nil) require.Error(t, diags.ErrorOrNil()) require.Len(t, diags, 1) require.True(t, strings.Contains(diags.Error(), `unrecognized attribute name "frequenc"`)) } -func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, configBytes []byte) diag.Diagnostics { +func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, configBytes []byte, declareBytes []byte) diag.Diagnostics { t.Helper() var ( @@ -281,6 +382,13 @@ func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, } } + if string(declareBytes) != "" { + declareBlocks, diags = fileToBlock(t, declareBytes) + if diags.HasErrors() { + return diags + } + } + applyOptions := controller.ApplyOptions{ ComponentBlocks: componentBlocks, ConfigBlocks: configBlocks, diff --git a/internal/flow/internal/controller/node_config_argument.go b/internal/flow/internal/controller/node_config_argument.go index 5b979503f805..23c57987cce0 100644 --- a/internal/flow/internal/controller/node_config_argument.go +++ b/internal/flow/internal/controller/node_config_argument.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "strings" "sync" "github.com/grafana/river/ast" @@ -85,3 +86,19 @@ func (cn *ArgumentConfigNode) Block() *ast.BlockStmt { // NodeID implements dag.Node and returns the unique ID for the config node. func (cn *ArgumentConfigNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments. +// The new block isn't used until the next time Evaluate is invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// ArgumentConfigNode. +func (cn *ArgumentConfigNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} diff --git a/internal/flow/internal/controller/node_config_export.go b/internal/flow/internal/controller/node_config_export.go index 027335e629b4..314e9a3dd983 100644 --- a/internal/flow/internal/controller/node_config_export.go +++ b/internal/flow/internal/controller/node_config_export.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "strings" "sync" "github.com/grafana/river/ast" @@ -74,3 +75,19 @@ func (cn *ExportConfigNode) Block() *ast.BlockStmt { // NodeID implements dag.Node and returns the unique ID for the config node. func (cn *ExportConfigNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments. +// The new block isn't used until the next time Evaluate is invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// ExportConfigNode. +func (cn *ExportConfigNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} diff --git a/internal/flow/internal/controller/node_config_import.go b/internal/flow/internal/controller/node_config_import.go index fdaf4e3b3757..ba640ab72df9 100644 --- a/internal/flow/internal/controller/node_config_import.go +++ b/internal/flow/internal/controller/node_config_import.go @@ -385,6 +385,22 @@ func (cn *ImportConfigNode) run(errChan chan error, updateTasks func() error) er } } +// UpdateBlock updates the River block used to construct arguments. +// The new block isn't used until the next time Evaluate is invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// ImportConfigNode. +func (cn *ImportConfigNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.source.SetEval(vm.New(b.Body)) +} + func (cn *ImportConfigNode) Label() string { return cn.label } // Block implements BlockNode and returns the current block of the managed config node. diff --git a/internal/flow/internal/controller/node_config_logging.go b/internal/flow/internal/controller/node_config_logging.go index f5757138bdf2..1a17212e245d 100644 --- a/internal/flow/internal/controller/node_config_logging.go +++ b/internal/flow/internal/controller/node_config_logging.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "strings" "sync" "github.com/go-kit/log" @@ -80,3 +81,19 @@ func (cn *LoggingConfigNode) Block() *ast.BlockStmt { // NodeID implements dag.Node and returns the unique ID for the config node. func (cn *LoggingConfigNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments. +// The new block isn't used until the next time Evaluate is invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// LoggingConfigNode. +func (cn *LoggingConfigNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} diff --git a/internal/flow/internal/controller/node_config_tracing.go b/internal/flow/internal/controller/node_config_tracing.go index c8d77b46b8d0..893ade78266f 100644 --- a/internal/flow/internal/controller/node_config_tracing.go +++ b/internal/flow/internal/controller/node_config_tracing.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "strings" "sync" "github.com/grafana/agent/internal/flow/tracing" @@ -84,3 +85,19 @@ func (cn *TracingConfigNode) Block() *ast.BlockStmt { // NodeID implements dag.Node and returns the unique ID for the config node. func (cn *TracingConfigNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments. +// The new block isn't used until the next time Evaluate is invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// LoggingConfigNode. +func (cn *TracingConfigNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} diff --git a/internal/flow/internal/controller/node_declare.go b/internal/flow/internal/controller/node_declare.go index b67350eb5540..0dec60283908 100644 --- a/internal/flow/internal/controller/node_declare.go +++ b/internal/flow/internal/controller/node_declare.go @@ -1,6 +1,9 @@ package controller import ( + "strings" + "sync" + "github.com/grafana/river/ast" "github.com/grafana/river/vm" ) @@ -10,6 +13,7 @@ type DeclareNode struct { label string nodeID string componentName string + mut sync.RWMutex block *ast.BlockStmt } @@ -42,3 +46,17 @@ func (cn *DeclareNode) Block() *ast.BlockStmt { // NodeID implements dag.Node and returns the unique ID for the config node. func (cn *DeclareNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the managed River block. +// +// UpdateBlock will panic if the block does not match the component ID of the +// DeclareNode. +func (cn *DeclareNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(strings.Split(cn.nodeID, ".")) { + panic("UpdateBlock called with an River block with a different ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b +} diff --git a/internal/flow/internal/controller/scheduler_test.go b/internal/flow/internal/controller/scheduler_test.go index 9f9b3f136a75..ad5ebc23ca11 100644 --- a/internal/flow/internal/controller/scheduler_test.go +++ b/internal/flow/internal/controller/scheduler_test.go @@ -99,6 +99,7 @@ func (fr fakeRunnable) NodeID() string { return fr.ID } func (fr fakeRunnable) Run(ctx context.Context) error { return fr.Component.Run(ctx) } func (fr fakeRunnable) Block() *ast.BlockStmt { return nil } func (fr fakeRunnable) Evaluate(scope *vm.Scope) error { return nil } +func (fr fakeRunnable) UpdateBlock(b *ast.BlockStmt) {} type mockComponent struct { RunFunc func(ctx context.Context) error diff --git a/internal/flow/internal/importsource/import_file.go b/internal/flow/internal/importsource/import_file.go index bc6690967e17..8936e147a8e8 100644 --- a/internal/flow/internal/importsource/import_file.go +++ b/internal/flow/internal/importsource/import_file.go @@ -96,3 +96,8 @@ func (im *ImportFile) Run(ctx context.Context) error { func (im *ImportFile) CurrentHealth() component.Health { return im.fileComponent.CurrentHealth() } + +// Update the evaluator. +func (im *ImportFile) SetEval(eval *vm.Evaluator) { + im.eval = eval +} diff --git a/internal/flow/internal/importsource/import_git.go b/internal/flow/internal/importsource/import_git.go index 6c3490eb9b6f..95c5f31bb07e 100644 --- a/internal/flow/internal/importsource/import_git.go +++ b/internal/flow/internal/importsource/import_git.go @@ -254,3 +254,8 @@ func (im *ImportGit) CurrentHealth() component.Health { defer im.healthMut.RUnlock() return im.health } + +// Update the evaluator. +func (im *ImportGit) SetEval(eval *vm.Evaluator) { + im.eval = eval +} diff --git a/internal/flow/internal/importsource/import_http.go b/internal/flow/internal/importsource/import_http.go index 959f24fe8c74..6050338b086d 100644 --- a/internal/flow/internal/importsource/import_http.go +++ b/internal/flow/internal/importsource/import_http.go @@ -101,3 +101,8 @@ func (im *ImportHTTP) Run(ctx context.Context) error { func (im *ImportHTTP) CurrentHealth() component.Health { return im.managedRemoteHTTP.CurrentHealth() } + +// Update the evaluator. +func (im *ImportHTTP) SetEval(eval *vm.Evaluator) { + im.eval = eval +} diff --git a/internal/flow/internal/importsource/import_source.go b/internal/flow/internal/importsource/import_source.go index 9f4569771321..a23b1506ecca 100644 --- a/internal/flow/internal/importsource/import_source.go +++ b/internal/flow/internal/importsource/import_source.go @@ -32,6 +32,8 @@ type ImportSource interface { Run(ctx context.Context) error // CurrentHealth returns the current Health status of the running source. CurrentHealth() component.Health + // Update evaluator + SetEval(eval *vm.Evaluator) } // NewImportSource creates a new ImportSource depending on the type. diff --git a/internal/flow/internal/importsource/import_string.go b/internal/flow/internal/importsource/import_string.go index fc07583627a8..4bdac32e55e2 100644 --- a/internal/flow/internal/importsource/import_string.go +++ b/internal/flow/internal/importsource/import_string.go @@ -58,3 +58,8 @@ func (im *ImportString) CurrentHealth() component.Health { Health: component.HealthTypeHealthy, } } + +// Update the evaluator. +func (im *ImportString) SetEval(eval *vm.Evaluator) { + im.eval = eval +} diff --git a/internal/flow/module_eval_test.go b/internal/flow/module_eval_test.go index 94b811b4234d..86965bdf996f 100644 --- a/internal/flow/module_eval_test.go +++ b/internal/flow/module_eval_test.go @@ -210,6 +210,111 @@ func TestUpdates_TwoModules_SameCompNames(t *testing.T) { }, 3*time.Second, 10*time.Millisecond) } +func TestUpdates_ReloadConfig(t *testing.T) { + // We use this module in a Flow config below. + module := ` + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } +` + + // We send the count increments via module and to the summation component and verify that the updates propagate. + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + module.string "test" { + content = ` + strconv.Quote(module) + ` + arguments { + input = testcomponents.count.inc.count + } + } + + testcomponents.summation "sum" { + input = module.string.test.exports.output + } +` + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + // Reload with a new export. + module = ` + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = -10 + } +` + config = ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + module.string "test" { + content = ` + strconv.Quote(module) + ` + arguments { + input = testcomponents.count.inc.count + } + } + + testcomponents.summation "sum" { + input = module.string.test.exports.output + } +` + f, err = flow.ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == -10 + }, 3*time.Second, 10*time.Millisecond) +} + func testOptions(t *testing.T) flow.Options { t.Helper() s, err := logging.New(os.Stderr, logging.DefaultOptions) diff --git a/internal/flow/module_test.go b/internal/flow/module_test.go index 79661491c472..663d058757e5 100644 --- a/internal/flow/module_test.go +++ b/internal/flow/module_test.go @@ -103,13 +103,13 @@ func TestModule(t *testing.T) { name: "Duplicate argument config", argumentModuleContent: argumentConfig + argumentConfig, exportModuleContent: exportStringConfig, - expectedErrorContains: "\"argument.username\" block already declared", + expectedErrorContains: "block argument.username already declared at t1:2:2", }, { name: "Duplicate export config", argumentModuleContent: argumentConfig, exportModuleContent: exportStringConfig + exportStringConfig, - expectedErrorContains: "\"export.username\" block already declared", + expectedErrorContains: "block export.username already declared at t1:7:2", }, { name: "Multiple exports but none are used but still exported",