Skip to content

Commit

Permalink
Short timeout for cloud instance ID fetcher (#1050)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Jul 23, 2024
1 parent 437e171 commit 0ab3dc5
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 16 deletions.
11 changes: 11 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ var DefaultConfig = Config{
Enable: kubeflags.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
},
HostID: HostIDConfig{
FetchTimeout: 500 * time.Millisecond,
},
},
Routes: &transform.RoutesConfig{Unmatch: transform.UnmatchHeuristic},
NetworkFlows: defaultNetworkConfig,
Expand Down Expand Up @@ -192,6 +195,14 @@ type Attributes struct {
Kubernetes transform.KubernetesDecorator `yaml:"kubernetes"`
InstanceID traces.InstanceIDConfig `yaml:"instance_id"`
Select attributes.Selection `yaml:"select"`
HostID HostIDConfig `yaml:"host_id"`
}

type HostIDConfig struct {
// Override allows overriding the reported host.id in Beyla
Override string `yaml:"override" env:"BEYLA_HOST_ID"`
// HostIDFetchTimeout specifies the timeout for trying to fetch the HostID from diverse Cloud Providers
FetchTimeout time.Duration `yaml:"fetch_timeout" env:"BEYLA_HOST_ID_FETCH_TIMEOUT"`
}

type ConfigError string
Expand Down
7 changes: 7 additions & 0 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ attributes:
informers_sync_timeout: 30s
instance_id:
dns: true
host_id:
override: the-host-id
fetch_timeout: 4s
select:
beyla.network.flow:
include: ["foo", "bar"]
Expand Down Expand Up @@ -166,6 +169,10 @@ network:
Enable: kubeflags.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
},
HostID: HostIDConfig{
Override: "the-host-id",
FetchTimeout: 4 * time.Second,
},
Select: attributes.Selection{
attributes.BeylaNetworkFlow.Section: attributes.InclusionLists{
Include: []string{"foo", "bar"},
Expand Down
6 changes: 5 additions & 1 deletion pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func buildCommonContextInfo(

attributeGroups(config, ctxInfo)

ctxInfo.FetchHostID(ctx)
if config.Attributes.HostID.Override == "" {
ctxInfo.FetchHostID(ctx, config.Attributes.HostID.FetchTimeout)
} else {
ctxInfo.HostID = config.Attributes.HostID.Override
}

return ctxInfo
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/pipe/global/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
// ContextInfo stores some context information that must be shared across some nodes of the
// processing graph.
type ContextInfo struct {
// HostID of the host running Beyla
// HostID of the host running Beyla. Unless testing environments, this value must be
// automatically set after invoking FetchHostID
HostID string
// AppO11y stores context information that is only required for application observability.
// Its values must be initialized by the App O11y code and shouldn't be accessed from the
Expand Down
46 changes: 32 additions & 14 deletions pkg/internal/pipe/global/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"os"
"time"

"go.opentelemetry.io/contrib/detectors/aws/ec2"
"go.opentelemetry.io/contrib/detectors/azure/azurevm"
Expand All @@ -16,7 +17,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type hostIDFetcher func(context.Context) (string, error)
type hostIDFetcher func(context.Context, time.Duration) (string, error)

type fetcher struct {
name string
Expand All @@ -35,7 +36,7 @@ func cilog() *slog.Logger {
// This process is known to fail when Beyla runs inside a Kubernetes Pod out of the cloud providers
// mentioned in (1). In that case, the host.id will be later set to the full hostname.
// This method must be invoked once the ContextInfo object is completely initialized
func (ci *ContextInfo) FetchHostID(ctx context.Context) {
func (ci *ContextInfo) FetchHostID(ctx context.Context, timeout time.Duration) {
log := cilog().With("func", "fetchHostID")
fetchers := []fetcher{
{name: "AWS", fetch: ec2HostIDFetcher},
Expand All @@ -50,7 +51,7 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) {
log := log.With("fetcher", f.name)
log.Debug("trying to fetch host ID")
var id string
if id, err = f.fetch(ctx); err == nil {
if id, err = f.fetch(ctx, timeout); err == nil {
log.Info("got host ID", "hostID", id)
ci.HostID = id
return
Expand All @@ -64,22 +65,39 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) {
}
}

func azureHostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, azurevm.New())
func azureHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, azurevm.New())
}

func gcpHostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, gcp.NewDetector())
func gcpHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, gcp.NewDetector())
}

func ec2HostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, ec2.NewResourceDetector())
func ec2HostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, ec2.NewResourceDetector())
}

func detectHostID(ctx context.Context, detector resource.Detector) (string, error) {
res, err := detector.Detect(ctx)
if err != nil {
func detectHostID(ctx context.Context, timeout time.Duration, detector resource.Detector) (string, error) {
// passing a cancellable context to the detector.Detect(ctx) does not always
// end the connection prematurely, so we wrap its invocation into a goroutine
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resCh := make(chan *resource.Resource, 1)
errCh := make(chan error, 1)
go func() {
if res, err := detector.Detect(ctx); err != nil {
errCh <- err
} else {
resCh <- res
}
}()
var res *resource.Resource
select {
case res = <-resCh: // continue!
case err := <-errCh:
return "", err
case <-cctx.Done():
return "", errors.New("timed out waiting for host ID connection")
}
for _, attr := range res.Attributes() {
if attr.Key == semconv.HostIDKey {
Expand All @@ -89,7 +107,7 @@ func detectHostID(ctx context.Context, detector resource.Detector) (string, erro
return "", fmt.Errorf("can't find host.id in %v", res.Attributes())
}

func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) {
func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context, _ time.Duration) (string, error) {
if !ci.K8sInformer.IsKubeEnabled() {
return "", errors.New("kubernetes is not enabled")
}
Expand Down Expand Up @@ -130,7 +148,7 @@ func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) {
return nodes.Items[0].Status.NodeInfo.MachineID, nil
}

func linuxLocalMachineIDFetcher(_ context.Context) (string, error) {
func linuxLocalMachineIDFetcher(_ context.Context, _ time.Duration) (string, error) {
if result, err := os.ReadFile("/etc/machine-id"); err == nil || len(result) == 0 {
return string(bytes.TrimSpace(result)), nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/internal/pipe/global/host_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package global

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFetchTimeout(t *testing.T) {
ctxInfo := ContextInfo{}
start := time.Now()
ctxInfo.FetchHostID(context.Background(), time.Millisecond)
elapsed := time.Since(start)

assert.Less(t, elapsed, time.Second)
}

0 comments on commit 0ab3dc5

Please sign in to comment.