From c08fcad750288a500c2903e2aacf5cc53cc6b9e1 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 2 Jun 2022 13:39:27 -0700 Subject: [PATCH 01/14] Append over Write to deal w/ parallel writes: According to S/O, parallel writes are thread-safe despite it not being an explicit guarantee. This might be OS-specific, though? Cross that bridge if we ever get there... --- exp/lighthorizon/index/file.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 20f5e03970..482a29b007 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" ) @@ -37,8 +38,23 @@ func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) erro func (s *FileBackend) FlushAccounts(accounts []string) error { path := filepath.Join(s.dir, "accounts") - accountsString := strings.Join(accounts, "\n") - return os.WriteFile(path, []byte(accountsString), fs.ModeDir|0755) + f, err := os.OpenFile(path, os.O_CREATE| + os.O_APPEND| // crucial! since we might flush from various sources + os.O_WRONLY, + 0664) // rw-rw-r-- + + if err != nil { + return errors.Wrapf(err, "failed to open account file at %s", path) + } + + defer f.Close() + + accountsString := strings.Join(accounts, "\n") + "\n" // trailing newline + if _, err := f.Write([]byte(accountsString)); err != nil { + return errors.Wrapf(err, "writing to %s failed", path) + } + + return nil } func (s *FileBackend) writeBatch(b *batch) error { From 895a775210ab2d094f298af580fbe790a2bf25ce Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 2 Jun 2022 13:23:17 -0700 Subject: [PATCH 02/14] Add ability to read account list from file backend --- exp/lighthorizon/index/file.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 482a29b007..0c39b53ef9 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -2,6 +2,7 @@ package index import ( "compress/gzip" + "fmt" "io/fs" "os" "path/filepath" @@ -138,7 +139,21 @@ func (s *FileBackend) Read(account string) (map[string]*CheckpointIndex, error) } func (s *FileBackend) ReadAccounts() ([]string, error) { - panic("TODO") + path := filepath.Join(s.dir, "accounts") + log.Debugf("Opening accounts list at %s", path) + + // This file probably isn't insurmountably large (TODO: Confirm that), so we + // can probably read it all in one go. + buffer, err := os.ReadFile(path) + if os.IsNotExist(err) { + return nil, err + } else if err != nil { + return nil, errors.Wrapf(err, "failed to read %s", path) + } else if len(buffer) == 0 { + return nil, fmt.Errorf("account list at %s is empty", path) + } + + return strings.Split(string(buffer), "\n"), nil } func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { From db65892a415354dfa5c021814f0ddba660f97f1d Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 2 Jun 2022 16:16:40 -0700 Subject: [PATCH 03/14] Use XDR for accounts in the "list" file --- exp/lighthorizon/index/file.go | 58 ++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 0c39b53ef9..9cd0fd20cd 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -1,15 +1,17 @@ package index import ( + "bytes" "compress/gzip" "fmt" "io/fs" "os" "path/filepath" - "strings" + xdr3 "github.com/stellar/go-xdr/xdr3" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" ) type FileBackend struct { @@ -50,9 +52,21 @@ func (s *FileBackend) FlushAccounts(accounts []string) error { defer f.Close() - accountsString := strings.Join(accounts, "\n") + "\n" // trailing newline - if _, err := f.Write([]byte(accountsString)); err != nil { - return errors.Wrapf(err, "writing to %s failed", path) + for _, account := range accounts { + muxed := xdr.MuxedAccount{} + if err := muxed.SetAddress(account); err != nil { + return errors.Wrapf(err, "failed to encode %s", account) + } + + raw, err := muxed.MarshalBinary() + if err != nil { + return errors.Wrapf(err, "failed to marshal %s", account) + } + + _, err = f.Write(raw) + if err != nil { + return errors.Wrapf(err, "failed to write to %s", path) + } } return nil @@ -142,18 +156,48 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { path := filepath.Join(s.dir, "accounts") log.Debugf("Opening accounts list at %s", path) - // This file probably isn't insurmountably large (TODO: Confirm that), so we - // can probably read it all in one go. + // The entirety of pubnet accounts in a single file will consume ~400MB + // (according to Hubble + napkin math). That should never be done on a + // single machine anyway. Thus, if this file is insurmountably large to fit + // into memory, you should be splitting the work better. That means we can + // safely read it all in one go. buffer, err := os.ReadFile(path) if os.IsNotExist(err) { return nil, err } else if err != nil { return nil, errors.Wrapf(err, "failed to read %s", path) } else if len(buffer) == 0 { + // This is probably an error... We could also return os.ErrNotExist? return nil, fmt.Errorf("account list at %s is empty", path) } - return strings.Split(string(buffer), "\n"), nil + // The capacity here is ballparked based on all of the values being + // G-addresses (32 public key bytes) plus the key type (4 bytes). Note that + // this means it's never too large, but might be too small. + accounts := make([]string, 0, len(buffer)/36) + + // We don't use UnmarshalBinary here because we need to know how much of + // the file was read for each account. + reader := bytes.NewReader(buffer) + d := xdr3.NewDecoder(reader) + + for i := 0; i < len(buffer); { + muxed := xdr.MuxedAccount{} + readBytes, err := muxed.DecodeFrom(d) + if err != nil { + return nil, errors.Wrap(err, "failed to decode account") + } + + account, err := muxed.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get strkey") + } + + accounts = append(accounts, account) + i += readBytes + } + + return accounts, nil } func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { From b4d7f484dece09d8d82ddb16983ef2a81e570d3f Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 10:15:52 -0700 Subject: [PATCH 04/14] Use buffered reads w/ clean binary boundaries --- exp/lighthorizon/index/file.go | 78 ++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 9cd0fd20cd..c4d339ed96 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -3,7 +3,7 @@ package index import ( "bytes" "compress/gzip" - "fmt" + "io" "io/fs" "os" "path/filepath" @@ -156,45 +156,67 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { path := filepath.Join(s.dir, "accounts") log.Debugf("Opening accounts list at %s", path) - // The entirety of pubnet accounts in a single file will consume ~400MB - // (according to Hubble + napkin math). That should never be done on a - // single machine anyway. Thus, if this file is insurmountably large to fit - // into memory, you should be splitting the work better. That means we can - // safely read it all in one go. - buffer, err := os.ReadFile(path) + // We read the file in chunks with guarantees that we will always read on an + // account boundary: + // + // Accounts w/o IDs are always 36 bytes (4-byte type, 32-byte pubkey) + // Muxed accounts with IDs are always 44 bytes (36 + 8-byte ID) + // + // If we read 36*44=1584 bytes at a time, we are guaranteed to have a + // complete set of accounts (no partial buffer). Then, we bump this by 4 to + // read a sizeable amount into memory (the built-in buffered reader does + // 4096 bytes at a time). + // + // This keeps minimal file data in memory. + const chunkSize = 4 * 36 * 44 + + f, err := os.Open(path) if os.IsNotExist(err) { return nil, err } else if err != nil { return nil, errors.Wrapf(err, "failed to read %s", path) - } else if len(buffer) == 0 { - // This is probably an error... We could also return os.ErrNotExist? - return nil, fmt.Errorf("account list at %s is empty", path) } // The capacity here is ballparked based on all of the values being - // G-addresses (32 public key bytes) plus the key type (4 bytes). Note that - // this means it's never too large, but might be too small. - accounts := make([]string, 0, len(buffer)/36) + // G-addresses (32 public key bytes) plus the key type (4 bytes). + preallocationSize := chunkSize / 36 + info, err := os.Stat(path) + if err == nil { // we can still safely continue w/ errors + // Note that this will never be too large, but may be too small. + preallocationSize = int(info.Size()) / 36 + } + accounts := make([]string, 0, preallocationSize) - // We don't use UnmarshalBinary here because we need to know how much of - // the file was read for each account. - reader := bytes.NewReader(buffer) - d := xdr3.NewDecoder(reader) + for { + buffer := [chunkSize]byte{} + readBytes, err := f.Read(buffer[:]) - for i := 0; i < len(buffer); { - muxed := xdr.MuxedAccount{} - readBytes, err := muxed.DecodeFrom(d) - if err != nil { - return nil, errors.Wrap(err, "failed to decode account") + if err == io.EOF || readBytes <= 0 { + break + } else if err != nil { + return nil, errors.Wrapf(err, "failed reading %s", path) } - account, err := muxed.GetAddress() - if err != nil { - return nil, errors.Wrap(err, "failed to get strkey") + // We don't use UnmarshalBinary here because we need to know how much of + // the buffer was read for each account. + reader := bytes.NewReader(buffer[:readBytes]) + d := xdr3.NewDecoder(reader) + + for i := 0; i < readBytes; { + muxed := xdr.MuxedAccount{} + xdrBytesRead, err := muxed.DecodeFrom(d) + if err != nil { + return nil, errors.Wrap(err, "failed to decode account") + } + + account, err := muxed.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get strkey") + } + + accounts = append(accounts, account) + i += xdrBytesRead } - - accounts = append(accounts, account) - i += readBytes } return accounts, nil From 06df7c126f908320240a868d729ef15d386527f9 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 10:49:04 -0700 Subject: [PATCH 05/14] Make returned account list unique --- exp/lighthorizon/index/file.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index c4d339ed96..b3ba0e3d25 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -219,7 +219,19 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { } } - return accounts, nil + // The account list is very unlikely to be unique (especially if it was made + // w/ parallel flushes), so let's ensure that that's the case. + count := 0 + accountMap := make(map[string]struct{}, len(accounts)) + for _, account := range accounts { + if _, ok := accountMap[account]; !ok { + accountMap[account] = struct{}{} + accounts[count] = account // save memory: shove uniques to front + count++ + } + } + + return accounts[:count], nil } func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { From 3a749c083c913d551f8ed56767d167365b425167 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 10:49:23 -0700 Subject: [PATCH 06/14] Add test for account list on file backend --- exp/lighthorizon/index/file_test.go | 43 +++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 exp/lighthorizon/index/file_test.go diff --git a/exp/lighthorizon/index/file_test.go b/exp/lighthorizon/index/file_test.go new file mode 100644 index 0000000000..85abd4e5e8 --- /dev/null +++ b/exp/lighthorizon/index/file_test.go @@ -0,0 +1,43 @@ +package index + +import ( + "math/rand" + "testing" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/require" +) + +func TestSimpleFileStore(t *testing.T) { + tmpDir := t.TempDir() + + // Create a large (beyond a single chunk) list of arbitrary accounts, some + // regular and some muxed. + accountList := make([]string, 123) + for i, _ := range accountList { + var err error + var muxed xdr.MuxedAccount + address := keypair.MustRandom().Address() + + if rand.Intn(2) == 1 { + muxed, err = xdr.MuxedAccountFromAccountId(address, 12345678) + require.NoErrorf(t, err, "shouldn't happen") + } else { + muxed = xdr.MustMuxedAddress(address) + } + + accountList[i] = muxed.Address() + } + + require.Len(t, accountList, 123) + + file, err := NewFileBackend(tmpDir, 1) + require.NoError(t, err) + + require.NoError(t, file.FlushAccounts(accountList)) + + accounts, err := file.ReadAccounts() + require.NoError(t, err) + require.Equal(t, accountList, accounts) +} From 6a1de6969d6c9aa12a08a96c022ec633cf0f4d98 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 12:34:55 -0700 Subject: [PATCH 07/14] Use built-in buffered reader + peeks to check EOFs --- exp/lighthorizon/index/file.go | 57 ++++++++++++++-------------------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index b3ba0e3d25..49ddc1934e 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -1,7 +1,7 @@ package index import ( - "bytes" + "bufio" "compress/gzip" "io" "io/fs" @@ -162,13 +162,9 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { // Accounts w/o IDs are always 36 bytes (4-byte type, 32-byte pubkey) // Muxed accounts with IDs are always 44 bytes (36 + 8-byte ID) // - // If we read 36*44=1584 bytes at a time, we are guaranteed to have a - // complete set of accounts (no partial buffer). Then, we bump this by 4 to - // read a sizeable amount into memory (the built-in buffered reader does - // 4096 bytes at a time). - // - // This keeps minimal file data in memory. - const chunkSize = 4 * 36 * 44 + // Thus, if we read 36*44=1584 bytes at a time, we are guaranteed to have a + // complete set of accounts (no partial ones). + const chunkSize = 36 * 44 * 4 // times 4 to make it a reasonable buffer f, err := os.Open(path) if os.IsNotExist(err) { @@ -177,8 +173,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { return nil, errors.Wrapf(err, "failed to read %s", path) } - // The capacity here is ballparked based on all of the values being - // G-addresses (32 public key bytes) plus the key type (4 bytes). + // We ballpark the capacity assuming all of the values being G-addresses. preallocationSize := chunkSize / 36 info, err := os.Stat(path) if err == nil { // we can still safely continue w/ errors @@ -187,36 +182,32 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { } accounts := make([]string, 0, preallocationSize) - for { - buffer := [chunkSize]byte{} - readBytes, err := f.Read(buffer[:]) + // We don't use UnmarshalBinary here because we need to know how much of + // the buffer was read for each account. + reader := bufio.NewReaderSize(f, chunkSize) + decoder := xdr3.NewDecoder(reader) - if err == io.EOF || readBytes <= 0 { + for { + // Since we can't decode an EOF error from the below `muxed.DecodeFrom` + // call, we peek on the reader itself to see if we've reached EOF. + if _, err := reader.Peek(1); err == io.EOF { break } else if err != nil { return nil, errors.Wrapf(err, "failed reading %s", path) } - // We don't use UnmarshalBinary here because we need to know how much of - // the buffer was read for each account. - reader := bytes.NewReader(buffer[:readBytes]) - d := xdr3.NewDecoder(reader) - - for i := 0; i < readBytes; { - muxed := xdr.MuxedAccount{} - xdrBytesRead, err := muxed.DecodeFrom(d) - if err != nil { - return nil, errors.Wrap(err, "failed to decode account") - } - - account, err := muxed.GetAddress() - if err != nil { - return nil, errors.Wrap(err, "failed to get strkey") - } - - accounts = append(accounts, account) - i += xdrBytesRead + muxed := xdr.MuxedAccount{} + _, err := muxed.DecodeFrom(decoder) + if err != nil { + return nil, errors.Wrap(err, "failed to decode account") } + + account, err := muxed.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get strkey") + } + + accounts = append(accounts, account) } // The account list is very unlikely to be unique (especially if it was made From 478f9259f8787da16683cbabf43fdfedd4526688 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 12:38:08 -0700 Subject: [PATCH 08/14] Ignore non-EOFs --- exp/lighthorizon/index/file.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 49ddc1934e..26ef60df4f 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -192,9 +192,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { // call, we peek on the reader itself to see if we've reached EOF. if _, err := reader.Peek(1); err == io.EOF { break - } else if err != nil { - return nil, errors.Wrapf(err, "failed reading %s", path) - } + } // let later calls bubble up other errors muxed := xdr.MuxedAccount{} _, err := muxed.DecodeFrom(decoder) From 34b9ba2cc82fba745b44d50908ffbf49a85724b5 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 14:02:06 -0700 Subject: [PATCH 09/14] Do a set union w/ existing accounts on a flush --- exp/lighthorizon/index/file.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 26ef60df4f..fed6253b8d 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -38,21 +38,35 @@ func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) erro return parallelFlush(s.parallel, indexes, s.writeBatch) } +// FlushAccounts does a set union with the passed-in accounts and the existing +// on-disk accounts. func (s *FileBackend) FlushAccounts(accounts []string) error { - path := filepath.Join(s.dir, "accounts") - - f, err := os.OpenFile(path, os.O_CREATE| - os.O_APPEND| // crucial! since we might flush from various sources - os.O_WRONLY, - 0664) // rw-rw-r-- + existingAccounts, err := s.ReadAccounts() + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "failed to read existing accounts") + } + path := filepath.Join(s.dir, "accounts") + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0664) // rw-rw-r-- if err != nil { return errors.Wrapf(err, "failed to open account file at %s", path) } - defer f.Close() - for _, account := range accounts { + allAccounts := map[string]struct{}{} // keep a unique list + for i := 0; i < len(existingAccounts)+len(accounts); i++ { + var account string + if i < len(existingAccounts) { + account = existingAccounts[i] + } else { + account = accounts[i-len(existingAccounts)] + } + + if _, ok := allAccounts[account]; ok { + continue + } + allAccounts[account] = struct{}{} + muxed := xdr.MuxedAccount{} if err := muxed.SetAddress(account); err != nil { return errors.Wrapf(err, "failed to encode %s", account) From a59b4c81c96a13eac28efdee513b714e119fa58b Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 6 Jun 2022 14:02:12 -0700 Subject: [PATCH 10/14] Revert "Do a set union w/ existing accounts on a flush" This reverts commit 34b9ba2cc82fba745b44d50908ffbf49a85724b5. --- exp/lighthorizon/index/file.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index fed6253b8d..26ef60df4f 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -38,35 +38,21 @@ func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) erro return parallelFlush(s.parallel, indexes, s.writeBatch) } -// FlushAccounts does a set union with the passed-in accounts and the existing -// on-disk accounts. func (s *FileBackend) FlushAccounts(accounts []string) error { - existingAccounts, err := s.ReadAccounts() - if err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "failed to read existing accounts") - } - path := filepath.Join(s.dir, "accounts") - f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0664) // rw-rw-r-- + + f, err := os.OpenFile(path, os.O_CREATE| + os.O_APPEND| // crucial! since we might flush from various sources + os.O_WRONLY, + 0664) // rw-rw-r-- + if err != nil { return errors.Wrapf(err, "failed to open account file at %s", path) } - defer f.Close() - allAccounts := map[string]struct{}{} // keep a unique list - for i := 0; i < len(existingAccounts)+len(accounts); i++ { - var account string - if i < len(existingAccounts) { - account = existingAccounts[i] - } else { - account = accounts[i-len(existingAccounts)] - } - - if _, ok := allAccounts[account]; ok { - continue - } - allAccounts[account] = struct{}{} + defer f.Close() + for _, account := range accounts { muxed := xdr.MuxedAccount{} if err := muxed.SetAddress(account); err != nil { return errors.Wrapf(err, "failed to encode %s", account) From d8219958d02d7a2b6be067a1ddd673065948befa Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 7 Jun 2022 11:34:49 -0700 Subject: [PATCH 11/14] Okay yeah I think I just confused myself --- .../index/cmd/batch/reduce/main.go | 2 +- exp/lighthorizon/index/file.go | 58 ++++--------------- 2 files changed, 12 insertions(+), 48 deletions(-) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index 5c96d071b0..0fa871d38d 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -278,7 +278,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { if os.IsNotExist(err) { continue } else if err != nil { - logger.WithError(err).Error("Error reading tx prefix %s", prefix) + logger.WithError(err).Errorf("Error reading tx prefix %s", prefix) panic(err) } diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 26ef60df4f..80278be86d 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -8,10 +8,8 @@ import ( "os" "path/filepath" - xdr3 "github.com/stellar/go-xdr/xdr3" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" - "github.com/stellar/go/xdr" ) type FileBackend struct { @@ -52,21 +50,11 @@ func (s *FileBackend) FlushAccounts(accounts []string) error { defer f.Close() + // We write one account at a time because writes that occur within a single + // `write()` syscall are thread-safe. A larger write might be split into + // many calls and thus get interleaved, so we play it safe. for _, account := range accounts { - muxed := xdr.MuxedAccount{} - if err := muxed.SetAddress(account); err != nil { - return errors.Wrapf(err, "failed to encode %s", account) - } - - raw, err := muxed.MarshalBinary() - if err != nil { - return errors.Wrapf(err, "failed to marshal %s", account) - } - - _, err = f.Write(raw) - if err != nil { - return errors.Wrapf(err, "failed to write to %s", path) - } + f.Write([]byte(account + "\n")) } return nil @@ -156,16 +144,6 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { path := filepath.Join(s.dir, "accounts") log.Debugf("Opening accounts list at %s", path) - // We read the file in chunks with guarantees that we will always read on an - // account boundary: - // - // Accounts w/o IDs are always 36 bytes (4-byte type, 32-byte pubkey) - // Muxed accounts with IDs are always 44 bytes (36 + 8-byte ID) - // - // Thus, if we read 36*44=1584 bytes at a time, we are guaranteed to have a - // complete set of accounts (no partial ones). - const chunkSize = 36 * 44 * 4 // times 4 to make it a reasonable buffer - f, err := os.Open(path) if os.IsNotExist(err) { return nil, err @@ -174,7 +152,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { } // We ballpark the capacity assuming all of the values being G-addresses. - preallocationSize := chunkSize / 36 + preallocationSize := 56 * 100 // default to 100 lines info, err := os.Stat(path) if err == nil { // we can still safely continue w/ errors // Note that this will never be too large, but may be too small. @@ -182,30 +160,16 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { } accounts := make([]string, 0, preallocationSize) - // We don't use UnmarshalBinary here because we need to know how much of - // the buffer was read for each account. - reader := bufio.NewReaderSize(f, chunkSize) - decoder := xdr3.NewDecoder(reader) - + // // We don't use UnmarshalBinary here because we need to know how much of + // // the buffer was read for each account. + reader := bufio.NewReaderSize(f, 56*10) for { - // Since we can't decode an EOF error from the below `muxed.DecodeFrom` - // call, we peek on the reader itself to see if we've reached EOF. - if _, err := reader.Peek(1); err == io.EOF { + line, err := reader.ReadString(byte('\n')) + if err == io.EOF { break - } // let later calls bubble up other errors - - muxed := xdr.MuxedAccount{} - _, err := muxed.DecodeFrom(decoder) - if err != nil { - return nil, errors.Wrap(err, "failed to decode account") - } - - account, err := muxed.GetAddress() - if err != nil { - return nil, errors.Wrap(err, "failed to get strkey") } - accounts = append(accounts, account) + accounts = append(accounts, line[:len(line)-1]) // trim newline } // The account list is very unlikely to be unique (especially if it was made From 35dc3b5c94c45fae0bf8766e387ad9ff6251eb7f Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 7 Jun 2022 11:41:24 -0700 Subject: [PATCH 12/14] Use a constant --- exp/lighthorizon/index/file.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 80278be86d..60027a1c76 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -152,17 +152,18 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { } // We ballpark the capacity assuming all of the values being G-addresses. - preallocationSize := 56 * 100 // default to 100 lines + const gAddressSize = 56 + preallocationSize := 100 * gAddressSize // default to 100 lines info, err := os.Stat(path) if err == nil { // we can still safely continue w/ errors // Note that this will never be too large, but may be too small. - preallocationSize = int(info.Size()) / 36 + preallocationSize = int(info.Size()) / gAddressSize } accounts := make([]string, 0, preallocationSize) // // We don't use UnmarshalBinary here because we need to know how much of // // the buffer was read for each account. - reader := bufio.NewReaderSize(f, 56*10) + reader := bufio.NewReaderSize(f, 100*gAddressSize) // reasonable buffer size for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { From 5eb47bd12be742429d096bda6903e9acc7f10f86 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 7 Jun 2022 15:04:32 -0700 Subject: [PATCH 13/14] Fix error handling, off-by-one --- exp/lighthorizon/index/file.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index 60027a1c76..acb2dc414c 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -145,29 +145,30 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { log.Debugf("Opening accounts list at %s", path) f, err := os.Open(path) - if os.IsNotExist(err) { - return nil, err - } else if err != nil { - return nil, errors.Wrapf(err, "failed to read %s", path) + if err != nil { + return nil, errors.Wrapf(err, "failed to open %s", path) } - // We ballpark the capacity assuming all of the values being G-addresses. const gAddressSize = 56 + + // We ballpark the capacity assuming all of the values being G-addresses. preallocationSize := 100 * gAddressSize // default to 100 lines info, err := os.Stat(path) if err == nil { // we can still safely continue w/ errors // Note that this will never be too large, but may be too small. - preallocationSize = int(info.Size()) / gAddressSize + preallocationSize = int(info.Size()) / (gAddressSize + 1) // +1 for \n } accounts := make([]string, 0, preallocationSize) - // // We don't use UnmarshalBinary here because we need to know how much of - // // the buffer was read for each account. + // We don't use UnmarshalBinary here because we need to know how much of the + // buffer was read for each account. reader := bufio.NewReaderSize(f, 100*gAddressSize) // reasonable buffer size for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { break + } else if err != nil { + return accounts, errors.Wrapf(err, "failed to read %s", path) } accounts = append(accounts, line[:len(line)-1]) // trim newline From 91468eccecbb40824a6dc305c8263963e4f89461 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 9 Jun 2022 16:54:26 -0700 Subject: [PATCH 14/14] Combine building unique list while reading :brain: --- exp/lighthorizon/index/file.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/file.go index acb2dc414c..530c089582 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/file.go @@ -158,6 +158,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { // Note that this will never be too large, but may be too small. preallocationSize = int(info.Size()) / (gAddressSize + 1) // +1 for \n } + accountMap := make(map[string]struct{}, preallocationSize) accounts := make([]string, 0, preallocationSize) // We don't use UnmarshalBinary here because we need to know how much of the @@ -171,22 +172,17 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { return accounts, errors.Wrapf(err, "failed to read %s", path) } - accounts = append(accounts, line[:len(line)-1]) // trim newline - } + account := line[:len(line)-1] // trim newline - // The account list is very unlikely to be unique (especially if it was made - // w/ parallel flushes), so let's ensure that that's the case. - count := 0 - accountMap := make(map[string]struct{}, len(accounts)) - for _, account := range accounts { + // The account list is very unlikely to be unique (especially if it was made + // w/ parallel flushes), so let's ensure that that's the case. if _, ok := accountMap[account]; !ok { accountMap[account] = struct{}{} - accounts[count] = account // save memory: shove uniques to front - count++ + accounts = append(accounts, account) } } - return accounts[:count], nil + return accounts, nil } func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) {