Skip to content

Commit

Permalink
Merge branch 'main' into not-removing-only-c-files-and-their-dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirdavid1 authored Aug 13, 2024
2 parents 50d70b6 + 7c8d508 commit 6af7dc3
Show file tree
Hide file tree
Showing 54 changed files with 2,695 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
- name: Verify Components Image Ready
run: |
declare -a REPOS=("odigos-autoscaler" "odigos-scheduler" "odigos-instrumentor" "odigos-odiglet" "odigos-collector" "odigos-enterprise-odiglet", "odigos-ui")
declare -a REPOS=("odigos-autoscaler" "odigos-scheduler" "odigos-instrumentor" "odigos-odiglet" "odigos-collector" "odigos-enterprise-odiglet" "odigos-ui")
TAG_TO_CHECK=${{ env.TAG }}
for REPO in "${REPOS[@]}"; do
Expand Down
72 changes: 40 additions & 32 deletions agents/python/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,48 @@ def _configure(self, **kwargs):


def _initialize_components():
trace_exporters, metric_exporters, log_exporters = sdk_config._import_exporters(
sdk_config._get_exporter_names("traces"),
sdk_config._get_exporter_names("metrics"),
sdk_config._get_exporter_names("logs"),
)

auto_resource = {
"telemetry.distro.name": "odigos",
"telemetry.distro.version": VERSION,
}

resource_attributes_event = threading.Event()
client = start_opamp_client(resource_attributes_event)
resource_attributes_event.wait(timeout=30) # Wait for the resource attributes to be received for 30 seconds

received_value = client.resource_attributes
client = None

if received_value:
auto_resource.update(received_value)

resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

try:

client = start_opamp_client(resource_attributes_event)
resource_attributes_event.wait(timeout=30) # Wait for the resource attributes to be received for 30 seconds

# Reorder the python sys.path to ensure that the user application's dependencies take precedence over the agent's dependencies.
# This is necessary because the user application's dependencies may be incompatible with those used by the agent.
reorder_python_path()
# Reload distro modules to ensure the new path is used.
reload_distro_modules()
received_value = client.resource_attributes

if received_value:
trace_exporters, metric_exporters, log_exporters = sdk_config._import_exporters(
sdk_config._get_exporter_names("traces"),
sdk_config._get_exporter_names("metrics"),
sdk_config._get_exporter_names("logs"),
)

auto_resource = {
"telemetry.distro.name": "odigos",
"telemetry.distro.version": VERSION,
}

auto_resource.update(received_value)

resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

# Reorder the python sys.path to ensure that the user application's dependencies take precedence over the agent's dependencies.
# This is necessary because the user application's dependencies may be incompatible with those used by the agent.
reorder_python_path()
# Reload distro modules to ensure the new path is used.
reload_distro_modules()

except Exception as e:
if client is not None:
client.shutdown(custom_failure_message=str(e))


def initialize_traces_if_enabled(trace_exporters, resource):
traces_enabled = os.getenv(sdk_config.OTEL_TRACES_EXPORTER, "none").strip().lower()
Expand All @@ -78,7 +86,7 @@ def start_opamp_client(event):
client = OpAMPHTTPClient(event, condition)

python_version_supported = is_supported_python_version()

client.start(python_version_supported)

def shutdown():
Expand Down
3 changes: 2 additions & 1 deletion agents/python/opamp/health_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ class AgentHealthStatus(str, Enum):
HEALTHY = "Healthy"
STARTING = "Starting"
UNSUPPORTED_RUNTIME_VERSION = "UnsupportedRuntimeVersion"
TERMINATED = "ProcessTerminated"
TERMINATED = "ProcessTerminated"
AGENT_FAILURE = "AgentFailure"
43 changes: 33 additions & 10 deletions agents/python/opamp/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,36 @@ def start(self, python_version_supported: bool = None):
self.client_thread.start()

def run(self):
if not self.mandatory_env_vars_set():
try:
if not self.mandatory_env_vars_set():
self.event.set()
return

self.send_first_message_with_retry()
self.event.set()
return

self.send_first_message_with_retry()

self.event.set()

self.worker()

except Exception as e:
opamp_logger.error(f"Error running OpAMP client: {e}")
failure_message = self.get_agent_failure_disconnect_message(error_message=str(e))
self.send_agent_to_server_message(failure_message)

# Exiting the opamp thread and set the event to notify the main thread
self.event.set()
sys.exit()

def get_agent_failure_disconnect_message(self, error_message: str) -> None:
agent_failure_message = opamp_pb2.AgentToServer()

self.worker()
agent_disconnect = self.get_agent_disconnect()
agent_failure_message.agent_disconnect.CopyFrom(agent_disconnect)

agent_health = self.get_agent_health(component_health=False, last_error=error_message, status=AgentHealthStatus.AGENT_FAILURE.value)
agent_failure_message.health.CopyFrom(agent_health)

return agent_failure_message

def send_unsupported_version_disconnect_message(self, error_message: str) -> None:
first_disconnect_message = opamp_pb2.AgentToServer()

Expand Down Expand Up @@ -220,11 +240,14 @@ def mandatory_env_vars_set(self):

return True

def shutdown(self):
def shutdown(self, custom_failure_message: str = None):
self.running = False
opamp_logger.info("Sending agent disconnect message to OpAMP server...")
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)
if custom_failure_message:
disconnect_message = self.get_agent_failure_disconnect_message(error_message=custom_failure_message)
else:
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)

