Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100539: sql: add telemetry for UDFs with RETURNS TABLE r=mgartner a=mgartner

Informs #100226

Release note: None

100554: kv: initialize consistencyLimiter during Store construction, before Start r=aliher1911 a=nvanbenschoten

Fixes #96231.

This commit attempts to fix #96231. It moves the initialization of `Store.consistencyLimiter` up from the bottom of `Store.Start` into `NewStore`. It is unsafe to initialize this variable after the call to `Store.processRaft`, which starts Raft processing. Beyond that point, incoming Raft traffic is permitted and calls to `computeChecksumPostApply` are possible.

The two stacktraces we have in that issue conclusively point to the `Store.consistencyLimiter` being nil during a call to `(*Replica).computeChecksumPostApply`. This startup-time race is the only possible explanation I could come up with.

Release note (bug fix): Fixed a rare startup race that could cause an `invalid memory address or nil pointer dereference` error.

100592: storage: remove unnecessary version override in unit test r=nicktrav a=jbowens

Remove an unnecessary use of cluster.MakeTestingClusterSettingsWithVersions in favor of cluster.MakeTestingClusterSettings.

Epic: None
Informs: #100552
Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
4 people committed Apr 4, 2023
4 parents 734d52c + 255121a + 35c9283 + c573620 commit ef357d1
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,17 @@ func NewStore(
s.renewableLeasesSignal = make(chan struct{}, 1)
}

s.consistencyLimiter = quotapool.NewRateLimiter(
"ConsistencyQueue",
quotapool.Limit(consistencyCheckRate.Get(&cfg.Settings.SV)),
consistencyCheckRate.Get(&cfg.Settings.SV)*consistencyCheckRateBurstFactor,
quotapool.WithMinimumWait(consistencyCheckRateMinWait))

consistencyCheckRate.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
rate := consistencyCheckRate.Get(&cfg.Settings.SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})

s.limiters.BulkIOWriteRate = rate.NewLimiter(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)), kvserverbase.BulkIOWriteBurst)
bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)))
Expand Down Expand Up @@ -2057,17 +2068,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.storeRebalancer.Start(ctx, s.stopper)
}

s.consistencyLimiter = quotapool.NewRateLimiter(
"ConsistencyQueue",
quotapool.Limit(consistencyCheckRate.Get(&s.ClusterSettings().SV)),
consistencyCheckRate.Get(&s.ClusterSettings().SV)*consistencyCheckRateBurstFactor,
quotapool.WithMinimumWait(consistencyCheckRateMinWait))

consistencyCheckRate.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
rate := consistencyCheckRate.Get(&s.ClusterSettings().SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})

// Set the started flag (for unittests).
atomic.StoreInt32(&s.started, 1)

Expand Down
17 changes: 11 additions & 6 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,7 @@ func (u *sqlSymUnion) showCreateFormatOption() tree.ShowCreateFormatOption {
%type <privilege.TargetObjectType> target_object_type

// User defined function relevant components.
%type <bool> opt_or_replace opt_return_set opt_no
%type <bool> opt_or_replace opt_return_table opt_return_set opt_no
%type <str> param_name func_as
%type <tree.FuncParams> opt_func_param_with_default_list func_param_with_default_list func_params func_params_list
%type <tree.FuncParam> func_param_with_default func_param
Expand Down Expand Up @@ -4500,7 +4500,8 @@ create_extension_stmt:
// } ...
// %SeeAlso: WEBDOCS/create-function.html
create_func_stmt:
CREATE opt_or_replace FUNCTION func_create_name '(' opt_func_param_with_default_list ')' RETURNS opt_return_set func_return_type
CREATE opt_or_replace FUNCTION func_create_name '(' opt_func_param_with_default_list ')'
RETURNS opt_return_table opt_return_set func_return_type
opt_create_func_opt_list opt_routine_body
{
name := $4.unresolvedObjectName().ToFunctionName()
Expand All @@ -4510,11 +4511,11 @@ create_func_stmt:
FuncName: name,
Params: $6.functionParams(),
ReturnType: tree.FuncReturnType{
Type: $10.typeReference(),
IsSet: $9.bool(),
Type: $11.typeReference(),
IsSet: $10.bool(),
},
Options: $11.functionOptions(),
RoutineBody: $12.routineBody(),
Options: $12.functionOptions(),
RoutineBody: $13.routineBody(),
}
}
| CREATE opt_or_replace FUNCTION error // SHOW HELP: CREATE FUNCTION
Expand All @@ -4523,6 +4524,10 @@ opt_or_replace:
OR REPLACE { $$.val = true }
| /* EMPTY */ { $$.val = false }

opt_return_table:
TABLE { return unimplementedWithIssueDetail(sqllex, 100226, "UDF returning TABLE") }
| /* EMPTY */ { $$.val = false }

opt_return_set:
SETOF { $$.val = true}
| /* EMPTY */ { $$.val = false }
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/parser/testdata/create_function
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,13 @@ CREATE FUNCTION _()
RETURNS INT8
LANGUAGE plpgsql
AS $$_$$ -- identifiers removed

error
CREATE FUNCTION f() RETURNS TABLE 'SELECT 1' LANGUAGE SQL
----
at or near "table": syntax error: unimplemented: this syntax
DETAIL: source SQL:
CREATE FUNCTION f() RETURNS TABLE 'SELECT 1' LANGUAGE SQL
^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/100226/
4 changes: 1 addition & 3 deletions pkg/storage/pebble_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -95,8 +94,7 @@ func randStr(fill []byte, rng *rand.Rand) {
func TestPebbleIterator_ExternalCorruption(t *testing.T) {
defer leaktest.AfterTest(t)()

version := clusterversion.ByKey(clusterversion.V22_2)
st := cluster.MakeTestingClusterSettingsWithVersions(version, version, true)
st := cluster.MakeTestingClusterSettings()
ctx := context.Background()
rng := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
var f bytes.Buffer
Expand Down

0 comments on commit ef357d1

Please sign in to comment.