Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.postgresql): Allow limiting of column name length #16041

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions plugins/outputs/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ to use them.
## Each entry consumes approximately 34 bytes of memory.
# tag_cache_size = 100000

## Cut column names at the given length to not exceed PostgreSQL's
## 'identifier length' limit (default: no limit)
## (see https://www.postgresql.org/docs/current/limits.html)
## Be careful to not create duplicate column names!
# column_name_length_limit = 0

## Enable & set the log level for the Postgres driver.
# log_level = "warn" # trace, debug, info, warn, error, none
```
Expand Down Expand Up @@ -189,6 +195,19 @@ Documentation on how to write templates can be found [sqltemplate docs][1]

[1]: https://pkg.go.dev/github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate

## Long Column Names

Postgres imposes a limit on the length of column identifiers, which can be found
in the [official docs](https://www.postgresql.org/docs/current/limits.html). By
default Telegraf does not enforce this limit as this limit can be modified on
the server side. Furthermore, cutting off column names could lead to collisions
if the columns are only different after the cut-off.

> [!WARNING]
> Make sure you will not cause column name collisions when setting
> `column_name_length_limit`! If in doubt, explicitly shorten the field and tag
> names using e.g. the regexp processor.

### Samples

#### TimescaleDB
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Postgresql struct {
Uint64Type string `toml:"uint64_type"`
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
TagCacheSize int `toml:"tag_cache_size"`
ColumnNameLenLimit int `toml:"column_name_length_limit"`
LogLevel string `toml:"log_level"`
Logger telegraf.Logger `toml:"-"`

Expand Down
236 changes: 236 additions & 0 deletions plugins/outputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,3 +976,239 @@ func TestStressConcurrencyIntegration(t *testing.T) {
}
}
}

func TestLongColumnNamesErrorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

// Setup the plugin
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Init())
require.NoError(t, p.Connect())

// Define the metric to send
metrics := []telegraf.Metric{
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
"value": 42,
},
time.Unix(0, 0).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
"value": 43,
},
time.Unix(0, 1).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
"value": 44,
},
time.Unix(0, 2).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
"value": 45,
},
time.Unix(0, 9).UTC(),
),
}
require.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))

// Check if the logging is restricted to once per field and all columns are
// mentioned
var longColLogErrs []string
for _, l := range p.Logger.logs {
msg := l.String()
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
}
}
excpectedLongColumns := []string{
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
}
require.ElementsMatch(t, excpectedLongColumns, longColLogErrs)

// Denote the expected data in the table
expected := []map[string]interface{}{
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
}

// Get the actual table data nd convert the time to a timestamp for
// easier comparison
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, len(expected))
for i, actual := range dump {
if raw, found := actual["time"]; found {
if t, ok := raw.(time.Time); ok {
actual["time"] = t.Unix()
}
}
require.EqualValues(t, expected[i], actual)
}
}

func TestLongColumnNamesClipIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

// Setup the plugin
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.ColumnNameLenLimit = 63
require.NoError(t, p.Init())
require.NoError(t, p.Connect())

// Define the metric to send
metrics := []telegraf.Metric{
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
"value": 42,
},
time.Unix(0, 0).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
"value": 43,
},
time.Unix(0, 1).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
"value": 44,
},
time.Unix(0, 2).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
"value": 45,
},
time.Unix(0, 9).UTC(),
),
}
require.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))

// Check if the logging is restricted to once per field and all columns are mentioned
var longColLogWarns []string
var longColLogErrs []string
for _, l := range p.Logger.logs {
msg := l.String()
if l.level == pgx.LogLevelWarn && strings.Contains(msg, "Limiting too long column name") {
longColLogWarns = append(longColLogWarns, strings.TrimPrefix(msg, "warn: Limiting too long column name: "))
continue
}
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
continue
}
}

excpectedLongColumns := []string{
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
}
require.ElementsMatch(t, excpectedLongColumns, longColLogWarns)
require.Empty(t, longColLogErrs)

// Denote the expected data in the table
expected := []map[string]interface{}{
{
"time": time.Unix(0, 0).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(42),
},
{
"time": time.Unix(0, 1).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(43),
},
{
"time": time.Unix(0, 2).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(44),
},
{
"time": time.Unix(0, 9).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
"value": int64(45),
},
{
"time": time.Unix(0, 0).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(42),
},
{
"time": time.Unix(0, 1).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(43),
},
{
"time": time.Unix(0, 2).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(44),
},
{
"time": time.Unix(0, 9).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
"value": int64(45),
},
}

// Get the actual table data nd convert the time to a timestamp for
// easier comparison
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, len(expected))
for i, actual := range dump {
if raw, found := actual["time"]; found {
if t, ok := raw.(time.Time); ok {
actual["time"] = t.Unix()
}
}
require.EqualValues(t, expected[i], actual)
}
}
6 changes: 6 additions & 0 deletions plugins/outputs/postgresql/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,11 @@
## Each entry consumes approximately 34 bytes of memory.
# tag_cache_size = 100000

## Cut column names at the given length to not exceed PostgreSQL's
## 'identifier length' limit (default: no limit)
## (see https://www.postgresql.org/docs/current/limits.html)
## Be careful to not create duplicate column names!
# column_name_length_limit = 0

## Enable & set the log level for the Postgres driver.
# log_level = "warn" # trace, debug, info, warn, error, none
25 changes: 21 additions & 4 deletions plugins/outputs/postgresql/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ type TableManager struct {
// map[tableName]map[columnName]utils.Column
tables map[string]*tableState
tablesMutex sync.Mutex

// Map to track which columns are already logged
loggedLongColumnWarn map[string]bool
loggedLongColumnErr map[string]bool
}

// NewTableManager returns an instance of the tables.Manager interface
// that can handle checking and updating the state of tables in the PG database.
func NewTableManager(postgresql *Postgresql) *TableManager {
return &TableManager{
Postgresql: postgresql,
tables: make(map[string]*tableState),
Postgresql: postgresql,
tables: make(map[string]*tableState),
loggedLongColumnWarn: make(map[string]bool),
loggedLongColumnErr: make(map[string]bool),
}
}

Expand Down Expand Up @@ -178,7 +184,15 @@ func (tm *TableManager) EnsureStructure(
// check that the missing columns are columns that can be added
addColumns := make([]utils.Column, 0, len(missingCols))
invalidColumns := make([]utils.Column, 0, len(missingCols))
for _, col := range missingCols {
for i, col := range missingCols {
if tm.ColumnNameLenLimit > 0 && len(col.Name) > tm.ColumnNameLenLimit {
if !tm.loggedLongColumnWarn[col.Name] {
tm.Postgresql.Logger.Warnf("Limiting too long column name: %q", col.Name)
tm.loggedLongColumnWarn[col.Name] = true
}
col.Name = col.Name[:tm.ColumnNameLenLimit]
missingCols[i] = col
}
if tm.validateColumnName(col.Name) {
addColumns = append(addColumns, col)
continue
Expand All @@ -187,7 +201,10 @@ func (tm *TableManager) EnsureStructure(
if col.Role == utils.TagColType {
return nil, fmt.Errorf("column name too long: %q", col.Name)
}
tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name)
if !tm.loggedLongColumnErr[col.Name] {
tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name)
tm.loggedLongColumnErr[col.Name] = true
}
invalidColumns = append(invalidColumns, col)
}

Expand Down
Loading