Skip to content

Commit

Permalink
refactor: dataset index calculation (#2512)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Oct 10, 2022
1 parent 810bb5d commit bfba9cf
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 227 deletions.
128 changes: 128 additions & 0 deletions jobsdb/internal/dsindex/dsindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package dsindex

import (
"fmt"
"strconv"
"strings"
)

// Index represents a sortable dataset index, e.g. 1 < 1_1 < 1_1_1 < 1_2 < 2
type Index struct {
segments []int
}

// MustBump returns the next index that is greater than the current one,
// but still less than the index provided in the parameter. Panics on error
func (idx *Index) MustBump(previous *Index) *Index {
res, err := idx.Bump(previous)
if err != nil {
panic(err)
}
return res
}

// Bump returns the next index that is greater than the current one,
// but still less than the index provided in the parameter.
//
// Bump doesn't increment the first, major segment of the index, it starts from the second segment instead.
func (idx *Index) Bump(before *Index) (*Index, error) {
if !idx.Less(before) {
return nil, fmt.Errorf("%s is not before %s", idx, before)
}
var segment int // never increasing the major segment (index 0)
for {
segment++
res, err := idx.Increment(segment)
if err != nil {
return nil, err
}
pl := idx.Less(res)
rl := res.Less(before)
if pl && rl {
return res, nil
}
}
}

// MustIncrement returns a new dataset index that is incremented by one in the specified segment, panics on error
func (idx *Index) MustIncrement(segment int) *Index {
res, err := idx.Increment(segment)
if err != nil {
panic(err)
}
return res
}

// Increment returns a new dataset index that is incremented by one in the specified segment
func (idx *Index) Increment(segment int) (*Index, error) {
if segment < 0 || segment > idx.Length() {
return nil, fmt.Errorf("cannot increment segment %d of %s", segment, idx)
}
var res *Index = &Index{}
for i := 0; i < segment; i++ {
toAppend := idx.segments[i]
res.segments = append(res.segments, toAppend)
}
var lastSegment int
if idx.Length() > segment {
lastSegment = idx.segments[segment]
}
res.segments = append(res.segments, lastSegment+1)
return res, nil
}

// Less returns true if this dataset index is Less than the other dataset index
func (idx *Index) Less(other *Index) bool {
for i, segment := range idx.segments {
if i >= other.Length() {
return false
}
if segment < other.segments[i] {
return true
}
if segment > other.segments[i] {
return false
}
}
return other.Length() > idx.Length()
}

// Length returns the number of segments in the dataset index
func (idx *Index) Length() int {
return len(idx.segments)
}

// String returns a string representation of the dataset index
func (idx *Index) String() string {
var result []string
for _, segment := range idx.segments {
result = append(result, strconv.Itoa(segment))
}
return strings.Join(result, "_")
}

// MustParse returns a dataset index from a string representation, panics on error
func MustParse(value string) *Index {
res, err := Parse(value)
if err != nil {
panic(err)
}
return res
}

// Parse returns a dataset index from a string representation
func Parse(value string) (*Index, error) {
var result Index
stringSegments := strings.Split(value, "_")
for i, stringSegment := range stringSegments {
segment, err := strconv.Atoi(stringSegment)
if err != nil {
return nil, fmt.Errorf("illegal value for segment %d: %s", i+1, value)
}
if i > 0 && segment <= 0 {
return nil, fmt.Errorf("value for segment %d cannot be less than or equal to zero: %s", i+1, value)
}
result.segments = append(result.segments, segment)
}
return &result, nil
}
104 changes: 104 additions & 0 deletions jobsdb/internal/dsindex/dsindex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package dsindex_test

import (
"testing"

"github.com/rudderlabs/rudder-server/jobsdb/internal/dsindex"
"github.com/stretchr/testify/require"
)

func Test_Index_Parse(t *testing.T) {
t.Run("success scenarios", func(t *testing.T) {
idx, err := dsindex.Parse("0")
require.NoError(t, err)
require.Equal(t, "0", idx.String())
require.Equal(t, 1, idx.Length())

idx, err = dsindex.Parse("0_1")
require.NoError(t, err)
require.Equal(t, "0_1", idx.String())
require.Equal(t, 2, idx.Length())

idx, err = dsindex.Parse("1_2_3")
require.NoError(t, err)
require.Equal(t, "1_2_3", idx.String())
require.Equal(t, 3, idx.Length())
})

t.Run("error scenarios", func(t *testing.T) {
_, err := dsindex.Parse("")
require.Error(t, err)

_, err = dsindex.Parse("a")
require.Error(t, err)

_, err = dsindex.Parse("1a")
require.Error(t, err)

_, err = dsindex.Parse("1_a")
require.Error(t, err)

_, err = dsindex.Parse("1_0") // zero is allowed only as first segment
require.Error(t, err)

_, err = dsindex.Parse("1_-1")
require.Error(t, err)

require.Panics(t, func() { dsindex.MustParse("1_-1") })
})
}

func Test_Index_Less(t *testing.T) {
require.True(t, dsindex.MustParse("-10").Less(dsindex.MustParse("-9")))
require.True(t, dsindex.MustParse("0").Less(dsindex.MustParse("1")))
require.True(t, dsindex.MustParse("0_1").Less(dsindex.MustParse("1")))
require.True(t, dsindex.MustParse("0_1_2").Less(dsindex.MustParse("1")))
require.True(t, dsindex.MustParse("1_1").Less(dsindex.MustParse("1_2")))
require.True(t, dsindex.MustParse("1_1_1").Less(dsindex.MustParse("1_1_3")))
require.True(t, dsindex.MustParse("11").Less(dsindex.MustParse("11_1")))

require.False(t, dsindex.MustParse("1_1_1").Less(dsindex.MustParse("1_1")))
}

func Test_Index_Increment(t *testing.T) {
t.Run("success scenarios", func(t *testing.T) {
require.Equal(t, "-9", dsindex.MustParse("-10").MustIncrement(0).String())
require.Equal(t, "0_1", dsindex.MustParse("0").MustIncrement(1).String())
require.Equal(t, "0_2", dsindex.MustParse("0_1").MustIncrement(1).String())
require.Equal(t, "1", dsindex.MustParse("0_1").MustIncrement(0).String())
require.Equal(t, "0_2", dsindex.MustParse("0_1_2").MustIncrement(1).String())
})

t.Run("error scenarios", func(t *testing.T) {
_, err := dsindex.MustParse("0").Increment(-1)
require.Error(t, err)

_, err = dsindex.MustParse("0").Increment(2)
require.Error(t, err)

_, err = dsindex.MustParse("0").Increment(3)
require.Error(t, err)

require.Panics(t, func() { dsindex.MustParse("0").MustIncrement(3) })
})
}

func Test_Index_Bump(t *testing.T) {
t.Run("success scenarios", func(t *testing.T) {
require.Equal(t, "0_1", dsindex.MustParse("0").MustBump(dsindex.MustParse("1")).String())
require.Equal(t, "0_1", dsindex.MustParse("0").MustBump(dsindex.MustParse("2")).String())
require.Equal(t, "0_1", dsindex.MustParse("0").MustBump(dsindex.MustParse("3")).String())
require.Equal(t, "1_2_2", dsindex.MustParse("1_2_1").MustBump(dsindex.MustParse("1_2_3")).String())
require.Equal(t, "-11_1", dsindex.MustParse("-11").MustBump(dsindex.MustParse("-10")).String())
})

t.Run("error scenarios", func(t *testing.T) {
_, err := dsindex.MustParse("1_2_1").Bump(dsindex.MustParse("1_2_1"))
require.Error(t, err, "bump should fail if index is not less than other")

_, err = dsindex.MustParse("1_2").Bump(dsindex.MustParse("1_1"))
require.Error(t, err, "bump should fail if index is not less than other")

require.Panics(t, func() { dsindex.MustParse("1_2").MustBump(dsindex.MustParse("1_1")) })
})
}
116 changes: 23 additions & 93 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/jobsdb/internal/dsindex"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/utils/bytesize"
Expand Down Expand Up @@ -630,7 +631,6 @@ var (
backupRowsBatchSize int64
backupMaxTotalPayloadSize int64
pkgLogger logger.Logger
skipZeroAssertionForMultitenant bool
)

// Loads db config and migration related config from config file
Expand Down Expand Up @@ -665,7 +665,6 @@ func loadConfig() {
config.RegisterDurationConfigVariable(5, &backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...)
config.RegisterDurationConfigVariable(60, &cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
useJoinForUnprocessed = config.GetBool("JobsDB.useJoinForUnprocessed", true)
config.RegisterBoolConfigVariable(false, &skipZeroAssertionForMultitenant, true, "JobsDB.skipZeroAssertionForMultitenant")
}

func Init2() {
Expand Down Expand Up @@ -1184,15 +1183,17 @@ func (jd *HandleT) getTableSize(jobTable string) int64 {
}

func (jd *HandleT) checkIfFullDSInTx(tx *sql.Tx, ds dataSetT) (bool, error) {
var minJobCreatedAt sql.NullTime
sqlStatement := fmt.Sprintf(`SELECT MIN(created_at) FROM %q`, ds.JobTable)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&minJobCreatedAt)
if err != nil && err != sql.ErrNoRows {
return false, err
}
if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.MaxDSRetentionPeriod {
return true, nil
if jd.MaxDSRetentionPeriod > 0 {
var minJobCreatedAt sql.NullTime
sqlStatement := fmt.Sprintf(`SELECT MIN(created_at) FROM %q`, ds.JobTable)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&minJobCreatedAt)
if err != nil && err != sql.ErrNoRows {
return false, err
}
if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.MaxDSRetentionPeriod {
return true, nil
}
}

tableSize := jd.getTableSize(ds.JobTable)
Expand Down Expand Up @@ -1331,94 +1332,23 @@ func (jd *HandleT) doComputeNewIdxForAppend(dList []dataSetT) string {
return newDSIdx
}

// Tries to give a slice between before and after by incrementing last value in before. If the order doesn't maintain, it adds a level and recurses.
func computeInsertVals(before, after []string) ([]string, error) {
for {
if before == nil || after == nil {
return nil, fmt.Errorf("before or after is nil")
}
// Safe check: In the current jobsdb implementation, indices don't go more
// than 3 levels deep. Breaking out of the loop if the before is of size more than 4.
if len(before) > 4 {
return before, fmt.Errorf("can't compute insert index due to bad inputs. before: %v, after: %v", before, after)
}

calculatedVals := make([]string, len(before))
copy(calculatedVals, before)
lastVal, err := strconv.Atoi(calculatedVals[len(calculatedVals)-1])
if err != nil {
return calculatedVals, err
}
// Just increment the last value of the index as a possible candidate
calculatedVals[len(calculatedVals)-1] = fmt.Sprintf("%d", lastVal+1)
var equals bool
if len(calculatedVals) == len(after) {
equals = true
for k := 0; k < len(calculatedVals); k++ {
if calculatedVals[k] == after[k] {
continue
}
equals = false
}
}
if !equals {
comparison, err := dsComparitor(calculatedVals, after)
if err != nil {
return calculatedVals, err
}
if !comparison {
return calculatedVals, fmt.Errorf("computed index is invalid. before: %v, after: %v, calculatedVals: %v", before, after, calculatedVals)
}
}
// Only when the index starts with 0, we allow three levels and when we are using the legacy migration
// In all other cases, we allow only two levels
var comparatorBool bool
if skipZeroAssertionForMultitenant {
comparatorBool = len(calculatedVals) == 2
} else {
comparatorBool = (before[0] == "0" && len(calculatedVals) == 3) ||
(before[0] != "0" && len(calculatedVals) == 2)
}

if comparatorBool {
if equals {
return calculatedVals, fmt.Errorf("calculatedVals and after are same. computed index is invalid. before: %v, after: %v, calculatedVals: %v", before, after, calculatedVals)
} else {
return calculatedVals, nil
}
}

before = append(before, "0")
}
}

func computeInsertIdx(beforeIndex, afterIndex string) (string, error) {
comparison, err := dsComparitor(strings.Split(beforeIndex, "_"), strings.Split(afterIndex, "_"))
before, err := dsindex.Parse(beforeIndex)
if err != nil {
return "", fmt.Errorf("Error while comparing beforeIndex: %s and afterIndex: %s with error : %w", beforeIndex, afterIndex, err)
}
if !comparison {
return "", fmt.Errorf("Not a valid insert request between %s and %s", beforeIndex, afterIndex)
return "", fmt.Errorf("could not parse before index: %w", err)
}

// No dataset should have 0 as the index.
// 0_1, 0_2 are allowed.
if beforeIndex == "0" && !skipZeroAssertionForMultitenant {
return "", fmt.Errorf("Unsupported beforeIndex: %s", beforeIndex)
after, err := dsindex.Parse(afterIndex)
if err != nil {
return "", fmt.Errorf("could not parse after index: %w", err)
}

beforeVals := strings.Split(beforeIndex, "_")
afterVals := strings.Split(afterIndex, "_")
calculatedInsertVals, err := computeInsertVals(beforeVals, afterVals)
result, err := before.Bump(after)
if err != nil {
return "", fmt.Errorf("Failed to calculate InserVals with error: %w", err)
return "", fmt.Errorf("could not compute insert index: %w", err)
}
calculatedIdx := strings.Join(calculatedInsertVals, "_")
if len(calculatedInsertVals) > 3 {
return "", fmt.Errorf("We don't expect a ds to be computed to Level3. We got %s while trying to insert between %s and %s", calculatedIdx, beforeIndex, afterIndex)
if result.Length() > 2 {
return "", fmt.Errorf("unsupported resulting index %s level (3) between %s and %s", result, beforeIndex, afterIndex)
}

return calculatedIdx, nil
return result.String(), nil
}

func (jd *HandleT) computeNewIdxForIntraNodeMigration(l lock.LockToken, insertBeforeDS dataSetT) string { // Within the node
Expand Down Expand Up @@ -3708,7 +3638,7 @@ func (jd *HandleT) getBackupDSRange() *dataSetRangeT {
}
jd.statPreDropTableCount.Gauge(len(dnumList))

sortDnumList(jd, dnumList)
sortDnumList(dnumList)

backupDS = dataSetT{
JobTable: fmt.Sprintf("%s%s_jobs_%s", preDropTablePrefix, jd.tablePrefix, dnumList[0]),
Expand Down
Loading

0 comments on commit bfba9cf

Please sign in to comment.