Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Publish cloud.instance.id ecs field with k8s nodes (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelKatsoulis authored May 9, 2023
1 parent cce94bd commit 9f89160
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 34 deletions.
12 changes: 12 additions & 0 deletions input/assets/internal/ecs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package internal

import (
"github.com/elastic/beats/v7/libbeat/beat"
)

func WithCloudInstanceId(instanceId string) AssetOption {
return func(e beat.Event) beat.Event {
e.Fields["cloud.instance.id"] = instanceId
return e
}
}
37 changes: 37 additions & 0 deletions input/assets/internal/ecs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package internal

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/inputrunner/input/testutil"
)

func TestECS_WithCloudInstanceId(t *testing.T) {
for _, tt := range []struct {
name string
assetOp AssetOption
expected beat.Event
}{
{
name: "instance Id provided",
assetOp: WithCloudInstanceId("i-0699b78f46f0fa248"),
expected: beat.Event{
Fields: mapstr.M{"cloud.instance.id": "i-0699b78f46f0fa248"},
Meta: mapstr.M{},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
publisher := testutil.NewInMemoryPublisher()

Publish(publisher, tt.assetOp)

assert.Equal(t, 1, len(publisher.Events))
assert.Equal(t, tt.expected, publisher.Events[0])
})
}
}
3 changes: 1 addition & 2 deletions input/assets/internal/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ func WithAssetMetadata(value mapstr.M) AssetOption {
}
}

