diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index e4b4310237f..d2f4255f2bf 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -300,6 +300,7 @@ func TimeoutAction(t *testing.T, timeout time.Duration, errMsg string, action fu select { case <-deadline: t.Error(errMsg) + return case <-time.After(1 * time.Second): ok = action() } diff --git a/go/vt/srvtopo/query.go b/go/vt/srvtopo/query.go index 098f5c77bc1..ec1ed50100a 100644 --- a/go/vt/srvtopo/query.go +++ b/go/vt/srvtopo/query.go @@ -86,7 +86,12 @@ func (q *resilientQuery) getCurrentValue(ctx context.Context, wkey fmt.Stringer, // If it is not time to check again, then return either the cached // value or the cached error but don't ask topo again. - if !shouldRefresh { + // Here we have to be careful with the part where we haven't gotten even the first result. + // In that case, a refresh is already in progress, but the cache is empty! So, we can't use the cache. + // We have to wait for the query's results. + // We know the query has run at least once if the insertionTime is non-zero, or if we have an error. + queryRanAtLeastOnce := !entry.insertionTime.IsZero() || entry.lastError != nil + if !shouldRefresh && queryRanAtLeastOnce { if cacheValid { return entry.value, nil } diff --git a/go/vt/srvtopo/query_test.go b/go/vt/srvtopo/query_test.go new file mode 100644 index 00000000000..2569a2ad420 --- /dev/null +++ b/go/vt/srvtopo/query_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package srvtopo + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/stats" +) + +// TestResilientQueryGetCurrentValueInitialization tests that the resilient query returns the correct results when it has been +// initialized. +func TestResilientQueryGetCurrentValueInitialization(t *testing.T) { + // Create a basic query, which doesn't do anything other than return the same cell it got as an input. + // The query however needs to simulate being slow, so we have a sleep in there. + query := func(ctx context.Context, entry *queryEntry) (any, error) { + time.Sleep(1 * time.Second) + cell := entry.key.(cellName) + return cell, nil + } + counts := stats.NewCountersWithSingleLabel("TestResilientQueryGetCurrentValue", "Test for resilient query", "type") + + // Create the resilient query + rq := &resilientQuery{ + query: query, + counts: counts, + cacheRefreshInterval: 5 * time.Second, + cacheTTL: 5 * time.Second, + entries: make(map[string]*queryEntry), + } + + // Create a context and a cell. + ctx := context.Background() + cell := cellName("cell-1") + + // Hammer the resilient query with multiple get requests just as it is created. + // We expect all of them to work. + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + // To test with both stale and not-stale, we use the modulo of our index. + stale := i%2 == 0 + wg.Add(1) + go func() { + defer wg.Done() + res, err := rq.getCurrentValue(ctx, cell, stale) + // Assert that we don't have any error and the value matches what we want. + assert.NoError(t, err) + assert.EqualValues(t, cell, res) + }() + } + // Wait for the wait group to be empty, otherwise the test is marked a success before any of the go routines finish completion! + wg.Wait() +}