Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87652: kv: Handle invalid lease transfer checks r=andrewbaptist a=andrewbaptist

Previously it would be a programming error when checking if the leaseholder move was valid if the leaseholder wasn't one of the replicas. This check was overly strict.

Release note: None

88379: pgwire: add encoding for JSON arguments r=ZhouXing19 a=rafiss

fixes #88355

This adds pgwire decoding support for JSON (oid=114). This does not add
oid=114 to the OidToType map in types.go since that introduces many
other assumptions about the type; namely that the type can be used as
the type of a column descriptor. That change would mean that existing
tables that were defined with JSON would be inconsistent with new tables
defined with a JSON column.

Release note (sql change): The pgwire protocol implementation can
now accept arguments of the JSON type (oid=114). Previously, it could
only accept JSONB (oid=3802). Internally, JSON and JSONB values are
still identical, so this change only affects how the values are received
over the wire protocol.

88485: xform: make FoldDivOne an essential normalization r=msirek a=msirek

Fixes #86790

This fixes sqlsmith errors caused by differences in the number of
displayed digits right of the decimal place for `expression / 1`
by making the folding of this expression essential.

Release note: None

88528: scripts/bench: update benchstat install instructions r=tbg a=michae2

As far as I can tell, we're not relying on the JSON output in github.com/cockroachdb/benchstat so change the instructions here to install the original benchstat instead of the fork.

Release note: None

Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Mark Sirek <[email protected]>
Co-authored-by: Michael Erickson <[email protected]>
  • Loading branch information
5 people committed Sep 23, 2022
5 parents 975701e + bfbf3c7 + 1fe4c77 + 956948e + 734a878 commit 14f11eb
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 68 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/query_comparison_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func runOneRoundQueryComparison(
sqlsmith.UnlikelyRandomNulls(), sqlsmith.DisableCrossJoins(),
sqlsmith.DisableIndexHints(), sqlsmith.DisableWith(),
sqlsmith.LowProbabilityWhereClauseWithJoinTables(),
sqlsmith.DisableDivision(),
sqlsmith.SetComplexity(.3),
sqlsmith.SetScalarComplexity(.1),
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,9 +1619,6 @@ func (a *Allocator) ValidLeaseTargets(
// leaseholderShouldMoveDueToPreferences returns true if the current leaseholder
// is in violation of lease preferences _that can otherwise be satisfied_ by
// some existing replica.
//
// INVARIANT: This method should only be called with an `allExistingReplicas`
// slice that contains `leaseRepl`.
func (a *Allocator) leaseholderShouldMoveDueToPreferences(
ctx context.Context,
conf roachpb.SpanConfig,
Expand All @@ -1641,8 +1638,13 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
break
}
}
// If the leaseholder is not in the descriptor, then we should not move the
// lease since we don't know who the leaseholder is. This normally doesn't
// happen, but can occasionally since the loading of the leaseholder and of
// the existing replicas aren't always under a consistent lock.
if !leaseholderInExisting {
log.KvDistribution.Errorf(ctx, "programming error: expected leaseholder store to be in the slice of existing replicas")
log.KvDistribution.Info(ctx, "expected leaseholder store to be in the slice of existing replicas")
return false
}

// Exclude suspect/draining/dead stores.
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,25 +1483,25 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb
_ = r.requestLeaseLocked(ctx, st)
}