func WithNodeData(name, providerId string, startTime *metav1.Time) AssetOption {
func WithNodeData(name string, startTime *metav1.Time) AssetOption {
return func(e beat.Event) beat.Event {
e.Fields["kubernetes.node.name"] = name
e.Fields["kubernetes.node.providerId"] = providerId
e.Fields["kubernetes.node.start_time"] = startTime
return e
}
Expand Down
3 changes: 1 addition & 2 deletions input/assets/internal/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,10 @@ func TestPublish(t *testing.T) {
{
name: "with valid node data",
opts: []AssetOption{
WithNodeData("ip-172-31-29-242.us-east-2.compute.internal", "aws:///us-east-2b/i-0699b78f46f0fa248", &startTime),
WithNodeData("ip-172-31-29-242.us-east-2.compute.internal", &startTime),
},
expectedEvent: beat.Event{Fields: mapstr.M{
"kubernetes.node.name": "ip-172-31-29-242.us-east-2.compute.internal",
"kubernetes.node.providerId": "aws:///us-east-2b/i-0699b78f46f0fa248",
"kubernetes.node.start_time": &startTime,
}, Meta: mapstr.M{}},
},
Expand Down
34 changes: 19 additions & 15 deletions input/assets/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/inputrunner/input/assets/internal"
"github.com/elastic/inputrunner/input/testutil"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
)

var startTime = metav1.Time{Time: time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local)}
Expand Down Expand Up @@ -90,11 +91,12 @@ func TestPublishK8sNodeAsset(t *testing.T) {
name string
event beat.Event

assetName string
assetType string
assetID string
parents []string
children []string
assetName string
assetType string
assetID string
instanceID string
parents []string
children []string
}{
{
name: "publish node",
Expand All @@ -105,18 +107,19 @@ func TestPublishK8sNodeAsset(t *testing.T) {
"asset.ean": "k8s.node:60988eed-1885-4b63-9fa4-780206969deb",
"asset.parents": []string{},
"kubernetes.node.name": "ip-172-31-29-242.us-east-2.compute.internal",
"kubernetes.node.providerId": "aws:///us-east-2b/i-0699b78f46f0fa248",
"kubernetes.node.start_time": &startTime,
"cloud.instance.id": "i-0699b78f46f0fa248",
},
Meta: mapstr.M{
"index": "assets-k8s.node-default",
},
},

assetName: "ip-172-31-29-242.us-east-2.compute.internal",
assetType: "k8s.node",
assetID: "60988eed-1885-4b63-9fa4-780206969deb",
parents: []string{},
assetName: "ip-172-31-29-242.us-east-2.compute.internal",
assetType: "k8s.node",
assetID: "60988eed-1885-4b63-9fa4-780206969deb",
instanceID: "i-0699b78f46f0fa248",
parents: []string{},
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -125,8 +128,9 @@ func TestPublishK8sNodeAsset(t *testing.T) {
internal.Publish(publisher,
internal.WithAssetTypeAndID(tt.assetType, tt.assetID),
internal.WithAssetParents(tt.parents),
internal.WithNodeData(tt.assetName, "aws:///us-east-2b/i-0699b78f46f0fa248", &startTime),
internal.WithNodeData(tt.assetName, &startTime),
internal.WithIndex(tt.assetType, ""),
internal.WithCloudInstanceId(tt.instanceID),
)
assert.Equal(t, 1, len(publisher.Events))
assert.Equal(t, tt.event, publisher.Events[0])
Expand Down
63 changes: 53 additions & 10 deletions input/assets/k8s/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package k8s
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/inputrunner/input/assets/internal"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
kube "github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/inputrunner/input/assets/internal"

"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -112,23 +115,63 @@ func publishK8sNodes(ctx context.Context, log *logp.Logger, indexNamespace strin
for _, obj := range watcher.Store().List() {
o, ok := obj.(*kube.Node)
if ok {
log.Debug("Publish Node: %+v", o.Name)

assetProviderId := o.Spec.ProviderID
log.Debugf("Publish Node: %+v", o.Name)
instanceId := getInstanceId(o)
log.Debug("Node instance id: ", instanceId)
assetId := string(o.ObjectMeta.UID)
assetStartTime := o.ObjectMeta.CreationTimestamp
assetParents := []string{}

internal.Publish(publisher,
options := []internal.AssetOption{
internal.WithAssetTypeAndID(assetType, assetId),
internal.WithAssetParents(assetParents),
internal.WithNodeData(o.Name, assetProviderId, &assetStartTime),
internal.WithNodeData(o.Name, &assetStartTime),
internal.WithIndex(assetType, indexNamespace),
)
}
if instanceId != "" {
options = append(options, internal.WithCloudInstanceId(instanceId))
}
internal.Publish(publisher, options...)

} else {
log.Error("Publishing nodes assets failed. Type assertion of node object failed")
}

}

}

// getInstanceId returns the cloud instance id in case
// the node runs in one of [aws, gcp] csp.
// In case of aws the instance id is retrieved from providerId
// which is in the form of aws:///region/instanceId for not fargate nodes.
// In case of gcp it is retrieved by the annotation container.googleapis.com/instance_id
// In all other cases empty string is returned
func getInstanceId(node *kubernetes.Node) string {
providerId := node.Spec.ProviderID

switch csp := getCspFromProviderId(providerId); csp {
case "aws":
slice := strings.Split(providerId, "/")
// in case of fargate the slice length will be 6
if len(slice) == 5 {
return slice[4]
}
case "gcp":
annotations := node.GetAnnotations()
return annotations["container.googleapis.com/instance_id"]
default:
return ""
}
return ""
}

// getCspFromProviderId return the cps for a given providerId string.
// In case of aws providerId is in the form of aws:///region/instanceId
// In case of gcp providerId is in the form of gce://project/region/nodeName
func getCspFromProviderId(providerId string) string {
if strings.HasPrefix(providerId, "aws") {
return "aws"
}
if strings.HasPrefix(providerId, "gce") {
return "gcp"
}
return ""
}
137 changes: 132 additions & 5 deletions input/assets/k8s/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ package k8s
import (
"context"
"fmt"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/inputrunner/input/testutil"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
"testing"
"time"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
kube "github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/inputrunner/input/testutil"
)

func TestGetNodeWatcher(t *testing.T) {
Expand Down Expand Up @@ -142,3 +145,127 @@ func TestPublishK8sNodes(t *testing.T) {

assert.Equal(t, 1, len(publisher.Events))
}

func TestGetInstanceId(t *testing.T) {
for _, tt := range []struct {
name string
input kubernetes.Resource
output string
}{
{
name: "AWS node",
input: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
UID: "60988eed-1885-4b63-9fa4-780206969deb",
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}},
},
Spec: v1.NodeSpec{
ProviderID: "aws:///us-east-2b/i-0699b78f46f0fa248",
},
},
output: "i-0699b78f46f0fa248",
},
{
name: "AWS node Fargate",
input: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
UID: "60988eed-1885-4b63-9fa4-780206969deb",
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}},
},
Spec: v1.NodeSpec{
ProviderID: "aws:///us-east-2c/fa80a30ea9-6a2c0e0c771e4e8caa80f702f9821271/fargate-ip-192-168-104-15.us-east-2.compute.internal",
},
},
output: "",
},
{
name: "GCP node",
input: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
UID: "60988eed-1885-4b63-9fa4-780206969deb",
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"key1": "value1",
"key2": "value2",
"container.googleapis.com/instance_id": "5445971517456914360",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}},
},
Spec: v1.NodeSpec{
ProviderID: "gce://elastic-observability/us-central1-c/gke-michaliskatsoulis-te-default-pool-41126842-55kg",
},
},
output: "5445971517456914360",
},
{
name: "No CSP Node (kind)",
input: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
UID: "60988eed-1885-4b63-9fa4-780206969deb",
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}},
},
Spec: v1.NodeSpec{
ProviderID: "kind://docker/kind/kind-worker",
},
},
output: "",
},
} {
t.Run(tt.name, func(t *testing.T) {
n := tt.input.(*kube.Node)
providerId := getInstanceId(n)
assert.Equal(t, providerId, tt.output)
})
}
}

0 comments on commit 9f89160

Please sign in to comment.