Skip to content

Commit

Permalink
fix(monitor): avoid dirty data in indexes table (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
polebug authored Oct 28, 2024
1 parent 278f4e9 commit 60b86af
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions internal/database/dialer/postgres/client_partitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,37 +688,22 @@ func (c *client) findFederatedIndexesPartitioned(ctx context.Context, query mode
// deleteExpiredActivitiesPartitioned deletes expired activities.
func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network network.Network, timestamp time.Time) error {
var (
batchSize = 1000
dropActivitiesTables = make([]string, 0)
checkTablesTimestamp = []time.Time{timestamp}
batchSize = 1000
dropActivitiesTableMap = make(map[string]struct{}, 0)
checkTablesTimestamp = []time.Time{timestamp}
)

for i := 1; i <= 4; i++ {
dropActivitiesTables = append(dropActivitiesTables, c.buildActivitiesTableNames(network, timestamp.AddDate(0, -3*i, 0)))
checkTablesTimestamp = append(checkTablesTimestamp, timestamp.AddDate(0, -3*i, 0))
}

// Drop expired activities tables.
for _, name := range dropActivitiesTables {
zap.L().Info("dropping table", zap.String("table", name))

if err := c.database.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS "%s"`, name)).Error; err != nil {
zap.L().Error("failed to drop table", zap.Error(err), zap.String("table", name))
dropActivitiesTableMap[c.buildActivitiesTableNames(network, timestamp.AddDate(0, -3*i, 0))] = struct{}{}

return fmt.Errorf("drop table: %w", err)
}
checkTablesTimestamp = append(checkTablesTimestamp, timestamp.AddDate(0, -3*i, 0))
}

for _, checkTimestamp := range checkTablesTimestamp {
activityTable := c.buildActivitiesTableNames(network, checkTimestamp)

indexTable := c.buildIndexesTableNames(checkTimestamp)

activityTableExists, err := c.findPartitionTableExists(ctx, activityTable)
if err != nil {
return fmt.Errorf("find partition table exists: %w", err)
}

indexTableExists, err := c.findPartitionTableExists(ctx, indexTable)
if err != nil {
return fmt.Errorf("find partition table exists: %w", err)
Expand All @@ -728,10 +713,12 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network
continue
}

_, dropActivity := dropActivitiesTableMap[activityTable]

zap.L().Info("deleting expired activities", zap.String("table", activityTable), zap.String("indexTable", indexTable))

for {
done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(activityTableExists, &activityTable, nil))
done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(dropActivity, nil, &activityTable))
if err != nil {
return fmt.Errorf("batch delete expired activities: %w", err)
}
Expand All @@ -740,6 +727,16 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network
break
}
}

if dropActivity {
zap.L().Info("dropping table", zap.String("table", activityTable))

if err := c.database.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS "%s"`, activityTable)).Error; err != nil {
zap.L().Error("failed to drop table", zap.Error(err), zap.String("table", activityTable))

return fmt.Errorf("drop table: %w", err)
}
}
}

return nil
Expand Down

0 comments on commit 60b86af

Please sign in to comment.