Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Short timeout for cloud instance ID fetcher #1050

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ var DefaultConfig = Config{
Enable: kubeflags.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
},
HostID: HostIDConfig{
FetchTimeout: 500 * time.Millisecond,
},
},
Routes: &transform.RoutesConfig{Unmatch: transform.UnmatchHeuristic},
NetworkFlows: defaultNetworkConfig,
Expand Down Expand Up @@ -192,6 +195,14 @@ type Attributes struct {
Kubernetes transform.KubernetesDecorator `yaml:"kubernetes"`
InstanceID traces.InstanceIDConfig `yaml:"instance_id"`
Select attributes.Selection `yaml:"select"`
HostID HostIDConfig `yaml:"host_id"`
}

type HostIDConfig struct {
// Override allows overriding the reported host.id in Beyla
Override string `yaml:"override" env:"BEYLA_HOST_ID"`
// HostIDFetchTimeout specifies the timeout for trying to fetch the HostID from diverse Cloud Providers
FetchTimeout time.Duration `yaml:"fetch_timeout" env:"BEYLA_HOST_ID_FETCH_TIMEOUT"`
}

type ConfigError string
Expand Down
7 changes: 7 additions & 0 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ attributes:
informers_sync_timeout: 30s
instance_id:
dns: true
host_id:
override: the-host-id
fetch_timeout: 4s
select:
beyla.network.flow:
include: ["foo", "bar"]
Expand Down Expand Up @@ -166,6 +169,10 @@ network:
Enable: kubeflags.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
},
HostID: HostIDConfig{
Override: "the-host-id",
FetchTimeout: 4 * time.Second,
},
Select: attributes.Selection{
attributes.BeylaNetworkFlow.Section: attributes.InclusionLists{
Include: []string{"foo", "bar"},
Expand Down
6 changes: 5 additions & 1 deletion pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func buildCommonContextInfo(

attributeGroups(config, ctxInfo)

ctxInfo.FetchHostID(ctx)
if config.Attributes.HostID.Override == "" {
ctxInfo.FetchHostID(ctx, config.Attributes.HostID.FetchTimeout)
} else {
ctxInfo.HostID = config.Attributes.HostID.Override
}

return ctxInfo
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/pipe/global/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
// ContextInfo stores some context information that must be shared across some nodes of the
// processing graph.
type ContextInfo struct {
// HostID of the host running Beyla
// HostID of the host running Beyla. Unless testing environments, this value must be
// automatically set after invoking FetchHostID
HostID string
// AppO11y stores context information that is only required for application observability.
// Its values must be initialized by the App O11y code and shouldn't be accessed from the
Expand Down
46 changes: 32 additions & 14 deletions pkg/internal/pipe/global/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"os"
"time"

"go.opentelemetry.io/contrib/detectors/aws/ec2"
"go.opentelemetry.io/contrib/detectors/azure/azurevm"
Expand All @@ -16,7 +17,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type hostIDFetcher func(context.Context) (string, error)
type hostIDFetcher func(context.Context, time.Duration) (string, error)

type fetcher struct {
name string
Expand All @@ -35,7 +36,7 @@ func cilog() *slog.Logger {
// This process is known to fail when Beyla runs inside a Kubernetes Pod out of the cloud providers
// mentioned in (1). In that case, the host.id will be later set to the full hostname.
// This method must be invoked once the ContextInfo object is completely initialized
func (ci *ContextInfo) FetchHostID(ctx context.Context) {
func (ci *ContextInfo) FetchHostID(ctx context.Context, timeout time.Duration) {
log := cilog().With("func", "fetchHostID")
fetchers := []fetcher{
{name: "AWS", fetch: ec2HostIDFetcher},
Expand All @@ -50,7 +51,7 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) {
log := log.With("fetcher", f.name)
log.Debug("trying to fetch host ID")
var id string
if id, err = f.fetch(ctx); err == nil {
if id, err = f.fetch(ctx, timeout); err == nil {
log.Info("got host ID", "hostID", id)
ci.HostID = id
return
Expand All @@ -64,22 +65,39 @@ func (ci *ContextInfo) FetchHostID(ctx context.Context) {
}
}

func azureHostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, azurevm.New())
func azureHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, azurevm.New())
}

func gcpHostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, gcp.NewDetector())
func gcpHostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, gcp.NewDetector())
}

func ec2HostIDFetcher(ctx context.Context) (string, error) {
return detectHostID(ctx, ec2.NewResourceDetector())
func ec2HostIDFetcher(ctx context.Context, timeout time.Duration) (string, error) {
return detectHostID(ctx, timeout, ec2.NewResourceDetector())
}

func detectHostID(ctx context.Context, detector resource.Detector) (string, error) {
res, err := detector.Detect(ctx)
if err != nil {
func detectHostID(ctx context.Context, timeout time.Duration, detector resource.Detector) (string, error) {
// passing a cancellable context to the detector.Detect(ctx) does not always
// end the connection prematurely, so we wrap its invocation into a goroutine
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resCh := make(chan *resource.Resource, 1)
errCh := make(chan error, 1)
go func() {
if res, err := detector.Detect(ctx); err != nil {
errCh <- err
} else {
resCh <- res
}
}()
var res *resource.Resource
select {
case res = <-resCh: // continue!
case err := <-errCh:
return "", err
case <-cctx.Done():
return "", errors.New("timed out waiting for host ID connection")
}
for _, attr := range res.Attributes() {
if attr.Key == semconv.HostIDKey {
Expand All @@ -89,7 +107,7 @@ func detectHostID(ctx context.Context, detector resource.Detector) (string, erro
return "", fmt.Errorf("can't find host.id in %v", res.Attributes())
}

func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) {
func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context, _ time.Duration) (string, error) {
if !ci.K8sInformer.IsKubeEnabled() {
return "", errors.New("kubernetes is not enabled")
}
Expand Down Expand Up @@ -130,7 +148,7 @@ func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context) (string, error) {
return nodes.Items[0].Status.NodeInfo.MachineID, nil
}

func linuxLocalMachineIDFetcher(_ context.Context) (string, error) {
func linuxLocalMachineIDFetcher(_ context.Context, _ time.Duration) (string, error) {
if result, err := os.ReadFile("/etc/machine-id"); err == nil || len(result) == 0 {
return string(bytes.TrimSpace(result)), nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/internal/pipe/global/host_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package global

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFetchTimeout(t *testing.T) {
ctxInfo := ContextInfo{}
start := time.Now()
ctxInfo.FetchHostID(context.Background(), time.Millisecond)
elapsed := time.Since(start)

assert.Less(t, elapsed, time.Second)
}
Loading