Skip to content

Commit

Permalink
messager: add 33% jitter to postpone backoff (#6092)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <[email protected]>
  • Loading branch information
derekperkins authored Apr 20, 2020
1 parent 1397484 commit ab13a0f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
56 changes: 47 additions & 9 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package messager

import (
"bytes"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -255,17 +256,54 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos
mm.purgeQuery = sqlparser.BuildParsedQuery(
"delete from %v where time_acked < %a limit 500", mm.name, ":time_acked")

// if a maxBackoff is set, incorporate it into the update statement
if mm.maxBackoff > 0 {
mm.postponeQuery = sqlparser.BuildParsedQuery(
"update %v set time_next = %a+if(%a<<ifnull(epoch, 0) > %a, %a, %a<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null",
mm.name, ":time_now", ":min_backoff", ":max_backoff", ":max_backoff", ":min_backoff", "::ids")
mm.postponeQuery = buildPostponeQuery(mm.name, mm.minBackoff, mm.maxBackoff)

return mm
}

func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.Duration) *sqlparser.ParsedQuery {
var args []interface{}

buf := bytes.NewBufferString("update %v set time_next = ")
args = append(args, name)

// have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages
// whenever this is injected, append (:min_backoff, :time_now)
jitteredBackoff := "FLOOR((%a<<ifnull(epoch, 0))*(.666666 + (RAND(%a) * .666666)))"

//
// if the jittered backoff is less than min_backoff, just set it to time_now + min_backoff
//
buf.WriteString(fmt.Sprintf("IF(%s < %%a, %%a + %%a, ", jitteredBackoff))
// jitteredBackoff < :min_backoff
args = append(args, ":min_backoff", ":time_now", ":min_backoff")
// if it is less, then use :time_now + :min_backoff
args = append(args, ":time_now", ":min_backoff")

// now we are setting the false case on the above IF statement
if maxBackoff == 0 {
// if there is no max_backoff, just use :time_now + jitteredBackoff
buf.WriteString(fmt.Sprintf("%%a + %s", jitteredBackoff))
args = append(args, ":time_now", ":min_backoff", ":time_now")
} else {
mm.postponeQuery = sqlparser.BuildParsedQuery(
"update %v set time_next = %a+(%a<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null",
mm.name, ":time_now", ":min_backoff", "::ids")
// make sure that it doesn't exceed max_backoff
buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff))
// jitteredBackoff > :max_backoff
args = append(args, ":min_backoff", ":time_now", ":max_backoff")
// if it is greater, then use :time_now + :max_backoff
args = append(args, ":time_now", ":max_backoff")
// otherwise just use :time_now + jitteredBackoff
args = append(args, ":time_now", ":min_backoff", ":time_now")
}
return mm

// close the if statement
buf.WriteString(")")

// now that we've identified time_next, finish the statement
buf.WriteString(", epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null")
args = append(args, "::ids")

return sqlparser.BuildParsedQuery(buf.String(), args...)
}

// buildSelectColumnList is a convenience function that
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/messager/message_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) {
utils.MustMatch(t, wantids, gotids, "did not match")

query, bv = mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery = "update foo set time_next = :time_now+(:min_backoff<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
wantQuery = "update foo set time_next = IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) < :min_backoff, :time_now + :min_backoff, :time_now + FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666)))), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
Expand Down Expand Up @@ -790,7 +790,7 @@ func TestMMGenerateWithBackoff(t *testing.T) {
wantids := sqltypes.TestBindVariable([]interface{}{"1", "2"})

query, bv := mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery := "update foo set time_next = :time_now+if(:min_backoff<<ifnull(epoch, 0) > :max_backoff, :max_backoff, :min_backoff<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
wantQuery := "update foo set time_next = IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) < :min_backoff, :time_now + :min_backoff, IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) > :max_backoff, :time_now + :max_backoff, :time_now + FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))))), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
Expand Down

0 comments on commit ab13a0f

Please sign in to comment.