Skip to content

Commit

Permalink
Use block lease manager to open correct volume in seeker
Browse files Browse the repository at this point in the history
  • Loading branch information
justinjc committed Jun 19, 2019
1 parent ead0499 commit 40f98f1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 40 deletions.
58 changes: 35 additions & 23 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ const (
snapshotDirName = "snapshots"
commitLogsDirName = "commitlogs"

// The maximum number of delimeters ('-' or '.') that is expected in a
// (base) filename.
maxDelimNum = 4

// The volume index assigned to (legacy) filesets that don't have a volume
// number in their filename.
// NOTE: Since this index is the same as the index for the first
Expand Down Expand Up @@ -411,30 +415,14 @@ func (a fileSetFilesByTimeAndVolumeIndexAscending) Less(i, j int) bool {
return ti.Equal(tj) && ii < ij
}

func intComponentFromFilename(
baseFilename string,
componentPos int,
separatorIdx [4]int,
) (int64, error) {
var start int
if componentPos > 0 {
start = separatorIdx[componentPos-1] + 1
}
end := separatorIdx[componentPos]
if start > end || end > len(baseFilename)-1 || start < 0 {
return 0, fmt.Errorf(errUnexpectedFilename, baseFilename)
}

num, err := strconv.ParseInt(baseFilename[start:end], 10, 64)
if err != nil {
return 0, fmt.Errorf(errUnexpectedFilename, baseFilename)
}
return num, nil
}

func delimiterPositions(baseFilename string) ([4]int, int) {
// Returns the positions of filename delimiters ('-' and '.') and the number of
// delimeters found, to be used in conjunction with the intComponentFromFilename
// function to extract filename components. This function is deliberately
// optimized for speed and lack of allocationsm, since filename parsing is known
// to account for a significant proportion of sytstem resources.
func delimiterPositions(baseFilename string) ([maxDelimNum]int, int) {
var (
delimPos [4]int
delimPos [maxDelimNum]int
delimsFound int
)

Expand All @@ -453,6 +441,30 @@ func delimiterPositions(baseFilename string) ([4]int, int) {
return delimPos, delimsFound
}

// Returns the the specified component of a filename given delimeter positions.
// Our only use cases involve extracting numeric components, so this function
// assumes this and returns the component as an int64.
func intComponentFromFilename(
baseFilename string,
componentPos int,
delimPos [maxDelimNum]int,
) (int64, error) {
var start int
if componentPos > 0 {
start = delimPos[componentPos-1] + 1
}
end := delimPos[componentPos]
if start > end || end > len(baseFilename)-1 || start < 0 {
return 0, fmt.Errorf(errUnexpectedFilename, baseFilename)
}

num, err := strconv.ParseInt(baseFilename[start:end], 10, 64)
if err != nil {
return 0, fmt.Errorf(errUnexpectedFilename, baseFilename)
}
return num, nil
}

// TimeFromFileName extracts the block start time from file name.
func TimeFromFileName(fname string) (time.Time, error) {
base := filepath.Base(fname)
Expand Down
30 changes: 13 additions & 17 deletions src/dbnode/persist/fs/seek_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,17 @@ func (m *seekerManager) newOpenSeeker(
shard uint32,
blockStart time.Time,
) (DataFileSetSeeker, error) {
// TODO(juchan): get the actual volume here.
vol := 0
exists, err := DataFileSetExists(m.filePathPrefix, m.namespace, shard, blockStart, vol)
blm := m.blockRetrieverOpts.BlockLeaseManager()
state, err := blm.OpenLatestLease(m, block.LeaseDescriptor{
Namespace: m.namespace,
Shard: shard,
BlockStart: blockStart,
})
if err != nil {
return nil, fmt.Errorf("err opening latest lease: %v", err)
}

exists, err := DataFileSetExists(m.filePathPrefix, m.namespace, shard, blockStart, state.Volume)
if err != nil {
return nil, err
}
Expand All @@ -426,20 +434,8 @@ func (m *seekerManager) newOpenSeeker(
// Set the unread buffer to reuse it amongst all seekers.
seeker.setUnreadBuffer(m.unreadBuf.value)

var (
resources = m.getSeekerResources()
blm = m.blockRetrieverOpts.BlockLeaseManager()
)
_, err = blm.OpenLatestLease(m, block.LeaseDescriptor{
Namespace: m.namespace,
Shard: shard,
BlockStart: blockStart,
})
if err != nil {
return nil, fmt.Errorf("err opening latest lease: %v", err)
}

err = seeker.Open(m.namespace, shard, blockStart, volume, resources)
resources := m.getSeekerResources()
err = seeker.Open(m.namespace, shard, blockStart, state.Volume, resources)
m.putSeekerResources(resources)
if err != nil {
return nil, err
Expand Down

0 comments on commit 40f98f1

Please sign in to comment.