Skip to content

Commit

Permalink
update sql aggregate to handle multiple indexes on sharded analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
sredny buitrago authored and sredny buitrago committed Nov 21, 2024
1 parent a372054 commit 1b36031
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
10 changes: 5 additions & 5 deletions pumps/sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("omit_index_creation set to true, omitting index creation..")
return nil
}

if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) {
indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName)
if !c.db.Migrator().HasIndex(tableName, indexName) {
createIndexFn := func(c *SQLAggregatePump) error {
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error
err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error
if err != nil {
c.log.Errorf("error creating index for table %s : %s", tableName, err.Error())
return err
Expand All @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.backgroundIndexCreated <- true
}

c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully")
c.log.Info("Index ", indexName, " for table ", tableName, " created successfully")

return nil
}
Expand All @@ -198,7 +198,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("Creating index for table ", tableName, "...")
return createIndexFn(c)
}
c.log.Info(newAggregatedIndexName, " already exists.")
c.log.Info(indexName, " already exists.")

return nil
}
Expand Down
45 changes: 43 additions & 2 deletions pumps/sql_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) {
assert.Equal(t, "sqlite", pmp.db.Dialector.Name())
assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable))

assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName))
indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName)
assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName))

// Checking with invalid type
cfg["type"] = "invalid"
Expand Down Expand Up @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) {
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background on sharded pump",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
pmp := &SQLAggregatePump{}
cfg := &SQLAggregatePumpConf{}
cfg.Type = "sqlite"
cfg.TableSharding = true
cfg.ConnectionString = ""
pmp.SQLConf = cfg

pmp.log = log.WithField("prefix", "sql-aggregate-pump")
dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf)
if errDialect != nil {
return nil
}
db, err := gorm.Open(dialect, &gorm.Config{
AutoEmbedd: true,
UseJSONTags: true,
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil
}
pmp.db = db

pmp.backgroundIndexCreated = make(chan bool, 1)

if err := pmp.ensureTable(tableName); err != nil {
return nil
}

return pmp
},
givenTableName: "test2",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created on non existing table, not background",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
Expand Down Expand Up @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
} else {
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName)
indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
} else {
Expand Down

0 comments on commit 1b36031

Please sign in to comment.