Skip to content

Commit

Permalink
exp/lighthorizon, xdr: Rename CheckpointIndex to better reflect its…
Browse files Browse the repository at this point in the history
… capabilty. (#4510)

* Rename NextActive -> NextActiveBit to be descriptive
  • Loading branch information
Shaptic authored Aug 8, 2022
1 parent 4a82936 commit 828fc46
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 185 deletions.
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func readGzippedFrom(r io.Reader) (types.NamedIndices, int64, error) {
return nil, n, err
}

ind, err := types.NewCheckpointIndex(buf.Bytes())
ind, err := types.NewBitmapIndex(buf.Bytes())
if err != nil {
return nil, n, err
}
Expand Down
4 changes: 2 additions & 2 deletions exp/lighthorizon/index/backend/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
)

func TestGzipRoundtrip(t *testing.T) {
index := &types.CheckpointIndex{}
anotherIndex := &types.CheckpointIndex{}
index := &types.BitmapIndex{}
anotherIndex := &types.BitmapIndex{}
for i := 0; i < 100+rand.Intn(1000); i++ {
index.SetActive(uint32(rand.Intn(10_000)))
anotherIndex.SetActive(uint32(rand.Intn(10_000)))
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
if n == 0 {
return nil, os.ErrNotExist
}
var indexes map[string]*types.CheckpointIndex
var indexes map[string]*types.BitmapIndex
indexes, _, err = readGzippedFrom(bytes.NewReader(b.Bytes()))
if err != nil {
log.Errorf("Unable to parse %s: %v", account, err)
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) boo

// For every index that exists in `dest`, finds the corresponding index in
// `source` and merges it into `dest`'s version.
func mergeIndices(dest, source map[string]*types.CheckpointIndex) error {
func mergeIndices(dest, source map[string]*types.BitmapIndex) error {
for name, index := range dest {
// The source doesn't contain this particular index.
//
Expand Down
10 changes: 5 additions & 5 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ func (s *store) AddParticipantsToIndexesNoBackend(checkpoint uint32, index strin
var err error
for _, participant := range participants {
if _, ok := s.indexes[participant]; !ok {
s.indexes[participant] = map[string]*types.CheckpointIndex{}
s.indexes[participant] = map[string]*types.BitmapIndex{}
}

ind, ok := s.indexes[participant][index]
if !ok {
ind = &types.CheckpointIndex{}
ind = &types.BitmapIndex{}
s.indexes[participant][index] = ind
}

Expand Down Expand Up @@ -269,7 +269,7 @@ func (s *store) AddParticipantsToIndexes(checkpoint uint32, index string, partic
return nil
}

func (s *store) getCreateIndex(account, id string) (*types.CheckpointIndex, error) {
func (s *store) getCreateIndex(account, id string) (*types.BitmapIndex, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
defer s.approximateWorkingSet()
Expand All @@ -295,7 +295,7 @@ func (s *store) getCreateIndex(account, id string) (*types.CheckpointIndex, erro
ind, ok = accountIndexes[id]
if !ok {
// Not found anywhere, make a new one.
ind = &types.CheckpointIndex{}
ind = &types.BitmapIndex{}
accountIndexes[id] = ind
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *store) NextActive(account, indexId string, afterCheckpoint uint32) (uin
if err != nil {
return 0, err
}
return ind.NextActive(afterCheckpoint)
return ind.NextActiveBit(afterCheckpoint)
}

func (s *store) getCreateTrieIndex(prefix string) (*types.TrieIndex, error) {
Expand Down
164 changes: 82 additions & 82 deletions exp/lighthorizon/index/types/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,104 @@ import (
"github.com/stellar/go/xdr"
)

const CheckpointIndexVersion = 1
const BitmapIndexVersion = 1

type CheckpointIndex struct {
mutex sync.RWMutex
bitmap []byte
firstCheckpoint uint32
lastCheckpoint uint32
type BitmapIndex struct {
mutex sync.RWMutex
bitmap []byte
firstBit uint32
lastBit uint32
}

type NamedIndices map[string]*CheckpointIndex
type NamedIndices map[string]*BitmapIndex

func NewCheckpointIndex(b []byte) (*CheckpointIndex, error) {
xdrCheckpoint := xdr.CheckpointIndex{}
err := xdrCheckpoint.UnmarshalBinary(b)
func NewBitmapIndex(b []byte) (*BitmapIndex, error) {
xdrBitmap := xdr.BitmapIndex{}
err := xdrBitmap.UnmarshalBinary(b)
if err != nil {
return nil, err
}

return NewCheckpointIndexFromXDR(xdrCheckpoint), nil
return NewBitmapIndexFromXDR(xdrBitmap), nil
}

func NewCheckpointIndexFromXDR(index xdr.CheckpointIndex) *CheckpointIndex {
return &CheckpointIndex{
bitmap: index.Bitmap[:],
firstCheckpoint: uint32(index.FirstCheckpoint),
lastCheckpoint: uint32(index.LastCheckpoint),
func NewBitmapIndexFromXDR(index xdr.BitmapIndex) *BitmapIndex {
return &BitmapIndex{
bitmap: index.Bitmap[:],
firstBit: uint32(index.FirstBit),
lastBit: uint32(index.LastBit),
}
}

func (i *CheckpointIndex) Size() int {
func (i *BitmapIndex) Size() int {
return len(i.bitmap)
}

func (i *CheckpointIndex) SetActive(checkpoint uint32) error {
func (i *BitmapIndex) SetActive(index uint32) error {
i.mutex.Lock()
defer i.mutex.Unlock()
return i.setActive(checkpoint)
return i.setActive(index)
}

func bitShiftLeft(checkpoint uint32) byte {
if checkpoint%8 == 0 {
func bitShiftLeft(index uint32) byte {
if index%8 == 0 {
return 1
} else {
return byte(1) << (8 - checkpoint%8)
return byte(1) << (8 - index%8)
}
}

func (i *CheckpointIndex) rangeFirstCheckpoint() uint32 {
return (i.firstCheckpoint-1)/8*8 + 1
func (i *BitmapIndex) rangeFirstBit() uint32 {
return (i.firstBit-1)/8*8 + 1
}

func (i *CheckpointIndex) rangeLastCheckpoint() uint32 {
return i.rangeFirstCheckpoint() + uint32(len(i.bitmap))*8 - 1
func (i *BitmapIndex) rangeLastBit() uint32 {
return i.rangeFirstBit() + uint32(len(i.bitmap))*8 - 1
}

func (i *CheckpointIndex) setActive(checkpoint uint32) error {
if i.firstCheckpoint == 0 {
i.firstCheckpoint = checkpoint
i.lastCheckpoint = checkpoint
b := bitShiftLeft(checkpoint)
func (i *BitmapIndex) setActive(index uint32) error {
if i.firstBit == 0 {
i.firstBit = index
i.lastBit = index
b := bitShiftLeft(index)
i.bitmap = []byte{b}
} else {
if checkpoint >= i.rangeFirstCheckpoint() && checkpoint <= i.rangeLastCheckpoint() {
if index >= i.rangeFirstBit() && index <= i.rangeLastBit() {
// Update the bit in existing range
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
i.bitmap[loc] = i.bitmap[loc] | b

if checkpoint < i.firstCheckpoint {
i.firstCheckpoint = checkpoint
if index < i.firstBit {
i.firstBit = index
}
if checkpoint > i.lastCheckpoint {
i.lastCheckpoint = checkpoint
if index > i.lastBit {
i.lastBit = index
}
} else {
// Expand the bitmap
if checkpoint < i.rangeFirstCheckpoint() {
if index < i.rangeFirstBit() {
// ...to the left
c := (i.rangeFirstCheckpoint() - checkpoint) / 8
if (i.rangeFirstCheckpoint()-checkpoint)%8 != 0 {
c := (i.rangeFirstBit() - index) / 8
if (i.rangeFirstBit()-index)%8 != 0 {
c++
}
newBytes := make([]byte, c)
i.bitmap = append(newBytes, i.bitmap...)

b := bitShiftLeft(checkpoint)
b := bitShiftLeft(index)
i.bitmap[0] = i.bitmap[0] | b

i.firstCheckpoint = checkpoint
} else if checkpoint > i.rangeLastCheckpoint() {
i.firstBit = index
} else if index > i.rangeLastBit() {
// ... to the right
newBytes := make([]byte, (checkpoint-i.rangeLastCheckpoint())/8+1)
newBytes := make([]byte, (index-i.rangeLastBit())/8+1)
i.bitmap = append(i.bitmap, newBytes...)
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
i.bitmap[loc] = i.bitmap[loc] | b

i.lastCheckpoint = checkpoint
i.lastBit = index
}
}
}
Expand All @@ -114,30 +114,30 @@ func (i *CheckpointIndex) setActive(checkpoint uint32) error {
}

//lint:ignore U1000 Ignore unused function temporarily
func (i *CheckpointIndex) isActive(checkpoint uint32) bool {
if checkpoint >= i.firstCheckpoint && checkpoint <= i.lastCheckpoint {
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
func (i *BitmapIndex) isActive(index uint32) bool {
if index >= i.firstBit && index <= i.lastBit {
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
return i.bitmap[loc]&b != 0
} else {
return false
}
}

func (i *CheckpointIndex) iterate(f func(checkpoint uint32)) error {
func (i *BitmapIndex) iterate(f func(index uint32)) error {
i.mutex.RLock()
defer i.mutex.RUnlock()

if i.firstCheckpoint == 0 {
if i.firstBit == 0 {
return nil
}

f(i.firstCheckpoint)
curr := i.firstCheckpoint
f(i.firstBit)
curr := i.firstBit

for {
var err error
curr, err = i.nextActive(curr + 1)
curr, err = i.nextActiveBit(curr + 1)
if err != nil {
if err == io.EOF {
break
Expand All @@ -151,55 +151,55 @@ func (i *CheckpointIndex) iterate(f func(checkpoint uint32)) error {
return nil
}

func (i *CheckpointIndex) Merge(other *CheckpointIndex) error {
func (i *BitmapIndex) Merge(other *BitmapIndex) error {
i.mutex.Lock()
defer i.mutex.Unlock()

var err error
other.iterate(func(checkpoint uint32) {
other.iterate(func(index uint32) {
if err != nil {
return
}
err = i.setActive(checkpoint)
err = i.setActive(index)
})

return err
}

// NextActive returns the next checkpoint (inclusive) where this index is
// active. "Inclusive" means that if the index is active at `checkpoint`, this
// returns `checkpoint`.
func (i *CheckpointIndex) NextActive(checkpoint uint32) (uint32, error) {
// NextActiveBit returns the next bit position (inclusive) where this index is
// active. "Inclusive" means that if it's already active at `position`, this
// returns `position`.
func (i *BitmapIndex) NextActiveBit(position uint32) (uint32, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
return i.nextActive(checkpoint)
return i.nextActiveBit(position)
}

func (i *CheckpointIndex) nextActive(checkpoint uint32) (uint32, error) {
if i.firstCheckpoint == 0 || checkpoint > i.lastCheckpoint {
func (i *BitmapIndex) nextActiveBit(position uint32) (uint32, error) {
if i.firstBit == 0 || position > i.lastBit {
// We're past the end.
// TODO: Should this be an error? or how should we signal NONE here?
return 0, io.EOF
}

if checkpoint < i.firstCheckpoint {
checkpoint = i.firstCheckpoint
if position < i.firstBit {
position = i.firstBit
}

// Must be within the range, find the first non-zero after our start
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
loc := (position - i.rangeFirstBit()) / 8

// Is it in the same byte?
if shift, ok := maxBitAfter(i.bitmap[loc], (checkpoint-1)%8); ok {
return i.rangeFirstCheckpoint() + (loc * 8) + shift, nil
if shift, ok := maxBitAfter(i.bitmap[loc], (position-1)%8); ok {
return i.rangeFirstBit() + (loc * 8) + shift, nil
}

// Scan bytes after
loc++
for ; loc < uint32(len(i.bitmap)); loc++ {
// Find the offset of the set bit
if shift, ok := maxBitAfter(i.bitmap[loc], 0); ok {
return i.rangeFirstCheckpoint() + (loc * 8) + shift, nil
return i.rangeFirstBit() + (loc * 8) + shift, nil
}
}

Expand All @@ -223,30 +223,30 @@ func maxBitAfter(b byte, after uint32) (uint32, bool) {
return 0, false
}

func (i *CheckpointIndex) ToXDR() xdr.CheckpointIndex {
func (i *BitmapIndex) ToXDR() xdr.BitmapIndex {
i.mutex.RLock()
defer i.mutex.RUnlock()

return xdr.CheckpointIndex{
FirstCheckpoint: xdr.Uint32(i.firstCheckpoint),
LastCheckpoint: xdr.Uint32(i.lastCheckpoint),
Bitmap: i.bitmap,
return xdr.BitmapIndex{
FirstBit: xdr.Uint32(i.firstBit),
LastBit: xdr.Uint32(i.lastBit),
Bitmap: i.bitmap,
}
}

func (i *CheckpointIndex) Buffer() *bytes.Buffer {
func (i *BitmapIndex) Buffer() *bytes.Buffer {
i.mutex.RLock()
defer i.mutex.RUnlock()

xdrCheckpoint := i.ToXDR()
b, err := xdrCheckpoint.MarshalBinary()
xdrBitmap := i.ToXDR()
b, err := xdrBitmap.MarshalBinary()
if err != nil {
panic(err)
}
return bytes.NewBuffer(b)
}

// Flush flushes the index data to byte slice in index format.
func (i *CheckpointIndex) Flush() []byte {
func (i *BitmapIndex) Flush() []byte {
return i.Buffer().Bytes()
}
Loading

0 comments on commit 828fc46

Please sign in to comment.