From c6095ad8d36e798d3393992fd74a58681ec7989f Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Wed, 17 Oct 2018 15:37:24 -0400 Subject: [PATCH] importccl: re-enable job control tests I tracked down the problem. It was that after the CANCEL JOB was issued, sometimes the 2nd stage of the IMPORT (the shuffle) would have started, and sometimes it wouldn't have. If it did not start then RunJob would block forever trying to send on the allowResponse chan. Fix this by making a draining go routine after the first block. Closes #24623 Closes #24658 Release note: None --- pkg/ccl/importccl/import_stmt_test.go | 36 +++++++++++++++++++-------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 44b73ff15a19..a475199e71f1 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -22,6 +22,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "testing" "time" @@ -1498,7 +1499,6 @@ func BenchmarkConvertRecord(b *testing.B) { // work as intended on import jobs. func TestImportControlJob(t *testing.T) { defer leaktest.AfterTest(t)() - t.Skipf("#24658") defer func(oldInterval time.Duration) { jobs.DefaultAdoptInterval = oldInterval @@ -1527,15 +1527,33 @@ func TestImportControlJob(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) sqlDB.Exec(t, `CREATE DATABASE data`) - t.Run("cancel", func(t *testing.T) { - sqlDB.Exec(t, `CREATE DATABASE cancelimport`) - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + makeSrv := func() *httptest.Server { + var once sync.Once + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { - <-allowResponse + // The following code correctly handles both the case where, after the + // CANCEL JOB is issued, the second stage of the IMPORT (the shuffle, + // after the sampling) may or may not be started. If it was started, then a + // second GET request is done. The once here will cause that request to not + // block. The draining for loop below will cause jobutils.RunJob's second send + // on allowResponse to succeed (which it does after issuing the CANCEL JOB). + once.Do(func() { + <-allowResponse + go func() { + for range allowResponse { + } + }() + }) + _, _ = w.Write([]byte(r.URL.Path[1:])) } })) + } + + t.Run("cancel", func(t *testing.T) { + sqlDB.Exec(t, `CREATE DATABASE cancelimport`) + + srv := makeSrv() defer srv.Close() var urls []string @@ -1567,11 +1585,7 @@ func TestImportControlJob(t *testing.T) { sqlDB.Exec(t, `CREATE DATABASE pauseimport`) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == "GET" { - _, _ = w.Write([]byte(r.URL.Path[1:])) - } - })) + srv := makeSrv() defer srv.Close() count := 100