Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): Add tracing support #129

Merged
merged 3 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,54 @@ limitations under the License.
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/klcpermit"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/component-base/cli"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"os"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func main() {

tp := initOTel()

rand.Seed(time.Now().UnixNano())
command := app.NewSchedulerCommand(
app.WithPlugin(klcpermit.Name, klcpermit.New),
)

code := cli.Run(command)

err := tp.Shutdown(context.TODO())
if err != nil {
fmt.Println(err)
}
os.Exit(code)

}

func initOTel() *sdktrace.TracerProvider {
var err error
exp, err := stdout.NewExporter(stdout.WithPrettyPrint())
if err != nil {
log.Panicf("failed to initialize stdout exporter %v\n", err)
}
bsp := sdktrace.NewSimpleSpanProcessor(exp)
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp
}
7 changes: 4 additions & 3 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module github.com/keptn-sandbox/lifecycle-controller/scheduler
go 1.18

require (
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/stdout v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
k8s.io/client-go v0.24.3
Expand Down Expand Up @@ -67,13 +71,10 @@ require (
go.opentelemetry.io/contrib v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions scheduler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel/exporters/otlp v0.20.0 h1:PTNgq9MRmQqqJY0REVbZFvwkYOA85vbdQU/nVfxDyqg=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/stdout v0.20.0 h1:NXKkOWV7Np9myYrQE0wqRS3SbwzbupHu07rDONKubMo=
go.opentelemetry.io/otel/exporters/stdout v0.20.0/go.mod h1:t9LUU3JvYlmoPA61abhvsXxKh58xdyi3nMtI6JiR8v0=
go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw=
Expand Down
44 changes: 38 additions & 6 deletions scheduler/pkg/klcpermit/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package klcpermit

import (
"context"

"github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/tracing"
"go.opentelemetry.io/otel/codes"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var workloadInstanceResource = schema.GroupVersionResource{Group: "lifecycle.keptn.sh", Version: "v1alpha1", Resource: "keptnworkloadinstances"}
Expand Down Expand Up @@ -39,41 +43,53 @@ type Manager interface {

type WorkloadManager struct {
dynamicClient dynamic.Interface
Tracer trace.Tracer
}

func NewWorkloadManager(d dynamic.Interface) *WorkloadManager {
sMgr := &WorkloadManager{
dynamicClient: d,
Tracer: otel.Tracer("keptn/scheduler"),
}
return sMgr
}

var bindCRDSpan = make(map[string]trace.Span, 100)

func (sMgr *WorkloadManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
//List workloadInstance run CRDs
name := GetCRDName(pod)
name := getCRDName(pod)
crd, err := sMgr.GetCRD(ctx, pod.Namespace, name)

if err != nil {
klog.Infof("[Keptn Permit Plugin] could not find workloadInstance crd %s, err:%s", name, err.Error())
return WorkloadInstanceNotFound
}

ctx, span := sMgr.getSpan(ctx, crd, pod)

//check CRD status
phase, found, err := unstructured.NestedString(crd.UnstructuredContent(), "status", "preDeploymentStatus")
klog.Infof("[Keptn Permit Plugin] workloadInstance crd %s, found %s with phase %s ", crd, found, phase)
if err == nil && found {
span.AddEvent("StatusEvaluation", trace.WithAttributes(tracing.Status.String(phase)))
switch KeptnState(phase) {
case StatePending:
return Wait
case StateFailed:
span.SetStatus(codes.Error, "Failed")
span.End()
unbindSpan(pod)
return Failure
case StateSucceeded:
span.End()
unbindSpan(pod)
return Success
case StatePending:
return Wait
case StateRunning:
return Wait
case StateUnknown:
return Wait
}

}
return WorkloadInstanceStatusNotSpecified
}
Expand All @@ -84,9 +100,25 @@ func (sMgr *WorkloadManager) GetCRD(ctx context.Context, namespace string, name
return sMgr.dynamicClient.Resource(workloadInstanceResource).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
}

func GetCRDName(pod *corev1.Pod) string {
func (sMgr *WorkloadManager) getSpan(ctx context.Context, crd *unstructured.Unstructured, pod *corev1.Pod) (context.Context, trace.Span) {
name := getCRDName(pod)
if span, ok := bindCRDSpan[name]; ok {
return ctx, span
}
ctx, span := tracing.CreateSpan(ctx, crd, sMgr.Tracer, pod.Namespace)
//TODO store only sampled one and cap it
bindCRDSpan[name] = span
return ctx, span
}

func getCRDName(pod *corev1.Pod) string {
application := pod.Annotations["keptn.sh/app"]
workloadInstance := pod.Annotations["keptn.sh/workload"]
version := pod.Annotations["keptn.sh/version"]
return application + "-" + workloadInstance + "-" + version
}

func unbindSpan(pod *corev1.Pod) {
name := getCRDName(pod)
delete(bindCRDSpan, name)
}
25 changes: 25 additions & 0 deletions scheduler/pkg/tracing/carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tracing

import "fmt"

// KeptnCarrier carries the TraceContext
type KeptnCarrier map[string]interface{}

// Get returns the value associated with the passed key.
func (kc KeptnCarrier) Get(key string) string {
return fmt.Sprintf("%v", kc[key])
}

// Set stores the key-value pair.
func (kc KeptnCarrier) Set(key string, value string) {
kc[key] = value
}

// Keys lists the keys stored in this carrier.
func (kc KeptnCarrier) Keys() []string {
keys := make([]string, 0, len(kc))
for k := range kc {
keys = append(keys, k)
}
return keys
}
41 changes: 41 additions & 0 deletions scheduler/pkg/tracing/semconv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tracing

import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

const (
ApplicationName attribute.Key = attribute.Key("keptn.deployment.app_name")
Workload attribute.Key = attribute.Key("keptn.deployment.workload")
Version attribute.Key = attribute.Key("keptn.deployment.version")
Namespace attribute.Key = attribute.Key("keptn.deployment.namespace")
Status attribute.Key = attribute.Key("keptn.deployment.status")
)

func CreateSpan(ctx context.Context, crd *unstructured.Unstructured, t trace.Tracer, ns string) (context.Context, trace.Span) {
// search for annotations
annotations, found, _ := unstructured.NestedMap(crd.UnstructuredContent(), "metadata", "annotations")
if found {
ctx = otel.GetTextMapPropagator().Extract(ctx, KeptnCarrier(annotations))
}
ctx, span := t.Start(ctx, "schedule")

appName, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "app")
if found {
span.SetAttributes(ApplicationName.String(appName))
}
workload, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "workloadName")
if found {
span.SetAttributes(Workload.String(workload))
}
version, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "version")
if found {
span.SetAttributes(Version.String(version))
}
span.SetAttributes(Namespace.String(ns))
return ctx, span
}