From c8ddf08d521e8bc4aaa5eba390c8adf4504cd3cf Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 10 Apr 2023 17:19:58 -0400 Subject: [PATCH 1/3] roachtest: prevent shared mutable state across c2c roachtest runs Previously, all `c2c/*` roachtests run with `--count` would provide incomprehensible results because multiple roachtest runs of the same test would override each other's state. Specifically, the latest call of `test_spec.Run()`, would override the `test.Test` harness, and `syncedCluster.Cluster` used by all other tests with the same registration. This patch fixes this problem by moving all fields in `replicationSpec` that are set during test execution (i.e. a `test_spec.Run` call), to a new `replicationDriver` struct. Now, `replicationSpec` gets defined during test registration and is shared across test runs, while `replicationDriver` gets set within a test run. Epic: None Release note: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 467 +++++++++--------- 1 file changed, 243 insertions(+), 224 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 3c7bea787f56..ff892dc503a1 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -65,9 +65,14 @@ type clusterInfo struct { } type c2cSetup struct { - src *clusterInfo - dst *clusterInfo + src *clusterInfo + dst *clusterInfo + + // workloadNode identifies the node in the roachprod cluster that runs the workload. workloadNode option.NodeListOption + + // gatewayNodes identify the nodes in the source cluster to connect the main workload to. + gatewayNodes option.NodeListOption promCfg *prometheus.Config } @@ -194,7 +199,7 @@ type streamingWorkload interface { func defaultWorkloadDriver( workloadCtx context.Context, setup *c2cSetup, c cluster.Cluster, workload streamingWorkload, ) error { - return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.src.nodes)) + return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.gatewayNodes)) } type replicateTPCC struct { @@ -269,7 +274,9 @@ func (bo replicateBulkOps) runDriver( return nil } -type replicationTestSpec struct { +// replicationSpec are inputs to a c2c roachtest set during roachtest +// registration, and can not be modified during roachtest execution. +type replicationSpec struct { // name specifies the name of the roachtest name string @@ -303,25 +310,39 @@ type replicationTestSpec struct { // If non-empty, the test will be skipped with the supplied reason. skip string - // replicationStartHook is called as soon as the replication job begins, - // when there are no other roachtest connections to the database. - replicationStartHook func(ctx context.Context, sp *replicationTestSpec) - // tags are used to categorize the test. tags map[string]struct{} +} + +// replicationDriver manages c2c roachtest execution. +type replicationDriver struct { + rs replicationSpec + + // beforeWorkloadHook is called before the main workload begins. + beforeWorkloadHook func() + + // replicationStartHook is called as soon as the replication job begins. + replicationStartHook func(ctx context.Context, sp *replicationDriver) - // fields below are instantiated at runtime setup *c2cSetup t test.Test c cluster.Cluster metrics *c2cMetrics } -func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) { +func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver { + return replicationDriver{ + t: t, + c: c, + rs: rs, + } +} + +func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) { c.Put(ctx, t.Cockroach(), "./cockroach") - srcCluster := c.Range(1, sp.srcNodes) - dstCluster := c.Range(sp.srcNodes+1, sp.srcNodes+sp.dstNodes) - workloadNode := c.Node(sp.srcNodes + sp.dstNodes + 1) + srcCluster := c.Range(1, rd.rs.srcNodes) + dstCluster := c.Range(rd.rs.srcNodes+1, rd.rs.srcNodes+rd.rs.dstNodes) + workloadNode := c.Node(rd.rs.srcNodes + rd.rs.dstNodes + 1) c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) // TODO(msbutler): allow for backups once this test stabilizes a bit more. @@ -332,7 +353,7 @@ func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c clus // TODO(msbutler): allow for backups once this test stabilizes a bit more. dstStartOps := option.DefaultStartOptsNoBackups() - dstStartOps.RoachprodOpts.InitTarget = sp.srcNodes + 1 + dstStartOps.RoachprodOpts.InitTarget = rd.rs.srcNodes + 1 dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) @@ -376,95 +397,99 @@ func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c clus db: destDB, nodes: dstCluster} - sp.setup = &c2cSetup{ + rd.setup = &c2cSetup{ src: &srcTenantInfo, dst: &destTenantInfo, - workloadNode: workloadNode} - sp.t = t - sp.c = c - sp.metrics = &c2cMetrics{} - sp.replicationStartHook = func(ctx context.Context, sp *replicationTestSpec) {} + workloadNode: workloadNode, + gatewayNodes: srcTenantInfo.nodes} + + rd.t = t + rd.c = c + rd.metrics = &c2cMetrics{} + rd.replicationStartHook = func(ctx context.Context, sp *replicationDriver) {} + rd.beforeWorkloadHook = func() {} if !c.IsLocal() { // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to // pass a grafana dashboard for this to work - sp.setup.promCfg = (&prometheus.Config{}). - WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). - WithCluster(sp.setup.dst.nodes.InstallNodes()). - WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). + rd.setup.promCfg = (&prometheus.Config{}). + WithPrometheusNode(rd.setup.workloadNode.InstallNodes()[0]). + WithCluster(rd.setup.dst.nodes.InstallNodes()). + WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), sp.setup.promCfg)) - sp.t.L().Printf("Prom has started") + require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + rd.t.L().Printf("Prom has started") } } -func (sp *replicationTestSpec) crdbNodes() option.NodeListOption { - return sp.setup.src.nodes.Merge(sp.setup.dst.nodes) +func (rd *replicationDriver) crdbNodes() option.NodeListOption { + return rd.setup.src.nodes.Merge(rd.setup.dst.nodes) } -func (sp *replicationTestSpec) newMonitor(ctx context.Context) cluster.Monitor { - m := sp.c.NewMonitor(ctx, sp.crdbNodes()) - m.ExpectDeaths(sp.expectedNodeDeaths) +func (rd *replicationDriver) newMonitor(ctx context.Context) cluster.Monitor { + m := rd.c.NewMonitor(ctx, rd.crdbNodes()) + m.ExpectDeaths(rd.rs.expectedNodeDeaths) return m } -func (sp *replicationTestSpec) startStatsCollection( +func (rd *replicationDriver) startStatsCollection( ctx context.Context, ) func(time.Time) map[string]float64 { - if sp.c.IsLocal() { - sp.t.L().Printf("Local test. Don't setup grafana") + if rd.c.IsLocal() { + rd.t.L().Printf("Local test. Don't setup grafana") // Grafana does not run locally. return func(snapTime time.Time) map[string]float64 { return map[string]float64{} } } - client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), sp.setup.promCfg) - require.NoError(sp.t, err, "error creating prometheus client for stats collector") + client, err := clusterstats.SetupCollectorPromClient(ctx, rd.c, rd.t.L(), rd.setup.promCfg) + require.NoError(rd.t, err, "error creating prometheus client for stats collector") collector := clusterstats.NewStatsCollector(ctx, client) return func(snapTime time.Time) map[string]float64 { metricSnap := make(map[string]float64) for name, stat := range c2cPromMetrics { - point, err := collector.CollectPoint(ctx, sp.t.L(), snapTime, stat.Query) + point, err := collector.CollectPoint(ctx, rd.t.L(), snapTime, stat.Query) if err != nil { - sp.t.L().Errorf("Could not query prom %s", err.Error()) + rd.t.L().Errorf("Could not query prom %s", err.Error()) } // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. metricSnap[name] = sumOverLabel(point, stat.LabelName) - sp.t.L().Printf("%s: %.2f", name, metricSnap[name]) + rd.t.L().Printf("%s: %.2f", name, metricSnap[name]) } return metricSnap } } -func (sp *replicationTestSpec) preStreamingWorkload(ctx context.Context) { - if initCmd := sp.workload.sourceInitCmd(sp.setup.src.name, sp.setup.src.nodes); initCmd != "" { - sp.t.Status("populating source cluster before replication") +func (rd *replicationDriver) preStreamingWorkload(ctx context.Context) { + if initCmd := rd.rs.workload.sourceInitCmd(rd.setup.src.name, rd.setup.src.nodes); initCmd != "" { + rd.t.Status("populating source cluster before replication") initStart := timeutil.Now() - sp.c.Run(ctx, sp.setup.workloadNode, initCmd) - sp.t.L().Printf("src cluster workload initialization took %s minutes", + rd.c.Run(ctx, rd.setup.workloadNode, initCmd) + rd.t.L().Printf("src cluster workload initialization took %s minutes", timeutil.Since(initStart).Minutes()) } } -func (sp *replicationTestSpec) startReplicationStream(ctx context.Context) int { +func (rd *replicationDriver) startReplicationStream(ctx context.Context) int { streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", - sp.setup.dst.name, sp.setup.src.name, sp.setup.src.pgURL) - sp.setup.dst.sysSQL.Exec(sp.t, streamReplStmt) - sp.replicationStartHook(ctx, sp) - return getIngestionJobID(sp.t, sp.setup.dst.sysSQL, sp.setup.dst.name) + rd.setup.dst.name, rd.setup.src.name, rd.setup.src.pgURL) + rd.setup.dst.sysSQL.Exec(rd.t, streamReplStmt) + rd.replicationStartHook(ctx, rd) + return getIngestionJobID(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) } -func (sp *replicationTestSpec) runWorkload(ctx context.Context) error { - return sp.workload.runDriver(ctx, sp.c, sp.t, sp.setup) +func (rd *replicationDriver) runWorkload(ctx context.Context) error { + rd.beforeWorkloadHook() + return rd.rs.workload.runDriver(ctx, rd.c, rd.t, rd.setup) } -func (sp *replicationTestSpec) waitForHighWatermark(ingestionJobID int, wait time.Duration) { - testutils.SucceedsWithin(sp.t, func() error { - info, err := getStreamIngestionJobInfo(sp.setup.dst.db, ingestionJobID) +func (rd *replicationDriver) waitForHighWatermark(ingestionJobID int, wait time.Duration) { + testutils.SucceedsWithin(rd.t, func() error { + info, err := getStreamIngestionJobInfo(rd.setup.dst.db, ingestionJobID) if err != nil { return err } @@ -475,49 +500,49 @@ func (sp *replicationTestSpec) waitForHighWatermark(ingestionJobID int, wait tim }, wait) } -func (sp *replicationTestSpec) getWorkloadTimeout() time.Duration { - if sp.additionalDuration != 0 { - return sp.additionalDuration +func (rd *replicationDriver) getWorkloadTimeout() time.Duration { + if rd.rs.additionalDuration != 0 { + return rd.rs.additionalDuration } - return sp.timeout + return rd.rs.timeout } // getReplicationRetainedTime returns the `retained_time` of the replication // job. -func (sp *replicationTestSpec) getReplicationRetainedTime() time.Time { +func (rd *replicationDriver) getReplicationRetainedTime() time.Time { var retainedTime time.Time - sp.setup.dst.sysSQL.QueryRow(sp.t, + rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`, - roachpb.TenantName(sp.setup.dst.name)).Scan(&retainedTime) + roachpb.TenantName(rd.setup.dst.name)).Scan(&retainedTime) return retainedTime } -func (sp *replicationTestSpec) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { - sp.setup.dst.sysSQL.Exec(sp.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, sp.setup.dst.name, cutoverTime) +func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { + rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, + rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob).Scan(&status, &payloadBytes) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { - sp.t.Fatalf("job failed: %s", payload.Error) + rd.t.Fatalf("job failed: %s", payload.Error) } - sp.t.Fatalf("job failed") + rd.t.Fatalf("job failed") } if e, a := jobs.StatusSucceeded, jobs.Status(status); e != a { return errors.Errorf("expected job status %s, but got %s", e, a) } return nil }) - require.NoError(sp.t, err) + require.NoError(rd.t, err) } -func (sp *replicationTestSpec) compareTenantFingerprintsAtTimestamp( +func (rd *replicationDriver) compareTenantFingerprintsAtTimestamp( ctx context.Context, startTime, endTime time.Time, ) { - sp.t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", + rd.t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", startTime.UTC(), endTime.UTC())) // TODO(adityamaru,lidorcarmel): Once we agree on the format and precision we @@ -533,33 +558,33 @@ FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::TIMESTA AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 - fingerPrintMonitor := sp.newMonitor(ctx) + fingerPrintMonitor := rd.newMonitor(ctx) fingerPrintMonitor.Go(func(ctx context.Context) error { - sp.setup.src.sysSQL.QueryRow(sp.t, fingerprintQuery, sp.setup.src.ID).Scan(&srcFingerprint) + rd.setup.src.sysSQL.QueryRow(rd.t, fingerprintQuery, rd.setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 fingerPrintMonitor.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. - sp.metrics.fingerprintingStart = timeutil.Now() - sp.setup.dst.sysSQL.QueryRow(sp.t, fingerprintQuery, sp.setup.dst.ID).Scan(&destFingerprint) - sp.metrics.fingerprintingEnd = timeutil.Now() - fingerprintingDuration := sp.metrics.fingerprintingEnd.Sub(sp.metrics.fingerprintingStart).String() - sp.t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) + rd.metrics.fingerprintingStart = timeutil.Now() + rd.setup.dst.sysSQL.QueryRow(rd.t, fingerprintQuery, rd.setup.dst.ID).Scan(&destFingerprint) + rd.metrics.fingerprintingEnd = timeutil.Now() + fingerprintingDuration := rd.metrics.fingerprintingEnd.Sub(rd.metrics.fingerprintingStart).String() + rd.t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) return nil }) // If the goroutine gets cancelled or fataled, return before comparing fingerprints. - require.NoError(sp.t, fingerPrintMonitor.WaitE()) - require.Equal(sp.t, srcFingerprint, destFingerprint) + require.NoError(rd.t, fingerPrintMonitor.WaitE()) + require.Equal(rd.t, srcFingerprint, destFingerprint) } -func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster.Cluster) { - metricSnapper := sp.startStatsCollection(ctx) - sp.preStreamingWorkload(ctx) +func (rd *replicationDriver) main(ctx context.Context) { + metricSnapper := rd.startStatsCollection(ctx) + rd.preStreamingWorkload(ctx) - t.L().Printf("begin workload on src cluster") - m := sp.newMonitor(ctx) + rd.t.L().Printf("begin workload on src cluster") + m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. workloadCtx, workloadCancel := context.WithCancel(ctx) defer workloadCancel() @@ -567,76 +592,76 @@ func (sp *replicationTestSpec) main(ctx context.Context, t test.Test, c cluster. workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { defer close(workloadDoneCh) - err := sp.runWorkload(workloadCtx) + err := rd.runWorkload(workloadCtx) // The workload should only return an error if the roachtest driver cancels the - // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. + // workloadCtx after rd.additionalDuration has elapsed after the initial scan completes. if err != nil && workloadCtx.Err() == nil { // Implies the workload context was not cancelled and the workload cmd returned a // different error. return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) } - t.L().Printf("workload successfully finished") + rd.t.L().Printf("workload successfully finished") return nil }) - t.Status("starting replication stream") - sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - ingestionJobID := sp.startReplicationStream(ctx) + rd.t.Status("starting replication stream") + rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(t, sp.setup.dst.sysSQL, sp.setup.dst.name) + removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { - return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) + return lv.pollLatency(ctx, rd.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) }) - t.L().Printf("waiting for replication stream to finish ingesting initial scan") - sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) - sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, - sp.additionalDuration)) + rd.t.L().Printf("waiting for replication stream to finish ingesting initial scan") + rd.waitForHighWatermark(ingestionJobID, rd.rs.timeout/2) + rd.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + rd.t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, + rd.rs.additionalDuration)) select { case <-workloadDoneCh: - t.L().Printf("workload finished on its own") - case <-time.After(sp.getWorkloadTimeout()): + rd.t.L().Printf("workload finished on its own") + case <-time.After(rd.getWorkloadTimeout()): workloadCancel() - t.L().Printf("workload has cancelled after %s", sp.additionalDuration) + rd.t.L().Printf("workload has cancelled after %s", rd.rs.additionalDuration) case <-ctx.Done(): - t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) + rd.t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) return } var currentTime time.Time - sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) - cutoverTime := currentTime.Add(-sp.cutover) - sp.t.Status("cutover time chosen: ", cutoverTime.String()) + rd.setup.dst.sysSQL.QueryRow(rd.t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(-rd.rs.cutover) + rd.t.Status("cutover time chosen: ", cutoverTime.String()) - retainedTime := sp.getReplicationRetainedTime() + retainedTime := rd.getReplicationRetainedTime() - sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) - sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + rd.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + rd.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", + rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - sp.stopReplicationStream(ingestionJobID, cutoverTime) - sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) - sp.metrics.export(sp.t, len(sp.setup.src.nodes)) + rd.metrics.export(rd.t, len(rd.setup.src.nodes)) - t.Status("comparing fingerprints") - sp.compareTenantFingerprintsAtTimestamp( + rd.t.Status("comparing fingerprints") + rd.compareTenantFingerprintsAtTimestamp( ctx, retainedTime, cutoverTime, ) - lv.assertValid(t) + lv.assertValid(rd.t) } func c2cRegisterWrapper( r registry.Registry, - sp replicationTestSpec, + sp replicationSpec, run func(ctx context.Context, t test.Test, c cluster.Cluster), ) { @@ -664,7 +689,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster if !c.IsLocal() { t.Skip("c2c/acceptance is only meant to run on a local cluster") } - sp := replicationTestSpec{ + sp := replicationSpec{ srcNodes: 1, dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. @@ -673,12 +698,13 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, } - sp.setupC2C(ctx, t, c) - sp.main(ctx, t, c) + rd := makeReplicationDriver(t, c, sp) + rd.setupC2C(ctx, t, c) + rd.main(ctx) } func registerClusterToCluster(r registry.Registry) { - for _, sp := range []replicationTestSpec{ + for _, sp := range []replicationSpec{ { name: "c2c/tpcc/warehouses=500/duration=10/cutover=5", srcNodes: 4, @@ -748,17 +774,18 @@ func registerClusterToCluster(r registry.Registry) { sp := sp c2cRegisterWrapper(r, sp, func(ctx context.Context, t test.Test, c cluster.Cluster) { - sp.setupC2C(ctx, t, c) + rd := makeReplicationDriver(t, c, sp) + rd.setupC2C(ctx, t, c) m := c.NewMonitor(ctx) - hc := roachtestutil.NewHealthChecker(t, c, sp.crdbNodes()) + hc := roachtestutil.NewHealthChecker(t, c, rd.crdbNodes()) m.Go(func(ctx context.Context) error { require.NoError(t, hc.Runner(ctx)) return nil }) defer hc.Done() - sp.main(ctx, t, c) + rd.main(ctx) }) } } @@ -784,18 +811,14 @@ func (c c2cPhase) String() string { } } +// replResilienceSpec defines inputs to the replication resilience tests, set +// during roachtest registration. This can not be modified during roachtest +// execution. type replResilienceSpec struct { + replicationSpec + onSrc bool onCoordinator bool - // phase indicates the c2c phase a node shutdown will occur. - phase c2cPhase - spec *replicationTestSpec - - // the fields below are gathered after the replication stream has started - srcJobID jobspb.JobID - dstJobID jobspb.JobID - shutdownNode int - watcherNode int } func (rsp *replResilienceSpec) name() string { @@ -813,47 +836,77 @@ func (rsp *replResilienceSpec) name() string { } return builder.String() } -func (rsp *replResilienceSpec) getJobIDs(ctx context.Context) { - setup := rsp.spec.setup + +// replResilienceDriver manages the execution of the replication resilience tests. +type replResilienceDriver struct { + replicationDriver + rsp replResilienceSpec + + // phase indicates the c2c phase a node shutdown will occur. + phase c2cPhase + + // the fields below are gathered after the replication stream has started + srcJobID jobspb.JobID + dstJobID jobspb.JobID + shutdownNode int + watcherNode int +} + +func makeReplResilienceDriver( + t test.Test, c cluster.Cluster, rsp replResilienceSpec, +) replResilienceDriver { + rd := makeReplicationDriver(t, c, rsp.replicationSpec) + return replResilienceDriver{ + replicationDriver: rd, + // TODO(msbutler): randomly select a state to shutdown in. + phase: steadyState, + } +} + +func (rrd *replResilienceDriver) getJobIDs(ctx context.Context) { jobIDQuery := `SELECT job_id FROM [SHOW JOBS] WHERE job_type = '%s'` - testutils.SucceedsWithin(rsp.spec.t, func() error { - if err := setup.dst.db.QueryRowContext(ctx, fmt.Sprintf(jobIDQuery, - `STREAM INGESTION`)).Scan(&rsp.dstJobID); err != nil { + testutils.SucceedsWithin(rrd.t, func() error { + if err := rrd.setup.dst.db.QueryRowContext(ctx, fmt.Sprintf(jobIDQuery, + `STREAM INGESTION`)).Scan(&rrd.dstJobID); err != nil { return err } - if err := setup.src.db.QueryRowContext(ctx, fmt.Sprintf(jobIDQuery, - `STREAM REPLICATION`)).Scan(&rsp.srcJobID); err != nil { + if err := rrd.setup.src.db.QueryRowContext(ctx, fmt.Sprintf(jobIDQuery, + `STREAM REPLICATION`)).Scan(&rrd.srcJobID); err != nil { return err } return nil }, time.Minute) } -func (rsp *replResilienceSpec) getTargetInfo() (*clusterInfo, jobspb.JobID, option.NodeListOption) { - if rsp.onSrc { - return rsp.spec.setup.src, rsp.srcJobID, rsp.spec.setup.src.nodes +func (rrd *replResilienceDriver) getTargetInfo() ( + *clusterInfo, + jobspb.JobID, + option.NodeListOption, +) { + if rrd.rsp.onSrc { + return rrd.setup.src, rrd.srcJobID, rrd.setup.src.nodes } - return rsp.spec.setup.dst, rsp.dstJobID, rsp.spec.setup.dst.nodes + return rrd.setup.dst, rrd.dstJobID, rrd.setup.dst.nodes } -func (rsp *replResilienceSpec) getTargetAndWatcherNodes(ctx context.Context) { +func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) { var coordinatorNode int - info, jobID, nodes := rsp.getTargetInfo() + info, jobID, nodes := rrd.getTargetInfo() // To populate the coordinator_id field, a node needs to claim the job. // Give the job system a minute. - testutils.SucceedsWithin(rsp.spec.t, func() error { + testutils.SucceedsWithin(rrd.t, func() error { return info.db.QueryRowContext(ctx, `SELECT coordinator_id FROM crdb_internal.jobs WHERE job_id = $1`, jobID).Scan(&coordinatorNode) }, time.Minute) - if !rsp.onSrc { + if !rrd.rsp.onSrc { // From the destination cluster's perspective, node ids range from 1 to // num_dest_nodes, but from roachprod's perspective they range from // num_source_nodes+1 to num_crdb_roachprod nodes. We need to adjust for // this to shut down the right node. Example: if the coordinator node on the // dest cluster is 1, and there are 4 src cluster nodes, then // shut down roachprod node 5. - coordinatorNode += rsp.spec.srcNodes + coordinatorNode += rrd.rsp.srcNodes } var targetNode int @@ -866,17 +919,17 @@ func (rsp *replResilienceSpec) getTargetAndWatcherNodes(ctx context.Context) { } } } - if rsp.onCoordinator { + if rrd.rsp.onCoordinator { targetNode = coordinatorNode } else { targetNode = findAnotherNode(coordinatorNode) } - rsp.shutdownNode = targetNode - rsp.watcherNode = findAnotherNode(targetNode) + rrd.shutdownNode = targetNode + rrd.watcherNode = findAnotherNode(targetNode) } -func (rsp *replResilienceSpec) getPhase() c2cPhase { - progress := getJobProgress(rsp.spec.t, rsp.spec.setup.dst.sysSQL, rsp.dstJobID) +func (rrd *replResilienceDriver) getPhase() c2cPhase { + progress := getJobProgress(rrd.t, rrd.setup.dst.sysSQL, rrd.dstJobID) streamIngestProgress := progress.GetStreamIngest() highWater := progress.GetHighWater() @@ -890,14 +943,14 @@ func (rsp *replResilienceSpec) getPhase() c2cPhase { return phaseCutover } -func (rsp *replResilienceSpec) waitForTargetPhase() error { +func (rrd *replResilienceDriver) waitForTargetPhase() error { for { - currentPhase := rsp.getPhase() - rsp.spec.t.L().Printf("Current Phase %s", currentPhase.String()) + currentPhase := rrd.getPhase() + rrd.t.L().Printf("Current Phase %s", currentPhase.String()) switch { - case currentPhase < rsp.phase: + case currentPhase < rrd.phase: time.Sleep(5 * time.Second) - case currentPhase == rsp.phase: + case currentPhase == rrd.phase: return nil default: return errors.New("c2c job past target phase") @@ -905,113 +958,79 @@ func (rsp *replResilienceSpec) waitForTargetPhase() error { } } -type c2cResilienceKV struct { - // gatewayNodeCh will contain the crdb node that should act as a gateway for the workload. - // If 0 is sent, then all src cluster nodes can be gateway nodes. - gatewayNodeCh chan int -} - -func (rkv c2cResilienceKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - // TODO(msbutler): add an initial workload to test initial scan shutdown. - return "" -} - -func (rkv c2cResilienceKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent 0 {pgurl%s:%s}`, - nodes, - tenantName) -} - -func (rkv c2cResilienceKV) runDriver( - workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, -) error { - // The workload waits to begin until a watcher node is found after the c2c job - // is set up. If a non-zero workload is received, only connect the workload to - // that node, else connect workload to all nodes in the src cluster. - gatewayNodes := setup.src.nodes - if gatewayNodeOverride := <-rkv.gatewayNodeCh; gatewayNodeOverride != 0 { - gatewayNodes = c.Node(gatewayNodeOverride) - } - t.L().Printf("Resilience Gateway Nodes Chosen: %s", gatewayNodes.String()) - return c.RunE(workloadCtx, setup.workloadNode, rkv.sourceRunCmd(setup.src.name, gatewayNodes)) -} - func registerClusterReplicationResilience(r registry.Registry) { for _, rsp := range []replResilienceSpec{ { onSrc: true, onCoordinator: true, - // TODO(msbutler): instead of hardcoding shutdowns to occcur during the main c2c phase, - // randomly select a phase. - phase: steadyState, }, { onSrc: true, onCoordinator: false, - phase: steadyState, }, { onSrc: false, onCoordinator: true, - phase: steadyState, }, { onSrc: false, onCoordinator: false, - phase: steadyState, }, } { - gatewayNodeCh := make(chan int) rsp := rsp - rsp.spec = &replicationTestSpec{ + + rsp.replicationSpec = replicationSpec{ name: rsp.name(), srcNodes: 4, dstNodes: 4, cpus: 8, - workload: c2cResilienceKV{gatewayNodeCh: gatewayNodeCh}, + workload: replicateKV{readPercent: 0}, timeout: 20 * time.Minute, additionalDuration: 5 * time.Minute, cutover: 4 * time.Minute, expectedNodeDeaths: 1, } - c2cRegisterWrapper(r, *rsp.spec, + c2cRegisterWrapper(r, rsp.replicationSpec, func(ctx context.Context, t test.Test, c cluster.Cluster) { - sp := rsp.spec - sp.setupC2C(ctx, t, c) + rrd := makeReplResilienceDriver(t, c, rsp) + rrd.setupC2C(ctx, t, c) shutdownSetupDone := make(chan struct{}) - rsp.spec.replicationStartHook = func(ctx context.Context, sp *replicationTestSpec) { + + rrd.beforeWorkloadHook = func() { + // Ensure the workload begins after c2c jobs have been set up. + <-shutdownSetupDone + } + + rrd.replicationStartHook = func(ctx context.Context, rd *replicationDriver) { + // Once the C2C job is set up, we need to modify some configs to + // ensure the shutdown doesn't bother the underlying c2c job and the + // foreground workload. The shutdownSetupDone channel prevents other + // goroutines from reading the configs concurrently. + defer close(shutdownSetupDone) - rsp.getJobIDs(ctx) - rsp.getTargetAndWatcherNodes(ctx) + rrd.getJobIDs(ctx) + rrd.getTargetAndWatcherNodes(ctx) // To prevent sql connections from connecting to the shutdown node, // ensure roachtest process connections to cluster use watcher node // from now on. - watcherDB := c.Conn(ctx, sp.t.L(), rsp.watcherNode) + watcherDB := c.Conn(ctx, rd.t.L(), rrd.watcherNode) watcherSQL := sqlutils.MakeSQLRunner(watcherDB) if rsp.onSrc { - sp.setup.src.db = watcherDB - sp.setup.src.sysSQL = watcherSQL - - // Only connect the foreground workload to the watcher node. - gatewayNodeCh <- rsp.watcherNode - + rd.setup.src.db = watcherDB + rd.setup.src.sysSQL = watcherSQL + rd.setup.gatewayNodes = c.Node(rrd.watcherNode) } else { - sp.setup.dst.db = watcherDB - sp.setup.dst.sysSQL = watcherSQL - - // Indicates all src cluster nodes can connect to the workload. - gatewayNodeCh <- 0 + rd.setup.dst.db = watcherDB + rd.setup.dst.sysSQL = watcherSQL } } - m := sp.newMonitor(ctx) + m := rrd.newMonitor(ctx) m.Go(func(ctx context.Context) error { - // start the replication job - sp.main(ctx, t, c) + rrd.main(ctx) return nil }) @@ -1025,17 +1044,17 @@ func registerClusterReplicationResilience(r registry.Registry) { // successful c2c replication execution. shutdownStarter := func() jobStarter { return func(c cluster.Cluster, t test.Test) (string, error) { - return fmt.Sprintf("%d", rsp.dstJobID), rsp.waitForTargetPhase() + return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase() } } - destinationWatcherNode := rsp.watcherNode + destinationWatcherNode := rrd.watcherNode if rsp.onSrc { - destinationWatcherNode = rsp.spec.setup.dst.nodes[0] + destinationWatcherNode = rrd.setup.dst.nodes[0] } shutdownCfg := nodeShutdownConfig{ - shutdownNode: rsp.shutdownNode, + shutdownNode: rrd.shutdownNode, watcherNode: destinationWatcherNode, - crdbNodes: rsp.spec.crdbNodes(), + crdbNodes: rrd.crdbNodes(), restartSettings: []install.ClusterSettingOption{install.SecureOption(true)}, } executeNodeShutdown(ctx, t, c, shutdownCfg, shutdownStarter()) From 5cdb6258eb0130d49947c81c4e3a9c8a91ef55d2 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Wed, 15 Mar 2023 11:02:43 -0400 Subject: [PATCH 2/3] sql: enabling forward indexes and ORDERBY on JSONB columns Currently, #97928 outlines the scheme for JSONB encoding and decoding for forward indexes. However, the PR doesn't enable this feature to our users. This current PR aims to allow forward indexes on JSONB columns. The presence of a lexicographical ordering, as described in #97928, shall now allow primary and secondary indexes on JSONB columns along with the ability to use ORDER BY filter in their queries. Additionally, JSON values consist of decimal numbers and containers, such as Arrays and Objects, which can contain these decimal numbers. In order to preserve the values after the decimal, JSONB columns are now required to be composite in nature. This shall enable such values to be stored in both the key and the value side of a K/V pair in hopes of receiving the exact value. Fixes: #35706 Release note (sql change): This PR adds support for enabling forward indexes and ordering on JSON values. Epic: CRDB-24501 --- .../tests/3node-tenant/generated_test.go | 7 + pkg/sql/catalog/colinfo/col_type_info.go | 5 +- .../catalog/colinfo/column_type_properties.go | 2 +- .../colinfo/column_type_properties_test.go | 2 +- pkg/sql/catalog/table_col_set.go | 3 + pkg/sql/catalog/tabledesc/structured.go | 12 +- pkg/sql/colenc/encode.go | 3 + pkg/sql/colenc/key.go | 15 + pkg/sql/colencoding/key_encoding.go | 21 +- .../testdata/logic_test/distsql_stats | 4 +- .../testdata/logic_test/expression_index | 183 ++++++- .../testdata/logic_test/inverted_index | 10 - .../logic_test/inverted_index_multi_column | 95 +++- pkg/sql/logictest/testdata/logic_test/json | 4 +- .../logictest/testdata/logic_test/json_index | 486 ++++++++++++++++++ .../logictest/testdata/logic_test/pg_catalog | 54 +- .../tests/fakedist-disk/generated_test.go | 7 + .../tests/fakedist-vec-off/generated_test.go | 7 + .../tests/fakedist/generated_test.go | 7 + .../generated_test.go | 7 + .../local-mixed-22.2-23.1/generated_test.go | 7 + .../tests/local-vec-off/generated_test.go | 7 + .../logictest/tests/local/generated_test.go | 7 + pkg/sql/opt/exec/execbuilder/testdata/json | 408 +++++++++++++++ pkg/sql/opt/exec/execbuilder/testdata/stats | 4 + .../execbuilder/tests/local/generated_test.go | 7 + pkg/sql/opt/indexrec/candidate.go | 16 +- pkg/sql/opt/indexrec/hypothetical_table.go | 5 +- pkg/sql/opt/norm/testdata/rules/inline | 8 +- pkg/sql/opt/norm/testdata/rules/project | 4 +- pkg/sql/opt/optbuilder/orderby.go | 2 - pkg/sql/opt/xform/testdata/external/customer | 5 +- pkg/sql/rowenc/index_encoding.go | 2 +- pkg/sql/rowenc/index_encoding_test.go | 96 +++- pkg/sql/sem/tree/datum.go | 39 ++ pkg/sql/sem/tree/eval.go | 2 + pkg/sql/ttl/ttljob/ttljob_test.go | 5 +- 37 files changed, 1471 insertions(+), 87 deletions(-) create mode 100644 pkg/sql/logictest/testdata/logic_test/json_index create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/json diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index a6b4400ccd1d..4935a2779e72 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -1088,6 +1088,13 @@ func TestTenantLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestTenantLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestTenantLogic_kv_builtin_functions_tenant( t *testing.T, ) { diff --git a/pkg/sql/catalog/colinfo/col_type_info.go b/pkg/sql/catalog/colinfo/col_type_info.go index 98d21abdbc5e..ec400e70b3fd 100644 --- a/pkg/sql/catalog/colinfo/col_type_info.go +++ b/pkg/sql/catalog/colinfo/col_type_info.go @@ -146,7 +146,7 @@ func ColumnTypeIsIndexable(t *types.T) bool { // using an inverted index. func ColumnTypeIsInvertedIndexable(t *types.T) bool { switch t.Family() { - case types.ArrayFamily, types.StringFamily: + case types.JsonFamily, types.ArrayFamily, types.StringFamily: return true } return ColumnTypeIsOnlyInvertedIndexable(t) @@ -162,7 +162,6 @@ func ColumnTypeIsOnlyInvertedIndexable(t *types.T) bool { t = t.ArrayContents() } switch t.Family() { - case types.JsonFamily: case types.GeographyFamily: case types.GeometryFamily: case types.TSVectorFamily: @@ -183,7 +182,7 @@ func MustBeValueEncoded(semanticType *types.T) bool { default: return MustBeValueEncoded(semanticType.ArrayContents()) } - case types.JsonFamily, types.TupleFamily, types.GeographyFamily, types.GeometryFamily: + case types.TupleFamily, types.GeographyFamily, types.GeometryFamily: return true case types.TSVectorFamily, types.TSQueryFamily: return true diff --git a/pkg/sql/catalog/colinfo/column_type_properties.go b/pkg/sql/catalog/colinfo/column_type_properties.go index 7eeeddbb3cea..96ed30d5a633 100644 --- a/pkg/sql/catalog/colinfo/column_type_properties.go +++ b/pkg/sql/catalog/colinfo/column_type_properties.go @@ -52,6 +52,7 @@ func CanHaveCompositeKeyEncoding(typ *types.T) bool { switch typ.Family() { case types.FloatFamily, types.DecimalFamily, + types.JsonFamily, types.CollatedStringFamily: return true case types.ArrayFamily: @@ -75,7 +76,6 @@ func CanHaveCompositeKeyEncoding(typ *types.T) bool { types.UuidFamily, types.INetFamily, types.TimeFamily, - types.JsonFamily, types.TimeTZFamily, types.BitFamily, types.GeometryFamily, diff --git a/pkg/sql/catalog/colinfo/column_type_properties_test.go b/pkg/sql/catalog/colinfo/column_type_properties_test.go index fb985eaee505..9a1d1a0310df 100644 --- a/pkg/sql/catalog/colinfo/column_type_properties_test.go +++ b/pkg/sql/catalog/colinfo/column_type_properties_test.go @@ -52,7 +52,7 @@ func TestCanHaveCompositeKeyEncoding(t *testing.T) { {types.IntArray, false}, {types.Interval, false}, {types.IntervalArray, false}, - {types.Jsonb, false}, + {types.Jsonb, true}, {types.Name, false}, {types.Oid, false}, {types.String, false}, diff --git a/pkg/sql/catalog/table_col_set.go b/pkg/sql/catalog/table_col_set.go index 1476afb6a97a..166a0f5e8a5a 100644 --- a/pkg/sql/catalog/table_col_set.go +++ b/pkg/sql/catalog/table_col_set.go @@ -56,6 +56,9 @@ func (s TableColSet) ForEach(f func(col descpb.ColumnID)) { s.set.ForEach(func(i int) { f(descpb.ColumnID(i)) }) } +// Copy returns a copy of s which can be modified independently. +func (s TableColSet) Copy() TableColSet { return TableColSet{set: s.set.Copy()} } + // SubsetOf returns true if s is a subset of other. func (s TableColSet) SubsetOf(other TableColSet) bool { return s.set.SubsetOf(other.set) diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 9037cc825dc5..bf705cd0bbae 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -714,6 +714,14 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er colIDs = idx.CollectKeyColumnIDs() } + // Inverted indexes don't store composite values in the individual + // paths present. The composite values will be encoded in + // the primary index itself. + compositeColIDsLocal := compositeColIDs.Copy() + if isInverted { + compositeColIDsLocal.Remove(invID) + } + // StoreColumnIDs are derived from StoreColumnNames just like KeyColumnIDs // derives from KeyColumnNames. // For primary indexes this set of columns is typically defined as the set @@ -755,12 +763,12 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er // or in the primary key whose type has a composite encoding, like DECIMAL // for instance. for _, colID := range idx.IndexDesc().KeyColumnIDs { - if compositeColIDs.Contains(colID) { + if compositeColIDsLocal.Contains(colID) { idx.IndexDesc().CompositeColumnIDs = append(idx.IndexDesc().CompositeColumnIDs, colID) } } for _, colID := range idx.IndexDesc().KeySuffixColumnIDs { - if compositeColIDs.Contains(colID) { + if compositeColIDsLocal.Contains(colID) { idx.IndexDesc().CompositeColumnIDs = append(idx.IndexDesc().CompositeColumnIDs, colID) } } diff --git a/pkg/sql/colenc/encode.go b/pkg/sql/colenc/encode.go index b1a35cb5c4b4..f52ae26703f5 100644 --- a/pkg/sql/colenc/encode.go +++ b/pkg/sql/colenc/encode.go @@ -707,6 +707,9 @@ func isComposite(vec coldata.Vec, row int) bool { case types.DecimalFamily: d := tree.DDecimal{Decimal: vec.Decimal()[row]} return d.IsComposite() + case types.JsonFamily: + j := tree.DJSON{JSON: vec.JSON().Get(row)} + return j.IsComposite() default: d := vec.Datum().Get(row) if cdatum, ok := d.(tree.CompositeDatum); ok { diff --git a/pkg/sql/colenc/key.go b/pkg/sql/colenc/key.go index f8f057926a5d..598cc708d051 100644 --- a/pkg/sql/colenc/key.go +++ b/pkg/sql/colenc/key.go @@ -180,6 +180,21 @@ func encodeKeys[T []byte | roachpb.Key]( } kys[r] = b } + case types.JsonFamily: + jsonVector := vec.JSON() + for r := 0; r < count; r++ { + b := kys[r] + if partialIndexAndNullCheck(kys, r, start, nulls, dir) { + continue + } + var err error + jsonObj := jsonVector.Get(r + start) + b, err = jsonObj.EncodeForwardIndex(b, dir) + if err != nil { + return err + } + kys[r] = b + } default: if buildutil.CrdbTestBuild { if typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) != typeconv.DatumVecCanonicalTypeFamily { diff --git a/pkg/sql/colencoding/key_encoding.go b/pkg/sql/colencoding/key_encoding.go index 32f6641a3532..10e72b385b99 100644 --- a/pkg/sql/colencoding/key_encoding.go +++ b/pkg/sql/colencoding/key_encoding.go @@ -187,12 +187,21 @@ func decodeTableKeyToCol( } vecs.IntervalCols[colIdx][rowIdx] = d case types.JsonFamily: - // Don't attempt to decode the JSON value. Instead, just return the - // remaining bytes of the key. - var jsonLen int - jsonLen, err = encoding.PeekLength(key) - vecs.JSONCols[colIdx].Bytes.Set(rowIdx, key[:jsonLen]) - rkey = key[jsonLen:] + // Decode the JSON, and then store the bytes in the + // vector in the value-encoded format. + // TODO (shivam): Make it possible for the vector to store + // key-encoded JSONs instead of value-encoded JSONs. + var d tree.Datum + encDir := encoding.Ascending + if dir == catenumpb.IndexColumn_DESC { + encDir = encoding.Descending + } + d, rkey, err = keyside.Decode(da, valType, key, encDir) + json, ok := d.(*tree.DJSON) + if !ok { + return nil, false, scratch, errors.AssertionFailedf("Could not type assert into DJSON") + } + vecs.JSONCols[colIdx].Set(rowIdx, json.JSON) case types.EncodedKeyFamily: // Don't attempt to decode the inverted key. keyLen, err := encoding.PeekLength(key) diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_stats b/pkg/sql/logictest/testdata/logic_test/distsql_stats index 5737db76796d..0fd7e84b4f4b 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_stats +++ b/pkg/sql/logictest/testdata/logic_test/distsql_stats @@ -1257,7 +1257,7 @@ ORDER BY statistics_name column_names row_count null_count has_histogram s {a} 3 0 true s {b} 3 0 true -s {j} 3 0 false +s {j} 3 0 true s {rowid} 3 0 true # Test that non-index columns have histograms collected for them, with @@ -2348,7 +2348,7 @@ SHOW STATISTICS USING JSON FOR TABLE j; statement ok ALTER TABLE j INJECT STATISTICS '$j_stats' -statement error pq: cannot create partial statistics on an inverted index column +statement error pq: table j does not contain a non-partial forward index with j as a prefix column CREATE STATISTICS j_partial ON j FROM j USING EXTREMES; statement ok diff --git a/pkg/sql/logictest/testdata/logic_test/expression_index b/pkg/sql/logictest/testdata/logic_test/expression_index index 3f242f89f0e0..21206bdf7746 100644 --- a/pkg/sql/logictest/testdata/logic_test/expression_index +++ b/pkg/sql/logictest/testdata/logic_test/expression_index @@ -111,15 +111,9 @@ CREATE TABLE err (a INT, INDEX (a, (NULL))) statement ok CREATE TABLE t_null_cast (a INT, INDEX (a, (NULL::TEXT))) -statement error index element j->'a' of type jsonb is not indexable -CREATE TABLE err (a INT, j JSON, INDEX (a, (j->'a'))) - statement error index element \(a, b\) of type record is not indexable CREATE TABLE err (a INT, b INT, INDEX (a, (row(a, b)))) -statement error index element j->'a' of type jsonb is not allowed as a prefix column in an inverted index.*\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html -CREATE TABLE err (a INT, j JSON, INVERTED INDEX ((j->'a'), j)) - statement error index element a \+ b of type int is not allowed as the last column in an inverted index.*\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html CREATE TABLE err (a INT, b INT, INVERTED INDEX (a, (a + b))) @@ -340,15 +334,21 @@ CREATE INDEX err ON t (a, (NULL), b) statement ok CREATE INDEX t_cast_idx ON t (a, (NULL::TEXT), b) -statement error index element j->'a' of type jsonb is not indexable +statement ok CREATE INDEX err ON t (a, (j->'a')); +statement ok +DROP INDEX err + statement error index element \(a, b\) of type record is not indexable CREATE INDEX err ON t (a, (row(a, b))); -statement error index element j->'a' of type jsonb is not allowed as a prefix column in an inverted index.*\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html +statement ok CREATE INVERTED INDEX err ON t ((j->'a'), j); +statement ok +DROP INDEX err + statement error index element a \+ b of type int is not allowed as the last column in an inverted index.*\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html CREATE INVERTED INDEX err ON t (a, (a + b)); @@ -639,6 +639,68 @@ SELECT k, a, b, c, comp FROM t@t_lower_c_a_plus_b_idx WHERE lower(c) = 'foo' AND k a b c comp 2 20 200 FOO 30 +statement ok +CREATE TABLE d (a INT, j JSON); +CREATE INDEX json_expr_index on d ((j->'a')) + +statement ok +INSERT INTO d VALUES + (1, '{"a": "hello"}'), + (2, '{"a": "b"}'), + (3, '{"a": "bye"}'), + (4, '{"a": "json"}') + +query IT +SELECT a, j from d@json_expr_index where j->'a' = '"b"' ORDER BY a +---- +2 {"a": "b"} + +query IT +SELECT a, j from d@json_expr_index where j->'a' = '"b"' OR j->'a' = '"bye"' ORDER BY a +---- +2 {"a": "b"} +3 {"a": "bye"} + +query IT +SELECT a, j from d@json_expr_index where j->'a' > '"a"' ORDER BY a +---- +1 {"a": "hello"} +2 {"a": "b"} +3 {"a": "bye"} +4 {"a": "json"} + +query IT +SELECT a, j from d@json_expr_index where j->'a' <= '"hello"' ORDER BY a +---- +1 {"a": "hello"} +2 {"a": "b"} +3 {"a": "bye"} + + +statement ok +INSERT INTO d VALUES + (5, '{"a": "forward", "json": "inverted"}'), + (6, '{"a": "c", "b": "d"}') + + +statement ok +CREATE INVERTED INDEX json_inv on d ((j->'a'), j) + +query IT +SELECT a, j from d@json_inv where j->'a' = '"forward"' AND j->'json' = '"inverted"' ORDER BY a +---- +5 {"a": "forward", "json": "inverted"} + +query IT +SELECT a, j from d@json_inv where j->'a' = '"c"' AND j->'json' = '"inverted"' ORDER BY a +---- + +query IT +SELECT a, j from d@json_inv where j->'a' = '"c"' AND j->'b' = '"d"' ORDER BY a +---- +6 {"a": "c", "b": "d"} + + # Backfilling expression indexes. statement ok @@ -931,6 +993,97 @@ SELECT i, j FROM inv@i_plus_100_j_a WHERE i+100 = 101 AND j->'a' @> '"x"' ORDER 1 {"a": ["x", "y", "z"]} 1 {"a": [1, "x"]} +# Backfilling JSON expression indexes + +statement ok +CREATE TABLE json_backfill ( + k INT PRIMARY KEY, + j JSON, + INDEX forward_expr ((j->'a')), + INDEX forward (j) +) + +statement ok +INSERT INTO json_backfill VALUES + (1, '[1, 2, 3]'), + (2, '{"a": [1, 2, 3], "b": [4, 5, 6]}'), + (3, '{"a": {"a": "b"}, "d": {"e": [1, 2, 3]}}'), + (4, '{"a": [4, 5]}') + +query T +SELECT j from json_backfill@forward_expr where j->'a' IN ('[1, 2, 3]', '[4,5]') ORDER BY k +---- +{"a": [1, 2, 3], "b": [4, 5, 6]} +{"a": [4, 5]} + +query T +SELECT j from json_backfill@forward_expr where j->'a' = '{"a": "b"}' ORDER BY k +---- +{"a": {"a": "b"}, "d": {"e": [1, 2, 3]}} + +query T +SELECT j from json_backfill@forward_expr where j->'a' > '{"a": "b"}' ORDER BY k +---- + +query T +SELECT j from json_backfill@forward_expr where j->'a' < '{"a": "b"}' ORDER BY k +---- +{"a": [1, 2, 3], "b": [4, 5, 6]} +{"a": [4, 5]} + +query T +SELECT j from json_backfill@forward where j = '[1, 2, 3]' ORDER BY k +---- +[1, 2, 3] + +query T +SELECT j from json_backfill@forward where j = '{"a": [4, 5]}' OR j = '[1, 2, 3]' ORDER BY k +---- +[1, 2, 3] +{"a": [4, 5]} + + +statement ok +DROP INDEX forward_expr; +DROP INDEX forward; + +statement ok +CREATE INDEX forward_expr on json_backfill ((j->'a')); +CREATE INDEX forward on json_backfill (j); + +query T +SELECT j from json_backfill@forward_expr where j->'a' IN ('[1, 2, 3]', '[4,5]') ORDER BY k +---- +{"a": [1, 2, 3], "b": [4, 5, 6]} +{"a": [4, 5]} + +query T +SELECT j from json_backfill@forward_expr where j->'a' = '{"a": "b"}' ORDER BY k +---- +{"a": {"a": "b"}, "d": {"e": [1, 2, 3]}} + +query T +SELECT j from json_backfill@forward_expr where j->'a' > '{"a": "b"}' ORDER BY k +---- + +query T +SELECT j from json_backfill@forward_expr where j->'a' < '{"a": "b"}' ORDER BY k +---- +{"a": [1, 2, 3], "b": [4, 5, 6]} +{"a": [4, 5]} + +query T +SELECT j from json_backfill@forward where j = '[1, 2, 3]' ORDER BY k +---- +[1, 2, 3] + +query T +SELECT j from json_backfill@forward where j = '{"a": [4, 5]}' OR j = '[1, 2, 3]' ORDER BY k +---- +[1, 2, 3] +{"a": [4, 5]} + + # Unique expression indexes. statement ok @@ -1009,3 +1162,17 @@ BEGIN; CREATE INDEX t_a_times_three_idx ON t ((a * 3)); SELECT crdb_internal.force_retry('10ms'); COMMIT + +# JSON Expression Unique Indexes + +statement ok +CREATE TABLE uniq_json ( + k INT PRIMARY KEY, + j JSON, + UNIQUE INDEX ((j->'a')) +) + +statement error duplicate key value violates unique constraint "uniq_json_expr_key" +INSERT INTO uniq_json VALUES + (1, '{"a": "b"}'), + (2, '{"a": "b"}') diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index b/pkg/sql/logictest/testdata/logic_test/inverted_index index 81b3a410b980..67655c3299e9 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index @@ -73,14 +73,6 @@ CREATE INVERTED INDEX ON c (foo DESC) statement ok CREATE INVERTED INDEX ON c("qUuX") -statement error column foo of type jsonb is only allowed as the last column in an inverted index\nHINT: see the documentation for more information about inverted indexes -CREATE TABLE d ( - id INT PRIMARY KEY, - foo JSONB, - bar JSONB, - INVERTED INDEX (foo, bar) -) - statement error column foo of type int is not allowed as the last column in an inverted index\nHINT: see the documentation for more information about inverted indexes CREATE TABLE d ( id INT PRIMARY KEY, @@ -1834,8 +1826,6 @@ SELECT j FROM cb2@i WHERE j <@ '[[1], [2]]' ORDER BY k [[2]] [[1], [2]] -statement error pq: index element j->'some_key' of type jsonb is not indexable in a non-inverted index\nHINT: you may want to create an inverted index instead. See the documentation for inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html -CREATE TABLE t2 (j JSON, INDEX ((j->'some_key'))) statement ok CREATE TABLE table_desc_inverted_index ( diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index_multi_column b/pkg/sql/logictest/testdata/logic_test/inverted_index_multi_column index ae0efdeee95d..b398ca90b69d 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index_multi_column +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index_multi_column @@ -5,12 +5,9 @@ statement error column b of type int is not allowed as the last column in an inv CREATE TABLE m_err (k INT PRIMARY KEY, a INT, b INT, geom GEOMETRY, INVERTED INDEX (a, b)) # Err if a non-last column is not a non-invertable type. -statement error column j of type jsonb is only allowed as the last column in an inverted index\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html -CREATE TABLE m_err (k INT PRIMARY KEY, j JSON, geom GEOMETRY, INVERTED INDEX (j, geom)) +statement error column geom1 of type geometry is only allowed as the last column in an inverted index\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html +CREATE TABLE m_err (k INT PRIMARY KEY, geom1 GEOMETRY , geom GEOMETRY, INVERTED INDEX (geom1, geom)) -# Err if a non-last column is not a non-invertable type. -statement error column j of type jsonb is only allowed as the last column in an inverted index\nHINT: see the documentation for more information about inverted indexes: https://www.cockroachlabs.com/docs/.*/inverted-indexes.html -CREATE TABLE m_err (k INT PRIMARY KEY, a INT, j JSON, geom GEOMETRY, INVERTED INDEX (a, j, geom)) statement ok CREATE TABLE l (k INT PRIMARY KEY, a INT, j JSON, INVERTED INDEX (a, j ASC)) @@ -323,3 +320,91 @@ query ITT SELECT * FROM backfill_d@idx WHERE i IN (1, 2, 3, 4) AND s = 'bar' AND j @> '7'::JSON ---- 2 bar [7, 0, 7] + +# Test selecting, inserting, updating, and deleting on a table with a +# multi-column JSON inverted index. +statement ok +CREATE TABLE d ( + id INT PRIMARY KEY, + foo JSONB, + bar JSONB, + INVERTED INDEX idx (foo, bar) +); + +# Testing inserting +statement ok +INSERT into d VALUES + (1, '"foo"', '[7]'), + (2, '"bar"', '[7, 0, 7]'), + (3, '"baz"', '{"a": "b"}'), + (4, '"foo"', '[7, 8, 9, 10]'), + (5, '"foo"', '[[0], [7, 8, 9, 10]]') + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->0 = '7' ORDER BY id +---- +1 "foo" [7] +4 "foo" [7, 8, 9, 10] + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->1 = '0' ORDER BY id +---- + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->1 = '8' ORDER BY id +---- +4 "foo" [7, 8, 9, 10] + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->0 @> '[0]' ORDER BY id +---- +5 "foo" [[0], [7, 8, 9, 10]] + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->0 <@ '[0]' ORDER BY id +---- +5 "foo" [[0], [7, 8, 9, 10]] + +# Testing deleting +statement ok +DELETE FROM d WHERE id = 5 + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->0 <@ '[0]' ORDER BY id +---- + +# Testing updating +statement ok +UPDATE d SET foo = '"updated"' WHERE id = 2 + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"updated"' AND bar->0 @> '7' ORDER BY id +---- +2 "updated" [7, 0, 7] + +# Backfill this multi-column inverted index. + +statement ok +DROP INDEX d@idx + +statement ok +INSERT into d VALUES + (6, '"backfilling"', '[[0], [1], 2, 3]'), + (7, '"q"', '[[0], [1], [2], []]'), + (8, '"backfilling"', '[[0], [1], [2], []]') + + +statement ok +CREATE INVERTED INDEX idx on d (foo, bar) + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"backfilling"' AND bar->2 @> '2' ORDER BY id +---- +6 "backfilling" [[0], [1], 2, 3] +8 "backfilling" [[0], [1], [2], []] + +query ITT +SELECT id, foo, bar FROM d@idx where foo = '"foo"' AND bar->0 = '7' ORDER BY id +---- +1 "foo" [7] +4 "foo" [7, 8, 9, 10] diff --git a/pkg/sql/logictest/testdata/logic_test/json b/pkg/sql/logictest/testdata/logic_test/json index 6f645ebb22af..9aede9c08c56 100644 --- a/pkg/sql/logictest/testdata/logic_test/json +++ b/pkg/sql/logictest/testdata/logic_test/json @@ -179,10 +179,10 @@ SELECT bar FROM foo WHERE bar->'a' = '"b"'::JSON ---- {"a": "b"} -statement error pgcode 0A000 can't order by column type jsonb.*\nHINT.*\n.*35706 +statement ok SELECT bar FROM foo ORDER BY bar -statement error pgcode 0A000 column k is of type jsonb and thus is not indexable +statement ok CREATE TABLE pk (k JSON PRIMARY KEY) query T rowsort diff --git a/pkg/sql/logictest/testdata/logic_test/json_index b/pkg/sql/logictest/testdata/logic_test/json_index new file mode 100644 index 000000000000..7fd4f6d1f4fe --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/json_index @@ -0,0 +1,486 @@ +# Add JSON columns as primary index. +statement ok +CREATE TABLE t (x JSONB PRIMARY KEY) + +# Adding JSON primitive types. +statement ok +INSERT INTO t VALUES + ('"a"'::JSONB), + ('"b"'::JSONB), + ('"aa"'::JSONB), + ('"abcdefghi"'::JSONB), + ('100'::JSONB), + ('1'::JSONB), + ('{"a": "b"}'), + ('[]') + +# Ensuring the ordering is intact. +query T +SELECT x FROM t ORDER BY x +---- +"a" +"aa" +"abcdefghi" +"b" +1 +100 +[] +{"a": "b"} + + +# Test that unique indexes reject bad inserts. +statement error pq: duplicate key value violates unique constraint "t_pkey" +INSERT INTO t VALUES + ('"a"'::JSONB) + +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +"a" +"aa" +"abcdefghi" +"b" +1 +100 +[] +{"a": "b"} + +# Use the index for point lookups. +query T +SELECT x FROM t@t_pkey WHERE x = '"a"' +---- +"a" + +query T +SELECT x FROM t@t_pkey WHERE x = '"aa"' +---- +"aa" + +query T +SELECT x FROM t@t_pkey WHERE x = '100' +---- +100 + +query T +SELECT x FROM t@t_pkey WHERE x = '12' +---- + +query T +SELECT x FROM t@t_pkey WHERE x = '{"a": "b"}' +---- +{"a": "b"} + +# Using the index for bounded scans. +query T +SELECT x FROM t@t_pkey WHERE x > '1' ORDER BY x +---- +100 +[] +{"a": "b"} + +query T +SELECT x FROM t@t_pkey WHERE x < '1' ORDER BY x +---- +"a" +"aa" +"abcdefghi" +"b" + + +query T +SELECT x FROM t@t_pkey WHERE x > '1' OR x < '1' ORDER BY x +---- +"a" +"aa" +"abcdefghi" +"b" +100 +[] +{"a": "b"} + +query T +SELECT x FROM t@t_pkey WHERE x > '1' AND x < '1' ORDER BY x +---- + +# Trying to order by in a descending fashion. +query T +SELECT x FROM t@t_pkey WHERE x > '1' OR x < '1' ORDER BY x DESC +---- +{"a": "b"} +[] +100 +"b" +"abcdefghi" +"aa" +"a" + +# Adding more primitive JSON values. +statement ok +INSERT INTO t VALUES + ('true'), + ('false'), + ('null'), + ('"aaaaaaayouube"'), + ('"testing spaces"'), + ('"Testing Punctuation?!."') + +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +null +"Testing Punctuation?!." +"a" +"aa" +"aaaaaaayouube" +"abcdefghi" +"b" +"testing spaces" +1 +100 +false +true +[] +{"a": "b"} + +query T +SELECT x FROM t@t_pkey WHERE x > 'true' ORDER BY x +---- +[] +{"a": "b"} + +query T +SELECT x FROM t@t_pkey WHERE x < 'false' ORDER BY x +---- +null +"Testing Punctuation?!." +"a" +"aa" +"aaaaaaayouube" +"abcdefghi" +"b" +"testing spaces" +1 +100 + +# Testing JSON Arrays. +statement ok +DROP TABLE IF EXISTS t; +CREATE TABLE t (x JSONB PRIMARY KEY) + +statement ok +INSERT INTO t VALUES + ('[]'), + ('[null]'), + ('[1]'), + ('[null, null, false, true, "a", 1]'), + ('[{"a":"b"}]'), + ('[{"a":"b", "c": [1, 2, 3, 4, 5]}]') + +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +[] +[null] +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[null, null, false, true, "a", 1] + +query T +SELECT x FROM t@t_pkey where x = '[1]' ORDER BY x +---- +[1] + +query T +SELECT x FROM t@t_pkey where x >= '[1]' ORDER BY x +---- +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[null, null, false, true, "a", 1] + +query T +SELECT x FROM t@t_pkey where x <= '[1]' ORDER BY x +---- +[] +[null] +[1] + +query T +SELECT x FROM t@t_pkey where x >= '[1]' AND x <= '{"a": "b"}' ORDER BY x +---- +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[null, null, false, true, "a", 1] + +# Nested JSON Arrays. +statement ok +INSERT INTO t VALUES + ('[1, [2, 3]]'), + ('[1, [2, [3, [4]]]]') + +query T +SELECT x FROM t@t_pkey WHERE x = '[1, [2, 3]]' ORDER BY x +---- +[1, [2, 3]] + +query T +SELECT x FROM t@t_pkey WHERE x = '[1, [2, [3, [4]]]]' ORDER BY x +---- +[1, [2, [3, [4]]]] + +# Testing the ordering again. +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +[] +[null] +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[1, [2, 3]] +[1, [2, [3, [4]]]] +[null, null, false, true, "a", 1] + +# Testing the scans with nested arrays. +query T +SELECT x FROM t@t_pkey WHERE x < '[1, [2, [3, [4]]]]' ORDER BY X +---- +[] +[null] +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[1, [2, 3]] + +# Testing JSON Objects. +statement ok +DROP TABLE IF EXISTS t; +CREATE TABLE t (x JSONB PRIMARY KEY) + +statement ok +INSERT INTO t VALUES + ('{}'), + ('{"a": 1}'), + ('{"a": "sh", "b": 1}'), + ('{"a": ["1"]}'), + ('{"a": [{"b":"c"}]}'), + ('{"c": true, "d": null, "newkey": "newvalue"}'), + ('{"e": {"f": {"g": 1}}, "f": [1, 2, 3]}'), + ('{ "aa": 1, "c": 1}'), + ('{"b": 1, "d": 1}') + +# Testing the ordering again. +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +{} +{"a": 1} +{"a": ["1"]} +{"a": [{"b": "c"}]} +{"a": "sh", "b": 1} +{"aa": 1, "c": 1} +{"b": 1, "d": 1} +{"e": {"f": {"g": 1}}, "f": [1, 2, 3]} +{"c": true, "d": null, "newkey": "newvalue"} + +query T +SELECT x FROM t@t_pkey WHERE x >= '{}' ORDER BY x; +---- +{} +{"a": 1} +{"a": ["1"]} +{"a": [{"b": "c"}]} +{"a": "sh", "b": 1} +{"aa": 1, "c": 1} +{"b": 1, "d": 1} +{"e": {"f": {"g": 1}}, "f": [1, 2, 3]} +{"c": true, "d": null, "newkey": "newvalue"} + +query T +SELECT x FROM t@t_pkey WHERE x < '{}' ORDER BY x; +---- + +query T +SELECT x FROM t@t_pkey WHERE x = '{"e": {"f": {"g": 1}}, "f": [1, 2, 3]}' ORDER BY x; +---- +{"e": {"f": {"g": 1}}, "f": [1, 2, 3]} + +# Ensure that we can order by JSONs without any indexes. +statement ok +DROP TABLE t; +CREATE TABLE t (x JSONB); +INSERT INTO t VALUES + ('{}'), + ('[]'), + ('true'), + ('false'), + ('null'), + ('"crdb"'), + ('[1, 2, 3]'), + ('1'), + ('{"a": "b", "c": "d"}'), + (NULL) + + +query T +SELECT x FROM t@t_pkey ORDER BY x +---- +NULL +null +"crdb" +1 +false +true +[] +[1, 2, 3] +{} +{"a": "b", "c": "d"} + +query T +SELECT x FROM t@t_pkey ORDER BY x DESC +---- +{"a": "b", "c": "d"} +{} +[1, 2, 3] +[] +true +false +1 +"crdb" +null +NULL + +# Test to show JSON Null is different from NULL. +query T +SELECT x FROM t@t_pkey WHERE x IS NOT NULL ORDER BY x +---- +null +"crdb" +1 +false +true +[] +[1, 2, 3] +{} +{"a": "b", "c": "d"} + + +# Test JSONs of composite types without an index. +statement ok +CREATE TABLE tjson(x JSONB); +INSERT INTO tjson VALUES + ('1.250'), + ('1.0'), + ('1.000'), + ('1.111111'), + ('10'), + ('[1, 2.0, 1.21, 1.00]'), + ('{"a": [1, 1.1], "b": 1.0000, "c": 10.0}') + +# Ensure these are round tripped correctly. +query T +SELECT x FROM tjson ORDER BY x, rowid +---- +1.0 +1.000 +1.111111 +1.250 +10 +[1, 2.0, 1.21, 1.00] +{"a": [1, 1.1], "b": 1.0000, "c": 10.0} + +# Test JSONs of composite types with an index. +statement ok +CREATE TABLE y(x JSONB PRIMARY KEY); +INSERT INTO y VALUES + ('1.00'), + ('1.250'), + ('10'), + ('[1, 2.0, 1.21, 1.00]'), + ('{"a": [1, 1.1], "b": 1.0000, "c": 10.0}') + +# Ensure these are round tripped correctly. +query T +SELECT x FROM y ORDER BY x +---- +1.00 +1.250 +10 +[1, 2.0, 1.21, 1.00] +{"a": [1, 1.1], "b": 1.0000, "c": 10.0} + +# Test that unique indexes reject bad inserts for composite types. +statement error pq: duplicate key value violates unique constraint "y_pkey" +INSERT INTO y VALUES + ('1.0000') + +# Testing with the descending designation. +statement ok +DROP TABLE t; +CREATE TABLE t (x JSONB) + +statement ok +CREATE INDEX i ON t(x DESC) + +statement ok +INSERT INTO t VALUES + ('{}'), + ('[]'), + ('true'), + ('false'), + ('null'), + ('"crdb"'), + ('[1, 2, 3]'), + ('1'), + ('{"a": "b", "c": "d"}'), + ('[null]'), + ('[1]'), + ('[null, null, false, true, "a", 1]'), + ('[{"a":"b"}]'), + ('[{"a":"b", "c": [1, 2, 3, 4, 5]}]') + +query T +SELECT x FROM t@i ORDER BY x; +---- +null +"crdb" +1 +false +true +[] +[null] +[1] +[{"a": "b"}] +[{"a": "b", "c": [1, 2, 3, 4, 5]}] +[1, 2, 3] +[null, null, false, true, "a", 1] +{} +{"a": "b", "c": "d"} + + +# Testing different joins on indexed JSONs. +statement ok +DROP TABLE IF EXISTS t1, t2 CASCADE; +CREATE TABLE t1 (x JSONB PRIMARY KEY); +CREATE TABLE t2 (x JSONB PRIMARY KEY); +INSERT INTO t1 VALUES + ('[1, [2, 3]]'), + ('[1, [2, [3, [4]]]]'); +INSERT INTO t2 VALUES + ('[1, [2, 3]]'), + ('{}'), + ('[1, [2, 4]]') + + +query T rowsort +SELECT t1.x FROM t1 INNER MERGE JOIN t2 ON t1.x = t2.x +---- +[1, [2, 3]] + +query T rowsort +SELECT t1.x FROM t1 INNER LOOKUP JOIN t2 ON t1.x = t2.x +---- +[1, [2, 3]] + diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 6b00acfc6a80..d134ae090358 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -3378,29 +3378,30 @@ JOIN pg_operator c ON c.oprname = '>' AND b.proargtypes[0] = c.oprleft AND b.pro WHERE (b.proname = 'max' OR b.proname = 'bool_or') AND c.oid = a.aggsortop; ---- oid oprname aggsortop -1224236426 > 1224236426 -3636536082 > 3636536082 3636536082 > 3636536082 -2948286002 > 2948286002 -3234851498 > 3234851498 -2318307066 > 2318307066 1737252658 > 1737252658 1737252658 > 1737252658 -1383827510 > 1383827510 +1224236426 > 1224236426 +3636536082 > 3636536082 264553706 > 264553706 -2105536758 > 2105536758 -1928531314 > 1928531314 -3421685890 > 3421685890 883535762 > 883535762 +1383827510 > 1383827510 +2318307066 > 2318307066 +3234851498 > 3234851498 +256681770 > 256681770 530358714 > 530358714 -3802002898 > 3802002898 +2105536758 > 2105536758 +1928531314 > 1928531314 1737252658 > 1737252658 1737252658 > 1737252658 -1064453514 > 1064453514 -1778355034 > 1778355034 -256681770 > 256681770 +2948286002 > 2948286002 2139039570 > 2139039570 +3802002898 > 3802002898 3457382662 > 3457382662 +3421685890 > 3421685890 +1064453514 > 1064453514 +1778355034 > 1778355034 +3944320082 > 3944320082 1385359122 > 1385359122 # Check whether correct operator's oid is set for min, bool_and and every. @@ -3411,30 +3412,31 @@ JOIN pg_operator c ON c.oprname = '<' AND b.proargtypes[0] = c.oprleft AND b.pro WHERE (b.proname = 'min' OR b.proname = 'bool_and' OR b.proname = 'every') AND c.oid = a.aggsortop; ---- oid oprname aggsortop -3859576864 < 3859576864 -2134593616 < 2134593616 2134593616 < 2134593616 2134593616 < 2134593616 -1446343536 < 1446343536 -2457977576 < 2457977576 -2790955336 < 2790955336 235310192 < 235310192 235310192 < 235310192 -2011297100 < 2011297100 +3859576864 < 3859576864 +2134593616 < 2134593616 3269496816 < 3269496816 -2104629996 < 2104629996 -3942776496 < 3942776496 -4132205728 < 4132205728 3676560592 < 3676560592 +2011297100 < 2011297100 +2790955336 < 2790955336 +2457977576 < 2457977576 +426663592 < 426663592 1494969736 < 1494969736 -3842027408 < 3842027408 +2104629996 < 2104629996 +3942776496 < 3942776496 235310192 < 235310192 235310192 < 235310192 -2300570720 < 2300570720 -3675947880 < 3675947880 -426663592 < 426663592 +1446343536 < 1446343536 2699108304 < 2699108304 +3842027408 < 3842027408 2897050084 < 2897050084 +4132205728 < 4132205728 +2300570720 < 2300570720 +3675947880 < 3675947880 +3575809104 < 3575809104 1579888144 < 1579888144 subtest collated_string_type diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 67b20ec7f1c2..e669955b4ded 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -1059,6 +1059,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index d897cdf6cc52..c73e6b466458 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -1059,6 +1059,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index b5ab2ecb7404..30a9826e6359 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -1066,6 +1066,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index 20a4224814e8..978fea60dd22 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -1038,6 +1038,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-mixed-22.2-23.1/generated_test.go b/pkg/sql/logictest/tests/local-mixed-22.2-23.1/generated_test.go index 6a996d2f1d58..88afcc2436b6 100644 --- a/pkg/sql/logictest/tests/local-mixed-22.2-23.1/generated_test.go +++ b/pkg/sql/logictest/tests/local-mixed-22.2-23.1/generated_test.go @@ -1052,6 +1052,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 860b4ca1cf5f..f698fd7082b8 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -1066,6 +1066,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index d9220d202593..1d3c3dfef086 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -1157,6 +1157,13 @@ func TestLogic_json_builtins( runLogicTest(t, "json_builtins") } +func TestLogic_json_index( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "json_index") +} + func TestLogic_kv_builtin_functions( t *testing.T, ) { diff --git a/pkg/sql/opt/exec/execbuilder/testdata/json b/pkg/sql/opt/exec/execbuilder/testdata/json new file mode 100644 index 000000000000..4d0adf966e3c --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/json @@ -0,0 +1,408 @@ +# LogicTest: local + +statement ok +CREATE TABLE t (x JSONB PRIMARY KEY) + +# Testing range scans on forward indexes. + +query T +EXPLAIN SELECT x FROM t WHERE x = 'null'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'null' - /'null'] + +query T +EXPLAIN SELECT x FROM t WHERE x = '"a"'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'"a"' - /'"a"'] + +query T +EXPLAIN SELECT x FROM t WHERE x = '1'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'1' - /'1'] + +query T +EXPLAIN SELECT x FROM t WHERE x = 'false'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'false' - /'false'] + +query T +EXPLAIN SELECT x FROM t WHERE x = 'true'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'true' - /'true'] + +query T +EXPLAIN SELECT x from t WHERE x = '[1, 2, 3]'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'[1, 2, 3]' - /'[1, 2, 3]'] + + +query T +EXPLAIN SELECT x from t WHERE x = '{"a": [1, 2, 3], "b": [1, 2]}'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'{"a": [1, 2, 3], "b": [1, 2]}' - /'{"a": [1, 2, 3], "b": [1, 2]}'] + + +query T +EXPLAIN SELECT x FROM t WHERE x < '1'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [ - /'1') + +query T +EXPLAIN SELECT x FROM t WHERE x < '"ABCD"'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [ - /'"ABCD"') + +query T +EXPLAIN SELECT x FROM t WHERE x < '[1, 2, 3]'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [ - /'[1, 2, 3]') + +query T +EXPLAIN SELECT x FROM t WHERE x <= '[]'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [ - /'[]'] + +query T +EXPLAIN SELECT x FROM t WHERE x > '{"a": "b"}'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: (/'{"a": "b"}' - ] + +query T +EXPLAIN SELECT x FROM t WHERE x > '{"a": "b"}'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: (/'{"a": "b"}' - ] + +query T +EXPLAIN SELECT x FROM t WHERE x > '{"a": "b"}'::JSONB AND x < '[1, 2, 3]'::JSONB +---- +distribution: local +vectorized: true +· +• norows + +query T +EXPLAIN SELECT x FROM t WHERE x <= '{"a": "b"}'::JSONB AND x >= '[1, 2, 3]'::JSONB +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'[1, 2, 3]' - /'{"a": "b"}'] + +query T +EXPLAIN SELECT x FROM t WHERE x <= '"a"'::JSONB OR x >= 'null'::JSONB +---- +distribution: local +vectorized: true +· +• filter +│ filter: (x <= '"a"') OR (x >= 'null') +│ +└── • scan + missing stats + table: t@t_pkey + spans: FULL SCAN + +# Tests to show multiple spans are generated with an IN operator. + +query T +EXPLAIN SELECT x FROM t WHERE x IN ('1', '"a"', '[1, 2, 3]', '{"a": "b"}') +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'"a"' - /'"a"'] [/'1' - /'1'] [/'[1, 2, 3]' - /'[1, 2, 3]'] [/'{"a": "b"}' - /'{"a": "b"}'] + +query T +EXPLAIN SELECT x FROM t WHERE x IN ('null', '{}', '[]', '""') +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/'null' - /'null'] [/'""' - /'""'] [/'[]' - /'[]'] [/'{}' - /'{}'] + +# Multicolumn index, including JSONB + +statement ok +CREATE TABLE s (x INT, y JSONB, z INT, INDEX i (x, y, z)) + + +query T +EXPLAIN SELECT x, y, z FROM s WHERE x = 2 AND y < '[1, 2, 3]'::JSONB AND z = 100 +---- +distribution: local +vectorized: true +· +• filter +│ filter: z = 100 +│ +└── • scan + missing stats + table: s@i + spans: (/2/NULL - /2/'[1, 2, 3]') + +query T +EXPLAIN SELECT x, y, z FROM s WHERE y >= '"a"'::JSONB +---- +distribution: local +vectorized: true +· +• filter +│ filter: y >= '"a"' +│ +└── • scan + missing stats + table: s@s_pkey + spans: FULL SCAN + +# Ensuring that the presence of composite values results in +# encoding in the valueside as well for a given K/V pair. +statement ok +CREATE TABLE composite (k INT PRIMARY KEY, j JSONB, FAMILY (k, j)); +CREATE INDEX on composite (j); + +query T kvtrace +INSERT INTO composite VALUES (1, '1.00'::JSONB), (2, '1'::JSONB), (3, '2'::JSONB), + (4, '3.0'::JSONB), (5, '"a"'::JSONB) +---- +CPut /Table/108/1/1/0 -> /TUPLE/ +InitPut /Table/108/2/"G*\x02\x00\x00\x89\x88" -> /BYTES/0x2f0f0c200000002000000403348964 +CPut /Table/108/1/2/0 -> /TUPLE/ +InitPut /Table/108/2/"G*\x02\x00\x00\x8a\x88" -> /BYTES/ +CPut /Table/108/1/3/0 -> /TUPLE/ +InitPut /Table/108/2/"G*\x04\x00\x00\x8b\x88" -> /BYTES/ +CPut /Table/108/1/4/0 -> /TUPLE/ +InitPut /Table/108/2/"G*\x06\x00\x00\x8c\x88" -> /BYTES/0x2f0f0c20000000200000040334891e +CPut /Table/108/1/5/0 -> /TUPLE/ +InitPut /Table/108/2/"F\x12a\x00\x01\x00\x8d\x88" -> /BYTES/ + +query T kvtrace +SELECT j FROM composite where j = '1.00'::JSONB +---- +Scan /Table/108/2/"G*\x02\x00\x0{0"-1"} + +query T +SELECT j FROM composite ORDER BY j; +---- +"a" +1.00 +1 +2 +3.0 + +# JSON Expression Indexes. +statement ok +CREATE TABLE d (a INT, j JSON); +CREATE INDEX json_expr_index on d ((j->'a')) + +statement ok +INSERT INTO d VALUES + (1, '{"a": "hello"}'), + (2, '{"a": "b"}'), + (3, '{"a": "bye"}'), + (4, '{"a": "json"}') + + +query T +EXPLAIN SELECT j from d where j->'a' = '"b"' +---- +distribution: local +vectorized: true +· +• index join +│ table: d@d_pkey +│ +└── • scan + missing stats + table: d@json_expr_index + spans: [/'"b"' - /'"b"'] + + +query T +EXPLAIN SELECT j from d where j->'a' = '"b"' OR j->'a' = '"bye"' +---- +distribution: local +vectorized: true +· +• index join +│ table: d@d_pkey +│ +└── • scan + missing stats + table: d@json_expr_index + spans: [/'"b"' - /'"b"'] [/'"bye"' - /'"bye"'] + +# The expression index is not used for this query. +query T +EXPLAIN SELECT j from d where j > '{"a":"b"}' +---- +distribution: local +vectorized: true +· +• filter +│ filter: j > '{"a": "b"}' +│ +└── • scan + missing stats + table: d@d_pkey + spans: FULL SCAN + +query T +EXPLAIN SELECT j from d where j->'a' = '1' +---- +distribution: local +vectorized: true +· +• index join +│ table: d@d_pkey +│ +└── • scan + missing stats + table: d@json_expr_index + spans: [/'1' - /'1'] + +query T +EXPLAIN SELECT j from d where j->'a' = 'null' +---- +distribution: local +vectorized: true +· +• index join +│ table: d@d_pkey +│ +└── • scan + missing stats + table: d@json_expr_index + spans: [/'null' - /'null'] + +# Inverted Indexes where JSON is also forward indexed. +statement ok +INSERT INTO d VALUES + (5, '{"a": "forward", "json": "inverted"}'), + (6, '{"a": "c", "b": "d"}') + + +statement ok +CREATE INVERTED INDEX json_inv on d ((j->'a'), j) + + +query T +EXPLAIN SELECT j from d where j->'a' = '"forward"' AND j->'json' = '"inverted"' +---- +distribution: local +vectorized: true +· +• index join +│ table: d@d_pkey +│ +└── • scan + missing stats + table: d@json_inv + spans: 1 span + +query T +EXPLAIN SELECT j from d where j->'a' = '[1, 2, 3]' AND j->'json' = '{}' +---- +distribution: local +vectorized: true +· +• filter +│ filter: (j->'json') = '{}' +│ +└── • index join + │ table: d@d_pkey + │ + └── • inverted filter + │ inverted column: j_inverted_key + │ num spans: 2 + │ + └── • scan + missing stats + table: d@json_inv + spans: 2 spans diff --git a/pkg/sql/opt/exec/execbuilder/testdata/stats b/pkg/sql/opt/exec/execbuilder/testdata/stats index 8a924bb8d80d..3dddc5bee8c9 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/stats +++ b/pkg/sql/opt/exec/execbuilder/testdata/stats @@ -355,6 +355,8 @@ limit │ ├── columns: j:1 │ ├── immutable │ ├── stats: [rows=1, distinct(1)=1, null(1)=1] + │ │ histogram(1)= 0 1 + │ │ <--- NULL │ ├── cost: 23.775 │ ├── fd: ()-->(1) │ ├── limit hint: 1.00 @@ -362,6 +364,8 @@ limit │ ├── scan tj │ │ ├── columns: j:1 │ │ ├── stats: [rows=5, distinct(1)=4, null(1)=1] + │ │ │ histogram(1)= 0 1 0 1 2 1 + │ │ │ <--- NULL --- '1' --- '{}' │ │ ├── cost: 23.695 │ │ ├── limit hint: 5.00 │ │ ├── distribution: test diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go index ced9ea9bef2b..83fb197ec1cc 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go @@ -343,6 +343,13 @@ func TestExecBuild_join_order( runExecBuildLogicTest(t, "join_order") } +func TestExecBuild_json( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "json") +} + func TestExecBuild_limit( t *testing.T, ) { diff --git a/pkg/sql/opt/indexrec/candidate.go b/pkg/sql/opt/indexrec/candidate.go index d94b5f265476..327fbc58dfcb 100644 --- a/pkg/sql/opt/indexrec/candidate.go +++ b/pkg/sql/opt/indexrec/candidate.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/intsets" ) @@ -223,7 +224,10 @@ func (ics indexCandidateSet) addOrderingIndex(ordering opt.Ordering) { func (ics *indexCandidateSet) addJoinIndexes(expr memo.FiltersExpr) { outerCols := expr.OuterCols().ToList() for _, col := range outerCols { - if colinfo.ColumnTypeIsIndexable(ics.md.ColumnMeta(col).Type) { + // TODO (Shivam): Index recommendations should not only allow JSON columns + // to be part of inverted indexes since they are also forward indexable. + if colinfo.ColumnTypeIsIndexable(ics.md.ColumnMeta(col).Type) && + ics.md.ColumnMeta(col).Type.Family() != types.JsonFamily { ics.addSingleColumnIndex(col, false /* desc */, ics.joinCandidates) } else { ics.addSingleColumnIndex(col, false /* desc */, ics.invertedCandidates) @@ -309,7 +313,10 @@ func (ics *indexCandidateSet) addVariableExprIndex( switch expr := expr.(type) { case *memo.VariableExpr: col := expr.Col - if colinfo.ColumnTypeIsIndexable(ics.md.ColumnMeta(col).Type) { + // TODO (Shivam): Index recommendations should not only allow JSON columns + // to be part of inverted indexes since they are also forward indexable. + if colinfo.ColumnTypeIsIndexable(ics.md.ColumnMeta(col).Type) && + ics.md.ColumnMeta(col).Type.Family() != types.JsonFamily { ics.addSingleColumnIndex(col, false /* desc */, indexCandidates) } else { ics.addSingleColumnIndex(col, false /* desc */, ics.invertedCandidates) @@ -339,7 +346,10 @@ func (ics *indexCandidateSet) addMultiColumnIndex( index := make([]cat.IndexColumn, 0, len(tableToCols[currTable])) for _, colSlice := range tableToCols[currTable] { indexCol := colSlice[0] - if colinfo.ColumnTypeIsIndexable(indexCol.Column.DatumType()) { + // TODO (Shivam): Index recommendations should not only allow JSON columns + // to be part of inverted indexes since they are also forward indexable. + if indexCol.Column.DatumType().Family() != types.JsonFamily && + colinfo.ColumnTypeIsIndexable(indexCol.Column.DatumType()) { index = append(index, indexCol) } } diff --git a/pkg/sql/opt/indexrec/hypothetical_table.go b/pkg/sql/opt/indexrec/hypothetical_table.go index 0037badc452c..5eb4324dcc2e 100644 --- a/pkg/sql/opt/indexrec/hypothetical_table.go +++ b/pkg/sql/opt/indexrec/hypothetical_table.go @@ -42,7 +42,10 @@ func BuildOptAndHypTableMaps( for _, indexCols := range indexes { indexOrd := hypTable.Table.IndexCount() + len(hypIndexes) lastKeyCol := indexCols[len(indexCols)-1] - inverted := !colinfo.ColumnTypeIsIndexable(lastKeyCol.DatumType()) + // TODO (Shivam): Index recommendations should not only allow JSON columns + // to be part of inverted indexes since they are also forward indexable. + inverted := !colinfo.ColumnTypeIsIndexable(lastKeyCol.DatumType()) || + lastKeyCol.DatumType().Family() == types.JsonFamily if inverted { invertedCol := hypTable.addInvertedCol(lastKeyCol.Column) indexCols[len(indexCols)-1] = cat.IndexColumn{Column: invertedCol} diff --git a/pkg/sql/opt/norm/testdata/rules/inline b/pkg/sql/opt/norm/testdata/rules/inline index eac039b21cab..084b6efd5aa6 100644 --- a/pkg/sql/opt/norm/testdata/rules/inline +++ b/pkg/sql/opt/norm/testdata/rules/inline @@ -740,7 +740,7 @@ project ├── columns: k:1!null i:2 s:3 j:4 v:5 x:6 ├── immutable ├── key: (1) - ├── fd: (1)-->(2-4), (3)-->(5), (4)-->(6) + ├── fd: (1)-->(2-4,6), (3)-->(5) ├── select │ ├── columns: k:1!null i:2 s:3 j:4 │ ├── immutable @@ -769,7 +769,7 @@ project ├── columns: k:1!null i:2 s:3 j:4 v:5 x:6 ├── immutable ├── key: (1) - ├── fd: (1)-->(2-4), (3)-->(5), (4)-->(6) + ├── fd: (1)-->(2-4,6), (3)-->(5) ├── select │ ├── columns: k:1!null i:2 s:3 j:4 │ ├── immutable @@ -835,12 +835,12 @@ semi-join (hash) ├── columns: k:1!null i:2 s:3 j:4 v:5 x:6 ├── immutable ├── key: (1) - ├── fd: (1)-->(2-4), (3)-->(5), (4)-->(6) + ├── fd: (1)-->(2-4,6), (3)-->(5) ├── project │ ├── columns: v:5 x:6 k:1!null i:2 s:3 j:4 │ ├── immutable │ ├── key: (1) - │ ├── fd: (1)-->(2-4), (3)-->(5), (4)-->(6) + │ ├── fd: (1)-->(2-4,6), (3)-->(5) │ ├── scan virt │ │ ├── columns: k:1!null i:2 s:3 j:4 │ │ ├── computed column expressions diff --git a/pkg/sql/opt/norm/testdata/rules/project b/pkg/sql/opt/norm/testdata/rules/project index f0bd44b41e09..ce57a23bcb20 100644 --- a/pkg/sql/opt/norm/testdata/rules/project +++ b/pkg/sql/opt/norm/testdata/rules/project @@ -1345,7 +1345,7 @@ ON True project ├── columns: x:1!null z:2 j:3 x:7 "?column?":8!null ├── immutable - ├── fd: (1)-->(2,3), (3)-->(7) + ├── fd: (1)-->(2,3) ├── inner-join (cross) │ ├── columns: b.x:1!null z:2 j:3 "?column?":8!null │ ├── multiplicity: left-rows(one-or-more), right-rows(zero-or-more) @@ -1390,7 +1390,6 @@ project │ ├── project │ │ ├── columns: "?column?":7 x:1!null │ │ ├── immutable - │ │ ├── fd: (1)-->(7) │ │ ├── inner-join (cross) │ │ │ ├── columns: x:1!null j:3 │ │ │ ├── multiplicity: left-rows(one-or-more), right-rows(zero-or-more) @@ -1516,7 +1515,6 @@ project ├── columns: x:2 j:1!null ├── cardinality: [2 - 2] ├── immutable - ├── fd: (1)-->(2) ├── values │ ├── columns: column1:1!null │ ├── cardinality: [2 - 2] diff --git a/pkg/sql/opt/optbuilder/orderby.go b/pkg/sql/opt/optbuilder/orderby.go index 014807e8aeec..bfb049f386ce 100644 --- a/pkg/sql/opt/optbuilder/orderby.go +++ b/pkg/sql/opt/optbuilder/orderby.go @@ -297,8 +297,6 @@ func ensureColumnOrderable(e tree.TypedExpr) { typ = typ.ArrayContents() } switch typ.Family() { - case types.JsonFamily: - panic(unimplementedWithIssueDetailf(35706, "", "can't order by column type jsonb")) case types.TSQueryFamily, types.TSVectorFamily: panic(unimplementedWithIssueDetailf(92165, "", "can't order by column type %s", typ.SQLString())) } diff --git a/pkg/sql/opt/xform/testdata/external/customer b/pkg/sql/opt/xform/testdata/external/customer index c7a737a871b4..c98bba84bb14 100644 --- a/pkg/sql/opt/xform/testdata/external/customer +++ b/pkg/sql/opt/xform/testdata/external/customer @@ -544,17 +544,16 @@ project │ ├── key columns: [1] = [1] │ ├── lookup columns are key │ ├── immutable - │ ├── fd: (1)-->(2,3), (6)-->(7), (2)==(7), (7)==(2) + │ ├── fd: (1)-->(2,3), (2)==(7), (7)==(2) │ ├── inner-join (lookup idtable@secondary_id) │ │ ├── columns: primary_id:1!null idtable.secondary_id:2!null value:6!null column7:7!null │ │ ├── key columns: [7] = [2] │ │ ├── immutable - │ │ ├── fd: (6)-->(7), (1)-->(2), (2)==(7), (7)==(2) + │ │ ├── fd: (1)-->(2), (2)==(7), (7)==(2) │ │ ├── project │ │ │ ├── columns: column7:7 value:6!null │ │ │ ├── cardinality: [2 - 2] │ │ │ ├── immutable - │ │ │ ├── fd: (6)-->(7) │ │ │ ├── values │ │ │ │ ├── columns: value:6!null │ │ │ │ ├── cardinality: [2 - 2] diff --git a/pkg/sql/rowenc/index_encoding.go b/pkg/sql/rowenc/index_encoding.go index 2abddbf8a37e..c05c187c7130 100644 --- a/pkg/sql/rowenc/index_encoding.go +++ b/pkg/sql/rowenc/index_encoding.go @@ -1457,7 +1457,7 @@ func GetValueColumns(index catalog.Index) []ValueEncodedColumn { id := index.GetCompositeColumnID(i) // Inverted indexes on a composite type (i.e. an array of composite types) // should not add the indexed column to the value. - if index.GetType() == descpb.IndexDescriptor_INVERTED && id == index.GetKeyColumnID(0) { + if index.GetType() == descpb.IndexDescriptor_INVERTED && id == index.InvertedColumnID() { continue } cols = append(cols, ValueEncodedColumn{ColID: id, IsComposite: true}) diff --git a/pkg/sql/rowenc/index_encoding_test.go b/pkg/sql/rowenc/index_encoding_test.go index d1a969eb1041..ef2b1176e61e 100644 --- a/pkg/sql/rowenc/index_encoding_test.go +++ b/pkg/sql/rowenc/index_encoding_test.go @@ -46,7 +46,9 @@ type indexKeyTest struct { secondaryValues []tree.Datum } -func makeTableDescForTest(test indexKeyTest) (catalog.TableDescriptor, catalog.TableColMap) { +func makeTableDescForTest( + test indexKeyTest, isSecondaryIndexForward bool, +) (catalog.TableDescriptor, catalog.TableColMap) { primaryColumnIDs := make([]descpb.ColumnID, len(test.primaryValues)) secondaryColumnIDs := make([]descpb.ColumnID, len(test.secondaryValues)) columns := make([]descpb.ColumnDescriptor, len(test.primaryValues)+len(test.secondaryValues)) @@ -65,6 +67,9 @@ func makeTableDescForTest(test indexKeyTest) (catalog.TableDescriptor, catalog.T if colinfo.ColumnTypeIsInvertedIndexable(columns[i].Type) { secondaryType = descpb.IndexDescriptor_INVERTED } + if isSecondaryIndexForward && columns[i].Type.Family() == types.JsonFamily { + secondaryType = descpb.IndexDescriptor_FORWARD + } secondaryColumnIDs[i-len(test.primaryValues)] = columns[i].ID } } @@ -116,6 +121,14 @@ func decodeIndex( } func TestIndexKey(t *testing.T) { + parseJSON := func(s string) *tree.DJSON { + j, err := json.ParseJSON(s) + if err != nil { + t.Fatalf("Failed to parse %s: %v", s, err) + } + return tree.NewDJSON(j) + } + rng, _ := randutil.NewTestRand() var a tree.DatumAlloc @@ -155,6 +168,82 @@ func TestIndexKey(t *testing.T) { []tree.Datum{tree.NewDInt(10), tree.NewDInt(11), tree.NewDInt(12)}, []tree.Datum{tree.NewDInt(20), tree.NewDInt(21), tree.NewDInt(22)}, }, + // Testing JSON in primary indexes. + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`"a"`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`1`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`"a"`), parseJSON(`[1, 2, 3]`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`{"a": "b"}`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`{"a": "b", "c": "d"}`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`[1, "a", {"a": "b"}]`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`null`), parseJSON(`[]`), parseJSON(`{}`), + parseJSON(`""`)}, + secondaryValues: []tree.Datum{tree.NewDInt(20)}, + }, + // Testing JSON in secondary indexes. + { + tableID: 50, + primaryValues: []tree.Datum{tree.NewDInt(20)}, + secondaryValues: []tree.Datum{parseJSON(`{"a": "b"}`)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{tree.NewDInt(20), tree.NewDInt(50)}, + secondaryValues: []tree.Datum{parseJSON(`{"a": "b"}`), parseJSON(`[1, "a", {"a": "b"}]`)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{tree.NewDInt(20), tree.NewDInt(50)}, + secondaryValues: []tree.Datum{parseJSON(`1`)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{tree.NewDInt(20), tree.NewDInt(50)}, + secondaryValues: []tree.Datum{parseJSON(`"b"`)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{tree.NewDInt(20), tree.NewDInt(50)}, + secondaryValues: []tree.Datum{parseJSON(`null`), parseJSON(`[]`), parseJSON(`{}`), + parseJSON(`""`)}, + }, + // Testing JSON in both primary and secondary indexes. + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`"a"`)}, + secondaryValues: []tree.Datum{parseJSON(`"b"`)}, + }, + { + tableID: 50, + primaryValues: []tree.Datum{parseJSON(`{"a": "b"}`), parseJSON(`[1, "a", {"a": "b"}]`)}, + secondaryValues: []tree.Datum{parseJSON(`null`), parseJSON(`[]`), parseJSON(`{}`), + parseJSON(`""`)}, + }, } for i := 0; i < 1000; i++ { @@ -178,7 +267,7 @@ func TestIndexKey(t *testing.T) { for i, test := range tests { evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(context.Background()) - tableDesc, colMap := makeTableDescForTest(test) + tableDesc, colMap := makeTableDescForTest(test, true /* isSecondaryIndexForward */) // Add the default family to each test, since secondary indexes support column families. var ( colNames []string @@ -348,7 +437,8 @@ func TestInvertedIndexKey(t *testing.T) { runTest := func(value tree.Datum, expectedKeys int, version descpb.IndexDescriptorVersion) { primaryValues := []tree.Datum{tree.NewDInt(10)} secondaryValues := []tree.Datum{value} - tableDesc, colMap := makeTableDescForTest(indexKeyTest{50, primaryValues, secondaryValues}) + tableDesc, colMap := makeTableDescForTest(indexKeyTest{50, primaryValues, secondaryValues}, + false /* isSecondaryIndexForward */) for _, idx := range tableDesc.PublicNonPrimaryIndexes() { idx.IndexDesc().Version = version } diff --git a/pkg/sql/sem/tree/datum.go b/pkg/sql/sem/tree/datum.go index ef1bb8c1eacc..4ab3a74dab5f 100644 --- a/pkg/sql/sem/tree/datum.go +++ b/pkg/sql/sem/tree/datum.go @@ -3622,6 +3622,45 @@ func NewDJSON(j json.JSON) *DJSON { return &DJSON{j} } +// IsComposite implements the CompositeDatum interface. +func (d *DJSON) IsComposite() bool { + switch d.JSON.Type() { + case json.NumberJSONType: + dec, ok := d.JSON.AsDecimal() + if !ok { + panic(errors.AssertionFailedf("could not convert into JSON Decimal")) + } + DDec := DDecimal{Decimal: *dec} + return DDec.IsComposite() + case json.ArrayJSONType: + jsonArray, ok := d.AsArray() + if !ok { + panic(errors.AssertionFailedf("could not extract the JSON Array")) + } + for _, elem := range jsonArray { + dJsonVal := DJSON{elem} + if dJsonVal.IsComposite() { + return true + } + } + case json.ObjectJSONType: + if it, err := d.ObjectIter(); it != nil && err == nil { + // Assumption: no collated strings are present as JSON keys. + // Thus, JSON keys are not being checked if they are + // composite or not. + for it.Next() { + valDJSON := NewDJSON(it.Value()) + if valDJSON.IsComposite() { + return true + } + } + } else if err != nil { + panic(errors.NewAssertionErrorWithWrappedErrf(err, "could not receive an ObjectKeyIterator")) + } + } + return false +} + // ParseDJSON takes a string of JSON and returns a DJSON value. func ParseDJSON(s string) (Datum, error) { j, err := json.ParseJSON(s) diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index 9a4123adbfd7..752b85b34a46 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -1578,6 +1578,7 @@ var CmpOps = cmpOpFixups(map[treecmp.ComparisonOperatorSymbol]*CmpOpOverloads{ makeLtFn(types.TimestampTZ, types.TimestampTZ, volatility.Leakproof), makeLtFn(types.Uuid, types.Uuid, volatility.Leakproof), makeLtFn(types.VarBit, types.VarBit, volatility.Leakproof), + makeLtFn(types.Jsonb, types.Jsonb, volatility.Immutable), // Mixed-type comparisons. makeLtFn(types.Date, types.Timestamp, volatility.Immutable), @@ -1634,6 +1635,7 @@ var CmpOps = cmpOpFixups(map[treecmp.ComparisonOperatorSymbol]*CmpOpOverloads{ makeLeFn(types.TimestampTZ, types.TimestampTZ, volatility.Leakproof), makeLeFn(types.Uuid, types.Uuid, volatility.Leakproof), makeLeFn(types.VarBit, types.VarBit, volatility.Leakproof), + makeLeFn(types.Jsonb, types.Jsonb, volatility.Immutable), // Mixed-type comparisons. makeLeFn(types.Date, types.Timestamp, volatility.Immutable), diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index 54b9003a66a8..d452dabdedf3 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -582,7 +582,10 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { var indexableTyps []*types.T for _, typ := range types.Scalar { // TODO(#76419): DateFamily has a broken `-infinity` case. - if colinfo.ColumnTypeIsIndexable(typ) && typ.Family() != types.DateFamily { + // TODO(#99432): JsonFamily has broken cases. This is because the test is wrapping JSON + // objects in multiple single quotes which causes parsing errors. + if colinfo.ColumnTypeIsIndexable(typ) && typ.Family() != types.DateFamily && + typ.Family() != types.JsonFamily { indexableTyps = append(indexableTyps, typ) } } From 9fc0ef669f77a3bef85a0af8a4bc54ce2def9b87 Mon Sep 17 00:00:00 2001 From: Mira Radeva Date: Fri, 7 Apr 2023 14:10:58 -0400 Subject: [PATCH 3/3] kvserver: add metrics to track snapshot queue size Previously, we had metrics to track the number of snapshots waiting in the snapshot queue; however, snapshots may be of different sizes, so it is also helpful to track the size of all snapshots in the queue. This change adds the following metrics to track the total size of all snapshots waiting in the queue: range.snapshots.send-queue-bytes range.snapshots.recv-queue-bytes Informs: #85528 Release note (ops change): Added two new metrics, range.snapshots.(send|recv)-queue-bytes, to track the total size of all snapshots waiting in the snapshot queue. --- pkg/kv/kvserver/metrics.go | 16 ++++++++++ pkg/kv/kvserver/store_snapshot.go | 53 +++++++++++++++++++++++-------- pkg/kv/kvserver/store_test.go | 14 ++++++-- pkg/ts/catalog/chart_catalog.go | 7 ++++ 4 files changed, 74 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 496f9ccce673..d9ea8aae8584 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -899,6 +899,18 @@ var ( Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapshotSendQueueSize = metric.Metadata{ + Name: "range.snapshots.send-queue-bytes", + Help: "Total size of all snapshots in the snapshot send queue", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRangeSnapshotRecvQueueSize = metric.Metadata{ + Name: "range.snapshots.recv-queue-bytes", + Help: "Total size of all snapshots in the snapshot receive queue", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", @@ -2046,6 +2058,8 @@ type StoreMetrics struct { RangeSnapshotRecvInProgress *metric.Gauge RangeSnapshotSendTotalInProgress *metric.Gauge RangeSnapshotRecvTotalInProgress *metric.Gauge + RangeSnapshotSendQueueSize *metric.Gauge + RangeSnapshotRecvQueueSize *metric.Gauge // Delegate snapshot metrics. These don't count self-delegated snapshots. DelegateSnapshotSendBytes *metric.Counter @@ -2645,6 +2659,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRecvInProgress: metric.NewGauge(metaRangeSnapshotRecvInProgress), RangeSnapshotSendTotalInProgress: metric.NewGauge(metaRangeSnapshotSendTotalInProgress), RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress), + RangeSnapshotSendQueueSize: metric.NewGauge(metaRangeSnapshotSendQueueSize), + RangeSnapshotRecvQueueSize: metric.NewGauge(metaRangeSnapshotRecvQueueSize), RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries), DelegateSnapshotSendBytes: metric.NewCounter(metaDelegateSnapshotSendBytes), diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 72f68241c430..335d9df08e9b 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -77,6 +77,15 @@ var snapshotPrioritizationEnabled = settings.RegisterBoolSetting( true, ) +// snapshotMetrics contains metrics on the number and size of snapshots in +// progress or in the snapshot queue. +type snapshotMetrics struct { + QueueLen *metric.Gauge + QueueSize *metric.Gauge + InProgress *metric.Gauge + TotalInProgress *metric.Gauge +} + // incomingSnapshotStream is the minimal interface on a GRPC stream required // to receive a snapshot over the network. type incomingSnapshotStream interface { @@ -678,13 +687,19 @@ func (s *Store) reserveReceiveSnapshot( ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot") defer sp.Finish() - return s.throttleSnapshot(ctx, s.snapshotApplyQueue, - int(header.SenderQueueName), header.SenderQueuePriority, + return s.throttleSnapshot(ctx, + s.snapshotApplyQueue, + int(header.SenderQueueName), + header.SenderQueuePriority, -1, header.RangeSize, header.RaftMessageRequest.RangeID, - s.metrics.RangeSnapshotRecvQueueLength, - s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, + snapshotMetrics{ + s.metrics.RangeSnapshotRecvQueueLength, + s.metrics.RangeSnapshotRecvQueueSize, + s.metrics.RangeSnapshotRecvInProgress, + s.metrics.RangeSnapshotRecvTotalInProgress, + }, ) } @@ -698,14 +713,19 @@ func (s *Store) reserveSendSnapshot( fn() } - return s.throttleSnapshot(ctx, s.snapshotSendQueue, + return s.throttleSnapshot(ctx, + s.snapshotSendQueue, int(req.SenderQueueName), req.SenderQueuePriority, req.QueueOnDelegateLen, rangeSize, req.RangeID, - s.metrics.RangeSnapshotSendQueueLength, - s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, + snapshotMetrics{ + s.metrics.RangeSnapshotSendQueueLength, + s.metrics.RangeSnapshotSendQueueSize, + s.metrics.RangeSnapshotSendInProgress, + s.metrics.RangeSnapshotSendTotalInProgress, + }, ) } @@ -720,7 +740,7 @@ func (s *Store) throttleSnapshot( maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, + snapshotMetrics snapshotMetrics, ) (cleanup func(), funcErr error) { tBegin := timeutil.Now() @@ -742,8 +762,13 @@ func (s *Store) throttleSnapshot( } }() - waitingSnapshotMetric.Inc(1) - defer waitingSnapshotMetric.Dec(1) + // Total bytes of snapshots waiting in the snapshot queue + snapshotMetrics.QueueSize.Inc(rangeSize) + defer snapshotMetrics.QueueSize.Dec(rangeSize) + // Total number of snapshots waiting in the snapshot queue + snapshotMetrics.QueueLen.Inc(1) + defer snapshotMetrics.QueueLen.Dec(1) + queueCtx := ctx if deadline, ok := queueCtx.Deadline(); ok { // Enforce a more strict timeout for acquiring the snapshot reservation to @@ -778,10 +803,10 @@ func (s *Store) throttleSnapshot( } // Counts non-empty in-progress snapshots. - inProgressSnapshotMetric.Inc(1) + snapshotMetrics.InProgress.Inc(1) } // Counts all in-progress snapshots. - totalInProgressSnapshotMetric.Inc(1) + snapshotMetrics.TotalInProgress.Inc(1) // The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the // Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot. @@ -804,10 +829,10 @@ func (s *Store) throttleSnapshot( return func() { s.metrics.ReservedReplicaCount.Dec(1) s.metrics.Reserved.Dec(rangeSize) - totalInProgressSnapshotMetric.Dec(1) + snapshotMetrics.TotalInProgress.Dec(1) if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - inProgressSnapshotMetric.Dec(1) + snapshotMetrics.InProgress.Dec(1) snapshotQueue.Release(permit) } }, nil diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 086efddccf2f..fe80c9015a35 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3157,7 +3157,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { s := tc.store cleanupNonEmpty1, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ - RangeSize: 1, + RangeSize: 10, }) if err != nil { t.Fatal(err) @@ -3167,6 +3167,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { } require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), @@ -3184,6 +3186,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { } require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(2), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), @@ -3205,6 +3209,10 @@ func TestReserveSnapshotThrottling(t *testing.T) { t.Errorf("unexpected snapshot queue length; expected: %d, got: %d", 1, s.Metrics().RangeSnapshotRecvQueueLength.Value()) } + if s.Metrics().RangeSnapshotRecvQueueSize.Value() != int64(10) { + t.Errorf("unexplected snapshot queue size; expected: %d, got: %d", 1, + s.Metrics().RangeSnapshotRecvQueueSize.Value()) + } if s.Metrics().RangeSnapshotRecvInProgress.Value() != int64(1) { t.Errorf("unexpected snapshots in progress; expected: %d, got: %d", 1, s.Metrics().RangeSnapshotRecvInProgress.Value()) @@ -3216,7 +3224,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { }() cleanupNonEmpty3, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ - RangeSize: 1, + RangeSize: 10, }) if err != nil { t.Fatal(err) @@ -3224,6 +3232,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { atomic.StoreInt32(&boom, 1) require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 342df09ab129..f3cb6436dd92 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -666,6 +666,13 @@ var charts = []sectionDescription{ "range.snapshots.delegate.sent-bytes", }, }, + { + Title: "Snapshot Queue Bytes", + Metrics: []string{ + "range.snapshots.send-queue-bytes", + "range.snapshots.recv-queue-bytes", + }, + }, }, }, {