with self.condition:
self.condition.notify_all()
Expand Down
6 changes: 5 additions & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/odigos-io/odigos/common v0.0.0
github.com/odigos-io/odigos/k8sutils v0.0.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.30.1
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -66,4 +67,7 @@ require (
sigs.k8s.io/yaml v1.3.0
)

replace github.com/odigos-io/odigos/common => ../common
replace (
github.com/odigos-io/odigos/common => ../common
github.com/odigos-io/odigos/k8sutils => ../k8sutils
)
7 changes: 4 additions & 3 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package v1alpha1

import (
"github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +kubebuilder:validation:Enum=CLUSTER_GATEWAY;NODE_COLLECTOR
type CollectorsGroupRole string
type CollectorsGroupRole k8sconsts.CollectorRole

const (
CollectorsGroupRoleClusterGateway CollectorsGroupRole = "CLUSTER_GATEWAY"
CollectorsGroupRoleNodeCollector CollectorsGroupRole = "NODE_COLLECTOR"
CollectorsGroupRoleClusterGateway CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleClusterGateway)
CollectorsGroupRoleNodeCollector CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleNodeCollector)
)

// CollectorsGroupSpec defines the desired state of Collector
Expand Down
82 changes: 75 additions & 7 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/consts"

"github.com/ghodss/yaml"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
Expand All @@ -24,6 +24,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList,
Expand All @@ -42,7 +44,7 @@ func SyncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
logger.Error(err, "failed to get desired config map")
return "", err
}
desiredData := desired.Data[consts.OdigosNodeCollectorConfigMapKey]
desiredData := desired.Data[constsK8s.OdigosNodeCollectorConfigMapKey]

existing := &v1.ConfigMap{}
if err := c.Get(ctx, client.ObjectKey{Namespace: datacollection.Namespace, Name: datacollection.Name}, existing); err != nil {
Expand Down Expand Up @@ -108,7 +110,7 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
Namespace: datacollection.Namespace,
},
Data: map[string]string{
consts.OdigosNodeCollectorConfigMapKey: cmData,
constsK8s.OdigosNodeCollectorConfigMapKey: cmData,
},
}

Expand Down Expand Up @@ -144,6 +146,23 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
}},
}
processorsCfg["resourcedetection"] = config.GenericMap{"detectors": []string{"ec2", "gcp", "azure"}}
processorsCfg["odigostrafficmetrics"] = config.GenericMap{
// adding the following resource attributes to the metrics allows to aggregate the metrics by source.
"res_attributes_keys": []string{
string(semconv.ServiceNameKey),
string(semconv.K8SNamespaceNameKey),
string(semconv.K8SDeploymentNameKey),
string(semconv.K8SStatefulSetNameKey),
string(semconv.K8SDaemonSetNameKey),
},
}
processorsCfg["resource/pod-name"] = config.GenericMap{
"attributes": []config.GenericMap{{
"key": "k8s.pod.name",
"value": "${POD_NAME}",
"action": "upsert",
}},
}

exporters := config.GenericMap{
"otlp/gateway": config.GenericMap{
Expand All @@ -152,6 +171,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
"insecure": true,
},
},
"otlp/odigos-own-telemetry-ui": config.GenericMap{
"endpoint": fmt.Sprintf("ui.%s:%d", env.GetCurrentNamespace(), consts.OTLPPort),
"tls": config.GenericMap{
"insecure": true,
},
"retry_on_failure": config.GenericMap{
"enabled": false,
},
},
}
tracesPipelineExporter := []string{"otlp/gateway"}

Expand All @@ -175,6 +203,28 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
},
"prometheus/self-metrics": config.GenericMap{
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*)",
"action": "keep",
},
},
},
},
},
},
},
Exporters: exporters,
Processors: processorsCfg,
Expand All @@ -184,8 +234,26 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
},
},
Service: config.Service{
Pipelines: map[string]config.Pipeline{},
Pipelines: map[string]config.Pipeline{
"metrics/otelcol": {
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
},
},
Extensions: []string{"health_check"},
Telemetry: config.Telemetry{
Metrics: config.GenericMap{
"address": "0.0.0.0:8888",
},
Resource: map[string]*string{
// The collector add "otelcol" as a service name, so we need to remove it
// to avoid duplication, since we are interested in the instrumented services.
string(semconv.ServiceNameKey): nil,
// The collector adds its own version as a service version, which is not needed currently.
string(semconv.ServiceVersionKey): nil,
},
},
},
}

Expand Down Expand Up @@ -245,15 +313,15 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["logs"] = config.Pipeline{
Receivers: []string{"filelog"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, logsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, logsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}

if collectTraces {
cfg.Service.Pipelines["traces"] = config.Pipeline{
Receivers: []string{"otlp"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, tracesProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, tracesProcessors...),
Exporters: tracesPipelineExporter,
}
}
Expand All @@ -268,7 +336,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o

cfg.Service.Pipelines["metrics"] = config.Pipeline{
Receivers: []string{"otlp", "kubeletstats"},
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection"}, metricsProcessors...),
Processors: append([]string{"batch", "odigosresourcename", "resource", "resourcedetection", "odigostrafficmetrics"}, metricsProcessors...),
Exporters: []string{"otlp/gateway"},
}
}
Expand Down
Loading

0 comments on commit 6af7dc3

Please sign in to comment.