diff --git a/exporter/rocketpool.go b/exporter/rocketpool.go index 05eabb370f..42893782ce 100644 --- a/exporter/rocketpool.go +++ b/exporter/rocketpool.go @@ -21,6 +21,7 @@ import ( _ "github.com/jackc/pgx/v4/stdlib" "github.com/jmoiron/sqlx" "github.com/klauspost/compress/zstd" + "github.com/lib/pq" rpDAO "github.com/rocket-pool/rocketpool-go/dao" rpDAOTrustedNode "github.com/rocket-pool/rocketpool-go/dao/trustednode" "github.com/rocket-pool/rocketpool-go/minipool" @@ -1117,6 +1118,7 @@ func (rp *RocketpoolExporter) SaveDAOMembers() error { valueStrings := make([]string, 0, batchSize) valueArgs := make([]interface{}, 0, batchSize*nArgs) + addresses := make([][]byte, 0, batchSize) for i, d := range data[start:end] { for j := 0; j < nArgs; j++ { valueStringsArgs[j] = i*nArgs + j + 1 @@ -1130,12 +1132,39 @@ func (rp *RocketpoolExporter) SaveDAOMembers() error { valueArgs = append(valueArgs, d.LastProposalTime) valueArgs = append(valueArgs, d.RPLBondAmount.String()) valueArgs = append(valueArgs, d.UnbondedValidatorCount) + addresses = append(addresses, d.Address) } - stmt := fmt.Sprintf(`insert into rocketpool_dao_members (rocketpool_storage_address, address, id, url, joined_time, last_proposal_time, rpl_bond_amount, unbonded_validator_count) values %s on conflict (rocketpool_storage_address, address) do update set id = excluded.id, url = excluded.url, joined_time = excluded.joined_time, last_proposal_time = excluded.last_proposal_time, rpl_bond_amount = excluded.rpl_bond_amount, unbonded_validator_count = excluded.unbonded_validator_count`, strings.Join(valueStrings, ",")) + stmt := fmt.Sprintf(` + INSERT INTO rocketpool_dao_members ( + rocketpool_storage_address, + address, + id, + url, + joined_time, + last_proposal_time, + rpl_bond_amount, + unbonded_validator_count + ) + values %s + on conflict (rocketpool_storage_address, address) do update set + id = excluded.id, + url = excluded.url, + joined_time = excluded.joined_time, + last_proposal_time = excluded.last_proposal_time, + rpl_bond_amount = excluded.rpl_bond_amount, + unbonded_validator_count = excluded.unbonded_validator_count + `, strings.Join(valueStrings, ",")) _, err := tx.Exec(stmt, valueArgs...) if err != nil { return fmt.Errorf("error inserting into rocketpool_dao_members: %w", err) } + + _, err = tx.Exec(` + DELETE FROM rocketpool_dao_members + WHERE NOT address = ANY($1)`, pq.ByteaArray(addresses)) + if err != nil { + return fmt.Errorf("error deleting from rocketpool_dao_members: %w", err) + } } return tx.Commit()