-
Notifications
You must be signed in to change notification settings - Fork 499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
exp/lighthorizon: Add thread-safe support for reading account list from FileBackend
.
#4422
Changes from 11 commits
c08fcad
895a775
db65892
b4d7f48
06df7c1
3a749c0
6a1de69
478f925
78181a0
34b9ba2
a59b4c8
d821995
35dc3b5
5eb47bd
91468ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,17 @@ | ||
package index | ||
|
||
import ( | ||
"bufio" | ||
"compress/gzip" | ||
"io" | ||
"io/fs" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
xdr3 "github.com/stellar/go-xdr/xdr3" | ||
"github.com/stellar/go/support/errors" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/xdr" | ||
) | ||
|
||
type FileBackend struct { | ||
|
@@ -37,8 +41,35 @@ func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) erro | |
func (s *FileBackend) FlushAccounts(accounts []string) error { | ||
path := filepath.Join(s.dir, "accounts") | ||
|
||
accountsString := strings.Join(accounts, "\n") | ||
return os.WriteFile(path, []byte(accountsString), fs.ModeDir|0755) | ||
f, err := os.OpenFile(path, os.O_CREATE| | ||
os.O_APPEND| // crucial! since we might flush from various sources | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is
Not sure I understand this. Why can't we just write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand it either 😆 after revisiting the old code, I don't think I can explain it correctly. It's possible that writes were getting interleaved (or just sloppy coding on my part), but I'm not sure. Have a look at d821995: it works just fine and the file looks normal (one address per line, clean). This whole PR history is nonsense 🤦♂️ This S/O thread suggests that Golang file writes are thread-safe, but we can make further guarantees by ensuring each write happens in a single |
||
os.O_WRONLY, | ||
0664) // rw-rw-r-- | ||
|
||
if err != nil { | ||
return errors.Wrapf(err, "failed to open account file at %s", path) | ||
} | ||
|
||
defer f.Close() | ||
|
||
for _, account := range accounts { | ||
muxed := xdr.MuxedAccount{} | ||
if err := muxed.SetAddress(account); err != nil { | ||
return errors.Wrapf(err, "failed to encode %s", account) | ||
} | ||
|
||
raw, err := muxed.MarshalBinary() | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to marshal %s", account) | ||
} | ||
|
||
_, err = f.Write(raw) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to write to %s", path) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *FileBackend) writeBatch(b *batch) error { | ||
|
@@ -122,7 +153,74 @@ func (s *FileBackend) Read(account string) (map[string]*CheckpointIndex, error) | |
} | ||
|
||
func (s *FileBackend) ReadAccounts() ([]string, error) { | ||
panic("TODO") | ||
path := filepath.Join(s.dir, "accounts") | ||
log.Debugf("Opening accounts list at %s", path) | ||
|
||
// We read the file in chunks with guarantees that we will always read on an | ||
// account boundary: | ||
// | ||
// Accounts w/o IDs are always 36 bytes (4-byte type, 32-byte pubkey) | ||
// Muxed accounts with IDs are always 44 bytes (36 + 8-byte ID) | ||
// | ||
// Thus, if we read 36*44=1584 bytes at a time, we are guaranteed to have a | ||
// complete set of accounts (no partial ones). | ||
const chunkSize = 36 * 44 * 4 // times 4 to make it a reasonable buffer | ||
|
||
f, err := os.Open(path) | ||
if os.IsNotExist(err) { | ||
return nil, err | ||
} else if err != nil { | ||
return nil, errors.Wrapf(err, "failed to read %s", path) | ||
} | ||
|
||
// We ballpark the capacity assuming all of the values being G-addresses. | ||
preallocationSize := chunkSize / 36 | ||
info, err := os.Stat(path) | ||
if err == nil { // we can still safely continue w/ errors | ||
// Note that this will never be too large, but may be too small. | ||
preallocationSize = int(info.Size()) / 36 | ||
} | ||
accounts := make([]string, 0, preallocationSize) | ||
|
||
// We don't use UnmarshalBinary here because we need to know how much of | ||
// the buffer was read for each account. | ||
reader := bufio.NewReaderSize(f, chunkSize) | ||
decoder := xdr3.NewDecoder(reader) | ||
|
||
for { | ||
// Since we can't decode an EOF error from the below `muxed.DecodeFrom` | ||
// call, we peek on the reader itself to see if we've reached EOF. | ||
if _, err := reader.Peek(1); err == io.EOF { | ||
break | ||
} // let later calls bubble up other errors | ||
|
||
muxed := xdr.MuxedAccount{} | ||
_, err := muxed.DecodeFrom(decoder) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to decode account") | ||
} | ||
|
||
account, err := muxed.GetAddress() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to get strkey") | ||
} | ||
|
||
accounts = append(accounts, account) | ||
} | ||
|
||
// The account list is very unlikely to be unique (especially if it was made | ||
// w/ parallel flushes), so let's ensure that that's the case. | ||
count := 0 | ||
accountMap := make(map[string]struct{}, len(accounts)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could the intermediate full size array of account strings be avoided by just put'ing to this map on ln 174 instead then iterate the de-dup'd map keys into string slice based on map len for a bit less thrash? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely brilliant point - done in 91468ec. |
||
for _, account := range accounts { | ||
if _, ok := accountMap[account]; !ok { | ||
accountMap[account] = struct{}{} | ||
accounts[count] = account // save memory: shove uniques to front | ||
count++ | ||
} | ||
} | ||
|
||
return accounts[:count], nil | ||
} | ||
|
||
func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package index | ||
|
||
import ( | ||
"math/rand" | ||
"testing" | ||
|
||
"github.com/stellar/go/keypair" | ||
"github.com/stellar/go/xdr" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSimpleFileStore(t *testing.T) { | ||
tmpDir := t.TempDir() | ||
|
||
// Create a large (beyond a single chunk) list of arbitrary accounts, some | ||
// regular and some muxed. | ||
accountList := make([]string, 123) | ||
for i, _ := range accountList { | ||
var err error | ||
var muxed xdr.MuxedAccount | ||
address := keypair.MustRandom().Address() | ||
|
||
if rand.Intn(2) == 1 { | ||
muxed, err = xdr.MuxedAccountFromAccountId(address, 12345678) | ||
require.NoErrorf(t, err, "shouldn't happen") | ||
} else { | ||
muxed = xdr.MustMuxedAddress(address) | ||
} | ||
|
||
accountList[i] = muxed.Address() | ||
} | ||
|
||
require.Len(t, accountList, 123) | ||
|
||
file, err := NewFileBackend(tmpDir, 1) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, file.FlushAccounts(accountList)) | ||
|
||
accounts, err := file.ReadAccounts() | ||
require.NoError(t, err) | ||
require.Equal(t, accountList, accounts) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean the file will only grow, and contain duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, but how else do we coordinate parallel (and partial) flushes? If you decide to flush indices to the filesystem every 512 accounts, say, then there is no way to guarantee a lack of dupes nor that you're done writing.
I added some code to return a unique list in 06df7c1, but I'm not sure how to prevent it from being generated in the first place. We could do something dumb like periodically read/unique-ify/flush it, or maybe store the list in memory at all times? But unsure. Ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlushAccounts()
could also do a "set union" instead of a direct append, e.g.Write(Union(ReadAccounts(), CurrentAccounts())
. This would slow down flushes but would ensure a unique and correct set of accounts on-disk.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried it out (and reverted it) in 34b9ba2 if you wanna have a look at that.