From e43ee1259a7f146e44a9fb49949813122a30dbe9 Mon Sep 17 00:00:00 2001 From: Jesse Gumz Date: Wed, 8 Nov 2023 09:22:32 -0800 Subject: [PATCH] [apm] standardize peer tag aggregation (#20550) * add new config flag for peer tag aggregation, add peer tag defaults * add peer_tags_aggregation config * update aggregation to work with just peer tags, remove dependency on specific peer.service field * add tests for instantiating concentrator with different peer tag related configurations * fix tests for client stats aggregator * correct inconsistent naming for peer tags aggregation variable * fix configuration of peer tags aggregation flag * Update pkg/config/config_template.yaml Co-authored-by: May Lee * Update pkg/config/config_template.yaml Co-authored-by: May Lee * Update pkg/config/config_template.yaml Co-authored-by: May Lee * Update pkg/config/config_template.yaml Co-authored-by: May Lee * add _dd.base_service to default peer tags * add split_payload field for client stats payload protobuf * Revert "add split_payload field for client stats payload protobuf" This reverts commit fc01656278ede63ca5032b599316f842cedd18a0. * add splitPayload field to StatsPayload struct * mark StatsPayloads with splitPayload field * add test cases for instantiation of client stats aggregator * make peer_service a reserved field * remove old PeerService field references * remove PeerService field references in tests * add new benchmark * revamp benchmark to account for peer tags * correct logic for marking payloads as split * appease linter * add db.system to defaults for peer tags * allow either peer service aggregation or peer tags aggregation flag to enable all default peer tags * ensure that we only retrieve peer tag when its value is non-empty * update default peer tags * additional updates for peer tags list * revise documentation for flags related to peer tags aggregation * add release notes --------- Co-authored-by: May Lee --- comp/trace/config/setup.go | 4 + pkg/config/apm.go | 1 + pkg/config/config_template.yaml | 25 +- pkg/config/config_test.go | 29 +++ pkg/flare/envvars.go | 1 + pkg/proto/datadog/trace/stats.proto | 9 +- pkg/proto/pbgo/trace/stats.pb.go | 188 ++++++++------- pkg/proto/pbgo/trace/stats_gen.go | 70 +++--- pkg/proto/pbgo/trace/stats_vtproto.pb.go | 76 +++--- pkg/trace/config/config.go | 5 +- pkg/trace/stats/aggregation.go | 18 +- pkg/trace/stats/aggregation_test.go | 107 +++------ pkg/trace/stats/client_stats_aggregator.go | 48 ++-- .../stats/client_stats_aggregator_test.go | 208 ++++++---------- pkg/trace/stats/concentrator.go | 51 +++- pkg/trace/stats/concentrator_test.go | 174 +++++++++----- pkg/trace/stats/statsraw.go | 5 +- pkg/trace/stats/statsraw_test.go | 226 ++++-------------- pkg/trace/writer/stats.go | 3 + pkg/trace/writer/stats_test.go | 2 + ...tags-for-aggregation-3d708e776c0eb05c.yaml | 11 + 21 files changed, 585 insertions(+), 676 deletions(-) create mode 100644 releasenotes/notes/add-default-peer-tags-for-aggregation-3d708e776c0eb05c.yaml diff --git a/comp/trace/config/setup.go b/comp/trace/config/setup.go index 678bee09c44e0..a3a949b9945ab 100644 --- a/comp/trace/config/setup.go +++ b/comp/trace/config/setup.go @@ -223,6 +223,10 @@ func applyDatadogConfig(c *config.AgentConfig, core corecompcfg.Component) error c.ConnectionLimit = core.GetInt("apm_config.connection_limit") } c.PeerServiceAggregation = core.GetBool("apm_config.peer_service_aggregation") + if c.PeerServiceAggregation { + log.Warn("`apm_config.peer_service_aggregation` is deprecated, please use `apm_config.peer_tags_aggregation` instead") + } + c.PeerTagsAggregation = core.GetBool("apm_config.peer_tags_aggregation") c.ComputeStatsBySpanKind = core.GetBool("apm_config.compute_stats_by_span_kind") if core.IsSet("apm_config.peer_tags") { c.PeerTags = core.GetStringSlice("apm_config.peer_tags") diff --git a/pkg/config/apm.go b/pkg/config/apm.go index d33c09412e672..f527bd048a488 100644 --- a/pkg/config/apm.go +++ b/pkg/config/apm.go @@ -73,6 +73,7 @@ func setupAPM(config Config) { config.BindEnvAndSetDefault("apm_config.windows_pipe_security_descriptor", "D:AI(A;;GA;;;WD)", "DD_APM_WINDOWS_PIPE_SECURITY_DESCRIPTOR") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.remote_tagger", true, "DD_APM_REMOTE_TAGGER") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.peer_service_aggregation", false, "DD_APM_PEER_SERVICE_AGGREGATION") //nolint:errcheck + config.BindEnvAndSetDefault("apm_config.peer_tags_aggregation", false, "DD_APM_PEER_TAGS_AGGREGATION") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.compute_stats_by_span_kind", false, "DD_APM_COMPUTE_STATS_BY_SPAN_KIND") //nolint:errcheck config.BindEnvAndSetDefault("apm_config.instrumentation.enabled", false, "DD_APM_INSTRUMENTATION_ENABLED") config.BindEnvAndSetDefault("apm_config.instrumentation.enabled_namespaces", []string{}, "DD_APM_INSTRUMENTATION_ENABLED_NAMESPACES") diff --git a/pkg/config/config_template.yaml b/pkg/config/config_template.yaml index e4e18fed67ccc..c68eff4a07660 100644 --- a/pkg/config/config_template.yaml +++ b/pkg/config/config_template.yaml @@ -1502,19 +1502,26 @@ api_key: ## @param peer_service_aggregation - bool - default: false ## @env DD_APM_PEER_SERVICE_AGGREGATION - bool - default: false - ## [BETA] Enables `peer.service` aggregation in the agent. If disabled, aggregated trace stats will not include `peer.service` as a dimension. - ## For the best experience with `peer.service`, it is recommended to also enable `compute_stats_by_span_kind`. - ## If enabling both causes the Agent to consume too many resources, try disabling `compute_stats_by_span_kind` first. - ## If the overhead remains high, it will be due to a high cardinality of `peer.service` values from the traces. You may need to check your instrumentation. - ## NOTE: If you are using an OTel tracer it's best to have both enabled because client/producer spans with a `peer.service` value - ## may not be marked by the Agent as top-level spans. + ## DEPRECATED - please use `peer_tags_aggregation` instead. # peer_service_aggregation: false + ## @param peer_tags_aggregation - bool - default: false + ## @env DD_APM_PEER_TAGS_AGGREGATION - bool - default: false + ## [BETA] Enables aggregation of peer related tags (e.g., `peer.service`, `db.instance`, etc.) in the Agent. + ## If disabled, aggregated trace stats will not include these tags as dimensions on trace metrics. + ## For the best experience with peer tags, Datadog also recommends enabling `compute_stats_by_span_kind`. + ## If you are using an OTel tracer, it's best to have both enabled because client/producer spans with relevant peer tags + ## may not be marked by the Agent as top-level spans. + ## If enabling both causes the Agent to consume too many resources, try disabling `compute_stats_by_span_kind` first. + ## A high cardinality of peer tags or APM resources can also contribute to higher CPU and memory consumption. + ## You can check for the cardinality of these fields by making trace search queries in the Datadog UI. + ## The default list of peer tags can be found in pkg/trace/stats/concentrator.go. + # peer_tags_aggregation: false + ## @param peer_tags - list of strings - optional ## @env DD_APM_PEER_TAGS - list of strings - optional - ## [BETA] Enables additional tags related to peer.service. Will not be used unless peer_service_aggregation is also set. - ## NOTE: This option is recommended to get the most granular tagging alongside `peer.service`, but it will - ## also increase the computational overhead of aggregation. This list can be omitted if that cost is too high for your agent. + ## [BETA] Optional list of supplementary peer tags that go beyond the defaults. The Datadog backend validates all tags + ## and will drop ones that are unapproved. # peer_tags: [] ## @param features - list of strings - optional diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 38f9e157a6038..4a059af936283 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -870,6 +870,26 @@ apm_config: require.False(t, testConfig.GetBool("apm_config.peer_service_aggregation")) } +func TestEnablePeerTagsAggregationYAML(t *testing.T) { + datadogYaml := ` +apm_config: + peer_tags_aggregation: true +` + testConfig := SetupConfFromYAML(datadogYaml) + err := setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.True(t, testConfig.GetBool("apm_config.peer_tags_aggregation")) + + datadogYaml = ` +apm_config: + peer_tags_aggregation: false +` + testConfig = SetupConfFromYAML(datadogYaml) + err = setupFipsEndpoints(testConfig) + require.NoError(t, err) + require.False(t, testConfig.GetBool("apm_config.peer_tags_aggregation")) +} + func TestEnablePeerServiceStatsAggregationEnv(t *testing.T) { t.Setenv("DD_APM_PEER_SERVICE_AGGREGATION", "true") testConfig := SetupConfFromYAML("") @@ -879,6 +899,15 @@ func TestEnablePeerServiceStatsAggregationEnv(t *testing.T) { require.False(t, testConfig.GetBool("apm_config.peer_service_aggregation")) } +func TestEnablePeerTagsAggregationEnv(t *testing.T) { + t.Setenv("DD_APM_PEER_TAGS_AGGREGATION", "true") + testConfig := SetupConfFromYAML("") + require.True(t, testConfig.GetBool("apm_config.peer_tags_aggregation")) + t.Setenv("DD_APM_PEER_TAGS_AGGREGATION", "false") + testConfig = SetupConfFromYAML("") + require.False(t, testConfig.GetBool("apm_config.peer_tags_aggregation")) +} + func TestEnableStatsComputationBySpanKindYAML(t *testing.T) { datadogYaml := ` apm_config: diff --git a/pkg/flare/envvars.go b/pkg/flare/envvars.go index edd2780d8776b..88bfc42164981 100644 --- a/pkg/flare/envvars.go +++ b/pkg/flare/envvars.go @@ -71,6 +71,7 @@ var allowedEnvvarNames = []string{ "DD_APM_REMOTE_TAGGER", "DD_APM_PEER_SERVICE_AGGREGATION", "DD_APM_COMPUTE_STATS_BY_SPAN_KIND", + "DD_APM_PEER_TAGS_AGGREGATION", "DD_APM_PEER_TAGS", "DD_APM_MAX_CATALOG_SERVICES", "DD_APM_RECEIVER_TIMEOUT", diff --git a/pkg/proto/datadog/trace/stats.proto b/pkg/proto/datadog/trace/stats.proto index 250cfca0c4884..2732825fd8e6e 100644 --- a/pkg/proto/datadog/trace/stats.proto +++ b/pkg/proto/datadog/trace/stats.proto @@ -14,6 +14,9 @@ message StatsPayload { repeated ClientStatsPayload stats = 3; string agentVersion = 4; bool clientComputed = 5; + // splitPayload indicates if the payload is actually one of several payloads split out from a larger payload. + // This field can be used in the backend to signal if re-aggregation is necessary. + bool splitPayload = 6; } // ClientStatsPayload is the first layer of span stats aggregation. It is also @@ -70,9 +73,9 @@ message ClientGroupedStats { bytes errorSummary = 11; // ddsketch summary of error spans latencies encoded in protobuf bool synthetics = 12; // set to true on spans generated by synthetics traffic uint64 topLevelHits = 13; // count of top level spans aggregated in the groupedstats - string peer_service = 14; // name of the remote service that the `service` communicated with + reserved 14; // peer_service has been deprecated string span_kind = 15; // value of the span.kind tag on the span - // peer_tags are supplementary tags that further describe a peer_service - // E.g., `aws.s3.bucket` gives a specific bucket for `peer_service = aws-s3` + // peer_tags are supplementary tags that further describe a peer entity + // E.g., `grpc.target` to describe the name of a gRPC peer, or `db.hostname` to describe the name of peer DB repeated string peer_tags = 16; } diff --git a/pkg/proto/pbgo/trace/stats.pb.go b/pkg/proto/pbgo/trace/stats.pb.go index c9c51aba5a1a2..1bfab210824c5 100644 --- a/pkg/proto/pbgo/trace/stats.pb.go +++ b/pkg/proto/pbgo/trace/stats.pb.go @@ -32,6 +32,9 @@ type StatsPayload struct { Stats []*ClientStatsPayload `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats,omitempty" msg:"Stats,omitempty"` AgentVersion string `protobuf:"bytes,4,opt,name=agentVersion,proto3" json:"agentVersion,omitempty"` ClientComputed bool `protobuf:"varint,5,opt,name=clientComputed,proto3" json:"clientComputed,omitempty"` + // splitPayload indicates if the payload is actually one of several payloads split out from a larger payload. + // This field can be used in the backend to signal if re-aggregation is necessary. + SplitPayload bool `protobuf:"varint,6,opt,name=splitPayload,proto3" json:"splitPayload,omitempty"` } func (x *StatsPayload) Reset() { @@ -101,6 +104,13 @@ func (x *StatsPayload) GetClientComputed() bool { return false } +func (x *StatsPayload) GetSplitPayload() bool { + if x != nil { + return x.SplitPayload + } + return false +} + // ClientStatsPayload is the first layer of span stats aggregation. It is also // the payload sent by tracers to the agent when stats in tracer are enabled. type ClientStatsPayload struct { @@ -330,22 +340,23 @@ type ClientGroupedStats struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` - Resource string `protobuf:"bytes,3,opt,name=resource,proto3" json:"resource,omitempty"` - HTTPStatusCode uint32 `protobuf:"varint,4,opt,name=HTTP_status_code,json=HTTPStatusCode,proto3" json:"HTTP_status_code,omitempty"` - Type string `protobuf:"bytes,5,opt,name=type,proto3" json:"type,omitempty"` - DBType string `protobuf:"bytes,6,opt,name=DB_type,json=DBType,proto3" json:"DB_type,omitempty"` // db_type might be used in the future to help in the obfuscation step - Hits uint64 `protobuf:"varint,7,opt,name=hits,proto3" json:"hits,omitempty"` // count of all spans aggregated in the groupedstats - Errors uint64 `protobuf:"varint,8,opt,name=errors,proto3" json:"errors,omitempty"` // count of error spans aggregated in the groupedstats - Duration uint64 `protobuf:"varint,9,opt,name=duration,proto3" json:"duration,omitempty"` // total duration in nanoseconds of spans aggregated in the bucket - OkSummary []byte `protobuf:"bytes,10,opt,name=okSummary,proto3" json:"okSummary,omitempty"` // ddsketch summary of ok spans latencies encoded in protobuf - ErrorSummary []byte `protobuf:"bytes,11,opt,name=errorSummary,proto3" json:"errorSummary,omitempty"` // ddsketch summary of error spans latencies encoded in protobuf - Synthetics bool `protobuf:"varint,12,opt,name=synthetics,proto3" json:"synthetics,omitempty"` // set to true on spans generated by synthetics traffic - TopLevelHits uint64 `protobuf:"varint,13,opt,name=topLevelHits,proto3" json:"topLevelHits,omitempty"` // count of top level spans aggregated in the groupedstats - PeerService string `protobuf:"bytes,14,opt,name=peer_service,json=peerService,proto3" json:"peer_service,omitempty"` // name of the remote service that the `service` communicated with - SpanKind string `protobuf:"bytes,15,opt,name=span_kind,json=spanKind,proto3" json:"span_kind,omitempty"` // value of the span.kind tag on the span - PeerTags []string `protobuf:"bytes,16,rep,name=peer_tags,json=peerTags,proto3" json:"peer_tags,omitempty"` // supplementary tags that further describe peer_service + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Resource string `protobuf:"bytes,3,opt,name=resource,proto3" json:"resource,omitempty"` + HTTPStatusCode uint32 `protobuf:"varint,4,opt,name=HTTP_status_code,json=HTTPStatusCode,proto3" json:"HTTP_status_code,omitempty"` + Type string `protobuf:"bytes,5,opt,name=type,proto3" json:"type,omitempty"` + DBType string `protobuf:"bytes,6,opt,name=DB_type,json=DBType,proto3" json:"DB_type,omitempty"` // db_type might be used in the future to help in the obfuscation step + Hits uint64 `protobuf:"varint,7,opt,name=hits,proto3" json:"hits,omitempty"` // count of all spans aggregated in the groupedstats + Errors uint64 `protobuf:"varint,8,opt,name=errors,proto3" json:"errors,omitempty"` // count of error spans aggregated in the groupedstats + Duration uint64 `protobuf:"varint,9,opt,name=duration,proto3" json:"duration,omitempty"` // total duration in nanoseconds of spans aggregated in the bucket + OkSummary []byte `protobuf:"bytes,10,opt,name=okSummary,proto3" json:"okSummary,omitempty"` // ddsketch summary of ok spans latencies encoded in protobuf + ErrorSummary []byte `protobuf:"bytes,11,opt,name=errorSummary,proto3" json:"errorSummary,omitempty"` // ddsketch summary of error spans latencies encoded in protobuf + Synthetics bool `protobuf:"varint,12,opt,name=synthetics,proto3" json:"synthetics,omitempty"` // set to true on spans generated by synthetics traffic + TopLevelHits uint64 `protobuf:"varint,13,opt,name=topLevelHits,proto3" json:"topLevelHits,omitempty"` // count of top level spans aggregated in the groupedstats + SpanKind string `protobuf:"bytes,15,opt,name=span_kind,json=spanKind,proto3" json:"span_kind,omitempty"` // value of the span.kind tag on the span + // peer_tags are supplementary tags that further describe a peer entity + // E.g., `grpc.target` to describe the name of a gRPC peer, or `db.hostname` to describe the name of peer DB + PeerTags []string `protobuf:"bytes,16,rep,name=peer_tags,json=peerTags,proto3" json:"peer_tags,omitempty"` } func (x *ClientGroupedStats) Reset() { @@ -471,13 +482,6 @@ func (x *ClientGroupedStats) GetTopLevelHits() uint64 { return 0 } -func (x *ClientGroupedStats) GetPeerService() string { - if x != nil { - return x.PeerService - } - return "" -} - func (x *ClientGroupedStats) GetSpanKind() string { if x != nil { return x.SpanKind @@ -497,7 +501,7 @@ var File_datadog_trace_stats_proto protoreflect.FileDescriptor var file_datadog_trace_stats_proto_rawDesc = []byte{ 0x0a, 0x19, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x64, 0x61, 0x74, - 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x22, 0xd5, 0x01, 0x0a, 0x0c, 0x53, + 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x22, 0xf9, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x48, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x48, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, @@ -511,74 +515,74 @@ var file_datadog_trace_stats_proto_rawDesc = []byte{ 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x26, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, - 0x65, 0x64, 0x22, 0x84, 0x03, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, - 0x74, 0x73, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, - 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, - 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x76, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x65, 0x6e, 0x76, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x12, 0x36, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x20, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, - 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x42, 0x75, 0x63, 0x6b, - 0x65, 0x74, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x61, 0x6e, - 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x61, 0x6e, 0x67, 0x12, 0x24, 0x0a, - 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x49, 0x44, - 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x49, - 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x08, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x2a, 0x0a, - 0x10, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x67, - 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, - 0x49, 0x44, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, - 0x6e, 0x65, 0x72, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x0c, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xa6, 0x01, 0x0a, 0x11, 0x43, 0x6c, - 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, - 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, - 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x37, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x21, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, - 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, - 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, 0x66, 0x74, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, - 0x66, 0x74, 0x22, 0xe0, 0x03, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, - 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x48, 0x54, 0x54, 0x50, 0x5f, 0x73, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x48, - 0x54, 0x54, 0x50, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, - 0x65, 0x12, 0x17, 0x0a, 0x07, 0x44, 0x42, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x69, - 0x74, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x68, 0x69, 0x74, 0x73, 0x12, 0x16, - 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x6f, 0x6b, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x18, - 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6f, 0x6b, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, - 0x12, 0x22, 0x0a, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x75, 0x6d, - 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x79, 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, - 0x63, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x79, 0x6e, 0x74, 0x68, 0x65, - 0x74, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, - 0x48, 0x69, 0x74, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x4c, - 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, - 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x70, 0x65, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x73, - 0x70, 0x61, 0x6e, 0x5f, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x73, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x65, 0x65, 0x72, - 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x10, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x70, 0x65, 0x65, - 0x72, 0x54, 0x61, 0x67, 0x73, 0x42, 0x16, 0x5a, 0x14, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x67, 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x50, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x84, 0x03, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x76, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x65, 0x6e, 0x76, 0x12, 0x18, 0x0a, 0x07, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x36, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x12, 0x12, 0x0a, + 0x04, 0x6c, 0x61, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x61, 0x6e, + 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x72, 0x61, 0x63, 0x65, 0x72, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x75, 0x6e, 0x74, 0x69, + 0x6d, 0x65, 0x49, 0x44, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x75, 0x6e, 0x74, + 0x69, 0x6d, 0x65, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, + 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, + 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x61, 0x67, 0x65, + 0x6e, 0x74, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, + 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, + 0x69, 0x6e, 0x65, 0x72, 0x49, 0x44, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, + 0x73, 0x18, 0x0c, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xa6, 0x01, + 0x0a, 0x11, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x42, 0x75, 0x63, + 0x6b, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x37, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, + 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x12, 0x26, + 0x0a, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, 0x66, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, + 0x65, 0x53, 0x68, 0x69, 0x66, 0x74, 0x22, 0xc3, 0x03, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x18, 0x0a, + 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x48, 0x54, 0x54, 0x50, 0x5f, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0d, 0x52, 0x0e, 0x48, 0x54, 0x54, 0x50, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x44, 0x42, 0x5f, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x68, 0x69, 0x74, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x68, 0x69, + 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, 0x08, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x6f, 0x6b, 0x53, 0x75, 0x6d, 0x6d, + 0x61, 0x72, 0x79, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6f, 0x6b, 0x53, 0x75, 0x6d, + 0x6d, 0x61, 0x72, 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x75, 0x6d, + 0x6d, 0x61, 0x72, 0x79, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x79, 0x6e, 0x74, + 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x79, + 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x4c, + 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, + 0x74, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x12, 0x1b, 0x0a, 0x09, + 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x73, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x65, 0x65, + 0x72, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x10, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x70, 0x65, + 0x65, 0x72, 0x54, 0x61, 0x67, 0x73, 0x4a, 0x04, 0x08, 0x0e, 0x10, 0x0f, 0x42, 0x16, 0x5a, 0x14, + 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x67, 0x6f, 0x2f, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/proto/pbgo/trace/stats_gen.go b/pkg/proto/pbgo/trace/stats_gen.go index e6d757b58cb51..0cb617b131962 100644 --- a/pkg/proto/pbgo/trace/stats_gen.go +++ b/pkg/proto/pbgo/trace/stats_gen.go @@ -102,12 +102,6 @@ func (z *ClientGroupedStats) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "TopLevelHits") return } - case "PeerService": - z.PeerService, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "PeerService") - return - } case "SpanKind": z.SpanKind, err = dc.ReadString() if err != nil { @@ -146,9 +140,9 @@ func (z *ClientGroupedStats) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 16 + // map header, size 15 // write "Service" - err = en.Append(0xde, 0x0, 0x10, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + err = en.Append(0x8f, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) if err != nil { return } @@ -277,16 +271,6 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "TopLevelHits") return } - // write "PeerService" - err = en.Append(0xab, 0x50, 0x65, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) - if err != nil { - return - } - err = en.WriteString(z.PeerService) - if err != nil { - err = msgp.WrapError(err, "PeerService") - return - } // write "SpanKind" err = en.Append(0xa8, 0x53, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64) if err != nil { @@ -320,9 +304,9 @@ func (z *ClientGroupedStats) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *ClientGroupedStats) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 16 + // map header, size 15 // string "Service" - o = append(o, 0xde, 0x0, 0x10, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + o = append(o, 0x8f, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) o = msgp.AppendString(o, z.Service) // string "Name" o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) @@ -360,9 +344,6 @@ func (z *ClientGroupedStats) MarshalMsg(b []byte) (o []byte, err error) { // string "TopLevelHits" o = append(o, 0xac, 0x54, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73) o = msgp.AppendUint64(o, z.TopLevelHits) - // string "PeerService" - o = append(o, 0xab, 0x50, 0x65, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) - o = msgp.AppendString(o, z.PeerService) // string "SpanKind" o = append(o, 0xa8, 0x53, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64) o = msgp.AppendString(o, z.SpanKind) @@ -471,12 +452,6 @@ func (z *ClientGroupedStats) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "TopLevelHits") return } - case "PeerService": - z.PeerService, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "PeerService") - return - } case "SpanKind": z.SpanKind, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -516,7 +491,7 @@ func (z *ClientGroupedStats) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *ClientGroupedStats) Msgsize() (s int) { - s = 3 + 8 + msgp.StringPrefixSize + len(z.Service) + 5 + msgp.StringPrefixSize + len(z.Name) + 9 + msgp.StringPrefixSize + len(z.Resource) + 15 + msgp.Uint32Size + 5 + msgp.StringPrefixSize + len(z.Type) + 7 + msgp.StringPrefixSize + len(z.DBType) + 5 + msgp.Uint64Size + 7 + msgp.Uint64Size + 9 + msgp.Uint64Size + 10 + msgp.BytesPrefixSize + len(z.OkSummary) + 13 + msgp.BytesPrefixSize + len(z.ErrorSummary) + 11 + msgp.BoolSize + 13 + msgp.Uint64Size + 12 + msgp.StringPrefixSize + len(z.PeerService) + 9 + msgp.StringPrefixSize + len(z.SpanKind) + 9 + msgp.ArrayHeaderSize + s = 1 + 8 + msgp.StringPrefixSize + len(z.Service) + 5 + msgp.StringPrefixSize + len(z.Name) + 9 + msgp.StringPrefixSize + len(z.Resource) + 15 + msgp.Uint32Size + 5 + msgp.StringPrefixSize + len(z.Type) + 7 + msgp.StringPrefixSize + len(z.DBType) + 5 + msgp.Uint64Size + 7 + msgp.Uint64Size + 9 + msgp.Uint64Size + 10 + msgp.BytesPrefixSize + len(z.OkSummary) + 13 + msgp.BytesPrefixSize + len(z.ErrorSummary) + 11 + msgp.BoolSize + 13 + msgp.Uint64Size + 9 + msgp.StringPrefixSize + len(z.SpanKind) + 9 + msgp.ArrayHeaderSize for za0001 := range z.PeerTags { s += msgp.StringPrefixSize + len(z.PeerTags[za0001]) } @@ -1413,6 +1388,12 @@ func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ClientComputed") return } + case "SplitPayload": + z.SplitPayload, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "SplitPayload") + return + } default: err = dc.Skip() if err != nil { @@ -1427,8 +1408,8 @@ func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { // omitempty: check for empty values - zb0001Len := uint32(5) - var zb0001Mask uint8 /* 5 bits */ + zb0001Len := uint32(6) + var zb0001Mask uint8 /* 6 bits */ if z.Stats == nil { zb0001Len-- zb0001Mask |= 0x4 @@ -1507,6 +1488,16 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ClientComputed") return } + // write "SplitPayload" + err = en.Append(0xac, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64) + if err != nil { + return + } + err = en.WriteBool(z.SplitPayload) + if err != nil { + err = msgp.WrapError(err, "SplitPayload") + return + } return } @@ -1514,8 +1505,8 @@ func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { func (z *StatsPayload) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // omitempty: check for empty values - zb0001Len := uint32(5) - var zb0001Mask uint8 /* 5 bits */ + zb0001Len := uint32(6) + var zb0001Mask uint8 /* 6 bits */ if z.Stats == nil { zb0001Len-- zb0001Mask |= 0x4 @@ -1553,6 +1544,9 @@ func (z *StatsPayload) MarshalMsg(b []byte) (o []byte, err error) { // string "ClientComputed" o = append(o, 0xae, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x64) o = msgp.AppendBool(o, z.ClientComputed) + // string "SplitPayload" + o = append(o, 0xac, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64) + o = msgp.AppendBool(o, z.SplitPayload) return } @@ -1628,6 +1622,12 @@ func (z *StatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ClientComputed") return } + case "SplitPayload": + z.SplitPayload, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SplitPayload") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1650,6 +1650,6 @@ func (z *StatsPayload) Msgsize() (s int) { s += z.Stats[za0001].Msgsize() } } - s += 13 + msgp.StringPrefixSize + len(z.AgentVersion) + 15 + msgp.BoolSize + s += 13 + msgp.StringPrefixSize + len(z.AgentVersion) + 15 + msgp.BoolSize + 13 + msgp.BoolSize return } diff --git a/pkg/proto/pbgo/trace/stats_vtproto.pb.go b/pkg/proto/pbgo/trace/stats_vtproto.pb.go index 8815beafaf602..a144ea7254707 100644 --- a/pkg/proto/pbgo/trace/stats_vtproto.pb.go +++ b/pkg/proto/pbgo/trace/stats_vtproto.pb.go @@ -47,6 +47,16 @@ func (m *StatsPayload) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.SplitPayload { + i-- + if m.SplitPayload { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x30 + } if m.ClientComputed { i-- if m.ClientComputed { @@ -323,13 +333,6 @@ func (m *ClientGroupedStats) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i-- dAtA[i] = 0x7a } - if len(m.PeerService) > 0 { - i -= len(m.PeerService) - copy(dAtA[i:], m.PeerService) - i = encodeVarint(dAtA, i, uint64(len(m.PeerService))) - i-- - dAtA[i] = 0x72 - } if m.TopLevelHits != 0 { i = encodeVarint(dAtA, i, uint64(m.TopLevelHits)) i-- @@ -444,6 +447,9 @@ func (m *StatsPayload) SizeVT() (n int) { if m.ClientComputed { n += 2 } + if m.SplitPayload { + n += 2 + } n += len(m.unknownFields) return n } @@ -586,10 +592,6 @@ func (m *ClientGroupedStats) SizeVT() (n int) { if m.TopLevelHits != 0 { n += 1 + sov(uint64(m.TopLevelHits)) } - l = len(m.PeerService) - if l > 0 { - n += 1 + l + sov(uint64(l)) - } l = len(m.SpanKind) if l > 0 { n += 1 + l + sov(uint64(l)) @@ -783,6 +785,26 @@ func (m *StatsPayload) UnmarshalVT(dAtA []byte) error { } } m.ClientComputed = bool(v != 0) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SplitPayload", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SplitPayload = bool(v != 0) default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) @@ -1743,38 +1765,6 @@ func (m *ClientGroupedStats) UnmarshalVT(dAtA []byte) error { break } } - case 14: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PeerService", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.PeerService = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex case 15: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field SpanKind", wireType) diff --git a/pkg/trace/config/config.go b/pkg/trace/config/config.go index 2f631973ec891..0f40e0739f304 100644 --- a/pkg/trace/config/config.go +++ b/pkg/trace/config/config.go @@ -281,9 +281,10 @@ type AgentConfig struct { // Concentrator BucketInterval time.Duration // the size of our pre-aggregation per bucket ExtraAggregators []string // DEPRECATED - PeerServiceAggregation bool // enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator + PeerServiceAggregation bool // TO BE DEPRECATED - enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator + PeerTagsAggregation bool // enables/disables stats aggregation for peer entity tags, used by Concentrator and ClientStatsAggregator ComputeStatsBySpanKind bool // enables/disables the computing of stats based on a span's `span.kind` field - PeerTags []string // additional tags to use for peer.service-related stats aggregation + PeerTags []string // additional tags to use for peer entity stats aggregation // Sampler configuration ExtraSampleRate float64 diff --git a/pkg/trace/stats/aggregation.go b/pkg/trace/stats/aggregation.go index 25c4008477bff..a69c18a7de742 100644 --- a/pkg/trace/stats/aggregation.go +++ b/pkg/trace/stats/aggregation.go @@ -17,10 +17,9 @@ import ( ) const ( - tagStatusCode = "http.status_code" - tagSynthetics = "synthetics" - tagPeerService = "peer.service" - tagSpanKind = "span.kind" + tagStatusCode = "http.status_code" + tagSynthetics = "synthetics" + tagSpanKind = "span.kind" ) // Aggregation contains all the dimension on which we aggregate statistics. @@ -33,7 +32,6 @@ type Aggregation struct { type BucketsAggregationKey struct { Service string Name string - PeerService string Resource string Type string SpanKind string @@ -74,7 +72,7 @@ func clientOrProducer(spanKind string) bool { } // NewAggregationFromSpan creates a new aggregation from the provided span and env -func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregationKey, enablePeerSvcAgg bool, peerTagKeys []string) (Aggregation, []string) { +func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregationKey, enablePeerTagsAgg bool, peerTagKeys []string) (Aggregation, []string) { synthetics := strings.HasPrefix(origin, tagSynthetics) agg := Aggregation{ PayloadAggregationKey: aggKey, @@ -89,10 +87,7 @@ func NewAggregationFromSpan(s *pb.Span, origin string, aggKey PayloadAggregation }, } var peerTags []string - if clientOrProducer(agg.SpanKind) { - if enablePeerSvcAgg { - agg.PeerService = s.Meta[tagPeerService] - } + if clientOrProducer(agg.SpanKind) && enablePeerTagsAgg { peerTags = matchingPeerTags(s, peerTagKeys) agg.PeerTagsHash = peerTagsHash(peerTags) } @@ -105,7 +100,7 @@ func matchingPeerTags(s *pb.Span, peerTagKeys []string) []string { } var pt []string for _, t := range peerTagKeys { - if v, ok := s.Meta[t]; ok { + if v, ok := s.Meta[t]; ok && v != "" { pt = append(pt, t+":"+v) } } @@ -135,7 +130,6 @@ func NewAggregationFromGroup(g *pb.ClientGroupedStats) Aggregation { BucketsAggregationKey: BucketsAggregationKey{ Resource: g.Resource, Service: g.Service, - PeerService: g.PeerService, Name: g.Name, SpanKind: g.SpanKind, StatusCode: g.HTTPStatusCode, diff --git a/pkg/trace/stats/aggregation_test.go b/pkg/trace/stats/aggregation_test.go index 9c324f512789b..2dda81a7e2e74 100644 --- a/pkg/trace/stats/aggregation_test.go +++ b/pkg/trace/stats/aggregation_test.go @@ -54,25 +54,32 @@ func TestGetStatusCode(t *testing.T) { } func TestNewAggregation(t *testing.T) { + peerTags := []string{"db.instance", "db.system", "peer.service"} + peerSvcOnlyHash := uint64(3430395298086625290) + peerTagsHash := uint64(9894752672193411515) for _, tt := range []struct { - in *pb.Span - enablePeerSvcAgg bool - resAgg Aggregation - resPeerTags []string + name string + in *pb.Span + enablePeerTagsAgg bool + resAgg Aggregation + resPeerTags []string }{ { + "nil case, peer tag aggregation disabled", &pb.Span{}, false, Aggregation{}, nil, }, { + "nil case, peer tag aggregation enabled", &pb.Span{}, true, Aggregation{}, nil, }, { + "peer tag aggregation disabled even though peer.service is present", &pb.Span{ Service: "a", Meta: map[string]string{"span.kind": "client", "peer.service": "remote-service"}, @@ -81,8 +88,8 @@ func TestNewAggregation(t *testing.T) { Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client"}}, nil, }, - // peer.service stats aggregation is enabled, but span.kind != (client, producer). { + "peer tags aggregation enabled, but span.kind != (client, producer)", &pb.Span{ Service: "a", Meta: map[string]string{"span.kind": "", "peer.service": "remote-service"}, @@ -92,103 +99,59 @@ func TestNewAggregation(t *testing.T) { nil, }, { + "peer tags aggregation enabled, span.kind == client", &pb.Span{ Service: "a", Meta: map[string]string{"span.kind": "client", "peer.service": "remote-service"}, }, true, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerService: "remote-service"}}, - nil, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerSvcOnlyHash}}, + []string{"peer.service:remote-service"}, }, { + "peer tags aggregation enabled, span.kind == producer", &pb.Span{ Service: "a", Meta: map[string]string{"span.kind": "producer", "peer.service": "remote-service"}, }, true, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "producer", PeerService: "remote-service"}}, - nil, - }, - { - &pb.Span{ - Service: "service", - Name: "operation", - Resource: "resource", - Meta: map[string]string{ - "span.kind": "client", - "peer.service": "remote-service", - "http.status_code": "200", - }, - }, - true, - Aggregation{ - BucketsAggregationKey: BucketsAggregationKey{ - Service: "service", - Name: "operation", - PeerService: "remote-service", - Resource: "resource", - SpanKind: "client", - StatusCode: 200, - Synthetics: false, - }, - }, - nil, - }, - } { - agg, et := NewAggregationFromSpan(tt.in, "", PayloadAggregationKey{}, tt.enablePeerSvcAgg, nil) - assert.Equal(t, tt.resAgg, agg) - assert.Equal(t, tt.resPeerTags, et) - } -} - -func TestNewAggregationPeerTags(t *testing.T) { - peerTags := []string{"db.instance", "db.system"} - for _, tt := range []struct { - in *pb.Span - resAgg Aggregation - resPeerTags []string - }{ - { - &pb.Span{}, - Aggregation{}, - nil, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "producer", PeerTagsHash: peerSvcOnlyHash}}, + []string{"peer.service:remote-service"}, }, { + "peer tags aggregation enabled and multiple peer tags match", &pb.Span{ Service: "a", - Meta: map[string]string{"span.kind": "", "field1": "val1", "db.instance": "i-1234", "db.system": "postgres"}, + Meta: map[string]string{"span.kind": "client", "field1": "val1", "peer.service": "remote-service", "db.instance": "i-1234", "db.system": "postgres"}, }, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", PeerTagsHash: 0}}, - nil, + true, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerTagsHash}}, + []string{"db.instance:i-1234", "db.system:postgres", "peer.service:remote-service"}, }, { + "peer tags aggregation enabled but all peer tags are empty", &pb.Span{ Service: "a", - Meta: map[string]string{"span.kind": "server", "field1": "val1", "db.instance": "i-1234", "db.system": "postgres"}, + Meta: map[string]string{"span.kind": "client", "field1": "val1", "peer.service": "", "db.instance": "", "db.system": ""}, }, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "server", PeerTagsHash: 0}}, + true, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: 0}}, nil, }, { + "peer tags aggregation enabled but some peer tags are empty", &pb.Span{ Service: "a", - Meta: map[string]string{"span.kind": "client", "field1": "val1", "db.instance": "i-1234", "db.system": "postgres"}, + Meta: map[string]string{"span.kind": "client", "field1": "val1", "peer.service": "remote-service", "db.instance": "", "db.system": ""}, }, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: 17292111254139093926}}, - []string{"db.instance:i-1234", "db.system:postgres"}, - }, - { - &pb.Span{ - Service: "a", - Meta: map[string]string{"span.kind": "producer", "field1": "val1", "db.instance": "i-1234", "db.system": "postgres"}, - }, - Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "producer", PeerTagsHash: 17292111254139093926}}, - []string{"db.instance:i-1234", "db.system:postgres"}, + true, + Aggregation{BucketsAggregationKey: BucketsAggregationKey{Service: "a", SpanKind: "client", PeerTagsHash: peerSvcOnlyHash}}, + []string{"peer.service:remote-service"}, }, } { - agg, et := NewAggregationFromSpan(tt.in, "", PayloadAggregationKey{}, true, peerTags) - assert.Equal(t, tt.resAgg, agg) - assert.Equal(t, tt.resPeerTags, et) + agg, et := NewAggregationFromSpan(tt.in, "", PayloadAggregationKey{}, tt.enablePeerTagsAgg, peerTags) + assert.Equal(t, tt.resAgg, agg, tt.name) + assert.Equal(t, tt.resPeerTags, et, tt.name) } } diff --git a/pkg/trace/stats/client_stats_aggregator.go b/pkg/trace/stats/client_stats_aggregator.go index 8a083c4c10285..2a5035a68ecf5 100644 --- a/pkg/trace/stats/client_stats_aggregator.go +++ b/pkg/trace/stats/client_stats_aggregator.go @@ -39,12 +39,12 @@ type ClientStatsAggregator struct { out chan *pb.StatsPayload buckets map[int64]*bucket // buckets used to aggregate client stats - flushTicker *time.Ticker - oldestTs time.Time - agentEnv string - agentHostname string - agentVersion string - peerSvcAggregation bool // flag to enable peer.service aggregation + flushTicker *time.Ticker + oldestTs time.Time + agentEnv string + agentHostname string + agentVersion string + peerTagsAggregation bool // flag to enable aggregation over peer tags exit chan struct{} done chan struct{} @@ -53,17 +53,17 @@ type ClientStatsAggregator struct { // NewClientStatsAggregator initializes a new aggregator ready to be started func NewClientStatsAggregator(conf *config.AgentConfig, out chan *pb.StatsPayload) *ClientStatsAggregator { c := &ClientStatsAggregator{ - flushTicker: time.NewTicker(time.Second), - In: make(chan *pb.ClientStatsPayload, 10), - buckets: make(map[int64]*bucket, 20), - out: out, - agentEnv: conf.DefaultEnv, - agentHostname: conf.Hostname, - agentVersion: conf.AgentVersion, - peerSvcAggregation: conf.PeerServiceAggregation, - oldestTs: alignAggTs(time.Now().Add(bucketDuration - oldestBucketStart)), - exit: make(chan struct{}), - done: make(chan struct{}), + flushTicker: time.NewTicker(time.Second), + In: make(chan *pb.ClientStatsPayload, 10), + buckets: make(map[int64]*bucket, 20), + out: out, + agentEnv: conf.DefaultEnv, + agentHostname: conf.Hostname, + agentVersion: conf.AgentVersion, + peerTagsAggregation: conf.PeerServiceAggregation || conf.PeerTagsAggregation, + oldestTs: alignAggTs(time.Now().Add(bucketDuration - oldestBucketStart)), + exit: make(chan struct{}), + done: make(chan struct{}), } return c } @@ -138,7 +138,7 @@ func (a *ClientStatsAggregator) add(now time.Time, p *pb.ClientStatsPayload) { a.buckets[ts.Unix()] = b } p.Stats = []*pb.ClientStatsBucket{clientBucket} - a.flush(b.add(p, a.peerSvcAggregation)) + a.flush(b.add(p, a.peerTagsAggregation)) } } @@ -209,7 +209,7 @@ func (b *bucket) add(p *pb.ClientStatsPayload, enablePeerSvcAgg bool) []*pb.Clie return []*pb.ClientStatsPayload{trimCounts(p)} } -func (b *bucket) aggregateCounts(p *pb.ClientStatsPayload, enablePeerSvcAgg bool) { +func (b *bucket) aggregateCounts(p *pb.ClientStatsPayload, enablePeerTagsAgg bool) { payloadAggKey := newPayloadAggregationKey(p.Env, p.Hostname, p.Version, p.ContainerID) payloadAgg, ok := b.agg[payloadAggKey] if !ok { @@ -225,12 +225,12 @@ func (b *bucket) aggregateCounts(p *pb.ClientStatsPayload, enablePeerSvcAgg bool if sb == nil { continue } - aggKey := newBucketAggregationKey(sb, enablePeerSvcAgg) + aggKey := newBucketAggregationKey(sb, enablePeerTagsAgg) agg, ok := payloadAgg[aggKey] if !ok { agg = &aggregatedCounts{} payloadAgg[aggKey] = agg - if enablePeerSvcAgg { + if enablePeerTagsAgg { agg.peerTags = sb.PeerTags } } @@ -255,7 +255,6 @@ func (b *bucket) aggregationToPayloads() []*pb.ClientStatsPayload { for aggrKey, counts := range aggrCounts { stats = append(stats, &pb.ClientGroupedStats{ Service: aggrKey.Service, - PeerService: aggrKey.PeerService, Name: aggrKey.Name, SpanKind: aggrKey.SpanKind, Resource: aggrKey.Resource, @@ -289,7 +288,7 @@ func newPayloadAggregationKey(env, hostname, version, cid string) PayloadAggrega return PayloadAggregationKey{Env: env, Hostname: hostname, Version: version, ContainerID: cid} } -func newBucketAggregationKey(b *pb.ClientGroupedStats, enablePeerSvcAgg bool) BucketsAggregationKey { +func newBucketAggregationKey(b *pb.ClientGroupedStats, enablePeerTagsAgg bool) BucketsAggregationKey { k := BucketsAggregationKey{ Service: b.Service, Name: b.Name, @@ -299,8 +298,7 @@ func newBucketAggregationKey(b *pb.ClientGroupedStats, enablePeerSvcAgg bool) Bu Synthetics: b.Synthetics, StatusCode: b.HTTPStatusCode, } - if enablePeerSvcAgg { - k.PeerService = b.PeerService + if enablePeerTagsAgg { k.PeerTagsHash = peerTagsHash(b.GetPeerTags()) } return k diff --git a/pkg/trace/stats/client_stats_aggregator_test.go b/pkg/trace/stats/client_stats_aggregator_test.go index 2e7e8645067f6..43cb03d60aa36 100644 --- a/pkg/trace/stats/client_stats_aggregator_test.go +++ b/pkg/trace/stats/client_stats_aggregator_test.go @@ -54,7 +54,6 @@ func payloadWithCounts(ts time.Time, k BucketsAggregationKey, hits, errors, dura Stats: []*proto.ClientGroupedStats{ { Service: k.Service, - PeerService: k.PeerService, Name: k.Name, SpanKind: k.SpanKind, Resource: k.Resource, @@ -244,8 +243,8 @@ func TestFuzzCountFields(t *testing.T) { assert := assert.New(t) for i := 0; i < 30; i++ { a := newTestAggregator() - // Ensure that peer.service aggregation is on. Some tests may expect non-empty values for peer.service. - a.peerSvcAggregation = true + // Ensure that peer tags aggregation is on. Some tests may expect non-empty values the peer tags. + a.peerTagsAggregation = true payloadTime := time.Now().Truncate(bucketDuration) merge1 := getTestStatsWithStart(payloadTime) @@ -267,13 +266,13 @@ func TestFuzzCountFields(t *testing.T) { if s == nil { continue } - actual = append(actual, s) + expected = append(expected, s) } for _, s := range aggCounts.Stats[0].Stats[0].Stats { if s == nil { continue } - expected = append(expected, s) + actual = append(actual, s) } assert.ElementsMatch(pb.PbToStringSlice(expected), pb.PbToStringSlice(actual)) @@ -367,142 +366,35 @@ func TestCountAggregation(t *testing.T) { } } -func TestCountAggregationPeerService(t *testing.T) { - assert := assert.New(t) - type tt struct { - k BucketsAggregationKey - res *proto.ClientGroupedStats - name string - enablePeerSvcAgg bool - } - tts := []tt{ - { - BucketsAggregationKey{Service: "s"}, - &proto.ClientGroupedStats{Service: "s"}, - "service", - false, - }, - { - BucketsAggregationKey{Name: "n"}, - &proto.ClientGroupedStats{Name: "n"}, - "name", - false, - }, - { - BucketsAggregationKey{Resource: "r"}, - &proto.ClientGroupedStats{Resource: "r"}, - "resource", - false, - }, - { - BucketsAggregationKey{Type: "t"}, - &proto.ClientGroupedStats{Type: "t"}, - "resource", - false, - }, - { - BucketsAggregationKey{Synthetics: true}, - &proto.ClientGroupedStats{Synthetics: true}, - "synthetics", - false, - }, - { - BucketsAggregationKey{StatusCode: 10}, - &proto.ClientGroupedStats{HTTPStatusCode: 10}, - "status", - false, - }, - { - BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, - &proto.ClientGroupedStats{Service: "s", PeerService: ""}, - "peer.service disabled", - false, - }, - { - BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, - &proto.ClientGroupedStats{Service: "s", PeerService: "remote-service"}, - "peer.service enabled", - true, - }, - { - BucketsAggregationKey{SpanKind: "client"}, - &proto.ClientGroupedStats{SpanKind: "client"}, - "span.kind", - false, - }, - } - for _, tc := range tts { - t.Run(tc.name, func(t *testing.T) { - a := newTestAggregator() - a.peerSvcAggregation = tc.enablePeerSvcAgg - testTime := time.Unix(time.Now().Unix(), 0) - - c1 := payloadWithCounts(testTime, tc.k, 11, 7, 100) - c2 := payloadWithCounts(testTime, tc.k, 27, 2, 300) - c3 := payloadWithCounts(testTime, tc.k, 5, 10, 3) - keyDefault := BucketsAggregationKey{} - cDefault := payloadWithCounts(testTime, keyDefault, 0, 2, 4) - - assert.Len(a.out, 0) - a.add(testTime, deepCopy(c1)) - a.add(testTime, deepCopy(c2)) - a.add(testTime, deepCopy(c3)) - a.add(testTime, deepCopy(cDefault)) - assert.Len(a.out, 3) - a.flushOnTime(testTime.Add(oldestBucketStart + time.Nanosecond)) - assert.Len(a.out, 4) - - assertDistribPayload(t, wrapPayloads([]*proto.ClientStatsPayload{c1, c2}), <-a.out) - assertDistribPayload(t, wrapPayload(c3), <-a.out) - assertDistribPayload(t, wrapPayload(cDefault), <-a.out) - aggCounts := <-a.out - assertAggCountsPayload(t, aggCounts) - - tc.res.Hits = 43 - tc.res.Errors = 19 - tc.res.Duration = 403 - assert.ElementsMatch(aggCounts.Stats[0].Stats[0].Stats, []*proto.ClientGroupedStats{ - tc.res, - // Additional grouped stat object that corresponds to the keyDefault/cDefault. - // We do not expect this to be aggregated with the non-default key in the test. - { - Hits: 0, - Errors: 2, - Duration: 4, - }, - }) - assert.Len(a.buckets, 0) - }) - } -} - func TestCountAggregationPeerTags(t *testing.T) { - assert := assert.New(t) - peerTags := []string{"db.instance:a", "db.system:b"} + peerTags := []string{"db.instance:a", "db.system:b", "peer.service:remote-service"} type tt struct { - k BucketsAggregationKey - res *proto.ClientGroupedStats - name string - enablePeerSvcAgg bool + k BucketsAggregationKey + res *proto.ClientGroupedStats + name string + enablePeerTagsAgg bool } + // The fnv64a hash of the peerTags var. + peerTagsHash := uint64(8580633704111928789) tts := []tt{ { - BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, - &proto.ClientGroupedStats{Service: "s", PeerService: ""}, - "peer.service aggregation disabled", + BucketsAggregationKey{Service: "s", Name: "test.op"}, + &proto.ClientGroupedStats{Service: "s", Name: "test.op"}, + "peer tags aggregation disabled", false, }, { - BucketsAggregationKey{Service: "s", PeerService: "remote-service"}, - &proto.ClientGroupedStats{Service: "s", PeerService: "remote-service", PeerTags: peerTags}, - "peer.service aggregation enabled", + BucketsAggregationKey{Service: "s", PeerTagsHash: peerTagsHash}, + &proto.ClientGroupedStats{Service: "s", PeerTags: peerTags}, + "peer tags aggregation enabled", true, }, } for _, tc := range tts { t.Run(tc.name, func(t *testing.T) { + assert := assert.New(t) a := newTestAggregator() - a.peerSvcAggregation = tc.enablePeerSvcAgg + a.peerTagsAggregation = tc.enablePeerTagsAgg testTime := time.Unix(time.Now().Unix(), 0) c1 := payloadWithCounts(testTime, tc.k, 11, 7, 100) @@ -547,16 +439,18 @@ func TestCountAggregationPeerTags(t *testing.T) { } } -func TestNewBucketAggregationKeyPeerService(t *testing.T) { +func TestNewBucketAggregationKeyPeerTags(t *testing.T) { + // The hash of "peer.service:remote-service". + peerTagsHash := uint64(3430395298086625290) t.Run("disabled", func(t *testing.T) { assert := assert.New(t) - r := newBucketAggregationKey(&proto.ClientGroupedStats{Service: "a", PeerService: "remote-test"}, false) + r := newBucketAggregationKey(&proto.ClientGroupedStats{Service: "a", PeerTags: []string{"peer.service:remote-service"}}, false) assert.Equal(BucketsAggregationKey{Service: "a"}, r) }) t.Run("enabled", func(t *testing.T) { assert := assert.New(t) - r := newBucketAggregationKey(&proto.ClientGroupedStats{Service: "a", PeerService: "remote-test"}, true) - assert.Equal(BucketsAggregationKey{Service: "a", PeerService: "remote-test"}, r) + r := newBucketAggregationKey(&proto.ClientGroupedStats{Service: "a", PeerTags: []string{"peer.service:remote-service"}}, true) + assert.Equal(BucketsAggregationKey{Service: "a", PeerTagsHash: peerTagsHash}, r) }) } @@ -617,7 +511,6 @@ func deepCopyGroupedStats(s []*proto.ClientGroupedStats) []*proto.ClientGroupedS Duration: b.GetDuration(), Synthetics: b.GetSynthetics(), TopLevelHits: b.GetTopLevelHits(), - PeerService: b.GetPeerService(), SpanKind: b.GetSpanKind(), PeerTags: b.GetPeerTags(), } @@ -632,3 +525,54 @@ func deepCopyGroupedStats(s []*proto.ClientGroupedStats) []*proto.ClientGroupedS } return new } + +func TestNewClientStatsAggregatorPeerAggregation(t *testing.T) { + t.Run("nothing enabled", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + } + a := NewClientStatsAggregator(&cfg, nil) + assert.False(a.peerTagsAggregation) + }) + t.Run("deprecated peer service flag set", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + } + a := NewClientStatsAggregator(&cfg, nil) + assert.True(a.peerTagsAggregation) + }) + t.Run("peer tags aggregation flag", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerTagsAggregation: true, + } + a := NewClientStatsAggregator(&cfg, nil) + assert.True(a.peerTagsAggregation) + }) + t.Run("deprecated peer service flag set + new peer tags aggregation flag", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + PeerTagsAggregation: true, + } + a := NewClientStatsAggregator(&cfg, nil) + assert.True(a.peerTagsAggregation) + }) +} diff --git a/pkg/trace/stats/concentrator.go b/pkg/trace/stats/concentrator.go index f45fb0c1bdac8..2ed26a42678fe 100644 --- a/pkg/trace/stats/concentrator.go +++ b/pkg/trace/stats/concentrator.go @@ -47,20 +47,52 @@ type Concentrator struct { agentEnv string agentHostname string agentVersion string - peerSvcAggregation bool // flag to enable peer.service aggregation + peerTagsAggregation bool // flag to enable aggregation of peer tags computeStatsBySpanKind bool // flag to enable computation of stats through checking the span.kind field peerTagKeys []string // keys for supplementary tags that describe peer.service entities } -func prepareTagKeys(tags ...string) []string { +var defaultPeerTags = []string{ + "_dd.base_service", + "amqp.destination", + "amqp.exchange", + "amqp.queue", + "aws.queue.name", + "bucketname", + "cassandra.cluster", + "db.cassandra.contact.points", + "db.couchbase.seed.nodes", + "db.hostname", + "db.instance", + "db.name", + "db.system", + "hazelcast.instance", + "messaging.kafka.bootstrap.servers", + "mongodb.db", + "msmq.queue.path", + "net.peer.name", + "network.destination.name", + "peer.hostname", + "peer.service", + "queuename", + "rpc.service", + "rulename", + "server.address", + "statemachinename", + "streamname", + "tablename", + "topicname", +} + +func preparePeerTags(tags ...string) []string { if len(tags) == 0 { return nil } var deduped []string - seen := make(map[string]bool) + seen := make(map[string]struct{}) for _, t := range tags { - if !seen[t] { - seen[t] = true + if _, ok := seen[t]; !ok { + seen[t] = struct{}{} deduped = append(deduped, t) } } @@ -85,11 +117,12 @@ func NewConcentrator(conf *config.AgentConfig, out chan *pb.StatsPayload, now ti agentEnv: conf.DefaultEnv, agentHostname: conf.Hostname, agentVersion: conf.AgentVersion, - peerSvcAggregation: conf.PeerServiceAggregation, + peerTagsAggregation: conf.PeerServiceAggregation || conf.PeerTagsAggregation, computeStatsBySpanKind: conf.ComputeStatsBySpanKind, } - if conf.PeerServiceAggregation { - c.peerTagKeys = prepareTagKeys(conf.PeerTags...) + // NOTE: maintain backwards-compatibility with old peer service flag that will eventually be deprecated. + if conf.PeerServiceAggregation || conf.PeerTagsAggregation { + c.peerTagKeys = preparePeerTags(append(defaultPeerTags, conf.PeerTags...)...) } return &c } @@ -219,7 +252,7 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string) b = NewRawBucket(uint64(btime), uint64(c.bsize)) c.buckets[btime] = b } - b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey, c.peerSvcAggregation, c.peerTagKeys) + b.HandleSpan(s, weight, isTop, pt.TraceChunk.Origin, aggKey, c.peerTagsAggregation, c.peerTagKeys) } } diff --git a/pkg/trace/stats/concentrator_test.go b/pkg/trace/stats/concentrator_test.go index 8fcf84b9d8541..c54366d2a5949 100644 --- a/pkg/trace/stats/concentrator_test.go +++ b/pkg/trace/stats/concentrator_test.go @@ -8,6 +8,7 @@ package stats import ( "fmt" "math/rand" + "sort" "testing" "time" @@ -95,6 +96,104 @@ func assertCountsEqual(t *testing.T, expected []*pb.ClientGroupedStats, actual [ assert.Equal(t, expectedM, actualM) } +func TestNewConcentratorPeerTags(t *testing.T) { + t.Run("nothing enabled", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.False(c.peerTagsAggregation) + assert.Nil(c.peerTagKeys) + }) + t.Run("deprecated peer service flag set", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(defaultPeerTags, c.peerTagKeys) + }) + t.Run("deprecated peer service flag set + peer tags", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + PeerTags: []string{"zz_tag"}, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(append(defaultPeerTags, "zz_tag"), c.peerTagKeys) + }) + t.Run("deprecated peer service flag set + new peer tags aggregation flag", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + PeerTagsAggregation: true, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(defaultPeerTags, c.peerTagKeys) + }) + t.Run("deprecated peer service flag set + new peer tags aggregation flag + peer tags", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerServiceAggregation: true, + PeerTagsAggregation: true, + PeerTags: []string{"zz_tag"}, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(append(defaultPeerTags, "zz_tag"), c.peerTagKeys) + }) + t.Run("new peer tags aggregation flag", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerTagsAggregation: true, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(defaultPeerTags, c.peerTagKeys) + }) + t.Run("new peer tags aggregation flag + peer tags", func(t *testing.T) { + assert := assert.New(t) + cfg := config.AgentConfig{ + BucketInterval: time.Duration(testBucketInterval), + AgentVersion: "0.99.0", + DefaultEnv: "env", + Hostname: "hostname", + PeerTagsAggregation: true, + PeerTags: []string{"zz_tag"}, + } + c := NewConcentrator(&cfg, nil, time.Now()) + assert.True(c.peerTagsAggregation) + assert.Equal(append(defaultPeerTags, "zz_tag"), c.peerTagKeys) + }) +} + // TestTracerHostname tests if `Concentrator` uses the tracer hostname rather than agent hostname, if there is one. func TestTracerHostname(t *testing.T) { assert := assert.New(t) @@ -577,61 +676,6 @@ func TestForceFlush(t *testing.T) { assert.Len(stats.GetStats(), 1) } -// TestPeerServiceStats tests that if peer.service is present in the span's meta, we will generate stats with it as an additional field. -func TestPeerServiceStats(t *testing.T) { - assert := assert.New(t) - now := time.Now() - sp := &pb.Span{ - ParentID: 0, - SpanID: 1, - Service: "myservice", - Name: "http.server.request", - Resource: "GET /users", - Duration: 100, - Meta: map[string]string{"span.kind": "server"}, - } - peerSvcSp := &pb.Span{ - ParentID: sp.SpanID, - SpanID: 2, - Service: "myservice", - Name: "postgres.query", - Resource: "SELECT user_id from users WHERE user_name = ?", - Duration: 75, - Metrics: map[string]float64{"_dd.measured": 1.0}, - Meta: map[string]string{"span.kind": "client", "peer.service": "users-db"}, - } - t.Run("enabled", func(t *testing.T) { - spans := []*pb.Span{sp, peerSvcSp} - traceutil.ComputeTopLevel(spans) - testTrace := toProcessedTrace(spans, "none", "") - c := NewTestConcentrator(now) - c.peerSvcAggregation = true - c.addNow(testTrace, "") - stats := c.flushNow(now.UnixNano()+int64(c.bufferLen)*testBucketInterval, false) - assert.Len(stats.Stats[0].Stats[0].Stats, 2) - for _, st := range stats.Stats[0].Stats[0].Stats { - if st.Name == "postgres.query" { - assert.Equal("users-db", st.PeerService) - } else { - assert.Equal("", st.PeerService) - } - } - }) - t.Run("disabled", func(t *testing.T) { - spans := []*pb.Span{sp, peerSvcSp} - traceutil.ComputeTopLevel(spans) - testTrace := toProcessedTrace(spans, "none", "") - c := NewTestConcentrator(now) - c.peerSvcAggregation = false - c.addNow(testTrace, "") - stats := c.flushNow(now.UnixNano()+int64(c.bufferLen)*testBucketInterval, false) - assert.Len(stats.Stats[0].Stats[0].Stats, 2) - for _, st := range stats.Stats[0].Stats[0].Stats { - assert.Equal("", st.PeerService) - } - }) -} - func TestPeerTags(t *testing.T) { assert := assert.New(t) now := time.Now() @@ -651,7 +695,7 @@ func TestPeerTags(t *testing.T) { Name: "postgres.query", Resource: "SELECT user_id from users WHERE user_name = ?", Duration: 75, - Meta: map[string]string{"span.kind": "client", "region": "us1"}, + Meta: map[string]string{"span.kind": "client", "db.instance": "i-1234", "db.system": "postgres", "region": "us1"}, Metrics: map[string]float64{"_dd.measured": 1.0}, } t.Run("not configured", func(t *testing.T) { @@ -671,13 +715,14 @@ func TestPeerTags(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "") c := NewTestConcentrator(now) - c.peerTagKeys = []string{"region"} + c.peerTagKeys = []string{"db.instance", "db.system", "peer.service"} + c.peerTagsAggregation = true c.addNow(testTrace, "") stats := c.flushNow(now.UnixNano()+int64(c.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 2) for _, st := range stats.Stats[0].Stats[0].Stats { if st.Name == "postgres.query" { - assert.Equal([]string{"region:us1"}, st.PeerTags) + assert.Equal([]string{"db.instance:i-1234", "db.system:postgres"}, st.PeerTags) } else { assert.Nil(st.PeerTags) } @@ -851,7 +896,7 @@ func TestComputeStatsForSpanKind(t *testing.T) { } } -func TestPrepareTagKeys(t *testing.T) { +func TestPreparePeerTags(t *testing.T) { type testCase struct { input []string output []string @@ -867,14 +912,15 @@ func TestPrepareTagKeys(t *testing.T) { output: nil, }, { - input: []string{"a", "b"}, - output: []string{"a", "b"}, + input: []string{"zz_tag", "peer.service", "some.other.tag", "db.name", "db.instance"}, + output: []string{"db.name", "db.instance", "peer.service", "some.other.tag", "zz_tag"}, }, { - input: []string{"a", "a", "b"}, - output: []string{"a", "b"}, + input: append([]string{"zz_tag"}, defaultPeerTags...), + output: append(defaultPeerTags, "zz_tag"), }, } { - assert.Equal(t, tc.output, prepareTagKeys(tc.input...)) + sort.Strings(tc.output) + assert.Equal(t, tc.output, preparePeerTags(tc.input...)) } } diff --git a/pkg/trace/stats/statsraw.go b/pkg/trace/stats/statsraw.go index 6e275b4afbde7..8ed20f505b5e8 100644 --- a/pkg/trace/stats/statsraw.go +++ b/pkg/trace/stats/statsraw.go @@ -74,7 +74,6 @@ func (s *groupedStats) export(a Aggregation) (*pb.ClientGroupedStats, error) { OkSummary: okSummary, ErrorSummary: errSummary, Synthetics: a.Synthetics, - PeerService: a.PeerService, SpanKind: a.SpanKind, PeerTags: s.peerTags, }, nil @@ -149,11 +148,11 @@ func (sb *RawBucket) Export() map[PayloadAggregationKey]*pb.ClientStatsBucket { } // HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators -func (sb *RawBucket) HandleSpan(s *pb.Span, weight float64, isTop bool, origin string, aggKey PayloadAggregationKey, enablePeerSvcAgg bool, peerTagKeys []string) { +func (sb *RawBucket) HandleSpan(s *pb.Span, weight float64, isTop bool, origin string, aggKey PayloadAggregationKey, enablePeerTagsAgg bool, peerTagKeys []string) { if aggKey.Env == "" { panic("env should never be empty") } - aggr, peerTags := NewAggregationFromSpan(s, origin, aggKey, enablePeerSvcAgg, peerTagKeys) + aggr, peerTags := NewAggregationFromSpan(s, origin, aggKey, enablePeerTagsAgg, peerTagKeys) sb.add(s, weight, isTop, aggr, peerTags) } diff --git a/pkg/trace/stats/statsraw_test.go b/pkg/trace/stats/statsraw_test.go index 527ea579f2259..3e538c372ed34 100644 --- a/pkg/trace/stats/statsraw_test.go +++ b/pkg/trace/stats/statsraw_test.go @@ -37,55 +37,6 @@ func TestGrain(t *testing.T) { }, aggr) } -func TestGrainWithPeerService(t *testing.T) { - t.Run("disabled", func(t *testing.T) { - assert := assert.New(t) - s := pb.Span{Service: "thing", Name: "other", Resource: "yo", Meta: map[string]string{"span.kind": "client", "peer.service": "remote-service"}} - aggr, _ := NewAggregationFromSpan(&s, "", PayloadAggregationKey{ - Env: "default", - Hostname: "default", - ContainerID: "cid", - }, false, nil) - assert.Equal(Aggregation{ - PayloadAggregationKey: PayloadAggregationKey{ - Env: "default", - Hostname: "default", - ContainerID: "cid", - }, - BucketsAggregationKey: BucketsAggregationKey{ - Service: "thing", - SpanKind: "client", - Name: "other", - Resource: "yo", - PeerService: "", - }, - }, aggr) - }) - t.Run("enabled", func(t *testing.T) { - assert := assert.New(t) - s := pb.Span{Service: "thing", Name: "other", Resource: "yo", Meta: map[string]string{"span.kind": "client", "peer.service": "remote-service"}} - aggr, _ := NewAggregationFromSpan(&s, "", PayloadAggregationKey{ - Env: "default", - Hostname: "default", - ContainerID: "cid", - }, true, nil) - assert.Equal(Aggregation{ - PayloadAggregationKey: PayloadAggregationKey{ - Env: "default", - Hostname: "default", - ContainerID: "cid", - }, - BucketsAggregationKey: BucketsAggregationKey{ - Service: "thing", - SpanKind: "client", - Name: "other", - Resource: "yo", - PeerService: "remote-service", - }, - }, aggr) - }) -} - func TestGrainWithPeerTags(t *testing.T) { t.Run("none present", func(t *testing.T) { assert := assert.New(t) @@ -93,13 +44,13 @@ func TestGrainWithPeerTags(t *testing.T) { Service: "thing", Name: "other", Resource: "yo", - Meta: map[string]string{"span.kind": "client", "peer.service": "aws-s3"}, + Meta: map[string]string{"span.kind": "client"}, } aggr, et := NewAggregationFromSpan(&s, "", PayloadAggregationKey{ Env: "default", Hostname: "default", ContainerID: "cid", - }, true, []string{"aws.s3.bucket", "db.instance", "db.system"}) + }, true, []string{"aws.s3.bucket", "db.instance", "db.system", "peer.service"}) assert.Equal(Aggregation{ PayloadAggregationKey: PayloadAggregationKey{ @@ -108,11 +59,10 @@ func TestGrainWithPeerTags(t *testing.T) { ContainerID: "cid", }, BucketsAggregationKey: BucketsAggregationKey{ - Service: "thing", - SpanKind: "client", - Name: "other", - Resource: "yo", - PeerService: "aws-s3", + Service: "thing", + SpanKind: "client", + Name: "other", + Resource: "yo", }, }, aggr) assert.Nil(et) @@ -129,7 +79,7 @@ func TestGrainWithPeerTags(t *testing.T) { Env: "default", Hostname: "default", ContainerID: "cid", - }, true, []string{"aws.s3.bucket", "db.instance", "db.system"}) + }, true, []string{"aws.s3.bucket", "db.instance", "db.system", "peer.service"}) assert.Equal(Aggregation{ PayloadAggregationKey: PayloadAggregationKey{ @@ -142,11 +92,10 @@ func TestGrainWithPeerTags(t *testing.T) { SpanKind: "client", Name: "other", Resource: "yo", - PeerService: "aws-s3", - PeerTagsHash: 6956349601612342508, + PeerTagsHash: 13698082192712149795, }, }, aggr) - assert.Equal([]string{"aws.s3.bucket:bucket-a"}, et) + assert.Equal([]string{"aws.s3.bucket:bucket-a", "peer.service:aws-s3"}, et) }) t.Run("all present", func(t *testing.T) { assert := assert.New(t) @@ -160,7 +109,7 @@ func TestGrainWithPeerTags(t *testing.T) { Env: "default", Hostname: "default", ContainerID: "cid", - }, true, []string{"db.instance", "db.system"}) + }, true, []string{"db.instance", "db.system", "peer.service"}) assert.Equal(Aggregation{ PayloadAggregationKey: PayloadAggregationKey{ @@ -173,11 +122,10 @@ func TestGrainWithPeerTags(t *testing.T) { SpanKind: "client", Name: "other", Resource: "yo", - PeerService: "aws-dynamodb", - PeerTagsHash: 7368490161962389668, + PeerTagsHash: 5537613849774405073, }, }, aggr) - assert.Equal([]string{"db.instance:dynamo.test.us1", "db.system:dynamodb"}, et) + assert.Equal([]string{"db.instance:dynamo.test.us1", "db.system:dynamodb", "peer.service:aws-dynamodb"}, et) }) } @@ -208,14 +156,26 @@ func TestGrainWithSynthetics(t *testing.T) { } func BenchmarkHandleSpanRandom(b *testing.B) { - sb := NewRawBucket(0, 1e9) - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - for _, span := range benchSpans { - sb.HandleSpan(span, 1, true, "", PayloadAggregationKey{"a", "b", "c", "d"}, true, nil) + b.Run("no_peer_tags", func(b *testing.B) { + sb := NewRawBucket(0, 1e9) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + for _, span := range benchSpans { + sb.HandleSpan(span, 1, true, "", PayloadAggregationKey{"a", "b", "c", "d"}, false, nil) + } } - } + }) + b.Run("peer_tags", func(b *testing.B) { + sb := NewRawBucket(0, 1e9) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + for _, span := range benchSpans { + sb.HandleSpan(span, 1, true, "", PayloadAggregationKey{"a", "b", "c", "d"}, true, defaultPeerTags) + } + } + }) } var benchSpans = []*pb.Span{ @@ -229,9 +189,9 @@ var benchSpans = []*pb.Span{ Start: 1548931840954169000, Duration: 100000000, Error: 403, - Meta: map[string]string{"query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s", "in.host": "2a01:e35:2ee1:7160:f66d:4ff:fe71:b690", "out.host": "/dev/null", "in.section": "dogdataprod"}, + Meta: map[string]string{"query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s", "db.hostname": "db.host.us1.prod", "db.name": "postgres"}, Metrics: map[string]float64{"rowcount": 0.5066325669281033}, - Type: "redis", + Type: "", }, { Service: "pg-master", @@ -243,7 +203,7 @@ var benchSpans = []*pb.Span{ Start: 1548931841019932928, Duration: 19844796, Error: 400, - Meta: map[string]string{"user": "leo"}, + Meta: map[string]string{"user": "leo", "db.hostname:": "db.host.us1.prod", "db.name": "postgres"}, Metrics: map[string]float64{"size": 0.47564235466940796, "rowcount": 0.12453347154800333}, Type: "lamar", }, @@ -257,7 +217,7 @@ var benchSpans = []*pb.Span{ Start: 1548931840963747104, Duration: 3566171, Error: 0, - Meta: map[string]string{"in.host": "8.8.8.8", "query": "GET beaker:c76db4c3af90410197cf88b0afba4942:session"}, + Meta: map[string]string{"in.host": "8.8.8.8", "query": "GET beaker:c76db4c3af90410197cf88b0afba4942:session", "db.hostname:": "db.host.us1.prod", "db.name": "postgres"}, Metrics: map[string]float64{"rowcount": 0.276209049435507, "size": 0.18889910131880996}, Type: "redis", }, @@ -271,7 +231,7 @@ var benchSpans = []*pb.Span{ Start: 1548931840954371301, Duration: 259245, Error: 502, - Meta: map[string]string{"in.host": "", "out.host": "/dev/null", "query": "\n -- get_contexts_sub_query[[org:9543 query_id:a135e15e7d batch:1]]\n WITH sub_contexts as (\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags,\n org_id\n FROM vs9543.dim_context c\n WHERE key = ANY(%(key)s)\n \n \n \n \n \n )\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags\n FROM sub_contexts c\n WHERE (c.org_id = %(org_id)s AND c.tags @> %(yes_tags0)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags1)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags2)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags3)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags4)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags5)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags6)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags7)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags8)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags9)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags10)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags11)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags12)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags13)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags14)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags15)s)\n \n \n \n \n \n "}, + Meta: map[string]string{"db.hostname:": "db.host.us1.prod", "db.name": "postgres", "query": "\n -- get_contexts_sub_query[[org:9543 query_id:a135e15e7d batch:1]]\n WITH sub_contexts as (\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags,\n org_id\n FROM vs9543.dim_context c\n WHERE key = ANY(%(key)s)\n \n \n \n \n \n )\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags\n FROM sub_contexts c\n WHERE (c.org_id = %(org_id)s AND c.tags @> %(yes_tags0)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags1)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags2)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags3)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags4)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags5)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags6)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags7)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags8)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags9)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags10)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags11)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags12)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags13)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags14)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags15)s)\n \n \n \n \n \n "}, Metrics: map[string]float64{"rowcount": 0.5543063276573277, "size": 0.6196504333337066, "payloads": 0.9689311094466356}, Type: "lamar", }, @@ -285,7 +245,7 @@ var benchSpans = []*pb.Span{ Start: 1548931840954749862, Duration: 161372, Error: 0, - Meta: map[string]string{"out.section": "-"}, + Meta: map[string]string{"out.section": "-", "db.hostname:": "db.host.us1.prod", "db.name": "postgres"}, Metrics: map[string]float64{"rowcount": 0.2646545763337349}, Type: "lamar", }, @@ -299,7 +259,7 @@ var benchSpans = []*pb.Span{ Start: 1548931840954191909, Duration: 9908, Error: 0, - Meta: map[string]string{"in.section": "replica"}, + Meta: map[string]string{"peer.service": "foo", "net.peer.name": "foo.us1", "network.destination.name": "foo.us1.12345"}, Metrics: map[string]float64{"rowcount": 0.7800384694533715, "payloads": 0.24585482170573683, "loops": 0.3119738365111953, "size": 0.6693070719377765}, Type: "sql", }, @@ -313,7 +273,7 @@ var benchSpans = []*pb.Span{ Start: 1548931840954175872, Duration: 2635, Error: 400, - Meta: map[string]string{"user": "benjamin", "query": "GET beaker:c76db4c3af90410197cf88b0afba4942:session", "out.section": "proxy-XXX"}, + Meta: map[string]string{"user": "benjamin", "query": "GET beaker:c76db4c3af90410197cf88b0afba4942:session", "db.hostname:": "db.host.us1.prod", "db.name": "postgres"}, Metrics: map[string]float64{"payloads": 0.5207323287655542, "loops": 0.4731462684058845, "heap_allocated": 0.5386526456622786, "size": 0.9438291624690298, "rowcount": 0.14536182482282964}, Type: "lamar", }, @@ -327,13 +287,13 @@ var benchSpans = []*pb.Span{ Start: 1548931840954169013, Duration: 370, Error: 400, - Meta: map[string]string{"in.host": "", "out.host": "/dev/null", "user": "leo", "query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s"}, + Meta: map[string]string{"db.hostname:": "db.host.us1.prod", "db.name": "postgres", "user": "leo", "query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s"}, Metrics: map[string]float64{}, Type: "lamar", }, { Service: "django", - Name: "web.query", + Name: "grpc.client.request", Resource: "events.buckets", TraceID: 0x5df0afd382d351de, SpanID: 0x3a51491c82d0b322, @@ -341,107 +301,23 @@ var benchSpans = []*pb.Span{ Start: 1548931840954198336, Duration: 2474, Error: 1, - Meta: map[string]string{"out.section": "8080"}, + Meta: map[string]string{"rpc.service": "buckets", "out.host": "baz", "net.peer.name": "baz.us1", "network.destination.name": "baz.us1.12345"}, Metrics: map[string]float64{"rowcount": 0.9895177718616301}, Type: "lamar", }, { - Service: "pg-master", - Name: "pylons.controller", - Resource: "GET cache|xxx", - TraceID: 0x5df0afd382d351de, - SpanID: 0x3482d8abba36420f, - ParentID: 0x69ff3ac466831715, - Start: 1548931840954192800, - Duration: 19, - Error: 1, - Meta: map[string]string{"out.host": "datadoghq.com", "in.section": "22"}, - Metrics: map[string]float64{"rowcount": 0.12186970474265321, "size": 0.4352687905570856}, - Type: "redis", - }, - { - Service: "web-billing", - Name: "web.template", - Resource: "GET /url/test/fixture/resource/42", - TraceID: 0x5df0afd382d351de, - SpanID: 0x4c233dc8bfa40958, - ParentID: 0x69ff3ac466831715, - Start: 1548931840954191934, - Duration: 70, - Error: 400, - Meta: map[string]string{"user": "bartek"}, - Metrics: map[string]float64{"rowcount": 0.3501786556194641}, - Type: "lamar", - }, - { - Service: "pg-master", - Name: "postgres.query", - Resource: "データの犬", - TraceID: 0x5df0afd382d351de, - SpanID: 0x41546750dfa40643, - ParentID: 0x61973c4d43bd8f04, - Start: 1548931840964093798, - Duration: 2700058, - Error: 2, - Meta: map[string]string{"query": "\n -- get_contexts_sub_query[[org:9543 query_id:a135e15e7d batch:1]]\n WITH sub_contexts as (\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags,\n org_id\n FROM vs9543.dim_context c\n WHERE key = ANY(%(key)s)\n \n \n \n \n \n )\n \n -- \n --\n SELECT key,\n host_name,\n device_name,\n tags\n FROM sub_contexts c\n WHERE (c.org_id = %(org_id)s AND c.tags @> %(yes_tags0)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags1)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags2)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags3)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags4)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags5)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags6)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags7)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags8)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags9)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags10)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags11)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags12)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags13)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags14)s)\n OR (c.org_id = %(org_id)s AND c.tags @> %(yes_tags15)s)\n \n \n \n \n \n ", "in.host": "2a01:e35:2ee1:7160:f66d:4ff:fe71:b690"}, - Metrics: map[string]float64{"payloads": 0.737550948148184, "size": 0.5683740489852795, "rowcount": 0.4318616362850698}, - Type: "lamar", - }, - { - Service: "rails", - Name: "web.template", - Resource: "events.buckets", - TraceID: 0x5df0afd382d351de, - SpanID: 0x18e45b850b3c1e39, - ParentID: 0x273710f0da9967a7, - Start: 1548931840954781284, - Duration: 126835, - Error: 0, - Meta: map[string]string{"user": "bartek", "query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s", "in.host": "postgres.service.consul", "out.host": "/dev/null"}, - Metrics: map[string]float64{}, - Type: "redis", - }, - { - Service: "pylons", + Service: "django", Name: "postgres.query", - Resource: "SELECT user.handle AS user_handle, user.id AS user_id, user.org_id AS user_org_id, user.password AS user_password, user.email AS user_email, user.name AS user_name, user.role AS user_role, user.team AS user_team, user.support AS user_support, user.is_admin AS user_is_admin, user.github_username AS user_github_username, user.github_token AS user_github_token, user.disabled AS user_disabled, user.verified AS user_verified, user.bot AS user_bot, user.created AS user_created, user.modified AS user_modified, user.time_zone AS user_time_zone, user.password_modified AS user_password_modified FROM user WHERE user.id = ? AND user.org_id = ? LIMIT ?", + Resource: "SELECT id FROM table;", TraceID: 0x5df0afd382d351de, - SpanID: 0x4f3f65e058ddbbfc, - ParentID: 0x273710f0da9967a7, - Start: 1548931840954752840, - Duration: 103, + SpanID: 0x3fd1ce2fbc1dde9e, + ParentID: 0x3a51491c82d0b322, + Start: 1548931840954169000, + Duration: 100000000, Error: 403, - Meta: map[string]string{"in.section": "22", "out.section": "standby", "user": "bartek", "in.host": "", "out.host": "138.195.130.42"}, - Metrics: map[string]float64{"payloads": 0.37210733159614523, "rowcount": 0.5264465848403574, "size": 0.025720650418526562}, - Type: "http", - }, - { - Service: "web-billing", - Name: "postgres.query", - Resource: "GET /url/test/fixture/resource/42", - TraceID: 0x5df0afd382d351de, - SpanID: 0x7b566c818866ef8b, - ParentID: 0x273710f0da9967a7, - Start: 1548931840954749879, - Duration: 11, - Error: 400, - Meta: map[string]string{"in.host": "postgres.service.consul", "out.host": "datadoghq.com", "user": "bartek", "query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s"}, - Metrics: map[string]float64{"rowcount": 0.805619107635167}, - Type: "redis", - }, - { - Service: "pg-master", - Name: "web.query", - Resource: "SELECT user.handle AS user_handle, user.id AS user_id, user.org_id AS user_org_id, user.password AS user_password, user.email AS user_email, user.name AS user_name, user.role AS user_role, user.team AS user_team, user.support AS user_support, user.is_admin AS user_is_admin, user.github_username AS user_github_username, user.github_token AS user_github_token, user.disabled AS user_disabled, user.verified AS user_verified, user.bot AS user_bot, user.created AS user_created, user.modified AS user_modified, user.time_zone AS user_time_zone, user.password_modified AS user_password_modified FROM user WHERE user.id = ? AND user.org_id = ? LIMIT ?", - TraceID: 0x5df0afd382d351de, - SpanID: 0x429768d2d13e8697, - ParentID: 0x4c233dc8bfa40958, - Start: 1548931840954191942, - Duration: 37, - Error: 400, - Meta: map[string]string{"out.host": "datadoghq.com", "in.section": "replica", "query": "GET beaker:c76db4c3af90410197cf88b0afba4942:session", "in.host": "8.8.8.8"}, - Metrics: map[string]float64{"payloads": 0.3779600143407876, "loops": 0.20498295768971775, "size": 0.7947128947983215, "rowcount": 0.7478115781577667}, - Type: "lamar", + Meta: map[string]string{"query": "SELECT id\n FROM ddsuperuser\n WHERE id = %(id)s", "db.hostname": "db.host.us1.prod", "db.name": "postgres"}, + Metrics: map[string]float64{"rowcount": 0.5066325669281033}, + Type: "db", }, } diff --git a/pkg/trace/writer/stats.go b/pkg/trace/writer/stats.go index 32bdbfc9ad937..7caa72423d283 100644 --- a/pkg/trace/writer/stats.go +++ b/pkg/trace/writer/stats.go @@ -216,6 +216,9 @@ func (w *StatsWriter) buildPayloads(sp *pb.StatsPayload, maxEntriesPerPayload in } if len(grouped) > 1 { w.stats.Splits.Inc() + for _, g := range grouped { + g.SplitPayload = true + } } return grouped } diff --git a/pkg/trace/writer/stats_test.go b/pkg/trace/writer/stats_test.go index afe3f917e2ba9..ac02efca49281 100644 --- a/pkg/trace/writer/stats_test.go +++ b/pkg/trace/writer/stats_test.go @@ -162,6 +162,7 @@ func TestStatsWriter(t *testing.T) { } assert.Equal(extractCounts([]*pb.StatsPayload{stats}), extractCounts(payloads)) for _, p := range payloads { + assert.True(p.SplitPayload) assert.Equal("agentenv", p.AgentEnv) assert.Equal("agenthost", p.AgentHostname) assert.Equal("agent-version", p.AgentVersion) @@ -189,6 +190,7 @@ func TestStatsWriter(t *testing.T) { payloads := sw.buildPayloads(&pb.StatsPayload{Stats: []*pb.ClientStatsPayload{stats}}, 1337) assert.Equal(1, len(payloads)) s := payloads[0].Stats + assert.False(payloads[0].SplitPayload) assert.Equal(3, len(s[0].Stats)) assert.Equal(5, len(s[0].Stats[0].Stats)) assert.Equal(5, len(s[0].Stats[1].Stats)) diff --git a/releasenotes/notes/add-default-peer-tags-for-aggregation-3d708e776c0eb05c.yaml b/releasenotes/notes/add-default-peer-tags-for-aggregation-3d708e776c0eb05c.yaml new file mode 100644 index 0000000000000..774527a4bc4b9 --- /dev/null +++ b/releasenotes/notes/add-default-peer-tags-for-aggregation-3d708e776c0eb05c.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + Added default peer tags for APM stats aggregation which can be enabled through a new flag (`peer_tags_aggregation`).