diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go index f90492348..30e8f6fd5 100644 --- a/pkg/beyla/config.go +++ b/pkg/beyla/config.go @@ -109,6 +109,9 @@ var DefaultConfig = Config{ Enable: kubeflags.EnabledDefault, InformersSyncTimeout: 30 * time.Second, }, + HostID: HostIDConfig{ + FetchTimeout: 500 * time.Millisecond, + }, }, Routes: &transform.RoutesConfig{Unmatch: transform.UnmatchHeuristic}, NetworkFlows: defaultNetworkConfig, @@ -192,6 +195,14 @@ type Attributes struct { Kubernetes transform.KubernetesDecorator `yaml:"kubernetes"` InstanceID traces.InstanceIDConfig `yaml:"instance_id"` Select attributes.Selection `yaml:"select"` + HostID HostIDConfig `yaml:"host_id"` +} + +type HostIDConfig struct { + // Override allows overriding the reported host.id in Beyla + Override string `yaml:"override" env:"BEYLA_HOST_ID"` + // HostIDFetchTimeout specifies the timeout for trying to fetch the HostID from diverse Cloud Providers + FetchTimeout time.Duration `yaml:"fetch_timeout" env:"BEYLA_HOST_ID_FETCH_TIMEOUT"` } type ConfigError string diff --git a/pkg/beyla/config_test.go b/pkg/beyla/config_test.go index 3bf2339ab..e280d6179 100644 --- a/pkg/beyla/config_test.go +++ b/pkg/beyla/config_test.go @@ -46,6 +46,9 @@ attributes: informers_sync_timeout: 30s instance_id: dns: true + host_id: + override: the-host-id + fetch_timeout: 4s select: beyla.network.flow: include: ["foo", "bar"] @@ -166,6 +169,10 @@ network: Enable: kubeflags.EnabledTrue, InformersSyncTimeout: 30 * time.Second, }, + HostID: HostIDConfig{ + Override: "the-host-id", + FetchTimeout: 4 * time.Second, + }, Select: attributes.Selection{ attributes.BeylaNetworkFlow.Section: attributes.InclusionLists{ Include: []string{"foo", "bar"}, diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index bd4b232aa..93704623e 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -131,7 +131,11 @@ func buildCommonContextInfo( attributeGroups(config, ctxInfo) - ctxInfo.FetchHostID(ctx) + if config.Attributes.HostID.Override == "" { + ctxInfo.FetchHostID(ctx, config.Attributes.HostID.FetchTimeout) + } else { + ctxInfo.HostID = config.Attributes.HostID.Override + } return ctxInfo } diff --git a/pkg/internal/pipe/global/context.go b/pkg/internal/pipe/global/context.go index 5e526fc74..a1a7a077c 100644 --- a/pkg/internal/pipe/global/context.go +++ b/pkg/internal/pipe/global/context.go @@ -11,7 +11,8 @@ import ( // ContextInfo stores some context information that must be shared across some nodes of the // processing graph. type ContextInfo struct { - // HostID of the host running Beyla + // HostID of the host running Beyla. Unless testing environments, this value must be + // automatically set after invoking FetchHostID HostID string // AppO11y stores context information that is only required for application observability. // Its values must be initialized by the App O11y code and shouldn't be accessed from the diff --git a/pkg/internal/pipe/global/host_id.go b/pkg/internal/pipe/global/host_id.go index e3e91631e..2821d3ed8 100644 --- a/pkg/internal/pipe/global/host_id.go +++ b/pkg/internal/pipe/global/host_id.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "time" "go.opentelemetry.io/contrib/detectors/aws/ec2" "go.opentelemetry.io/contrib/detectors/azure/azurevm" @@ -16,7 +17,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type hostIDFetcher func(context.Context) (string, error) +type hostIDFetcher func(context.Context, time.Duration) (string, error) type fetcher struct { name string @@ -35,7 +36,7 @@ func cilog() *slog.Logger { // This process is known to fail when Beyla runs inside a Kubernetes Pod out of the cloud providers // mentioned in (1). In that case, the host.id will be later set to the full hostname. // This method must be invoked once the ContextInfo object is completely initialized -func (ci *ContextInfo) FetchHostID(ctx context.Context) { +func (ci *ContextInfo) FetchHostID(ctx context.Context, timeout time.Duration) { log := cilog().With("func", "fetchHostID") fetchers := []fetcher{ {name: "AWS", fetch: ec2HostIDFetcher}, @@ -50,7 +51,7 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) { log := log.With("fetcher", f.name) log.Debug("trying to fetch host ID") var id string - if id, err = f.fetch(ctx); err == nil { + if id, err = f.fetch(ctx, timeout); err == nil { log.Info("got host ID", "hostID", id) ci.HostID = id return @@ -64,22 +65,39 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) { } } -func azureHostIDFetcher(ctx context.Context) (string, error) { - return detectHostID(ctx, azurevm.New()) +func azureHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) { + return detectHostID(ctx, timeout, azurevm.New()) } -func gcpHostIDFetcher(ctx context.Context) (string, error) { - return detectHostID(ctx, gcp.NewDetector()) +func gcpHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) { + return detectHostID(ctx, timeout, gcp.NewDetector()) } -func ec2HostIDFetcher(ctx context.Context) (string, error) { - return detectHostID(ctx, ec2.NewResourceDetector()) +func ec2HostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) { + return detectHostID(ctx, timeout, ec2.NewResourceDetector()) } -func detectHostID(ctx context.Context, detector resource.Detector) (string, error) { - res, err := detector.Detect(ctx) - if err != nil { +func detectHostID(ctx context.Context, timeout time.Duration, detector resource.Detector) (string, error) { + // passing a cancellable context to the detector.Detect(ctx) does not always + // end the connection prematurely, so we wrap its invocation into a goroutine + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + resCh := make(chan *resource.Resource, 1) + errCh := make(chan error, 1) + go func() { + if res, err := detector.Detect(ctx); err != nil { + errCh <- err + } else { + resCh <- res + } + }() + var res *resource.Resource + select { + case res = <-resCh: // continue! + case err := <-errCh: return "", err + case <-cctx.Done(): + return "", errors.New("timed out waiting for host ID connection") } for _, attr := range res.Attributes() { if attr.Key == semconv.HostIDKey { @@ -89,7 +107,7 @@ func detectHostID(ctx context.Context, detector resource.Detector) (string, erro return "", fmt.Errorf("can't find host.id in %v", res.Attributes()) } -func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) { +func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context, _ time.Duration) (string, error) { if !ci.K8sInformer.IsKubeEnabled() { return "", errors.New("kubernetes is not enabled") } @@ -130,7 +148,7 @@ func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) { return nodes.Items[0].Status.NodeInfo.MachineID, nil } -func linuxLocalMachineIDFetcher(_ context.Context) (string, error) { +func linuxLocalMachineIDFetcher(_ context.Context, _ time.Duration) (string, error) { if result, err := os.ReadFile("/etc/machine-id"); err == nil || len(result) == 0 { return string(bytes.TrimSpace(result)), nil } diff --git a/pkg/internal/pipe/global/host_id_test.go b/pkg/internal/pipe/global/host_id_test.go new file mode 100644 index 000000000..fe47f65b6 --- /dev/null +++ b/pkg/internal/pipe/global/host_id_test.go @@ -0,0 +1,18 @@ +package global + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFetchTimeout(t *testing.T) { + ctxInfo := ContextInfo{} + start := time.Now() + ctxInfo.FetchHostID(context.Background(), time.Millisecond) + elapsed := time.Since(start) + + assert.Less(t, elapsed, time.Second) +}