diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 700bae1f8b..4d40664d2a 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -17,21 +17,54 @@ limitations under the License. package main import ( + "context" + "fmt" + "log" + "os" + "time" + "github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/klcpermit" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/component-base/cli" "k8s.io/kubernetes/cmd/kube-scheduler/app" - "os" - "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) func main() { + tp := initOTel() + rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand( app.WithPlugin(klcpermit.Name, klcpermit.New), ) code := cli.Run(command) + + err := tp.Shutdown(context.TODO()) + if err != nil { + fmt.Println(err) + } os.Exit(code) + +} + +func initOTel() *sdktrace.TracerProvider { + var err error + exp, err := stdout.NewExporter(stdout.WithPrettyPrint()) + if err != nil { + log.Panicf("failed to initialize stdout exporter %v\n", err) + } + bsp := sdktrace.NewSimpleSpanProcessor(exp) + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSpanProcessor(bsp), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + return tp } diff --git a/scheduler/go.mod b/scheduler/go.mod index 69253b6398..0b5b3f8e0d 100644 --- a/scheduler/go.mod +++ b/scheduler/go.mod @@ -3,6 +3,10 @@ module github.com/keptn-sandbox/lifecycle-controller/scheduler go 1.18 require ( + go.opentelemetry.io/otel v0.20.0 + go.opentelemetry.io/otel/exporters/stdout v0.20.0 + go.opentelemetry.io/otel/sdk v0.20.0 + go.opentelemetry.io/otel/trace v0.20.0 k8s.io/api v0.24.3 k8s.io/apimachinery v0.24.3 k8s.io/client-go v0.24.3 @@ -67,13 +71,10 @@ require ( go.opentelemetry.io/contrib v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 // indirect - go.opentelemetry.io/otel v0.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect go.opentelemetry.io/otel/metric v0.20.0 // indirect - go.opentelemetry.io/otel/sdk v0.20.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect - go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/goleak v1.1.12 // indirect diff --git a/scheduler/go.sum b/scheduler/go.sum index 5cefa1970e..5b8c81b103 100644 --- a/scheduler/go.sum +++ b/scheduler/go.sum @@ -661,6 +661,8 @@ go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel/exporters/otlp v0.20.0 h1:PTNgq9MRmQqqJY0REVbZFvwkYOA85vbdQU/nVfxDyqg= go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM= +go.opentelemetry.io/otel/exporters/stdout v0.20.0 h1:NXKkOWV7Np9myYrQE0wqRS3SbwzbupHu07rDONKubMo= +go.opentelemetry.io/otel/exporters/stdout v0.20.0/go.mod h1:t9LUU3JvYlmoPA61abhvsXxKh58xdyi3nMtI6JiR8v0= go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw= diff --git a/scheduler/pkg/klcpermit/workflow_manager.go b/scheduler/pkg/klcpermit/workflow_manager.go index 10e89b7162..4dd4480396 100644 --- a/scheduler/pkg/klcpermit/workflow_manager.go +++ b/scheduler/pkg/klcpermit/workflow_manager.go @@ -2,13 +2,17 @@ package klcpermit import ( "context" - + "github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/tracing" + "go.opentelemetry.io/otel/codes" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) var workloadInstanceResource = schema.GroupVersionResource{Group: "lifecycle.keptn.sh", Version: "v1alpha1", Resource: "keptnworkloadinstances"} @@ -39,41 +43,53 @@ type Manager interface { type WorkloadManager struct { dynamicClient dynamic.Interface + Tracer trace.Tracer } func NewWorkloadManager(d dynamic.Interface) *WorkloadManager { sMgr := &WorkloadManager{ dynamicClient: d, + Tracer: otel.Tracer("keptn/scheduler"), } return sMgr } +var bindCRDSpan = make(map[string]trace.Span, 100) + func (sMgr *WorkloadManager) Permit(ctx context.Context, pod *corev1.Pod) Status { //List workloadInstance run CRDs - name := GetCRDName(pod) + name := getCRDName(pod) crd, err := sMgr.GetCRD(ctx, pod.Namespace, name) if err != nil { klog.Infof("[Keptn Permit Plugin] could not find workloadInstance crd %s, err:%s", name, err.Error()) return WorkloadInstanceNotFound } + + ctx, span := sMgr.getSpan(ctx, crd, pod) + //check CRD status phase, found, err := unstructured.NestedString(crd.UnstructuredContent(), "status", "preDeploymentStatus") klog.Infof("[Keptn Permit Plugin] workloadInstance crd %s, found %s with phase %s ", crd, found, phase) if err == nil && found { + span.AddEvent("StatusEvaluation", trace.WithAttributes(tracing.Status.String(phase))) switch KeptnState(phase) { - case StatePending: - return Wait case StateFailed: + span.SetStatus(codes.Error, "Failed") + span.End() + unbindSpan(pod) return Failure case StateSucceeded: + span.End() + unbindSpan(pod) return Success + case StatePending: + return Wait case StateRunning: return Wait case StateUnknown: return Wait } - } return WorkloadInstanceStatusNotSpecified } @@ -84,9 +100,25 @@ func (sMgr *WorkloadManager) GetCRD(ctx context.Context, namespace string, name return sMgr.dynamicClient.Resource(workloadInstanceResource).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) } -func GetCRDName(pod *corev1.Pod) string { +func (sMgr *WorkloadManager) getSpan(ctx context.Context, crd *unstructured.Unstructured, pod *corev1.Pod) (context.Context, trace.Span) { + name := getCRDName(pod) + if span, ok := bindCRDSpan[name]; ok { + return ctx, span + } + ctx, span := tracing.CreateSpan(ctx, crd, sMgr.Tracer, pod.Namespace) + //TODO store only sampled one and cap it + bindCRDSpan[name] = span + return ctx, span +} + +func getCRDName(pod *corev1.Pod) string { application := pod.Annotations["keptn.sh/app"] workloadInstance := pod.Annotations["keptn.sh/workload"] version := pod.Annotations["keptn.sh/version"] return application + "-" + workloadInstance + "-" + version } + +func unbindSpan(pod *corev1.Pod) { + name := getCRDName(pod) + delete(bindCRDSpan, name) +} diff --git a/scheduler/pkg/tracing/carrier.go b/scheduler/pkg/tracing/carrier.go new file mode 100644 index 0000000000..d479d29d38 --- /dev/null +++ b/scheduler/pkg/tracing/carrier.go @@ -0,0 +1,25 @@ +package tracing + +import "fmt" + +// KeptnCarrier carries the TraceContext +type KeptnCarrier map[string]interface{} + +// Get returns the value associated with the passed key. +func (kc KeptnCarrier) Get(key string) string { + return fmt.Sprintf("%v", kc[key]) +} + +// Set stores the key-value pair. +func (kc KeptnCarrier) Set(key string, value string) { + kc[key] = value +} + +// Keys lists the keys stored in this carrier. +func (kc KeptnCarrier) Keys() []string { + keys := make([]string, 0, len(kc)) + for k := range kc { + keys = append(keys, k) + } + return keys +} diff --git a/scheduler/pkg/tracing/semconv.go b/scheduler/pkg/tracing/semconv.go new file mode 100644 index 0000000000..0be5ffad6b --- /dev/null +++ b/scheduler/pkg/tracing/semconv.go @@ -0,0 +1,41 @@ +package tracing + +import ( + "context" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +const ( + ApplicationName attribute.Key = attribute.Key("keptn.deployment.app_name") + Workload attribute.Key = attribute.Key("keptn.deployment.workload") + Version attribute.Key = attribute.Key("keptn.deployment.version") + Namespace attribute.Key = attribute.Key("keptn.deployment.namespace") + Status attribute.Key = attribute.Key("keptn.deployment.status") +) + +func CreateSpan(ctx context.Context, crd *unstructured.Unstructured, t trace.Tracer, ns string) (context.Context, trace.Span) { + // search for annotations + annotations, found, _ := unstructured.NestedMap(crd.UnstructuredContent(), "metadata", "annotations") + if found { + ctx = otel.GetTextMapPropagator().Extract(ctx, KeptnCarrier(annotations)) + } + ctx, span := t.Start(ctx, "schedule") + + appName, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "app") + if found { + span.SetAttributes(ApplicationName.String(appName)) + } + workload, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "workloadName") + if found { + span.SetAttributes(Workload.String(workload)) + } + version, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "version") + if found { + span.SetAttributes(Version.String(version)) + } + span.SetAttributes(Namespace.String(ns)) + return ctx, span +}