// checkLeaseRespectsPreferences checks if current replica owns the lease and
// if it respects the lease preferences defined in the span config. If there are no
// preferences defined then it will return true and consider that to be in-conformance.
func (r *Replica) checkLeaseRespectsPreferences(ctx context.Context) (bool, error) {
if !r.OwnsValidLease(ctx, r.store.cfg.Clock.NowAsClockTimestamp()) {
return false, errors.Errorf("replica %s is not the leaseholder, cannot check lease preferences", r)
// leaseViolatesPreferences checks if current replica owns the lease and if it
// violates the lease preferences defined in the span config. If there is an
// error or no preferences defined then it will return false and consider that
// to be in-conformance.
func (r *Replica) leaseViolatesPreferences(ctx context.Context) bool {
storeDesc, err := r.store.Descriptor(ctx, false /* useCached */)
if err != nil {
log.Infof(ctx, "Unable to load the descriptor %v: cannot check if lease violates preference", err)
return false
}
conf := r.SpanConfig()
if len(conf.LeasePreferences) == 0 {
return true, nil
}
storeDesc, err := r.store.Descriptor(ctx, false /* useCached */)
if err != nil {
return false, err
return false
}
for _, preference := range conf.LeasePreferences {
if constraint.ConjunctionsCheck(*storeDesc, preference.Constraints) {
return true, nil
return false
}
}
return false, nil
// We have at lease one preference set up, but we don't satisfy any.
return true
}
26 changes: 12 additions & 14 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,7 @@ func (rq *replicateQueue) shouldQueue(
}

// If the lease is valid, check to see if we should transfer it.
status := repl.LeaseStatusAt(ctx, now)
if status.IsValid() &&
rq.canTransferLeaseFrom(ctx, repl) &&
if rq.canTransferLeaseFrom(ctx, repl) &&
rq.allocator.ShouldTransferLease(
ctx,
conf,
Expand All @@ -650,7 +648,7 @@ func (rq *replicateQueue) shouldQueue(
log.KvDistribution.VEventf(ctx, 2, "lease transfer needed, enqueuing")
return true, 0
}
if !status.IsValid() {
if !repl.LeaseStatusAt(ctx, now).IsValid() {
// The lease for this range is currently invalid, if this replica is
// the raft leader then it is necessary that it acquires the lease. We
// enqueue it regardless of being a leader or follower, where the
Expand Down Expand Up @@ -679,9 +677,7 @@ func (rq *replicateQueue) process(
// usually signaling that a rebalancing reservation could not be made with the
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
requeue, err := rq.processOneChangeWithTracing(
ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */
)
requeue, err := rq.processOneChangeWithTracing(ctx, repl)
if isSnapshotError(err) {
// If ChangeReplicas failed because the snapshot failed, we attempt to
// retry the operation. The most likely causes of the snapshot failing
Expand Down Expand Up @@ -783,17 +779,16 @@ func filterTracingSpans(rec tracingpb.Recording, opNamesToFilter ...string) trac
// logging the resulting traces to the DEV channel in the case of errors or
// when the configured log traces threshold is exceeded.
func (rq *replicateQueue) processOneChangeWithTracing(
ctx context.Context,
repl *Replica,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
ctx context.Context, repl *Replica,
) (requeue bool, _ error) {
processStart := timeutil.Now()
ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica",
tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

requeue, err := rq.processOneChange(ctx, repl, canTransferLeaseFrom, scatter, dryRun)
requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLeaseFrom,
false /* scatter */, false, /* dryRun */
)

// Utilize a new background context (properly annotated) to avoid writing
// traces from a child context into its parent.
Expand Down Expand Up @@ -1923,11 +1918,14 @@ func (rq *replicateQueue) changeReplicas(
// replica. It considers two factors if the replica is in -conformance with
// lease preferences and the last time a transfer occurred to avoid thrashing.
func (rq *replicateQueue) canTransferLeaseFrom(ctx context.Context, repl *Replica) bool {
if !repl.OwnsValidLease(ctx, repl.store.cfg.Clock.NowAsClockTimestamp()) {
// This replica is not the leaseholder, so it can't transfer the lease.
return false
}
// Do a best effort check to see if this replica conforms to the configured
// lease preferences (if any), if it does not we want to encourage more
// aggressive lease movement and not delay it.
respectsLeasePreferences, err := repl.checkLeaseRespectsPreferences(ctx)
if err == nil && !respectsLeasePreferences {
if repl.leaseViolatesPreferences(ctx) {
return true
}
if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil {
Expand Down
15 changes: 11 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,17 @@ func (ex *connExecutor) execBind(
} else {
typ, ok := types.OidToType[t]
if !ok {
var err error
typ, err = ex.planner.ResolveTypeByOID(ctx, t)
if err != nil {
return err
if t == oid.T_json {
// This special case is here so we can support decoding parameters
// with oid=json without adding full support for the JSON type.
// TODO(sql-exp): Remove this if we support JSON.
typ = types.Json
} else {
var err error
typ, err = ex.planner.ResolveTypeByOID(ctx, t)
if err != nil {
return err
}
}
}
d, err := pgwirebase.DecodeDatum(
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/decimal
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,27 @@ SELECT attname, atttypmod FROM pg_attribute WHERE attrelid = 't71926'::regclass:
no_typmod -1
precision 327684
precision_and_width 327687

# Regression test for #86790
statement ok
CREATE TABLE t86790 (x INT8 NOT NULL)

statement ok
INSERT INTO t86790 VALUES (-4429772553778622992)

query R
SELECT (x / 1)::DECIMAL FROM t86790
----
-4429772553778622992

statement ok
SET testing_optimizer_disable_rule_probability = 1

# The results should be the same as the previous SELECT.
query R
SELECT (x / 1)::DECIMAL FROM t86790
----
-4429772553778622992

statement ok
RESET testing_optimizer_disable_rule_probability
2 changes: 2 additions & 0 deletions pkg/sql/opt/xform/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ func (o *Optimizer) disableRulesRandom(probability float64) {
int(opt.FoldNullBinaryLeft),
int(opt.FoldNullComparisonRight),
int(opt.FoldNullComparisonLeft),
// FoldDivOne is needed for consistent formatting, so tests won't fail.
int(opt.FoldDivOne),
// Without PruneAggCols, it's common to receive
// "optimizer factory constructor call stack exceeded max depth of 10000"
int(opt.PruneAggCols),
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,13 @@ func (c *conn) handleParse(
sqlTypeHints[i] = nil
continue
}
if t == oid.T_json {
// This special case is here so we can support decoding parameters
// with oid=json without adding full support for the JSON type.
// TODO(sql-exp): Remove this if we support JSON.
sqlTypeHints[i] = types.Json
continue
}
v, ok := types.OidToType[t]
if !ok {
err := pgwirebase.NewProtocolViolationErrorf("unknown oid type: %v", t)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/pgwire/pgwirebase/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func DecodeDatum(
}
}
return out, nil
case oid.T_jsonb:
case oid.T_jsonb, oid.T_json:
if err := validateStringBytes(b); err != nil {
return nil, err
}
Expand Down Expand Up @@ -734,6 +734,11 @@ func DecodeDatum(
return nil, err
}
return tree.NewDIPAddr(tree.DIPAddr{IPAddr: ipAddr}), nil
case oid.T_json:
if err := validateStringBytes(b); err != nil {
return nil, err
}
return tree.ParseDJSON(string(b))
case oid.T_jsonb:
if len(b) < 1 {
return nil, NewProtocolViolationErrorf("no data to decode")
Expand Down
89 changes: 89 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/json
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,92 @@ ReadyForQuery
----
{"Type":"ErrorResponse","Code":"08P01"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Test binary output encoding for JSONB.
send
Bind {"ParameterFormatCodes": [0], "ResultFormatCodes": [1], "Parameters": [{"text":"{\"key\": \"val\"}"}]}
Execute
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"binary":"017b226b6579223a202276616c227d"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}


# Check that we can handle JSON parameters.
send
Parse {"Query": "SELECT $1::JSON", "ParameterOIDs": [114]}
Describe {"ObjectType": "S"}
Bind {"ParameterFormatCodes": [0], "Parameters": [{"text":"\"\""}]}
Execute
Sync
----

until noncrdb_only
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"ParameterDescription","ParameterOIDs":[114]}
{"Type":"RowDescription","Fields":[{"Name":"json","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":114,"DataTypeSize":-1,"TypeModifier":-1,"Format":0}]}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"\"\""}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# CRDB currently only supports decoding JSON values. Encoding will always be in
# JSONB format.
until crdb_only
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"ParameterDescription","ParameterOIDs":[114]}
{"Type":"RowDescription","Fields":[{"Name":"jsonb","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":3802,"DataTypeSize":-1,"TypeModifier":-1,"Format":0}]}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"\"\""}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# JSON with two double quotes (ASCII 0x22) is a valid JSON string.
send
Bind {"ParameterFormatCodes": [1], "Parameters": [{"binary":"2222"}]}
Execute
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"\"\""}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Test binary output encoding for JSON.
send
Bind {"ParameterFormatCodes": [0], "ResultFormatCodes": [1], "Parameters": [{"text":"{\"key\": \"val\"}"}]}
Execute
Sync
----

until noncrdb_only
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"{\"key\": \"val\"}"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# CRDB currently only supports decoding JSON values. Encoding will always be in
# JSONB format.
until crdb_only
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"binary":"017b226b6579223a202276616c227d"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
16 changes: 10 additions & 6 deletions pkg/sql/pgwire/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,15 @@ func writeBinaryInterval(b *writeBuffer, v duration.Duration) {
b.putInt32(int32(v.Months))
}

func writeBinaryJSON(b *writeBuffer, v json.JSON) {
func writeBinaryJSON(b *writeBuffer, v json.JSON, t *types.T) {
s := v.String()
b.putInt32(int32(len(s) + 1))
// Postgres version number, as of writing, `1` is the only valid value.
b.writeByte(1)
if t.Oid() == oid.T_jsonb {
b.putInt32(int32(len(s) + 1))
// Postgres version number, as of writing, `1` is the only valid value.
b.writeByte(1)
} else {
b.putInt32(int32(len(s)))
}
b.writeString(s)
}

Expand Down Expand Up @@ -740,7 +744,7 @@ func writeBinaryDatumNotNull(
b.putInt32AtIndex(initialLen /* index to write at */, int32(lengthToWrite))

case *tree.DJSON:
writeBinaryJSON(b, v.JSON)
writeBinaryJSON(b, v.JSON, t)

case *tree.DOid:
b.putInt32(4)
Expand Down Expand Up @@ -802,7 +806,7 @@ func (b *writeBuffer) writeBinaryColumnarElement(
writeBinaryInterval(b, vecs.IntervalCols[colIdx].Get(rowIdx))

case types.JsonFamily:
writeBinaryJSON(b, vecs.JSONCols[colIdx].Get(rowIdx))
writeBinaryJSON(b, vecs.JSONCols[colIdx].Get(rowIdx), typ)

default:
// All other types are represented via the datum-backed vector.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/pg_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var typeBuiltinsHaveUnderscore = map[oid.Oid]struct{}{
types.TimeTZ.Oid(): {},
types.Decimal.Oid(): {},
types.Interval.Oid(): {},
types.Json.Oid(): {},
types.Jsonb.Oid(): {},
types.Uuid.Oid(): {},
types.VarBit.Oid(): {},
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ func (p *planner) DeserializeSessionState(state *tree.DBytes) (*tree.DBool, erro
placeholderTypes[i] = nil
continue
}
if t == oid.T_json {
// This special case is here so we can support decoding parameters
// with oid=json without adding full support for the JSON type.
// TODO(sql-exp): Remove this if we support JSON.
placeholderTypes[i] = types.Json
continue
}
v, ok := types.OidToType[t]
if !ok {
err := pgwirebase.NewProtocolViolationErrorf("unknown oid type: %v", t)
Expand Down
Loading

0 comments on commit 14f11eb

Please sign in to comment.