From 8df42a79ee0bf628258ea2a2502cf98ca9450f5b Mon Sep 17 00:00:00 2001 From: polebug Date: Fri, 25 Oct 2024 17:02:28 +0800 Subject: [PATCH] fix(monitor): avoid dirty data in indexes table --- .../dialer/postgres/client_partitioned.go | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/internal/database/dialer/postgres/client_partitioned.go b/internal/database/dialer/postgres/client_partitioned.go index 791ee131..5d871187 100644 --- a/internal/database/dialer/postgres/client_partitioned.go +++ b/internal/database/dialer/postgres/client_partitioned.go @@ -688,25 +688,15 @@ func (c *client) findFederatedIndexesPartitioned(ctx context.Context, query mode // deleteExpiredActivitiesPartitioned deletes expired activities. func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network network.Network, timestamp time.Time) error { var ( - batchSize = 1000 - dropActivitiesTables = make([]string, 0) - checkTablesTimestamp = []time.Time{timestamp} + batchSize = 1000 + dropActivitiesTableMap = make(map[string]struct{}, 0) + checkTablesTimestamp = []time.Time{timestamp} ) for i := 1; i <= 4; i++ { - dropActivitiesTables = append(dropActivitiesTables, c.buildActivitiesTableNames(network, timestamp.AddDate(0, -3*i, 0))) - checkTablesTimestamp = append(checkTablesTimestamp, timestamp.AddDate(0, -3*i, 0)) - } - - // Drop expired activities tables. - for _, name := range dropActivitiesTables { - zap.L().Info("dropping table", zap.String("table", name)) - - if err := c.database.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS "%s"`, name)).Error; err != nil { - zap.L().Error("failed to drop table", zap.Error(err), zap.String("table", name)) + dropActivitiesTableMap[c.buildActivitiesTableNames(network, timestamp.AddDate(0, -3*i, 0))] = struct{}{} - return fmt.Errorf("drop table: %w", err) - } + checkTablesTimestamp = append(checkTablesTimestamp, timestamp.AddDate(0, -3*i, 0)) } for _, checkTimestamp := range checkTablesTimestamp { @@ -714,11 +704,6 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network indexTable := c.buildIndexesTableNames(checkTimestamp) - activityTableExists, err := c.findPartitionTableExists(ctx, activityTable) - if err != nil { - return fmt.Errorf("find partition table exists: %w", err) - } - indexTableExists, err := c.findPartitionTableExists(ctx, indexTable) if err != nil { return fmt.Errorf("find partition table exists: %w", err) @@ -728,10 +713,12 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network continue } + _, dropActivity := dropActivitiesTableMap[activityTable] + zap.L().Info("deleting expired activities", zap.String("table", activityTable), zap.String("indexTable", indexTable)) for { - done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(activityTableExists, &activityTable, nil)) + done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(dropActivity, nil, &activityTable)) if err != nil { return fmt.Errorf("batch delete expired activities: %w", err) } @@ -740,6 +727,16 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network break } } + + if dropActivity { + zap.L().Info("dropping table", zap.String("table", activityTable)) + + if err := c.database.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS "%s"`, activityTable)).Error; err != nil { + zap.L().Error("failed to drop table", zap.Error(err), zap.String("table", activityTable)) + + return fmt.Errorf("drop table: %w", err) + } + } } return nil