Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticketvote: Add cache fsck #1531

Merged
merged 9 commits into from
Oct 12, 2021
5 changes: 5 additions & 0 deletions politeiad/backendv2/tstorebe/plugins/ticketvote/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (p *ticketVotePlugin) invPath() string {
return filepath.Join(p.dataDir, filenameInventory)
}

// invRemove removes the ticketvote inventory from its respective path.
func (p *ticketVotePlugin) invRemove() error {
return os.RemoveAll(p.invPath())
}

// invGetLocked retrieves the inventory from disk. A new inventory is returned
// if one does not exist yet.
//
Expand Down
10 changes: 10 additions & 0 deletions politeiad/backendv2/tstorebe/plugins/ticketvote/submissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (p *ticketVotePlugin) submissionsCachePath(token []byte) (string, error) {
return filepath.Join(p.dataDir, fn), nil
}

// submissionsCacheRemove removes the cache from its path for the provided
// token.
func (p *ticketVotePlugin) submissionsCacheRemove(token []byte) error {
path, err := p.submissionsCachePath(token)
if err != nil {
return nil
}
return os.RemoveAll(path)
}

// submissionsCacheWithLock return the submissions list for a record token. If
// a submissions list does not exist for the token then an empty list will be
// returned.
Expand Down
273 changes: 248 additions & 25 deletions politeiad/backendv2/tstorebe/plugins/ticketvote/ticketvote.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package ticketvote

