diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 5719c2fd4c2..5a5d889da90 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -17,6 +17,7 @@ limitations under the License. package messager import ( + "bytes" "fmt" "io" "sync" @@ -255,17 +256,54 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos mm.purgeQuery = sqlparser.BuildParsedQuery( "delete from %v where time_acked < %a limit 500", mm.name, ":time_acked") - // if a maxBackoff is set, incorporate it into the update statement - if mm.maxBackoff > 0 { - mm.postponeQuery = sqlparser.BuildParsedQuery( - "update %v set time_next = %a+if(%a< %a, %a, %a< %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff)) + // jitteredBackoff > :max_backoff + args = append(args, ":min_backoff", ":time_now", ":max_backoff") + // if it is greater, then use :time_now + :max_backoff + args = append(args, ":time_now", ":max_backoff") + // otherwise just use :time_now + jitteredBackoff + args = append(args, ":time_now", ":min_backoff", ":time_now") } - return mm + + // close the if statement + buf.WriteString(")") + + // now that we've identified time_next, finish the statement + buf.WriteString(", epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null") + args = append(args, "::ids") + + return sqlparser.BuildParsedQuery(buf.String(), args...) } // buildSelectColumnList is a convenience function that diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 0f6bbd44def..7156c26cdb2 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = :time_now+(:min_backoff<