From def09e03935f91051b1ad7a61e2cac73d78b8053 Mon Sep 17 00:00:00 2001
From: Jackson Owens
Date: Thu, 16 Mar 2023 11:27:16 -0400
Subject: [PATCH 1/4] storage: unify storage/fs.FS and pebble/vfs.FS
The storage/fs.FS had largely the same interface as vfs.FS. The storage/fs.FS
interface was intended as a temporary stepping stone to using pebble's vfs.FS
interface throughout Cockroach for all filesystem access. This commit unifies
the two.
Epic: None
Release note: None
---
pkg/ccl/cliccl/ear_test.go | 2 +
pkg/kv/kvserver/BUILD.bazel | 1 +
pkg/kv/kvserver/client_replica_gc_test.go | 3 +-
pkg/kv/kvserver/kvserverbase/BUILD.bazel | 2 +-
pkg/kv/kvserver/kvserverbase/syncing_write.go | 4 +-
pkg/kv/kvserver/logstore/sideload_disk.go | 3 +-
pkg/kv/kvserver/replica_consistency.go | 3 +-
pkg/kv/kvserver/replica_corruption.go | 3 +-
pkg/kv/kvserver/replica_proposal.go | 3 +-
.../kvserver/replica_sst_snapshot_storage.go | 8 +-
pkg/kv/kvserver/store.go | 3 +-
pkg/sql/colcontainer/BUILD.bazel | 3 +-
pkg/sql/colcontainer/diskqueue.go | 14 ++--
pkg/sql/colcontainer/partitionedqueue_test.go | 19 ++---
pkg/sql/colflow/vectorized_flow.go | 3 +-
pkg/sql/colflow/vectorized_flow_test.go | 5 +-
pkg/sql/execinfra/BUILD.bazel | 2 +-
pkg/sql/execinfra/server_config.go | 4 +-
pkg/storage/bench_test.go | 2 +-
pkg/storage/engine.go | 4 +-
pkg/storage/engine_test.go | 25 +++----
pkg/storage/fs/fs.go | 69 ++++--------------
pkg/storage/pebble.go | 73 ++-----------------
pkg/storage/temp_engine.go | 8 +-
.../colcontainerutils/diskqueuecfg.go | 3 +-
25 files changed, 85 insertions(+), 184 deletions(-)
diff --git a/pkg/ccl/cliccl/ear_test.go b/pkg/ccl/cliccl/ear_test.go
index b29a06cc1ac5..9b2970867f4d 100644
--- a/pkg/ccl/cliccl/ear_test.go
+++ b/pkg/ccl/cliccl/ear_test.go
@@ -14,6 +14,7 @@ import (
"crypto/rand"
"fmt"
"path/filepath"
+ "sort"
"strings"
"testing"
@@ -56,6 +57,7 @@ func TestDecrypt(t *testing.T) {
// Find a manifest file to check.
files, err := p.List(dir)
require.NoError(t, err)
+ sort.Strings(files)
var manifestPath string
for _, basename := range files {
if strings.HasPrefix(basename, "MANIFEST-") {
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index c244d53fe9e2..0524d6dbe822 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -212,6 +212,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
diff --git a/pkg/kv/kvserver/client_replica_gc_test.go b/pkg/kv/kvserver/client_replica_gc_test.go
index 53c9e04e9d21..2b1dde8d0621 100644
--- a/pkg/kv/kvserver/client_replica_gc_test.go
+++ b/pkg/kv/kvserver/client_replica_gc_test.go
@@ -12,6 +12,7 @@ package kvserver_test
import (
"context"
+ "os"
"path/filepath"
"testing"
"time"
@@ -112,7 +113,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
if dir == "" {
t.Fatal("no sideloaded directory")
}
- if err := eng.MkdirAll(dir); err != nil {
+ if err := eng.MkdirAll(dir, os.ModePerm); err != nil {
t.Fatal(err)
}
if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil {
diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel
index f4a7f74ddcc7..1bb735f64459 100644
--- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel
+++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel
@@ -21,7 +21,6 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
- "//pkg/storage/fs",
"//pkg/util/errorutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
@@ -30,6 +29,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_x_time//rate",
],
diff --git a/pkg/kv/kvserver/kvserverbase/syncing_write.go b/pkg/kv/kvserver/kvserverbase/syncing_write.go
index d44e865394b9..6231252b9627 100644
--- a/pkg/kv/kvserver/kvserverbase/syncing_write.go
+++ b/pkg/kv/kvserver/kvserverbase/syncing_write.go
@@ -19,10 +19,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
- "github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)
@@ -82,7 +82,7 @@ func WriteFileSyncing(
ctx context.Context,
filename string,
data []byte,
- fs fs.FS,
+ fs vfs.FS,
perm os.FileMode,
settings *cluster.Settings,
limiter *rate.Limiter,
diff --git a/pkg/kv/kvserver/logstore/sideload_disk.go b/pkg/kv/kvserver/logstore/sideload_disk.go
index 81496e0de454..e2bfc6765581 100644
--- a/pkg/kv/kvserver/logstore/sideload_disk.go
+++ b/pkg/kv/kvserver/logstore/sideload_disk.go
@@ -13,6 +13,7 @@ package logstore
import (
"context"
"fmt"
+ "os"
"path/filepath"
"strconv"
"strings"
@@ -75,7 +76,7 @@ func NewDiskSideloadStorage(
}
func (ss *DiskSideloadStorage) createDir() error {
- err := ss.eng.MkdirAll(ss.dir)
+ err := ss.eng.MkdirAll(ss.dir, os.ModePerm)
ss.dirCreated = ss.dirCreated || err == nil
return err
}
diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go
index d4f4c9ad103b..99786884b2d4 100644
--- a/pkg/kv/kvserver/replica_consistency.go
+++ b/pkg/kv/kvserver/replica_consistency.go
@@ -15,6 +15,7 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
+ "os"
"sync"
"time"
@@ -741,7 +742,7 @@ func (r *Replica) computeChecksumPostApply(
// certain of completing the check. Since we're already in a goroutine
// that's about to end, just sleep for a few seconds and then terminate.
auxDir := r.store.TODOEngine().GetAuxiliaryDir()
- _ = r.store.TODOEngine().MkdirAll(auxDir)
+ _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)
const attentionFmt = `ATTENTION:
diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go
index c1d500bae05a..68fc59145fd4 100644
--- a/pkg/kv/kvserver/replica_corruption.go
+++ b/pkg/kv/kvserver/replica_corruption.go
@@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
+ "os"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -50,7 +51,7 @@ func (r *Replica) setCorruptRaftMuLocked(
r.mu.destroyStatus.Set(cErr, destroyReasonRemoved)
auxDir := r.store.TODOEngine().GetAuxiliaryDir()
- _ = r.store.TODOEngine().MkdirAll(auxDir)
+ _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)
preventStartupMsg := fmt.Sprintf(`ATTENTION:
diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go
index 5f8548109c45..d1496ff42d05 100644
--- a/pkg/kv/kvserver/replica_proposal.go
+++ b/pkg/kv/kvserver/replica_proposal.go
@@ -12,6 +12,7 @@ package kvserver
import (
"context"
+ "os"
"path/filepath"
"time"
"unsafe"
@@ -561,7 +562,7 @@ func addSSTablePreApply(
// TODO(tschottdorf): remove this once sideloaded storage guarantees its
// existence.
- if err := eng.MkdirAll(filepath.Dir(ingestPath)); err != nil {
+ if err := eng.MkdirAll(filepath.Dir(ingestPath), os.ModePerm); err != nil {
panic(err)
}
if _, err := eng.Stat(ingestPath); err == nil {
diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go
index b53a0cbf6eb3..d7ce8c6cc12f 100644
--- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go
+++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go
@@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
+ "os"
"path/filepath"
"strconv"
@@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/objstorage"
+ "github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)
@@ -113,7 +115,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string {
}
func (s *SSTSnapshotStorageScratch) createDir() error {
- err := s.storage.engine.MkdirAll(s.snapDir)
+ err := s.storage.engine.MkdirAll(s.snapDir, os.ModePerm)
s.dirCreated = s.dirCreated || err == nil
return err
}
@@ -182,7 +184,7 @@ func (s *SSTSnapshotStorageScratch) Close() error {
type SSTSnapshotStorageFile struct {
scratch *SSTSnapshotStorageScratch
created bool
- file fs.File
+ file vfs.File
filename string
ctx context.Context
bytesPerSync int64
@@ -207,7 +209,7 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
}
var err error
if f.bytesPerSync > 0 {
- f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
+ f.file, err = fs.CreateWithSync(f.scratch.storage.engine, f.filename, int(f.bytesPerSync))
} else {
f.file, err = f.scratch.storage.engine.Create(f.filename)
}
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index 35622c3285e3..21e89bb7551b 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
+ "os"
"path/filepath"
"runtime"
"sort"
@@ -3083,7 +3084,7 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span {
// the provided key spans. If spans is empty, it includes the entire store.
func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) {
checkpointBase := s.checkpointsDir()
- _ = s.TODOEngine().MkdirAll(checkpointBase)
+ _ = s.TODOEngine().MkdirAll(checkpointBase, os.ModePerm)
// Create the checkpoint in a "pending" directory first. If we fail midway, it
// should be clear that the directory contains an incomplete checkpoint.
pendingDir := filepath.Join(checkpointBase, tag+"_pending")
diff --git a/pkg/sql/colcontainer/BUILD.bazel b/pkg/sql/colcontainer/BUILD.bazel
index 193404fb093d..767aa256c692 100644
--- a/pkg/sql/colcontainer/BUILD.bazel
+++ b/pkg/sql/colcontainer/BUILD.bazel
@@ -20,6 +20,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_golang_snappy//:snappy",
"@com_github_marusama_semaphore//:semaphore",
],
@@ -46,7 +47,6 @@ go_test(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
- "//pkg/storage/fs",
"//pkg/testutils/colcontainerutils",
"//pkg/testutils/skip",
"//pkg/util/humanizeutil",
@@ -54,6 +54,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_stretchr_testify//require",
],
diff --git a/pkg/sql/colcontainer/diskqueue.go b/pkg/sql/colcontainer/diskqueue.go
index 5e0a000f062b..6170b45754c8 100644
--- a/pkg/sql/colcontainer/diskqueue.go
+++ b/pkg/sql/colcontainer/diskqueue.go
@@ -14,6 +14,7 @@ import (
"bytes"
"context"
"io"
+ "os"
"path/filepath"
"strconv"
@@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/golang/snappy"
)
@@ -181,7 +183,7 @@ type diskQueue struct {
// written before a compress and flush.
writeBufferLimit int
writeFileIdx int
- writeFile fs.File
+ writeFile vfs.File
deserializerState struct {
*colserde.FileDeserializer
curBatch int
@@ -189,7 +191,7 @@ type diskQueue struct {
// readFileIdx is an index into the current file in files the deserializer is
// reading from.
readFileIdx int
- readFile fs.File
+ readFile vfs.File
scratchDecompressedReadBytes []byte
diskAcc *mon.BoundAccount
@@ -298,7 +300,7 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather {
// DiskQueueCfg is a struct holding the configuration options for a DiskQueue.
type DiskQueueCfg struct {
// FS is the filesystem interface to use.
- FS fs.FS
+ FS vfs.FS
// GetPather returns where the temporary directory that will contain this
// DiskQueue's files has been created. The directory name will be a UUID.
// Note that the directory is created lazily on the first call to GetPath.
@@ -411,7 +413,7 @@ func newDiskQueue(
if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls {
d.writeBufferLimit = d.cfg.BufferSizeBytes / 2
}
- if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil {
+ if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName), os.ModePerm); err != nil {
return nil, err
}
// rotateFile will create a new file to write to.
@@ -492,7 +494,7 @@ func (d *diskQueue) Close(ctx context.Context) error {
// to write to.
func (d *diskQueue) rotateFile(ctx context.Context) error {
fName := filepath.Join(d.cfg.GetPather.GetPath(ctx), d.dirName, strconv.Itoa(d.seqNo))
- f, err := d.cfg.FS.CreateWithSync(fName, bytesPerSync)
+ f, err := fs.CreateWithSync(d.cfg.FS, fName, bytesPerSync)
if err != nil {
return err
}
@@ -527,7 +529,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) error {
return nil
}
-func (d *diskQueue) resetWriters(f fs.File) error {
+func (d *diskQueue) resetWriters(f vfs.File) error {
d.writer.reset(f)
return d.serializer.Reset(d.writer)
}
diff --git a/pkg/sql/colcontainer/partitionedqueue_test.go b/pkg/sql/colcontainer/partitionedqueue_test.go
index 8538a931ab80..3efbe2d9d6de 100644
--- a/pkg/sql/colcontainer/partitionedqueue_test.go
+++ b/pkg/sql/colcontainer/partitionedqueue_test.go
@@ -19,16 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/types"
- "github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)
type fdCountingFSFile struct {
- fs.File
+ vfs.File
onCloseCb func()
}
@@ -41,7 +41,7 @@ func (f *fdCountingFSFile) Close() error {
}
type fdCountingFS struct {
- fs.FS
+ vfs.FS
writeFDs int
readFDs int
}
@@ -58,7 +58,7 @@ func (f *fdCountingFS) assertOpenFDs(
require.Equal(t, expectedReadFDs, f.readFDs)
}
-func (f *fdCountingFS) Create(name string) (fs.File, error) {
+func (f *fdCountingFS) Create(name string) (vfs.File, error) {
file, err := f.FS.Create(name)
if err != nil {
return nil, err
@@ -67,16 +67,7 @@ func (f *fdCountingFS) Create(name string) (fs.File, error) {
return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
}
-func (f *fdCountingFS) CreateWithSync(name string, bytesPerSync int) (fs.File, error) {
- file, err := f.FS.CreateWithSync(name, bytesPerSync)
- if err != nil {
- return nil, err
- }
- f.writeFDs++
- return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
-}
-
-func (f *fdCountingFS) Open(name string) (fs.File, error) {
+func (f *fdCountingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go
index 7100bb9b1b43..bf0988e126b9 100644
--- a/pkg/sql/colflow/vectorized_flow.go
+++ b/pkg/sql/colflow/vectorized_flow.go
@@ -12,6 +12,7 @@ package colflow
import (
"context"
+ "os"
"path/filepath"
"sync"
"sync/atomic"
@@ -326,7 +327,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
tempDirName := f.GetID().String()
f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName)
log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2))
- if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil {
+ if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path, os.ModePerm); err != nil {
colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory"))
}
// We have just created the temporary directory which will be used for all
diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go
index 47dfc4d4af73..534da28773a7 100644
--- a/pkg/sql/colflow/vectorized_flow_test.go
+++ b/pkg/sql/colflow/vectorized_flow_test.go
@@ -12,6 +12,7 @@ package colflow
import (
"context"
+ "os"
"path/filepath"
"sync"
"testing"
@@ -364,12 +365,12 @@ func TestVectorizedFlowTempDirectory(t *testing.T) {
errCh := make(chan error)
go func() {
createTempDir(ctx)
- errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"))
+ errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"), os.ModePerm)
}()
createTempDir(ctx)
// Both goroutines should be able to create their subdirectories within the
// flow's temporary directory.
- require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine")))
+ require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"), os.ModePerm))
require.NoError(t, <-errCh)
vf.Cleanup(ctx)
checkDirs(t, 0)
diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel
index 37292614e0fa..6e318108c23e 100644
--- a/pkg/sql/execinfra/BUILD.bazel
+++ b/pkg/sql/execinfra/BUILD.bazel
@@ -60,7 +60,6 @@ go_library(
"//pkg/sql/sqlliveness",
"//pkg/sql/stats",
"//pkg/sql/types",
- "//pkg/storage/fs",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/intsets",
@@ -77,6 +76,7 @@ go_library(
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
"@io_opentelemetry_go_otel//attribute",
diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go
index 4f907f7da134..e33fd9f3533a 100644
--- a/pkg/sql/execinfra/server_config.go
+++ b/pkg/sql/execinfra/server_config.go
@@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
- "github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -45,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/marusama/semaphore"
)
@@ -95,7 +95,7 @@ type ServerConfig struct {
// TempFS is used by the vectorized execution engine to store columns when the
// working set is larger than can be stored in memory.
- TempFS fs.FS
+ TempFS vfs.FS
// VecFDSemaphore is a weighted semaphore that restricts the number of open
// file descriptors in the vectorized engine.
diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go
index d747464f79b6..496563bc3020 100644
--- a/pkg/storage/bench_test.go
+++ b/pkg/storage/bench_test.go
@@ -1927,7 +1927,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) {
return cmp < 0
})
sstFileName := fmt.Sprintf("tmp-ingest-%d", i)
- sstFile, err := eng.fs.Create(sstFileName)
+ sstFile, err := eng.Create(sstFileName)
require.NoError(b, err)
// No improvement with v3 since the multiple versions are in different
// files.
diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go
index d4d621a2d0df..08e5f4af6c90 100644
--- a/pkg/storage/engine.go
+++ b/pkg/storage/engine.go
@@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
- "github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/storage/pebbleiter"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -31,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
+ "github.com/cockroachdb/pebble/vfs"
prometheusgo "github.com/prometheus/client_model/go"
)
@@ -941,7 +941,7 @@ type Engine interface {
// be invoked while holding mutexes).
RegisterFlushCompletedCallback(cb func())
// Filesystem functionality.
- fs.FS
+ vfs.FS
// CreateCheckpoint creates a checkpoint of the engine in the given directory,
// which must not exist. The directory should be on the same file system so
// that hard links can be used. If spans is not empty, the checkpoint excludes
diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go
index c53cb0470700..9e2de5259227 100644
--- a/pkg/storage/engine_test.go
+++ b/pkg/storage/engine_test.go
@@ -18,6 +18,7 @@ import (
"io"
"math"
"math/rand"
+ "os"
"path/filepath"
"reflect"
"sort"
@@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
@@ -1056,12 +1058,6 @@ func TestIngestDelayLimit(t *testing.T) {
}
}
-type stringSorter []string
-
-func (s stringSorter) Len() int { return len(s) }
-func (s stringSorter) Swap(i int, j int) { s[i], s[j] = s[j], s[i] }
-func (s stringSorter) Less(i int, j int) bool { return strings.Compare(s[i], s[j]) < 0 }
-
func TestEngineFS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
@@ -1114,7 +1110,7 @@ func TestEngineFS(t *testing.T) {
"9h: delete /dir1",
}
- var f fs.File
+ var f vfs.File
for _, tc := range testCases {
s := strings.Split(tc, " ")[1:]
@@ -1132,14 +1128,14 @@ func TestEngineFS(t *testing.T) {
}
var (
- g fs.File
+ g vfs.File
err error
)
switch s[0] {
case "create":
g, err = e.Create(s[1])
case "create-with-sync":
- g, err = e.CreateWithSync(s[1], 1)
+ g, err = fs.CreateWithSync(e, s[1], 1)
case "link":
err = e.Link(s[1], s[2])
case "open":
@@ -1151,13 +1147,13 @@ func TestEngineFS(t *testing.T) {
case "rename":
err = e.Rename(s[1], s[2])
case "create-dir":
- err = e.MkdirAll(s[1])
+ err = e.MkdirAll(s[1], os.ModePerm)
case "list-dir":
result, err := e.List(s[1])
if err != nil {
break
}
- sort.Sort(stringSorter(result))
+ sort.Strings(result)
got := strings.Join(result, ",")
want := s[3]
if got != want {
@@ -1304,6 +1300,7 @@ func TestFS(t *testing.T) {
t.Helper()
got, err := fs.List(dir)
+ sort.Strings(got)
require.NoError(t, err)
if !reflect.DeepEqual(got, want) {
t.Fatalf("fs.List(%q) = %#v, want %#v", dir, got, want)
@@ -1311,7 +1308,7 @@ func TestFS(t *testing.T) {
}
// Create a/ and assert that it's empty.
- require.NoError(t, fs.MkdirAll(path("a")))
+ require.NoError(t, fs.MkdirAll(path("a"), os.ModePerm))
expectLS(path("a"), []string{})
if _, err := fs.Stat(path("a/b/c")); !oserror.IsNotExist(err) {
t.Fatal(`fs.Stat("a/b/c") should not exist`)
@@ -1319,8 +1316,8 @@ func TestFS(t *testing.T) {
// Create a/b/ and a/b/c/ in a single MkdirAll call.
// Then ensure that a duplicate call returns a nil error.
- require.NoError(t, fs.MkdirAll(path("a/b/c")))
- require.NoError(t, fs.MkdirAll(path("a/b/c")))
+ require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm))
+ require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm))
expectLS(path("a"), []string{"b"})
expectLS(path("a/b"), []string{"c"})
expectLS(path("a/b/c"), []string{})
diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go
index 3d626c2215e4..a64ad6ed54e7 100644
--- a/pkg/storage/fs/fs.go
+++ b/pkg/storage/fs/fs.go
@@ -12,66 +12,23 @@ package fs
import (
"io"
- "os"
-)
-
-// File and FS are a partial attempt at offering the Pebble vfs.FS interface. Given the constraints
-// of the RocksDB Env interface we've chosen to only include what is easy to implement. Additionally,
-// it does not try to subsume all the file related functionality already in the Engine interface.
-// It seems preferable to do a final cleanup only when the implementation can simply use Pebble's
-// implementation of vfs.FS. At that point the following interface will become a superset of vfs.FS.
-type File interface {
- io.ReadWriteCloser
- io.ReaderAt
- Sync() error
-}
-
-// FS provides a filesystem interface.
-type FS interface {
- // Create creates the named file for writing, removing the file at
- // the provided path if one already exists.
- Create(name string) (File, error)
-
- // CreateWithSync is similar to Create, but the file is periodically
- // synced whenever more than bytesPerSync bytes accumulate. This syncing
- // does not provide any persistency guarantees, but can prevent latency
- // spikes.
- CreateWithSync(name string, bytesPerSync int) (File, error)
-
- // Link creates newname as a hard link to the oldname file.
- Link(oldname, newname string) error
-
- // Open opens the named file for reading.
- Open(name string) (File, error)
-
- // OpenDir opens the named directory for syncing.
- OpenDir(name string) (File, error)
- // Remove removes the named file. If the file with given name doesn't
- // exist, return an error that returns true from oserror.IsNotExist().
- Remove(name string) error
-
- // Rename renames a file. It overwrites the file at newname if one exists,
- // the same as os.Rename.
- Rename(oldname, newname string) error
-
- // MkdirAll creates the named dir and its parents. Does nothing if the
- // directory already exists.
- MkdirAll(name string) error
-
- // RemoveAll deletes the path and any children it contains.
- RemoveAll(dir string) error
-
- // List returns a listing of the given directory. The names returned are
- // relative to the directory.
- List(name string) ([]string, error)
+ "github.com/cockroachdb/pebble/vfs"
+)
- // Stat returns a FileInfo describing the named file.
- Stat(name string) (os.FileInfo, error)
+// CreateWithSync creates a file wrapped with logic to periodically sync
+// whenever more than bytesPerSync bytes accumulate. This syncing does not
+// provide any persistency guarantees, but can prevent latency spikes.
+func CreateWithSync(fs vfs.FS, name string, bytesPerSync int) (vfs.File, error) {
+ f, err := fs.Create(name)
+ if err != nil {
+ return nil, err
+ }
+ return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil
}
// WriteFile writes data to a file named by filename.
-func WriteFile(fs FS, filename string, data []byte) error {
+func WriteFile(fs vfs.FS, filename string, data []byte) error {
f, err := fs.Create(filename)
if err != nil {
return err
@@ -84,7 +41,7 @@ func WriteFile(fs FS, filename string, data []byte) error {
}
// ReadFile reads data from a file named by filename.
-func ReadFile(fs FS, filename string) ([]byte, error) {
+func ReadFile(fs vfs.FS, filename string) ([]byte, error) {
file, err := fs.Open(filename)
if err != nil {
return nil, err
diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go
index d24762cbb81a..7894cae53d6a 100644
--- a/pkg/storage/pebble.go
+++ b/pkg/storage/pebble.go
@@ -19,7 +19,6 @@ import (
"math"
"os"
"path/filepath"
- "sort"
"strconv"
"strings"
"sync"
@@ -731,6 +730,8 @@ type EncryptionStatsHandler interface {
// Pebble is a wrapper around a Pebble database instance.
type Pebble struct {
+ vfs.FS
+
atomic struct {
// compactionConcurrency is the current compaction concurrency set on
// the Pebble store. The compactionConcurrency option in the Pebble
@@ -768,7 +769,6 @@ type Pebble struct {
sharedBytesWritten int64
// Relevant options copied over from pebble.Options.
- fs vfs.FS
unencryptedFS vfs.FS
logCtx context.Context
logger pebble.LoggerAndTracer
@@ -1012,6 +1012,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
storeProps := computeStoreProperties(ctx, cfg.Dir, opts.ReadOnly, encryptionEnv != nil /* encryptionEnabled */)
p = &Pebble{
+ FS: opts.FS,
readOnly: opts.ReadOnly,
path: cfg.Dir,
auxDir: auxDir,
@@ -1023,7 +1024,6 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
settings: cfg.Settings,
encryption: encryptionEnv,
fileRegistry: fileRegistry,
- fs: opts.FS,
unencryptedFS: unencryptedFS,
logger: opts.LoggerAndTracer,
logCtx: logCtx,
@@ -1812,7 +1812,7 @@ func (p *Pebble) GetEnvStats() (*EnvStats, error) {
}
stats.ActiveKeyFiles++
- filename := p.fs.PathBase(filePath)
+ filename := p.FS.PathBase(filePath)
numStr := strings.TrimSuffix(filename, ".sst")
if len(numStr) == len(filename) {
continue // not a sstable
@@ -1909,68 +1909,7 @@ func (p *Pebble) RegisterFlushCompletedCallback(cb func()) {
p.mu.Unlock()
}
-// Remove implements the FS interface.
-func (p *Pebble) Remove(filename string) error {
- return p.fs.Remove(filename)
-}
-
-// RemoveAll implements the Engine interface.
-func (p *Pebble) RemoveAll(dir string) error {
- return p.fs.RemoveAll(dir)
-}
-
-// Link implements the FS interface.
-func (p *Pebble) Link(oldname, newname string) error {
- return p.fs.Link(oldname, newname)
-}
-
-var _ fs.FS = &Pebble{}
-
-// Create implements the FS interface.
-func (p *Pebble) Create(name string) (fs.File, error) {
- return p.fs.Create(name)
-}
-
-// CreateWithSync implements the FS interface.
-func (p *Pebble) CreateWithSync(name string, bytesPerSync int) (fs.File, error) {
- f, err := p.fs.Create(name)
- if err != nil {
- return nil, err
- }
- return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil
-}
-
-// Open implements the FS interface.
-func (p *Pebble) Open(name string) (fs.File, error) {
- return p.fs.Open(name)
-}
-
-// OpenDir implements the FS interface.
-func (p *Pebble) OpenDir(name string) (fs.File, error) {
- return p.fs.OpenDir(name)
-}
-
-// Rename implements the FS interface.
-func (p *Pebble) Rename(oldname, newname string) error {
- return p.fs.Rename(oldname, newname)
-}
-
-// MkdirAll implements the FS interface.
-func (p *Pebble) MkdirAll(name string) error {
- return p.fs.MkdirAll(name, 0755)
-}
-
-// List implements the FS interface.
-func (p *Pebble) List(name string) ([]string, error) {
- dirents, err := p.fs.List(name)
- sort.Strings(dirents)
- return dirents, err
-}
-
-// Stat implements the FS interface.
-func (p *Pebble) Stat(name string) (os.FileInfo, error) {
- return p.fs.Stat(name)
-}
+var _ vfs.FS = &Pebble{}
func checkpointSpansNote(spans []roachpb.Span) []byte {
note := "CRDB spans:\n"
@@ -2004,7 +1943,7 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error {
// TODO(#90543, cockroachdb/pebble#2285): move spans info to Pebble manifest.
if len(spans) > 0 {
if err := fs.SafeWriteToFile(
- p.fs, dir, p.fs.PathJoin(dir, "checkpoint.txt"),
+ p.FS, dir, p.FS.PathJoin(dir, "checkpoint.txt"),
checkpointSpansNote(spans),
); err != nil {
return err
diff --git a/pkg/storage/temp_engine.go b/pkg/storage/temp_engine.go
index c9545c82dc8a..acc375d37892 100644
--- a/pkg/storage/temp_engine.go
+++ b/pkg/storage/temp_engine.go
@@ -16,16 +16,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
- "github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/pebble"
+ "github.com/cockroachdb/pebble/vfs"
)
// NewTempEngine creates a new engine for DistSQL processors to use when
// the working set is larger than can be stored in memory.
func NewTempEngine(
ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec,
-) (diskmap.Factory, fs.FS, error) {
+) (diskmap.Factory, vfs.FS, error) {
return NewPebbleTempEngine(ctx, tempStorage, storeSpec)
}
@@ -58,13 +58,13 @@ func (r *pebbleTempEngine) NewSortedDiskMultiMap() diskmap.SortedDiskMap {
// when the working set is larger than can be stored in memory.
func NewPebbleTempEngine(
ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec,
-) (diskmap.Factory, fs.FS, error) {
+) (diskmap.Factory, vfs.FS, error) {
return newPebbleTempEngine(ctx, tempStorage, storeSpec)
}
func newPebbleTempEngine(
ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec,
-) (*pebbleTempEngine, fs.FS, error) {
+) (*pebbleTempEngine, vfs.FS, error) {
var loc Location
var cacheSize int64 = 128 << 20 // 128 MiB, arbitrary, but not "too big"
if tempStorage.InMemory {
diff --git a/pkg/testutils/colcontainerutils/diskqueuecfg.go b/pkg/testutils/colcontainerutils/diskqueuecfg.go
index 488032af484a..deaabb7afc72 100644
--- a/pkg/testutils/colcontainerutils/diskqueuecfg.go
+++ b/pkg/testutils/colcontainerutils/diskqueuecfg.go
@@ -12,6 +12,7 @@ package colcontainerutils
import (
"context"
+ "os"
"testing"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -54,7 +55,7 @@ func NewTestingDiskQueueCfg(t testing.TB, inMem bool) (colcontainer.DiskQueueCfg
}
if inMem {
- if err := ngn.MkdirAll(inMemDirName); err != nil {
+ if err := ngn.MkdirAll(inMemDirName, os.ModePerm); err != nil {
t.Fatal(err)
}
}
From f1ce8cf6d16b11abf5a5ae8c317b1d43fbdc5f34 Mon Sep 17 00:00:00 2001
From: Jordan Lewis
Date: Mon, 27 Feb 2023 00:13:32 -0500
Subject: [PATCH 2/4] tsearch: add ts_rank functionality
This commit adds ts_rank, the family of builtins that allow ranking of
text search results. The function takes a tsquery and a tsvector and
returns a float that indicates how good the match is. The function can
be modified by passing in a custom array of weights that matches the
text search weights A, B, C, and D, and a bitmask that controls the
ranking behavior in various detailed ways.
See the excellent Postgres documentation here for details:
https://www.postgresql.org/docs/current/textsearch-controls.html
Release note (sql change): add the ts_rank function for ranking text
search query results
---
docs/generated/sql/functions.md | 10 +-
.../logictest/testdata/logic_test/tsvector | 36 +++
pkg/sql/sem/builtins/builtins.go | 1 -
pkg/sql/sem/builtins/fixed_oids.go | 4 +
pkg/sql/sem/builtins/tsearch_builtins.go | 113 +++++++
pkg/util/tsearch/BUILD.bazel | 2 +
pkg/util/tsearch/rank.go | 286 ++++++++++++++++++
pkg/util/tsearch/rank_test.go | 46 +++
pkg/util/tsearch/tsquery.go | 18 +-
pkg/util/tsearch/tsvector.go | 29 +-
10 files changed, 528 insertions(+), 17 deletions(-)
create mode 100644 pkg/util/tsearch/rank.go
create mode 100644 pkg/util/tsearch/rank_test.go
diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md
index 20d0f653770f..e4a8cdc76c12 100644
--- a/docs/generated/sql/functions.md
+++ b/docs/generated/sql/functions.md
@@ -905,7 +905,15 @@ available replica will error.
to_tsvector(text: string) → tsvector | Converts text to a tsvector, normalizing words according to the default configuration. Position information is included in the result.
| Stable |
ts_parse(parser_name: string, document: string) → tuple{int AS tokid, string AS token} | ts_parse parses the given document and returns a series of records, one for each token produced by parsing. Each record includes a tokid showing the assigned token type and a token which is the text of the token.
- | Stable |
+Stable |
+ts_rank(vector: tsvector, query: tsquery) → float4 | Ranks vectors based on the frequency of their matching lexemes.
+ | Immutable |
+ts_rank(vector: tsvector, query: tsquery, normalization: int) → float4 | Ranks vectors based on the frequency of their matching lexemes.
+ | Immutable |
+ts_rank(weights: float[], vector: tsvector, query: tsquery) → float4 | Ranks vectors based on the frequency of their matching lexemes.
+ | Immutable |
+ts_rank(weights: float[], vector: tsvector, query: tsquery, normalization: int) → float4 | Ranks vectors based on the frequency of their matching lexemes.
+ | Immutable |
### Fuzzy String Matching functions
diff --git a/pkg/sql/logictest/testdata/logic_test/tsvector b/pkg/sql/logictest/testdata/logic_test/tsvector
index 33d8a851d593..14c17947e0ad 100644
--- a/pkg/sql/logictest/testdata/logic_test/tsvector
+++ b/pkg/sql/logictest/testdata/logic_test/tsvector
@@ -298,3 +298,39 @@ query T
SELECT to_tsvector('Hello I am a potato')
----
'am':3 'hell':1 'i':2 'potat':5
+
+query TT
+SELECT to_tsvector('english', ''), to_tsvector('english', 'and the')
+----
+· ·
+
+statement error doesn't contain lexemes
+SELECT to_tsquery('english', 'the')
+
+statement ok
+CREATE TABLE sentences (sentence text, v TSVECTOR AS (to_tsvector('english', sentence)) STORED, INVERTED INDEX (v));
+INSERT INTO sentences VALUES
+ ('Future users of large data banks must be protected from having to know how the data is organized in the machine (the internal representation).'),
+ ('A prompting service which supplies such information is not a satisfactory solution.'),
+ ('Activities of users at terminals and most application programs should remain unaffected when the internal representation of data is changed and even when some aspects of the external representation
+ are changed.'),
+ ('Changes in data representation will often be needed as a result of changes in query, update, and report traffic and natural growth in the types of stored information.'),
+ ('Existing noninferential, formatted data systems provide users with tree-structured files or slightly more general network models of the data.'),
+ ('In Section 1, inadequacies of these models are discussed.'),
+ ('A model based on n-ary relations, a normal form for data base relations, and the concept of a universal data sublanguage are introduced.'),
+ ('In Section 2, certain operations on relations (other than logical inference) are discussed and applied to the problems of redundancy and consistency in the user’s model.')
+
+query FFFFT
+SELECT
+ts_rank(v, query) AS rank,
+ts_rank(ARRAY[0.2, 0.3, 0.5, 0.9]:::FLOAT[], v, query) AS wrank,
+ts_rank(v, query, 2|8) AS nrank,
+ts_rank(ARRAY[0.3, 0.4, 0.6, 0.95]:::FLOAT[], v, query, 1|2|4|8|16|32) AS wnrank,
+v
+FROM sentences, to_tsquery('english', 'relation') query
+WHERE query @@ v
+ORDER BY rank DESC
+LIMIT 10
+----
+0.075990885 0.15198177 0.00042217158 8.555783e-05 'ari':6 'base':3,13 'concept':17 'data':12,21 'form':10 'introduc':24 'model':2 'n':5 'normal':9 'relat':7,14 'sublanguag':22 'univers':20
+0.06079271 0.12158542 0.0003101669 6.095758e-05 '2':3 'appli':15 'certain':4 'consist':22 'discuss':13 'infer':11 'logic':10 'model':27 'oper':5 'problem':18 'redund':20 'relat':7 'section':2 'user':25
diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go
index d3100ea03f49..34b2a1094b6b 100644
--- a/pkg/sql/sem/builtins/builtins.go
+++ b/pkg/sql/sem/builtins/builtins.go
@@ -3765,7 +3765,6 @@ value if you rely on the HLC for accuracy.`,
"jsonb_to_tsvector": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
"ts_delete": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
"ts_filter": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
- "ts_rank": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
"ts_rank_cd": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
"ts_rewrite": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
"tsquery_phrase": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}),
diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go
index 435b16d75e9c..680d96e4a513 100644
--- a/pkg/sql/sem/builtins/fixed_oids.go
+++ b/pkg/sql/sem/builtins/fixed_oids.go
@@ -2373,6 +2373,10 @@ var builtinOidsArray = []string{
2399: `to_tsvector(text: string) -> tsvector`,
2400: `phraseto_tsquery(text: string) -> tsquery`,
2401: `plainto_tsquery(text: string) -> tsquery`,
+ 2402: `ts_rank(weights: float[], vector: tsvector, query: tsquery, normalization: int) -> float4`,
+ 2403: `ts_rank(vector: tsvector, query: tsquery, normalization: int) -> float4`,
+ 2404: `ts_rank(vector: tsvector, query: tsquery) -> float4`,
+ 2405: `ts_rank(weights: float[], vector: tsvector, query: tsquery) -> float4`,
}
var builtinOidsBySignature map[string]oid.Oid
diff --git a/pkg/sql/sem/builtins/tsearch_builtins.go b/pkg/sql/sem/builtins/tsearch_builtins.go
index e5e3a4633c79..627f1897b9b3 100644
--- a/pkg/sql/sem/builtins/tsearch_builtins.go
+++ b/pkg/sql/sem/builtins/tsearch_builtins.go
@@ -231,4 +231,117 @@ var tsearchBuiltins = map[string]builtinDefinition{
Volatility: volatility.Stable,
},
),
+ "ts_rank": makeBuiltin(
+ tree.FunctionProperties{},
+ tree.Overload{
+ Types: tree.ParamTypes{
+ {Name: "weights", Typ: types.FloatArray},
+ {Name: "vector", Typ: types.TSVector},
+ {Name: "query", Typ: types.TSQuery},
+ {Name: "normalization", Typ: types.Int},
+ },
+ ReturnType: tree.FixedReturnType(types.Float4),
+ Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
+ weights, err := getWeights(tree.MustBeDArray(args[0]))
+ if err != nil {
+ return nil, err
+ }
+ rank, err := tsearch.Rank(
+ weights,
+ tree.MustBeDTSVector(args[1]).TSVector,
+ tree.MustBeDTSQuery(args[2]).TSQuery,
+ int(tree.MustBeDInt(args[3])),
+ )
+ if err != nil {
+ return nil, err
+ }
+ return tree.NewDFloat(tree.DFloat(rank)), nil
+ },
+ Info: "Ranks vectors based on the frequency of their matching lexemes.",
+ Volatility: volatility.Immutable,
+ },
+ tree.Overload{
+ Types: tree.ParamTypes{
+ {Name: "weights", Typ: types.FloatArray},
+ {Name: "vector", Typ: types.TSVector},
+ {Name: "query", Typ: types.TSQuery},
+ },
+ ReturnType: tree.FixedReturnType(types.Float4),
+ Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
+ weights, err := getWeights(tree.MustBeDArray(args[0]))
+ if err != nil {
+ return nil, err
+ }
+ rank, err := tsearch.Rank(
+ weights,
+ tree.MustBeDTSVector(args[1]).TSVector,
+ tree.MustBeDTSQuery(args[2]).TSQuery,
+ 0,
+ )
+ if err != nil {
+ return nil, err
+ }
+ return tree.NewDFloat(tree.DFloat(rank)), nil
+ },
+ Info: "Ranks vectors based on the frequency of their matching lexemes.",
+ Volatility: volatility.Immutable,
+ },
+ tree.Overload{
+ Types: tree.ParamTypes{
+ {Name: "vector", Typ: types.TSVector},
+ {Name: "query", Typ: types.TSQuery},
+ {Name: "normalization", Typ: types.Int},
+ },
+ ReturnType: tree.FixedReturnType(types.Float4),
+ Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
+ rank, err := tsearch.Rank(
+ nil, /* weights */
+ tree.MustBeDTSVector(args[0]).TSVector,
+ tree.MustBeDTSQuery(args[1]).TSQuery,
+ int(tree.MustBeDInt(args[2])),
+ )
+ if err != nil {
+ return nil, err
+ }
+ return tree.NewDFloat(tree.DFloat(rank)), nil
+ },
+ Info: "Ranks vectors based on the frequency of their matching lexemes.",
+ Volatility: volatility.Immutable,
+ },
+ tree.Overload{
+ Types: tree.ParamTypes{
+ {Name: "vector", Typ: types.TSVector},
+ {Name: "query", Typ: types.TSQuery},
+ },
+ ReturnType: tree.FixedReturnType(types.Float4),
+ Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
+ rank, err := tsearch.Rank(
+ nil, /* weights */
+ tree.MustBeDTSVector(args[0]).TSVector,
+ tree.MustBeDTSQuery(args[1]).TSQuery,
+ 0, /* method */
+ )
+ if err != nil {
+ return nil, err
+ }
+ return tree.NewDFloat(tree.DFloat(rank)), nil
+ },
+ Info: "Ranks vectors based on the frequency of their matching lexemes.",
+ Volatility: volatility.Immutable,
+ },
+ ),
+}
+
+func getWeights(arr *tree.DArray) ([]float32, error) {
+ ret := make([]float32, 4)
+ if arr.Len() < len(ret) {
+ return ret, pgerror.New(pgcode.ArraySubscript, "array of weight is too short (must be at least 4)")
+ }
+ for i, d := range arr.Array {
+ if d == tree.DNull {
+ return ret, pgerror.New(pgcode.NullValueNotAllowed, "array of weight must not contain null")
+ }
+ ret[i] = float32(tree.MustBeDFloat(d))
+ }
+ return ret, nil
}
diff --git a/pkg/util/tsearch/BUILD.bazel b/pkg/util/tsearch/BUILD.bazel
index cc7cc235500e..01e25c3c2780 100644
--- a/pkg/util/tsearch/BUILD.bazel
+++ b/pkg/util/tsearch/BUILD.bazel
@@ -9,6 +9,7 @@ go_library(
"eval.go",
"lex.go",
"random.go",
+ "rank.go",
"snowball.go",
"stopwords.go",
"tsquery.go",
@@ -63,6 +64,7 @@ go_test(
srcs = [
"encoding_test.go",
"eval_test.go",
+ "rank_test.go",
"tsquery_test.go",
"tsvector_test.go",
],
diff --git a/pkg/util/tsearch/rank.go b/pkg/util/tsearch/rank.go
new file mode 100644
index 000000000000..757f736af6ec
--- /dev/null
+++ b/pkg/util/tsearch/rank.go
@@ -0,0 +1,286 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package tsearch
+
+import (
+ "math"
+ "sort"
+ "strings"
+)
+
+// defaultWeights is the default list of weights corresponding to the tsvector
+// lexeme weights D, C, B, and A.
+var defaultWeights = [4]float32{0.1, 0.2, 0.4, 1.0}
+
+// Bitmask for the normalization integer. These define different ranking
+// behaviors. They're defined in Postgres in tsrank.c.
+// 0, the default, ignores the document length.
+// 1 devides the rank by 1 + the logarithm of the document length.
+// 2 divides the rank by the document length.
+// 4 divides the rank by the mean harmonic distance between extents.
+//
+// NOTE: This is only implemented by ts_rank_cd, which is currently not
+// implemented by CockroachDB. This constant is left for consistency with
+// the original PostgreSQL source code.
+//
+// 8 divides the rank by the number of unique words in document.
+// 16 divides the rank by 1 + the logarithm of the number of unique words in document.
+// 32 divides the rank by itself + 1.
+type rankBehavior int
+
+const (
+ // rankNoNorm is the default. It ignores the document length.
+ rankNoNorm rankBehavior = 0x0
+ // rankNormLoglength divides the rank by 1 + the logarithm of the document length.
+ rankNormLoglength = 0x01
+ // rankNormLength divides the rank by the document length.
+ rankNormLength = 0x02
+ // rankNormExtdist divides the rank by the mean harmonic distance between extents.
+ // Note, this is only implemented by ts_rank_cd, which is not currently implemented
+ // by CockroachDB. The constant is kept for consistency with Postgres.
+ rankNormExtdist = 0x04
+ // rankNormUniq divides the rank by the number of unique words in document.
+ rankNormUniq = 0x08
+ // rankNormLoguniq divides the rank by 1 + the logarithm of the number of unique words in document.
+ rankNormLoguniq = 0x10
+ // rankNormRdivrplus1 divides the rank by itself + 1.
+ rankNormRdivrplus1 = 0x20
+)
+
+// Defeat the unused linter.
+var _ = rankNoNorm
+var _ = rankNormExtdist
+
+// cntLen returns the count of represented lexemes in a tsvector, including
+// the number of repeated lexemes in the vector.
+func cntLen(v TSVector) int {
+ var ret int
+ for i := range v {
+ posLen := len(v[i].positions)
+ if posLen > 0 {
+ ret += posLen
+ } else {
+ ret += 1
+ }
+ }
+ return ret
+}
+
+// Rank implements the ts_rank functionality, which ranks a tsvector against a
+// tsquery. The weights parameter is a list of weights corresponding to the
+// tsvector lexeme weights D, C, B, and A. The method parameter is a bitmask
+// defining different ranking behaviors, defined in the rankBehavior type
+// above in this file. The default ranking behavior is 0, which doesn't perform
+// any normalization based on the document length.
+//
+// N.B.: this function is directly translated from the calc_rank function in
+// tsrank.c, which contains almost no comments. As of this time, I am unable
+// to sufficiently explain how this ranker works, but I'm confident that the
+// implementation is at least compatible with Postgres.
+// https://github.com/postgres/postgres/blob/765f5df726918bcdcfd16bcc5418e48663d1dd59/src/backend/utils/adt/tsrank.c#L357
+func Rank(weights []float32, v TSVector, q TSQuery, method int) (float32, error) {
+ w := defaultWeights
+ if weights != nil {
+ copy(w[:4], weights[:4])
+ }
+ if len(v) == 0 || q.root == nil {
+ return 0, nil
+ }
+ var res float32
+ if q.root.op == and || q.root.op == followedby {
+ res = rankAnd(w, v, q)
+ } else {
+ res = rankOr(w, v, q)
+ }
+ if res < 0 {
+ // This constant is taken from the Postgres source code, unfortunately I
+ // don't understand its meaning.
+ res = 1e-20
+ }
+ if method&rankNormLoglength > 0 {
+ res /= float32(math.Log(float64(cntLen(v)+1)) / math.Log(2.0))
+ }
+
+ if method&rankNormLength > 0 {
+ l := cntLen(v)
+ if l > 0 {
+ res /= float32(l)
+ }
+ }
+ // rankNormExtDist is not applicable - it's only used for ts_rank_cd.
+
+ if method&rankNormUniq > 0 {
+ res /= float32(len(v))
+ }
+
+ if method&rankNormLoguniq > 0 {
+ res /= float32(math.Log(float64(len(v)+1)) / math.Log(2.0))
+ }
+
+ if method&rankNormRdivrplus1 > 0 {
+ res /= res + 1
+ }
+
+ return res, nil
+}
+
+func sortAndDistinctQueryTerms(q TSQuery) []*tsNode {
+ // Extract all leaf nodes from the query tree.
+ leafNodes := make([]*tsNode, 0)
+ var extractTerms func(q *tsNode)
+ extractTerms = func(q *tsNode) {
+ if q == nil {
+ return
+ }
+ if q.op != invalid {
+ extractTerms(q.l)
+ extractTerms(q.r)
+ } else {
+ leafNodes = append(leafNodes, q)
+ }
+ }
+ extractTerms(q.root)
+ // Sort the terms.
+ sort.Slice(leafNodes, func(i, j int) bool {
+ return leafNodes[i].term.lexeme < leafNodes[j].term.lexeme
+ })
+ // Then distinct: (wouldn't it be nice if Go had generics?)
+ lastUniqueIdx := 0
+ for j := 1; j < len(leafNodes); j++ {
+ if leafNodes[j].term.lexeme != leafNodes[lastUniqueIdx].term.lexeme {
+ // We found a unique entry, at index i. The last unique entry in the array
+ // was at lastUniqueIdx, so set the entry after that one to our new unique
+ // entry, and bump lastUniqueIdx for the next loop iteration.
+ lastUniqueIdx++
+ leafNodes[lastUniqueIdx] = leafNodes[j]
+ }
+ }
+ leafNodes = leafNodes[:lastUniqueIdx+1]
+ return leafNodes
+}
+
+// findRankMatches finds all matches for a given query term in a tsvector,
+// regardless of the expected query weight.
+// query is the term being matched. v is the tsvector being searched.
+// matches is a slice of matches to append to, to save on allocations as this
+// function is called in a loop.
+func findRankMatches(query *tsNode, v TSVector, matches [][]tsPosition) [][]tsPosition {
+ target := query.term.lexeme
+ i := sort.Search(len(v), func(i int) bool {
+ return v[i].lexeme >= target
+ })
+ if i >= len(v) {
+ return matches
+ }
+ if query.term.isPrefixMatch() {
+ for j := i; j < len(v); j++ {
+ t := v[j]
+ if !strings.HasPrefix(t.lexeme, target) {
+ break
+ }
+ matches = append(matches, t.positions)
+ }
+ } else if v[i].lexeme == target {
+ matches = append(matches, v[i].positions)
+ }
+ return matches
+}
+
+// rankOr computes the rank for a query with an OR operator at its root.
+// It takes the same parameters as TSRank.
+func rankOr(weights [4]float32, v TSVector, q TSQuery) float32 {
+ queryLeaves := sortAndDistinctQueryTerms(q)
+ var matches = make([][]tsPosition, 0)
+ var res float32
+ for i := range queryLeaves {
+ matches = matches[:0]
+ matches = findRankMatches(queryLeaves[i], v, matches)
+ if len(matches) == 0 {
+ continue
+ }
+ resj := float32(0.0)
+ wjm := float32(-1.0)
+ jm := 0
+ for _, innerMatches := range matches {
+ for j, pos := range innerMatches {
+ termWeight := pos.weight.val()
+ weight := weights[termWeight]
+ resj = resj + weight/float32((j+1)*(j+1))
+ if weight > wjm {
+ wjm = weight
+ jm = j
+ }
+ }
+ }
+ // Explanation from Postgres tsrank.c:
+ // limit (sum(1/i^2),i=1,inf) = pi^2/6
+ // resj = sum(wi/i^2),i=1,noccurence,
+ // wi - should be sorted desc,
+ // don't sort for now, just choose maximum weight. This should be corrected
+ // Oleg Bartunov
+ res = res + (wjm+resj-wjm/float32((jm+1)*(jm+1)))/1.64493406685
+ }
+ if len(queryLeaves) > 0 {
+ res /= float32(len(queryLeaves))
+ }
+ return res
+}
+
+// rankAnd computes the rank for a query with an AND or followed-by operator at
+// its root. It takes the same parameters as TSRank.
+func rankAnd(weights [4]float32, v TSVector, q TSQuery) float32 {
+ queryLeaves := sortAndDistinctQueryTerms(q)
+ if len(queryLeaves) < 2 {
+ return rankOr(weights, v, q)
+ }
+ pos := make([][]tsPosition, len(queryLeaves))
+ res := float32(-1)
+ var matches = make([][]tsPosition, 0)
+ for i := range queryLeaves {
+ matches = matches[:0]
+ matches = findRankMatches(queryLeaves[i], v, matches)
+ for _, innerMatches := range matches {
+ pos[i] = innerMatches
+ // Loop back through the earlier position matches
+ for k := 0; k < i; k++ {
+ if pos[k] == nil {
+ continue
+ }
+ for l := range pos[i] {
+ // For each of the earlier matches
+ for p := range pos[k] {
+ dist := int(pos[i][l].position) - int(pos[k][p].position)
+ if dist < 0 {
+ dist = -dist
+ }
+ if dist != 0 {
+ curw := float32(math.Sqrt(float64(weights[pos[i][l].weight.val()] * weights[pos[k][p].weight.val()] * wordDistance(dist))))
+ if res < 0 {
+ res = curw
+ } else {
+ res = 1.0 - (1.0-res)*(1.0-curw)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return res
+}
+
+// Returns a weight of a word collocation. See Postgres tsrank.c.
+func wordDistance(dist int) float32 {
+ if dist > 100 {
+ return 1e-30
+ }
+ return float32(1.0 / (1.005 + 0.05*math.Exp(float64(float32(dist)/1.5-2))))
+}
diff --git a/pkg/util/tsearch/rank_test.go b/pkg/util/tsearch/rank_test.go
new file mode 100644
index 000000000000..900f6e13100f
--- /dev/null
+++ b/pkg/util/tsearch/rank_test.go
@@ -0,0 +1,46 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package tsearch
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestRank(t *testing.T) {
+ tests := []struct {
+ weights []float32
+ v string
+ q string
+ method int
+ expected float32
+ }{
+ {v: "a:1 s:2C d g", q: "a | s", expected: 0.091189064},
+ {v: "a:1 sa:2C d g", q: "a | s", expected: 0.030396355},
+ {v: "a:1 sa:2C d g", q: "a | s:*", expected: 0.091189064},
+ {v: "a:1 sa:2C d g", q: "a | sa:*", expected: 0.091189064},
+ {v: "a:1 s:2B d g", q: "a | s", expected: 0.15198177},
+ {v: "a:1 s:2 d g", q: "a | s", expected: 0.06079271},
+ {v: "a:1 s:2C d g", q: "a & s", expected: 0.14015312},
+ {v: "a:1 s:2B d g", q: "a & s", expected: 0.19820644},
+ {v: "a:1 s:2 d g", q: "a & s", expected: 0.09910322},
+ }
+ for _, tt := range tests {
+ v, err := ParseTSVector(tt.v)
+ assert.NoError(t, err)
+ q, err := ParseTSQuery(tt.q)
+ assert.NoError(t, err)
+ actual, err := Rank(tt.weights, v, q, tt.method)
+ assert.NoError(t, err)
+ assert.Equalf(t, tt.expected, actual, "Rank(%v, %v, %v, %v)", tt.weights, tt.v, tt.q, tt.method)
+ }
+}
diff --git a/pkg/util/tsearch/tsquery.go b/pkg/util/tsearch/tsquery.go
index bc5c0a7ea76a..761860495911 100644
--- a/pkg/util/tsearch/tsquery.go
+++ b/pkg/util/tsearch/tsquery.go
@@ -499,12 +499,13 @@ func toTSQuery(config string, interpose tsOperator, input string) (TSQuery, erro
}
tokens = append(tokens, term)
}
- lexeme, ok, err := TSLexize(config, lexemeTokens[j])
+ lexeme, stopWord, err := TSLexize(config, lexemeTokens[j])
if err != nil {
return TSQuery{}, err
}
- if !ok {
+ if stopWord {
foundStopwords = true
+ //continue
}
tokens = append(tokens, tsTerm{lexeme: lexeme, positions: tok.positions})
}
@@ -518,17 +519,20 @@ func toTSQuery(config string, interpose tsOperator, input string) (TSQuery, erro
}
if foundStopwords {
- return cleanupStopwords(query)
+ query = cleanupStopwords(query)
+ if query.root == nil {
+ return query, pgerror.Newf(pgcode.Syntax, "text-search query doesn't contain lexemes: %s", input)
+ }
}
- return query, nil
+ return query, err
}
-func cleanupStopwords(query TSQuery) (TSQuery, error) {
+func cleanupStopwords(query TSQuery) TSQuery {
query.root, _, _ = cleanupStopword(query.root)
if query.root == nil {
- return TSQuery{}, nil
+ return TSQuery{}
}
- return query, nil
+ return query
}
// cleanupStopword cleans up a query tree by removing stop words and adjusting
diff --git a/pkg/util/tsearch/tsvector.go b/pkg/util/tsearch/tsvector.go
index f811f9d4144e..3417b5d99809 100644
--- a/pkg/util/tsearch/tsvector.go
+++ b/pkg/util/tsearch/tsvector.go
@@ -121,6 +121,14 @@ func (w tsWeight) TSVectorPGEncoding() (byte, error) {
return 0, errors.Errorf("invalid tsvector weight %d", w)
}
+func (w tsWeight) val() int {
+ b, err := w.TSVectorPGEncoding()
+ if err != nil {
+ panic(err)
+ }
+ return int(b)
+}
+
// matches returns true if the receiver is matched by the input tsquery weight.
func (w tsWeight) matches(queryWeight tsWeight) bool {
if queryWeight == weightAny {
@@ -287,6 +295,10 @@ func (t tsTerm) matchesWeight(targetWeight tsWeight) bool {
return false
}
+func (t tsTerm) isPrefixMatch() bool {
+ return len(t.positions) >= 1 && t.positions[0].weight&weightStar != 0
+}
+
// TSVector is a sorted list of terms, each of which is a lexeme that might have
// an associated position within an original document.
type TSVector []tsTerm
@@ -392,16 +404,16 @@ func TSParse(input string) []string {
// TSLexize implements the "dictionary" construct that's exposed via ts_lexize.
// It gets invoked once per input token to produce an output lexeme during
// routines like to_tsvector and to_tsquery.
-// It can return false in the second parameter to indicate a stopword was found.
-func TSLexize(config string, token string) (lexeme string, notAStopWord bool, err error) {
- stopwords, notAStopWord := stopwordsMap[config]
- if !notAStopWord {
+// It can return true in the second parameter to indicate a stopword was found.
+func TSLexize(config string, token string) (lexeme string, stopWord bool, err error) {
+ stopwords, ok := stopwordsMap[config]
+ if !ok {
return "", false, pgerror.Newf(pgcode.UndefinedObject, "text search configuration %q does not exist", config)
}
lower := strings.ToLower(token)
if _, ok := stopwords[lower]; ok {
- return "", false, nil
+ return "", true, nil
}
stemmer, err := getStemmer(config)
if err != nil {
@@ -409,7 +421,7 @@ func TSLexize(config string, token string) (lexeme string, notAStopWord bool, er
}
env := snowballstem.NewEnv(lower)
stemmer(env)
- return env.Current(), true, nil
+ return env.Current(), false, nil
}
// DocumentToTSVector parses an input document into lexemes, removes stop words,
@@ -419,17 +431,18 @@ func DocumentToTSVector(config string, input string) (TSVector, error) {
tokens := TSParse(input)
vector := make(TSVector, 0, len(tokens))
for i := range tokens {
- lexeme, ok, err := TSLexize(config, tokens[i])
+ lexeme, stopWord, err := TSLexize(config, tokens[i])
if err != nil {
return nil, err
}
- if !ok {
+ if stopWord {
continue
}
term := tsTerm{lexeme: lexeme}
pos := i + 1
if i > maxTSVectorPosition {
+ // Postgres silently truncates positions larger than 16383 to 16383.
pos = maxTSVectorPosition
}
term.positions = []tsPosition{{position: uint16(pos)}}
From 2cc4d0ff9b4fb21e465a8f269fd9ca04452e8ab4 Mon Sep 17 00:00:00 2001
From: Pavel Kalinnikov
Date: Tue, 21 Mar 2023 12:51:52 +0000
Subject: [PATCH 3/4] kvserver: wait for checkpoint completion in tests
This commit fixes the race possible in TestCheckConsistencyInconsistent:
- Node 2 is corrupted.
- The second phase of runConsistency check times out on node 1, and returns
early when only nodes 2 and 3 have created the storage checkpoint.
- Node 1 haven't created the checkpoint, but has started doing so.
- Node 2 "fatals" (mocked out in the test) shortly after the check is complete.
- Node 1 is still creating its checkpoint, but has probably created the
directory by now.
- Hence, the test assumes that the checkpoint has been created, and proceeds to
open it and compute the checksum of the range.
This commit makes the test wait for the moment when all the checkpoint are
known to be fully populated.
Part of #81819
Epic: none
Release note: none
---
pkg/kv/kvserver/consistency_queue_test.go | 36 +++++++++++++++--------
1 file changed, 23 insertions(+), 13 deletions(-)
diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go
index d793f9598df6..7b5767cb8faa 100644
--- a/pkg/kv/kvserver/consistency_queue_test.go
+++ b/pkg/kv/kvserver/consistency_queue_test.go
@@ -348,12 +348,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Empty(t, onDiskCheckpointPaths(i))
}
- // Write some arbitrary data only to store 1. Inconsistent key "e"!
- store1 := tc.GetFirstStoreFromServer(t, 1)
+ // Write some arbitrary data only to store on n2. Inconsistent key "e"!
+ s2 := tc.GetFirstStoreFromServer(t, 1)
var val roachpb.Value
val.SetInt(42)
// Put an inconsistent key "e" to s2, and have s1 and s3 still agree.
- require.NoError(t, storage.MVCCPut(context.Background(), store1.TODOEngine(), nil,
+ require.NoError(t, storage.MVCCPut(context.Background(), s2.TODOEngine(), nil,
roachpb.Key("e"), tc.Server(0).Clock().Now(), hlc.ClockTimestamp{}, val, nil))
// Run consistency check again, this time it should find something.
@@ -369,15 +369,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Contains(t, resp.Result[0].Detail, `[minority]`)
assert.Contains(t, resp.Result[0].Detail, `stats`)
- // Checkpoints should have been created on all stores.
- hashes := make([][]byte, numStores)
+ // Make sure that all the stores started creating a checkpoint. The metric
+ // measures the number of checkpoint directories, but a directory can
+ // represent an incomplete checkpoint that is still being populated.
for i := 0; i < numStores; i++ {
- cps := onDiskCheckpointPaths(i)
- require.Len(t, cps, 1)
- t.Logf("found a checkpoint at %s", cps[0])
- // The checkpoint must have been finalized.
- require.False(t, strings.HasSuffix(cps[0], "_pending"))
-
metric := tc.GetFirstStoreFromServer(t, i).Metrics().RdbCheckpoints
testutils.SucceedsSoon(t, func() error {
if got, want := metric.Value(), int64(1); got != want {
@@ -385,6 +380,21 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
}
return nil
})
+ }
+ // As discussed in https://github.com/cockroachdb/cockroach/issues/81819, it
+ // is possible that the check completes while there are still checkpoints in
+ // flight. Waiting for the server termination makes sure that checkpoints are
+ // fully created.
+ tc.Stopper().Stop(context.Background())
+
+ // Checkpoints should have been created on all stores.
+ hashes := make([][]byte, numStores)
+ for i := 0; i < numStores; i++ {
+ cps := onDiskCheckpointPaths(i)
+ require.Len(t, cps, 1)
+ t.Logf("found a checkpoint at %s", cps[0])
+ // The checkpoint must have been finalized.
+ require.False(t, strings.HasSuffix(cps[0], "_pending"))
// Create a new store on top of checkpoint location inside existing in-mem
// VFS to verify its contents.
@@ -417,8 +427,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree
assert.NotEqual(t, hashes[0], hashes[1]) // s2 diverged
- // A death rattle should have been written on s2 (store index 1).
- eng := store1.TODOEngine()
+ // A death rattle should have been written on s2.
+ eng := s2.TODOEngine()
f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir()))
require.NoError(t, err)
b, err := io.ReadAll(f)
From 6372b6e141701741f8d72839cdec8e0a7512fcc6 Mon Sep 17 00:00:00 2001
From: Pavel Kalinnikov
Date: Tue, 21 Mar 2023 12:58:49 +0000
Subject: [PATCH 4/4] kvserver: unskip TestCheckConsistencyInconsistent
The test was skipped for a reason that no longer holds.
Epic: none
Release note: none
---
pkg/kv/kvserver/consistency_queue_test.go | 9 ---------
1 file changed, 9 deletions(-)
diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go
index 7b5767cb8faa..609dc55318ed 100644
--- a/pkg/kv/kvserver/consistency_queue_test.go
+++ b/pkg/kv/kvserver/consistency_queue_test.go
@@ -35,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
- "github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -244,14 +243,6 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
- // TODO(pavelkalinnikov): not if we remove TestingSetRedactable below?
- skip.UnderRaceWithIssue(t, 81819, "slow test, and TestingSetRedactable triggers race detector")
-
- // This test prints a consistency checker diff, so it's
- // good to make sure we're overly redacting said diff.
- // TODO(pavelkalinnikov): remove this since we don't print diffs anymore?
- defer log.TestingSetRedactable(true)()
-
// Test expects simple MVCC value encoding.
storage.DisableMetamorphicSimpleValueEncoding(t)