import (
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"sync"

Expand Down Expand Up @@ -212,31 +214,252 @@ func (p *ticketVotePlugin) Hook(h plugins.HookT, payload string) error {
func (p *ticketVotePlugin) Fsck(tokens [][]byte) error {
log.Tracef("ticketvote Fsck")

// Verify the coherency of the summaries cache. This can be
// accomplished by simply calling the summary() method on each
// record.

// Verify the coherency of the submissions cache. All records
// that have the vote metadata LinkTo field set must be included
// in the parent record submissions list.

// Verify the coherency of the cached inventory. This is done in
// multi steps.
//
// 1. For each record, store the token, vote status, and timestamp
// of the most recent vote status change.
//
// 2. Sort the stored records into vote status groups where each
// vote status group is ordered from oldest to newest using the
// timestamp of their most recent vote status change.
//
// 3. Add all tokens to the inventory. The tokens MUST be added
// by vote status from oldest to newest. Ongoing votes MUST
// be added to the inventory then updated using the inventory
// method that updates an entry to the started vote status.

// Audit all finished votes. This verifies that all cast votes
// use eligible tickets and that there are no duplicate votes.
// invEntry is a struct used to insert an entry on the ticketvote inventory
// cache.
type invEntry struct {
data entry

// timestamp holds the last vote status change timestamp, which is used
// to sort the records from oldest to newest.
timestamp int64
}

// Group inventory entries by their vote statuses and build RFP submissions
// list for every RFP parent record. While traversing the tokens list, for
// each record token, verify the coherency of the summaries cache and audit
// all cast votes against its eligible tickets.
var (
unauthorized = make([]*invEntry, 0, len(tokens))
authorized = make([]*invEntry, 0, len(tokens))
started = make([]*invEntry, 0, len(tokens))
finished = make([]*invEntry, 0, len(tokens))
approved = make([]*invEntry, 0, len(tokens))
rejected = make([]*invEntry, 0, len(tokens))
ineligible = make([]*invEntry, 0, len(tokens))

// rfps holds the submissions of all RFP parents.
rfps = make(map[string][]string, len(tokens)) // [parentToken][]childTokens
)

thi4go marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("Starting ticketvote fsck for %v records", len(tokens))

for _, t := range tokens {
// Get the partial record for each token.
r, err := p.tstore.RecordPartial(t, 0, nil, false)
if err != nil {
return err
}

// Skip ticketvote fsck if record state is unvetted.
if r.RecordMetadata.State == backend.StateUnvetted {
continue
}

// Decode vote metadata and build submissions map.
vmd, err := voteMetadataDecode(r.Files)
if err != nil {
return err
}
if vmd != nil && vmd.LinkTo != "" {
// Save RFP submissions to further check the coherency of the
// submissions cache of RFP parents.
rfps[vmd.LinkTo] = append(rfps[vmd.LinkTo],
hex.EncodeToString(t))
}

// Get best block for summary call.
bb, err := p.bestBlock()
if err != nil {
return err
}

// Get the vote summary for each record. The summary call checks if a
// cache entry exists for that record's vote summary and retrieves it.
// If it does not exist, it'll build the cache entry from scratch. This
// verifies the coherency of the summaries cache.
s, err := p.summary(t, bb)
if err != nil {
return err
}

// Create inventory entry for each record.
ie := &invEntry{
data: entry{
Token: hex.EncodeToString(t),
Status: s.Status,
EndHeight: s.EndBlockHeight,
},
}

// Set timestamp field and group tokens according to the record's vote
// status.
switch {
case s.Status == ticketvote.VoteStatusUnauthorized:
ie.timestamp = r.RecordMetadata.Timestamp
unauthorized = append(unauthorized, ie)
case s.Status == ticketvote.VoteStatusAuthorized:
// Get auth details blobs from tstore.
auths, err := p.auths(t)
if err != nil {
return err
}
// Search for latest authorize action timestamp.
for _, auth := range auths {
if ticketvote.AuthActionT(auth.Action) ==
ticketvote.AuthActionAuthorize {
ie.timestamp = auth.Timestamp
}
}
authorized = append(authorized, ie)
case s.Status == ticketvote.VoteStatusStarted:
ie.timestamp = int64(s.StartBlockHeight)
started = append(started, ie)
case s.Status == ticketvote.VoteStatusFinished:
ie.timestamp = int64(s.EndBlockHeight)
finished = append(finished, ie)
case s.Status == ticketvote.VoteStatusApproved:
ie.timestamp = int64(s.EndBlockHeight)
approved = append(approved, ie)
case s.Status == ticketvote.VoteStatusRejected:
ie.timestamp = int64(s.EndBlockHeight)
rejected = append(rejected, ie)
case s.Status == ticketvote.VoteStatusIneligible:
ie.timestamp = r.RecordMetadata.Timestamp
ineligible = append(ineligible, ie)
default:
return fmt.Errorf("invalid vote status for record %v",
ie.data.Token)
}

// Audit finished votes. This verifies that all cast votes use eligible
// tickets, and that no duplicate votes exist.

// Skip votes audit if record is unauthorized, authorized or ineligible.
if s.Status == ticketvote.VoteStatusUnauthorized ||
s.Status == ticketvote.VoteStatusAuthorized ||
s.Status == ticketvote.VoteStatusIneligible {
continue
}

// Get vote details for eligible tickets.
vd, err := p.voteDetails(t)
if err != nil {
return err
}

// Get vote results for all cast vote details.
vr, err := p.voteResults(t)
if err != nil {
return err
}

// Create map access for the eligible tickets.
eligibles := make(map[string]struct{}, len(vd.EligibleTickets))
for _, t := range vd.EligibleTickets {
thi4go marked this conversation as resolved.
Show resolved Hide resolved
eligibles[t] = struct{}{}
}

// Range through all cast votes and make sure it was cast by a eligible
// ticket.
for _, vote := range vr {
_, ok := eligibles[vote.Ticket]
if !ok {
return fmt.Errorf("vote was cast by a not eligible ticket %v"+
"on record %v", vote.Ticket, vote.Token)
}
}
}

log.Infof("%v records summaries cache verified", len(tokens))
log.Infof("%v records audited for eligible cast votes", len(tokens))

// Verify the coherency of the submissions cache.
for parentToken, submissions := range rfps {
bToken, err := hex.DecodeString(parentToken)
if err != nil {
return err
}
cache, err := p.submissionsCache(bToken)
if err != nil {
return err
}
// Check if every submission is contained in the cache.
bad := false
for _, s := range submissions {
_, ok := cache.Tokens[s]
if !ok {
bad = true
break
}
}
// Check if cache is bad and needs a rebuild.
if bad {
err := p.submissionsCacheRemove(bToken)
if err != nil {
return err
}
for _, s := range submissions {
err = p.submissionsCacheAdd(parentToken, s)
if err != nil {
return err
}
}
}
}

log.Infof("%v RFP parents submissions cache verified", len(rfps))

// Rebuild the ticketvote inventory cache.

// Sort each vote status group from oldest to newest.
thi4go marked this conversation as resolved.
Show resolved Hide resolved
sort.Slice(unauthorized, func(i, j int) bool {
return unauthorized[i].timestamp < unauthorized[j].timestamp
})
sort.Slice(authorized, func(i, j int) bool {
return authorized[i].timestamp < authorized[j].timestamp
})
sort.Slice(started, func(i, j int) bool {
return started[i].timestamp < started[j].timestamp
})
sort.Slice(finished, func(i, j int) bool {
return finished[i].timestamp < finished[j].timestamp
})
sort.Slice(approved, func(i, j int) bool {
return approved[i].timestamp < approved[j].timestamp
})
sort.Slice(rejected, func(i, j int) bool {
return rejected[i].timestamp < rejected[j].timestamp
})
sort.Slice(ineligible, func(i, j int) bool {
return ineligible[i].timestamp < ineligible[j].timestamp
})

// Delete ticketvote inventory cache before rebuilding.
err := p.invRemove()
if err != nil {
return err
}

// Add entries from all status groups to the ticketvote inventory.
entries := make([]*invEntry, 0, len(tokens))
entries = append(entries, unauthorized...)
entries = append(entries, authorized...)
entries = append(entries, started...)
entries = append(entries, finished...)
entries = append(entries, approved...)
entries = append(entries, rejected...)
entries = append(entries, ineligible...)
for _, entry := range entries {
if entry.data.Status == ticketvote.VoteStatusStarted {
p.inventoryAdd(entry.data.Token, ticketvote.VoteStatusAuthorized)
p.inventoryUpdateToStarted(entry.data.Token,
ticketvote.VoteStatusStarted, entry.data.EndHeight)
continue
}
p.inventoryAdd(entry.data.Token, entry.data.Status)
}

log.Infof("%v records added to the ticketvote inventory", len(entries))

return nil
}
Expand Down