From 1952f8939680f85a4b3d111b57186e56b3fb3539 Mon Sep 17 00:00:00 2001 From: yuri-lipnesh Date: Sat, 28 Sep 2024 10:55:31 -0400 Subject: [PATCH] [usm] add SHOW operation to postgres monitor (#29469) --- pkg/network/encoding/marshal/usm_postgres.go | 4 +- .../protocols/postgres/debugging/debugging.go | 10 +- pkg/network/protocols/postgres/model_linux.go | 52 +++++++-- pkg/network/protocols/postgres/operations.go | 6 + .../protocols/postgres/stats_common.go | 8 +- pkg/network/protocols/postgres/statskeeper.go | 2 +- .../protocols/postgres/statskeeper_test.go | 10 +- pkg/network/protocols/postgres/telemetry.go | 2 +- pkg/network/usm/postgres_monitor_test.go | 106 +++++++++++++++++- 9 files changed, 165 insertions(+), 35 deletions(-) diff --git a/pkg/network/encoding/marshal/usm_postgres.go b/pkg/network/encoding/marshal/usm_postgres.go index ea7ed3a10659d2..c1b21b9e1dd685 100644 --- a/pkg/network/encoding/marshal/usm_postgres.go +++ b/pkg/network/encoding/marshal/usm_postgres.go @@ -62,7 +62,7 @@ func (e *postgresEncoder) encodeData(connectionData *USMConnectionData[postgres. staticTags |= stats.StaticTags e.postgresAggregationsBuilder.AddAggregations(func(builder *model.DatabaseStatsBuilder) { builder.SetPostgres(func(statsBuilder *model.PostgresStatsBuilder) { - statsBuilder.SetTableName(key.TableName) + statsBuilder.SetTableName(key.Parameters) statsBuilder.SetOperation(uint64(toPostgresModelOperation(key.Operation))) if latencies := stats.Latencies; latencies != nil { blob, _ := proto.Marshal(latencies.ToProto()) @@ -106,6 +106,8 @@ func toPostgresModelOperation(op postgres.Operation) model.PostgresOperation { return model.PostgresOperation_PostgresAlterOp case postgres.TruncateTableOP: return model.PostgresOperation_PostgresTruncateOp + case postgres.ShowOP: + return model.PostgresOperation_PostgresShowOp default: return model.PostgresOperation_PostgresUnknownOp } diff --git a/pkg/network/protocols/postgres/debugging/debugging.go b/pkg/network/protocols/postgres/debugging/debugging.go index 0522a2b2a7c231..0e98b7bdf824fc 100644 --- a/pkg/network/protocols/postgres/debugging/debugging.go +++ b/pkg/network/protocols/postgres/debugging/debugging.go @@ -20,11 +20,11 @@ type address struct { Port uint16 } -// key represents a (client, server, table name) tuple. +// key represents a (client, server, parameters: table name or runtime parameter) tuple. type key struct { - Client address - Server address - TableName string + Client address + Server address + Parameters string } // Stats consolidates request count and latency information for a certain status code @@ -58,7 +58,7 @@ func Postgres(stats map[postgres.Key]*postgres.RequestStat) []RequestSummary { IP: serverAddr.String(), Port: k.DstPort, }, - TableName: k.TableName, + Parameters: k.Parameters, } if _, ok := resMap[tempKey]; !ok { resMap[tempKey] = make(map[string]Stats) diff --git a/pkg/network/protocols/postgres/model_linux.go b/pkg/network/protocols/postgres/model_linux.go index 0221dbe144f6b5..45881f761204d5 100644 --- a/pkg/network/protocols/postgres/model_linux.go +++ b/pkg/network/protocols/postgres/model_linux.go @@ -21,16 +21,21 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) +const ( + // EmptyParameters represents the case where the non-empty query has no parameters + EmptyParameters = "EMPTY_PARAMETERS" +) + // EventWrapper wraps an ebpf event and provides additional methods to extract information from it. // We use this wrapper to avoid recomputing the same values (operation and table name) multiple times. type EventWrapper struct { *ebpf.EbpfEvent - operationSet bool - operation Operation - tableNameSet bool - tableName string - normalizer *sqllexer.Normalizer + operationSet bool + operation Operation + parametersSet bool + parameters string + normalizer *sqllexer.Normalizer } // NewEventWrapper creates a new EventWrapper from an ebpf event. @@ -73,6 +78,25 @@ func (e *EventWrapper) Operation() Operation { return e.operation } +// extractParameters returns the string following the command +func (e *EventWrapper) extractParameters() string { + b := getFragment(&e.Tx) + idxParam := bytes.IndexByte(b, ' ') // trim the string to a space, it will give the parameter + if idxParam == -1 { + return EmptyParameters + } + idxParam++ + + idxEnd := bytes.IndexByte(b[idxParam:], '\x00') // trim trailing nulls + if idxEnd == 0 { + return EmptyParameters + } + if idxEnd != -1 { + return string(b[idxParam : idxParam+idxEnd]) + } + return string(b[idxParam:]) +} + var re = regexp.MustCompile(`(?i)if\s+exists`) // extractTableName extracts the table name from the query. @@ -97,14 +121,18 @@ func (e *EventWrapper) extractTableName() string { } -// TableName returns the name of the table the query is operating on. -func (e *EventWrapper) TableName() string { - if !e.tableNameSet { - e.tableName = e.extractTableName() - e.tableNameSet = true +// Parameters returns the table name or run-time parameter. +func (e *EventWrapper) Parameters() string { + if !e.parametersSet { + if e.operation == ShowOP { + e.parameters = e.extractParameters() + } else { + e.parameters = e.extractTableName() + } + e.parametersSet = true } - return e.tableName + return e.parameters } // RequestLatency returns the latency of the request in nanoseconds @@ -125,6 +153,6 @@ ebpfTx{ // String returns a string representation of the underlying event func (e *EventWrapper) String() string { var output strings.Builder - output.WriteString(fmt.Sprintf(template, e.Operation(), e.TableName(), e.RequestLatency())) + output.WriteString(fmt.Sprintf(template, e.Operation(), e.Parameters(), e.RequestLatency())) return output.String() } diff --git a/pkg/network/protocols/postgres/operations.go b/pkg/network/protocols/postgres/operations.go index 49ed0349bee3c6..41bc164fd2e205 100644 --- a/pkg/network/protocols/postgres/operations.go +++ b/pkg/network/protocols/postgres/operations.go @@ -29,6 +29,8 @@ const ( AlterTableOP // TruncateTableOP represents a TRUNCATE operation. TruncateTableOP + // ShowOP represents a command SHOW + ShowOP ) // String returns the string representation of the operation. @@ -50,6 +52,8 @@ func (op Operation) String() string { return "DELETE" case AlterTableOP: return "ALTER" + case ShowOP: + return "SHOW" default: return "UNKNOWN" } @@ -74,6 +78,8 @@ func FromString(op string) Operation { return DeleteTableOP case "ALTER": return AlterTableOP + case "SHOW": + return ShowOP default: return UnknownOP } diff --git a/pkg/network/protocols/postgres/stats_common.go b/pkg/network/protocols/postgres/stats_common.go index 073f0466b764ac..07f39b1dbaccc6 100644 --- a/pkg/network/protocols/postgres/stats_common.go +++ b/pkg/network/protocols/postgres/stats_common.go @@ -18,17 +18,17 @@ import ( // Key is an identifier for a group of Postgres transactions type Key struct { - Operation Operation - TableName string + Operation Operation + Parameters string types.ConnectionKey } // NewKey creates a new postgres key -func NewKey(saddr, daddr util.Address, sport, dport uint16, operation Operation, tableName string) Key { +func NewKey(saddr, daddr util.Address, sport, dport uint16, operation Operation, parameters string) Key { return Key{ ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport), Operation: operation, - TableName: tableName, + Parameters: parameters, } } diff --git a/pkg/network/protocols/postgres/statskeeper.go b/pkg/network/protocols/postgres/statskeeper.go index 5b35e3dfe910bc..069b18c660d754 100644 --- a/pkg/network/protocols/postgres/statskeeper.go +++ b/pkg/network/protocols/postgres/statskeeper.go @@ -37,7 +37,7 @@ func (s *StatKeeper) Process(tx *EventWrapper) { key := Key{ Operation: tx.Operation(), - TableName: tx.TableName(), + Parameters: tx.Parameters(), ConnectionKey: tx.ConnTuple(), } requestStats, ok := s.stats[key] diff --git a/pkg/network/protocols/postgres/statskeeper_test.go b/pkg/network/protocols/postgres/statskeeper_test.go index 0acf4dc862f365..7510e2edeccf90 100644 --- a/pkg/network/protocols/postgres/statskeeper_test.go +++ b/pkg/network/protocols/postgres/statskeeper_test.go @@ -28,16 +28,16 @@ func TestStatKeeperProcess(t *testing.T) { Response_last_seen: 10, }, }, - operationSet: true, - operation: SelectOP, - tableNameSet: true, - tableName: "dummy", + operationSet: true, + operation: SelectOP, + parametersSet: true, + parameters: "dummy", }) } require.Equal(t, 1, len(s.stats)) for k, stat := range s.stats { - require.Equal(t, "dummy", k.TableName) + require.Equal(t, "dummy", k.Parameters) require.Equal(t, SelectOP, k.Operation) require.Equal(t, 20, stat.Count) require.Equal(t, float64(20), stat.Latencies.GetCount()) diff --git a/pkg/network/protocols/postgres/telemetry.go b/pkg/network/protocols/postgres/telemetry.go index 9d45855c312fba..349c8cee77c05a 100644 --- a/pkg/network/protocols/postgres/telemetry.go +++ b/pkg/network/protocols/postgres/telemetry.go @@ -156,7 +156,7 @@ func (t *Telemetry) Count(tx *ebpf.EbpfEvent, eventWrapper *EventWrapper) { t.failedOperationExtraction.Add(1) state = operationNotFound } - if eventWrapper.TableName() == "UNKNOWN" { + if eventWrapper.Parameters() == "UNKNOWN" { t.failedTableNameExtraction.Add(1) if state == operationNotFound { state = tableAndOpNotFound diff --git a/pkg/network/usm/postgres_monitor_test.go b/pkg/network/usm/postgres_monitor_test.go index a74ef203f3721a..94a188ab4d31b7 100644 --- a/pkg/network/usm/postgres_monitor_test.go +++ b/pkg/network/usm/postgres_monitor_test.go @@ -478,7 +478,6 @@ func testDecoding(t *testing.T, isTLS bool) { }, isTLS) }, }, - // This test validates that the SHOW command is currently not supported. { name: "show command", preMonitorSetup: func(t *testing.T, ctx pgTestContext) { @@ -497,8 +496,8 @@ func testDecoding(t *testing.T, isTLS bool) { }, validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) { validatePostgres(t, monitor, map[string]map[postgres.Operation]int{ - "UNKNOWN": { - postgres.UnknownOP: adjustCount(1), + "search_path": { + postgres.ShowOP: adjustCount(1), }, }, isTLS) }, @@ -724,11 +723,106 @@ func validatePostgres(t *testing.T, monitor *Monitor, expectedStats map[string]m if hasTLSTag != tls { continue } - if _, ok := found[key.TableName]; !ok { - found[key.TableName] = make(map[postgres.Operation]int) + if _, ok := found[key.Parameters]; !ok { + found[key.Parameters] = make(map[postgres.Operation]int) } - found[key.TableName][key.Operation] += stats.Count + found[key.Parameters][key.Operation] += stats.Count } return reflect.DeepEqual(expectedStats, found) }, time.Second*5, time.Millisecond*100, "Expected to find a %v stats, instead captured %v", &expectedStats, &found) } + +func (s *postgresProtocolParsingSuite) TestExtractParameters() { + t := s.T() + + units := []struct { + name string + expected string + event ebpf.EbpfEvent + }{ + { + name: "query_size longer than the actual length of the content", + expected: "version and status", + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: createFragment([]byte("SHOW version and status")), + Original_query_size: 64, + }, + }, + }, + { + name: "query_size shorter than the actual length of the content", + expected: "param1 param2", + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: createFragment([]byte("SHOW param1 param2 param3")), + Original_query_size: 18, + }, + }, + }, + { + name: "the query has no parameters", + expected: postgres.EmptyParameters, + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: createFragment([]byte("SHOW ")), + Original_query_size: 10, + }, + }, + }, + { + name: "command has trailing zeros", + expected: "param", + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 'p', 'a', 'r', 'a', 'm', 0, 0, 0}, + Original_query_size: 13, + }, + }, + }, + { + name: "malformed command with wrong query_size", + expected: postgres.EmptyParameters, + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 0, 0, 'a', ' ', 'b', 'c', 0, 0, 0}, + Original_query_size: 14, + }, + }, + }, + { + name: "empty parameters with spaces and nils", + expected: postgres.EmptyParameters, + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 0, ' ', 0, ' ', 0, 0, 0}, + Original_query_size: 12, + }, + }, + }, + { + name: "parameters with control codes only", + expected: "\x01\x02\x03\x04\x05", + event: ebpf.EbpfEvent{ + Tx: ebpf.EbpfTx{ + Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 1, 2, 3, 4, 5}, + Original_query_size: 10, + }, + }, + }, + } + for _, unit := range units { + t.Run(unit.name, func(t *testing.T) { + e := postgres.NewEventWrapper(&unit.event) + require.NotNil(t, e) + e.Operation() + require.Equal(t, unit.expected, e.Parameters()) + }) + } +} + +func createFragment(fragment []byte) [ebpf.BufferSize]byte { + var b [ebpf.BufferSize]byte + copy(b[:], fragment) + return b +}