Skip to content

Commit

Permalink
Loader reuse existing nodes on reload (grafana#6288)
Browse files Browse the repository at this point in the history
* Move UpdateBlock from componentNode interface to blockNode interface.

This change means that all blockNodes have now the possibility to update their managed block with the update River block content.

* Update the loader to update the managed block of a config node  on reload if it already existed in the graph.
With this optimization, we re-use existing nodes and update them instead of creating a new node.
This is especially useful for modules.

* add two new tests to check that on reload the config nodes behave as expected

* add updateblock to declare node

* update loader logic to detect duplicated blocks and reuse already defined blocks

* add updateblock to import config node

* update changelog

* move function and remove unnecessary check
  • Loading branch information
wildum authored Mar 5, 2024
1 parent 2df4296 commit 2e9d5a2
Show file tree
Hide file tree
Showing 19 changed files with 416 additions and 56 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Main (unreleased)
- Set permissions on the `Grafana Agent [Flow]` folder when installing via the
windows installer rather than relying on the parent folder permissions. (@erikbaranowski)

- Fix an issue where the import config node would not run after a config reload. (@wildum)

- Fix an issue where Loki could reject a batch of logs when structured metadata feature is used. (@thampiotr)

v0.40.1 (2024-02-27)
Expand Down
9 changes: 5 additions & 4 deletions internal/flow/internal/controller/block_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
type BlockNode interface {
dag.Node

// Block returns the current block of the managed config node.
// Block returns the current block managed by the node.
Block() *ast.BlockStmt

// Evaluate updates the arguments for the managed component
// by re-evaluating its River block with the provided scope. The managed component
// will be built the first time Evaluate is called.
// Evaluate updates the arguments by re-evaluating the River block with the provided scope.
//
// Evaluate will return an error if the River block cannot be evaluated or if
// decoding to arguments fails.
Evaluate(scope *vm.Scope) error

// UpdateBlock updates the River block used to construct arguments.
UpdateBlock(b *ast.BlockStmt)
}
4 changes: 0 additions & 4 deletions internal/flow/internal/controller/component_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controller

import (
"github.com/grafana/agent/internal/component"
"github.com/grafana/river/ast"
)

// ComponentNode is a generic representation of a Flow component.
Expand All @@ -27,9 +26,6 @@ type ComponentNode interface {
// ID returns the component ID of the managed component from its River block.
ID() ComponentID

// UpdateBlock updates the River block used to construct arguments for the managed component.
UpdateBlock(b *ast.BlockStmt)

// ModuleIDs returns the current list of modules managed by the component.
ModuleIDs() []string
}
89 changes: 56 additions & 33 deletions internal/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,15 @@ func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks,
}

func (l *Loader) populateDeclareNodes(g *dag.Graph, declareBlocks []*ast.BlockStmt) diag.Diagnostics {
var diags diag.Diagnostics
var (
diags diag.Diagnostics
node *DeclareNode
blockMap = make(map[string]*ast.BlockStmt, len(declareBlocks))
)
l.declareNodes = map[string]*DeclareNode{}
for _, declareBlock := range declareBlocks {
id := BlockComponentID(declareBlock).String()

if declareBlock.Label == declareType {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Expand All @@ -350,23 +356,40 @@ func (l *Loader) populateDeclareNodes(g *dag.Graph, declareBlocks []*ast.BlockSt
})
continue
}
// TODO: if node already exists in the graph, update the block
// instead of copying it.
node := NewDeclareNode(declareBlock)
if g.GetByID(node.NodeID()) != nil {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("cannot add declare node %q; node with same ID already exists", node.NodeID()),
})

if diag, defined := blockAlreadyDefined(blockMap, id, declareBlock); defined {
diags = append(diags, diag)
continue
}

if exist := l.graph.GetByID(id); exist != nil {
node = exist.(*DeclareNode)
node.UpdateBlock(declareBlock)
} else {
node = NewDeclareNode(declareBlock)
}
l.componentNodeManager.customComponentReg.registerDeclare(declareBlock)
l.declareNodes[node.label] = node
g.Add(node)
}
return diags
}

// blockAlreadyDefined returns (diag, true) if the given id is already in the provided blockMap.
// else it adds the block to the map and returns (empty diag, false).
func blockAlreadyDefined(blockMap map[string]*ast.BlockStmt, id string, block *ast.BlockStmt) (diag.Diagnostic, bool) {
if orig, redefined := blockMap[id]; redefined {
return diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("block %s already declared at %s", id, ast.StartPos(orig).Position()),
StartPos: block.NamePos.Position(),
EndPos: block.NamePos.Add(len(id) - 1).Position(),
}, true
}
blockMap[id] = block
return diag.Diagnostic{}, false
}

// populateServiceNodes adds service nodes to the graph.
func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockStmt) diag.Diagnostics {
var diags diag.Diagnostics
Expand Down Expand Up @@ -441,25 +464,33 @@ func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockSt
// populateConfigBlockNodes adds any config blocks to the graph.
func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, configBlocks []*ast.BlockStmt) diag.Diagnostics {
var (
diags diag.Diagnostics
nodeMap = NewConfigNodeMap()
diags diag.Diagnostics
nodeMap = NewConfigNodeMap()
blockMap = make(map[string]*ast.BlockStmt, len(configBlocks))
)

for _, block := range configBlocks {
node, newConfigNodeDiags := NewConfigNode(block, l.globals)
diags = append(diags, newConfigNodeDiags...)

if g.GetByID(node.NodeID()) != nil {
configBlockStartPos := ast.StartPos(block).Position()
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("%q block already declared at %s", node.NodeID(), configBlockStartPos),
StartPos: configBlockStartPos,
EndPos: ast.EndPos(block).Position(),
})

var (
node BlockNode
newConfigNodeDiags diag.Diagnostics
)
id := BlockComponentID(block).String()
if diag, defined := blockAlreadyDefined(blockMap, id, block); defined {
diags = append(diags, diag)
continue
}
// Check the graph from the previous call to Load to see we can copy an
// existing instance of BlockNode.
if exist := l.graph.GetByID(id); exist != nil {
node = exist.(BlockNode)
node.UpdateBlock(block)
} else {
node, newConfigNodeDiags = NewConfigNode(block, l.globals)
diags = append(diags, newConfigNodeDiags...)
if diags.HasErrors() {
continue
}
}

nodeMapDiags := nodeMap.Append(node)
diags = append(diags, nodeMapDiags...)
Expand Down Expand Up @@ -502,18 +533,10 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo
)
for _, block := range componentBlocks {
id := BlockComponentID(block).String()

if orig, redefined := blockMap[id]; redefined {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("Component %s already declared at %s", id, ast.StartPos(orig).Position()),
StartPos: block.NamePos.Position(),
EndPos: block.NamePos.Add(len(id) - 1).Position(),
})
if diag, defined := blockAlreadyDefined(blockMap, id, block); defined {
diags = append(diags, diag)
continue
}
blockMap[id] = block

// Check the graph from the previous call to Load to see if we can copy an
// existing instance of ComponentNode.
if exist := l.graph.GetByID(id); exist != nil {
Expand Down
Loading

0 comments on commit 2e9d5a2

Please sign in to comment.