diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 20d0f653770f..e4a8cdc76c12 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -905,7 +905,15 @@ available replica will error.

to_tsvector(text: string) → tsvector

Converts text to a tsvector, normalizing words according to the default configuration. Position information is included in the result.

Stable ts_parse(parser_name: string, document: string) → tuple{int AS tokid, string AS token}

ts_parse parses the given document and returns a series of records, one for each token produced by parsing. Each record includes a tokid showing the assigned token type and a token which is the text of the token.

-
Stable +Stable +ts_rank(vector: tsvector, query: tsquery) → float4

Ranks vectors based on the frequency of their matching lexemes.

+
Immutable +ts_rank(vector: tsvector, query: tsquery, normalization: int) → float4

Ranks vectors based on the frequency of their matching lexemes.

+
Immutable +ts_rank(weights: float[], vector: tsvector, query: tsquery) → float4

Ranks vectors based on the frequency of their matching lexemes.

+
Immutable +ts_rank(weights: float[], vector: tsvector, query: tsquery, normalization: int) → float4

Ranks vectors based on the frequency of their matching lexemes.

+
Immutable ### Fuzzy String Matching functions diff --git a/pkg/ccl/cliccl/ear_test.go b/pkg/ccl/cliccl/ear_test.go index b29a06cc1ac5..9b2970867f4d 100644 --- a/pkg/ccl/cliccl/ear_test.go +++ b/pkg/ccl/cliccl/ear_test.go @@ -14,6 +14,7 @@ import ( "crypto/rand" "fmt" "path/filepath" + "sort" "strings" "testing" @@ -56,6 +57,7 @@ func TestDecrypt(t *testing.T) { // Find a manifest file to check. files, err := p.List(dir) require.NoError(t, err) + sort.Strings(files) var manifestPath string for _, basename := range files { if strings.HasPrefix(basename, "MANIFEST-") { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 9e1751d54b73..f13176a8a032 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -212,6 +212,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_google_btree//:btree", diff --git a/pkg/kv/kvserver/client_replica_gc_test.go b/pkg/kv/kvserver/client_replica_gc_test.go index 53c9e04e9d21..2b1dde8d0621 100644 --- a/pkg/kv/kvserver/client_replica_gc_test.go +++ b/pkg/kv/kvserver/client_replica_gc_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "os" "path/filepath" "testing" "time" @@ -112,7 +113,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { if dir == "" { t.Fatal("no sideloaded directory") } - if err := eng.MkdirAll(dir); err != nil { + if err := eng.MkdirAll(dir, os.ModePerm); err != nil { t.Fatal(err) } if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil { diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index a7db3ab806ad..bd53ed544dc1 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -35,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -243,14 +242,6 @@ func TestCheckConsistencyInconsistent(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(pavelkalinnikov): not if we remove TestingSetRedactable below? - skip.UnderRaceWithIssue(t, 81819, "slow test, and TestingSetRedactable triggers race detector") - - // This test prints a consistency checker diff, so it's - // good to make sure we're overly redacting said diff. - // TODO(pavelkalinnikov): remove this since we don't print diffs anymore? - defer log.TestingSetRedactable(true)() - // Test expects simple MVCC value encoding. storage.DisableMetamorphicSimpleValueEncoding(t) @@ -347,12 +338,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Empty(t, onDiskCheckpointPaths(i)) } - // Write some arbitrary data only to store 1. Inconsistent key "e"! - store1 := tc.GetFirstStoreFromServer(t, 1) + // Write some arbitrary data only to store on n2. Inconsistent key "e"! + s2 := tc.GetFirstStoreFromServer(t, 1) var val roachpb.Value val.SetInt(42) // Put an inconsistent key "e" to s2, and have s1 and s3 still agree. - require.NoError(t, storage.MVCCPut(context.Background(), store1.TODOEngine(), nil, + require.NoError(t, storage.MVCCPut(context.Background(), s2.TODOEngine(), nil, roachpb.Key("e"), tc.Server(0).Clock().Now(), hlc.ClockTimestamp{}, val, nil)) // Run consistency check again, this time it should find something. @@ -368,15 +359,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Contains(t, resp.Result[0].Detail, `[minority]`) assert.Contains(t, resp.Result[0].Detail, `stats`) - // Checkpoints should have been created on all stores. - hashes := make([][]byte, numStores) + // Make sure that all the stores started creating a checkpoint. The metric + // measures the number of checkpoint directories, but a directory can + // represent an incomplete checkpoint that is still being populated. for i := 0; i < numStores; i++ { - cps := onDiskCheckpointPaths(i) - require.Len(t, cps, 1) - t.Logf("found a checkpoint at %s", cps[0]) - // The checkpoint must have been finalized. - require.False(t, strings.HasSuffix(cps[0], "_pending")) - metric := tc.GetFirstStoreFromServer(t, i).Metrics().RdbCheckpoints testutils.SucceedsSoon(t, func() error { if got, want := metric.Value(), int64(1); got != want { @@ -384,6 +370,21 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } return nil }) + } + // As discussed in https://github.com/cockroachdb/cockroach/issues/81819, it + // is possible that the check completes while there are still checkpoints in + // flight. Waiting for the server termination makes sure that checkpoints are + // fully created. + tc.Stopper().Stop(context.Background()) + + // Checkpoints should have been created on all stores. + hashes := make([][]byte, numStores) + for i := 0; i < numStores; i++ { + cps := onDiskCheckpointPaths(i) + require.Len(t, cps, 1) + t.Logf("found a checkpoint at %s", cps[0]) + // The checkpoint must have been finalized. + require.False(t, strings.HasSuffix(cps[0], "_pending")) // Create a new store on top of checkpoint location inside existing in-mem // VFS to verify its contents. @@ -414,8 +415,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree assert.NotEqual(t, hashes[0], hashes[1]) // s2 diverged - // A death rattle should have been written on s2 (store index 1). - eng := store1.TODOEngine() + // A death rattle should have been written on s2. + eng := s2.TODOEngine() f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir())) require.NoError(t, err) b, err := io.ReadAll(f) diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index f4a7f74ddcc7..1bb735f64459 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/storage/fs", "//pkg/util/errorutil", "//pkg/util/hlc", "//pkg/util/humanizeutil", @@ -30,6 +29,7 @@ go_library( "//pkg/util/timeutil", "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@org_golang_x_time//rate", ], diff --git a/pkg/kv/kvserver/kvserverbase/syncing_write.go b/pkg/kv/kvserver/kvserverbase/syncing_write.go index d44e865394b9..6231252b9627 100644 --- a/pkg/kv/kvserver/kvserverbase/syncing_write.go +++ b/pkg/kv/kvserver/kvserverbase/syncing_write.go @@ -19,10 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "golang.org/x/time/rate" ) @@ -82,7 +82,7 @@ func WriteFileSyncing( ctx context.Context, filename string, data []byte, - fs fs.FS, + fs vfs.FS, perm os.FileMode, settings *cluster.Settings, limiter *rate.Limiter, diff --git a/pkg/kv/kvserver/logstore/sideload_disk.go b/pkg/kv/kvserver/logstore/sideload_disk.go index 81496e0de454..e2bfc6765581 100644 --- a/pkg/kv/kvserver/logstore/sideload_disk.go +++ b/pkg/kv/kvserver/logstore/sideload_disk.go @@ -13,6 +13,7 @@ package logstore import ( "context" "fmt" + "os" "path/filepath" "strconv" "strings" @@ -75,7 +76,7 @@ func NewDiskSideloadStorage( } func (ss *DiskSideloadStorage) createDir() error { - err := ss.eng.MkdirAll(ss.dir) + err := ss.eng.MkdirAll(ss.dir, os.ModePerm) ss.dirCreated = ss.dirCreated || err == nil return err } diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index d4f4c9ad103b..99786884b2d4 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,6 +15,7 @@ import ( "crypto/sha512" "encoding/binary" "fmt" + "os" "sync" "time" @@ -741,7 +742,7 @@ func (r *Replica) computeChecksumPostApply( // certain of completing the check. Since we're already in a goroutine // that's about to end, just sleep for a few seconds and then terminate. auxDir := r.store.TODOEngine().GetAuxiliaryDir() - _ = r.store.TODOEngine().MkdirAll(auxDir) + _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm) path := base.PreventedStartupFile(auxDir) const attentionFmt = `ATTENTION: diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index c1d500bae05a..68fc59145fd4 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "os" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -50,7 +51,7 @@ func (r *Replica) setCorruptRaftMuLocked( r.mu.destroyStatus.Set(cErr, destroyReasonRemoved) auxDir := r.store.TODOEngine().GetAuxiliaryDir() - _ = r.store.TODOEngine().MkdirAll(auxDir) + _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm) path := base.PreventedStartupFile(auxDir) preventStartupMsg := fmt.Sprintf(`ATTENTION: diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5f8548109c45..d1496ff42d05 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "os" "path/filepath" "time" "unsafe" @@ -561,7 +562,7 @@ func addSSTablePreApply( // TODO(tschottdorf): remove this once sideloaded storage guarantees its // existence. - if err := eng.MkdirAll(filepath.Dir(ingestPath)); err != nil { + if err := eng.MkdirAll(filepath.Dir(ingestPath), os.ModePerm); err != nil { panic(err) } if _, err := eng.Stat(ingestPath); err == nil { diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go index b53a0cbf6eb3..d7ce8c6cc12f 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "os" "path/filepath" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/vfs" "golang.org/x/time/rate" ) @@ -113,7 +115,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string { } func (s *SSTSnapshotStorageScratch) createDir() error { - err := s.storage.engine.MkdirAll(s.snapDir) + err := s.storage.engine.MkdirAll(s.snapDir, os.ModePerm) s.dirCreated = s.dirCreated || err == nil return err } @@ -182,7 +184,7 @@ func (s *SSTSnapshotStorageScratch) Close() error { type SSTSnapshotStorageFile struct { scratch *SSTSnapshotStorageScratch created bool - file fs.File + file vfs.File filename string ctx context.Context bytesPerSync int64 @@ -207,7 +209,7 @@ func (f *SSTSnapshotStorageFile) ensureFile() error { } var err error if f.bytesPerSync > 0 { - f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync)) + f.file, err = fs.CreateWithSync(f.scratch.storage.engine, f.filename, int(f.bytesPerSync)) } else { f.file, err = f.scratch.storage.engine.Create(f.filename) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 35622c3285e3..21e89bb7551b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math" + "os" "path/filepath" "runtime" "sort" @@ -3083,7 +3084,7 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span { // the provided key spans. If spans is empty, it includes the entire store. func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) { checkpointBase := s.checkpointsDir() - _ = s.TODOEngine().MkdirAll(checkpointBase) + _ = s.TODOEngine().MkdirAll(checkpointBase, os.ModePerm) // Create the checkpoint in a "pending" directory first. If we fail midway, it // should be clear that the directory contains an incomplete checkpoint. pendingDir := filepath.Join(checkpointBase, tag+"_pending") diff --git a/pkg/sql/colcontainer/BUILD.bazel b/pkg/sql/colcontainer/BUILD.bazel index 193404fb093d..767aa256c692 100644 --- a/pkg/sql/colcontainer/BUILD.bazel +++ b/pkg/sql/colcontainer/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/util/mon", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_golang_snappy//:snappy", "@com_github_marusama_semaphore//:semaphore", ], @@ -46,7 +47,6 @@ go_test( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/types", - "//pkg/storage/fs", "//pkg/testutils/colcontainerutils", "//pkg/testutils/skip", "//pkg/util/humanizeutil", @@ -54,6 +54,7 @@ go_test( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "@com_github_cockroachdb_pebble//vfs", "@com_github_marusama_semaphore//:semaphore", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/colcontainer/diskqueue.go b/pkg/sql/colcontainer/diskqueue.go index 5e0a000f062b..6170b45754c8 100644 --- a/pkg/sql/colcontainer/diskqueue.go +++ b/pkg/sql/colcontainer/diskqueue.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "io" + "os" "path/filepath" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "github.com/golang/snappy" ) @@ -181,7 +183,7 @@ type diskQueue struct { // written before a compress and flush. writeBufferLimit int writeFileIdx int - writeFile fs.File + writeFile vfs.File deserializerState struct { *colserde.FileDeserializer curBatch int @@ -189,7 +191,7 @@ type diskQueue struct { // readFileIdx is an index into the current file in files the deserializer is // reading from. readFileIdx int - readFile fs.File + readFile vfs.File scratchDecompressedReadBytes []byte diskAcc *mon.BoundAccount @@ -298,7 +300,7 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather { // DiskQueueCfg is a struct holding the configuration options for a DiskQueue. type DiskQueueCfg struct { // FS is the filesystem interface to use. - FS fs.FS + FS vfs.FS // GetPather returns where the temporary directory that will contain this // DiskQueue's files has been created. The directory name will be a UUID. // Note that the directory is created lazily on the first call to GetPath. @@ -411,7 +413,7 @@ func newDiskQueue( if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls { d.writeBufferLimit = d.cfg.BufferSizeBytes / 2 } - if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil { + if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName), os.ModePerm); err != nil { return nil, err } // rotateFile will create a new file to write to. @@ -492,7 +494,7 @@ func (d *diskQueue) Close(ctx context.Context) error { // to write to. func (d *diskQueue) rotateFile(ctx context.Context) error { fName := filepath.Join(d.cfg.GetPather.GetPath(ctx), d.dirName, strconv.Itoa(d.seqNo)) - f, err := d.cfg.FS.CreateWithSync(fName, bytesPerSync) + f, err := fs.CreateWithSync(d.cfg.FS, fName, bytesPerSync) if err != nil { return err } @@ -527,7 +529,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) error { return nil } -func (d *diskQueue) resetWriters(f fs.File) error { +func (d *diskQueue) resetWriters(f vfs.File) error { d.writer.reset(f) return d.serializer.Reset(d.writer) } diff --git a/pkg/sql/colcontainer/partitionedqueue_test.go b/pkg/sql/colcontainer/partitionedqueue_test.go index 8538a931ab80..3efbe2d9d6de 100644 --- a/pkg/sql/colcontainer/partitionedqueue_test.go +++ b/pkg/sql/colcontainer/partitionedqueue_test.go @@ -19,16 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/pebble/vfs" "github.com/marusama/semaphore" "github.com/stretchr/testify/require" ) type fdCountingFSFile struct { - fs.File + vfs.File onCloseCb func() } @@ -41,7 +41,7 @@ func (f *fdCountingFSFile) Close() error { } type fdCountingFS struct { - fs.FS + vfs.FS writeFDs int readFDs int } @@ -58,7 +58,7 @@ func (f *fdCountingFS) assertOpenFDs( require.Equal(t, expectedReadFDs, f.readFDs) } -func (f *fdCountingFS) Create(name string) (fs.File, error) { +func (f *fdCountingFS) Create(name string) (vfs.File, error) { file, err := f.FS.Create(name) if err != nil { return nil, err @@ -67,16 +67,7 @@ func (f *fdCountingFS) Create(name string) (fs.File, error) { return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil } -func (f *fdCountingFS) CreateWithSync(name string, bytesPerSync int) (fs.File, error) { - file, err := f.FS.CreateWithSync(name, bytesPerSync) - if err != nil { - return nil, err - } - f.writeFDs++ - return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil -} - -func (f *fdCountingFS) Open(name string) (fs.File, error) { +func (f *fdCountingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) { file, err := f.FS.Open(name) if err != nil { return nil, err diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 687cebf6753e..d3efa84ab050 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -12,6 +12,7 @@ package colflow import ( "context" + "os" "path/filepath" "sync" "sync/atomic" @@ -343,7 +344,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string { tempDirName := f.GetID().String() f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName) log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2)) - if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil { + if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path, os.ModePerm); err != nil { colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory")) } // We have just created the temporary directory which will be used for all diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 47dfc4d4af73..534da28773a7 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -12,6 +12,7 @@ package colflow import ( "context" + "os" "path/filepath" "sync" "testing" @@ -364,12 +365,12 @@ func TestVectorizedFlowTempDirectory(t *testing.T) { errCh := make(chan error) go func() { createTempDir(ctx) - errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async")) + errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"), os.ModePerm) }() createTempDir(ctx) // Both goroutines should be able to create their subdirectories within the // flow's temporary directory. - require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"))) + require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"), os.ModePerm)) require.NoError(t, <-errCh) vf.Cleanup(ctx) checkDirs(t, 0) diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 37292614e0fa..6e318108c23e 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -60,7 +60,6 @@ go_library( "//pkg/sql/sqlliveness", "//pkg/sql/stats", "//pkg/sql/types", - "//pkg/storage/fs", "//pkg/util/admission", "//pkg/util/buildutil", "//pkg/util/intsets", @@ -77,6 +76,7 @@ go_library( "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_marusama_semaphore//:semaphore", "@io_opentelemetry_go_otel//attribute", diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 4f907f7da134..e33fd9f3533a 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/stats" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -45,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "github.com/marusama/semaphore" ) @@ -95,7 +95,7 @@ type ServerConfig struct { // TempFS is used by the vectorized execution engine to store columns when the // working set is larger than can be stored in memory. - TempFS fs.FS + TempFS vfs.FS // VecFDSemaphore is a weighted semaphore that restricts the number of open // file descriptors in the vectorized engine. diff --git a/pkg/sql/logictest/testdata/logic_test/tsvector b/pkg/sql/logictest/testdata/logic_test/tsvector index 33d8a851d593..14c17947e0ad 100644 --- a/pkg/sql/logictest/testdata/logic_test/tsvector +++ b/pkg/sql/logictest/testdata/logic_test/tsvector @@ -298,3 +298,39 @@ query T SELECT to_tsvector('Hello I am a potato') ---- 'am':3 'hell':1 'i':2 'potat':5 + +query TT +SELECT to_tsvector('english', ''), to_tsvector('english', 'and the') +---- +· · + +statement error doesn't contain lexemes +SELECT to_tsquery('english', 'the') + +statement ok +CREATE TABLE sentences (sentence text, v TSVECTOR AS (to_tsvector('english', sentence)) STORED, INVERTED INDEX (v)); +INSERT INTO sentences VALUES + ('Future users of large data banks must be protected from having to know how the data is organized in the machine (the internal representation).'), + ('A prompting service which supplies such information is not a satisfactory solution.'), + ('Activities of users at terminals and most application programs should remain unaffected when the internal representation of data is changed and even when some aspects of the external representation + are changed.'), + ('Changes in data representation will often be needed as a result of changes in query, update, and report traffic and natural growth in the types of stored information.'), + ('Existing noninferential, formatted data systems provide users with tree-structured files or slightly more general network models of the data.'), + ('In Section 1, inadequacies of these models are discussed.'), + ('A model based on n-ary relations, a normal form for data base relations, and the concept of a universal data sublanguage are introduced.'), + ('In Section 2, certain operations on relations (other than logical inference) are discussed and applied to the problems of redundancy and consistency in the user’s model.') + +query FFFFT +SELECT +ts_rank(v, query) AS rank, +ts_rank(ARRAY[0.2, 0.3, 0.5, 0.9]:::FLOAT[], v, query) AS wrank, +ts_rank(v, query, 2|8) AS nrank, +ts_rank(ARRAY[0.3, 0.4, 0.6, 0.95]:::FLOAT[], v, query, 1|2|4|8|16|32) AS wnrank, +v +FROM sentences, to_tsquery('english', 'relation') query +WHERE query @@ v +ORDER BY rank DESC +LIMIT 10 +---- +0.075990885 0.15198177 0.00042217158 8.555783e-05 'ari':6 'base':3,13 'concept':17 'data':12,21 'form':10 'introduc':24 'model':2 'n':5 'normal':9 'relat':7,14 'sublanguag':22 'univers':20 +0.06079271 0.12158542 0.0003101669 6.095758e-05 '2':3 'appli':15 'certain':4 'consist':22 'discuss':13 'infer':11 'logic':10 'model':27 'oper':5 'problem':18 'redund':20 'relat':7 'section':2 'user':25 diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index d3100ea03f49..34b2a1094b6b 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -3765,7 +3765,6 @@ value if you rely on the HLC for accuracy.`, "jsonb_to_tsvector": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), "ts_delete": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), "ts_filter": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), - "ts_rank": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), "ts_rank_cd": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), "ts_rewrite": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), "tsquery_phrase": makeBuiltin(tree.FunctionProperties{UnsupportedWithIssue: 7821, Category: builtinconstants.CategoryFullTextSearch}), diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 435b16d75e9c..680d96e4a513 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -2373,6 +2373,10 @@ var builtinOidsArray = []string{ 2399: `to_tsvector(text: string) -> tsvector`, 2400: `phraseto_tsquery(text: string) -> tsquery`, 2401: `plainto_tsquery(text: string) -> tsquery`, + 2402: `ts_rank(weights: float[], vector: tsvector, query: tsquery, normalization: int) -> float4`, + 2403: `ts_rank(vector: tsvector, query: tsquery, normalization: int) -> float4`, + 2404: `ts_rank(vector: tsvector, query: tsquery) -> float4`, + 2405: `ts_rank(weights: float[], vector: tsvector, query: tsquery) -> float4`, } var builtinOidsBySignature map[string]oid.Oid diff --git a/pkg/sql/sem/builtins/tsearch_builtins.go b/pkg/sql/sem/builtins/tsearch_builtins.go index e5e3a4633c79..627f1897b9b3 100644 --- a/pkg/sql/sem/builtins/tsearch_builtins.go +++ b/pkg/sql/sem/builtins/tsearch_builtins.go @@ -231,4 +231,117 @@ var tsearchBuiltins = map[string]builtinDefinition{ Volatility: volatility.Stable, }, ), + "ts_rank": makeBuiltin( + tree.FunctionProperties{}, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "weights", Typ: types.FloatArray}, + {Name: "vector", Typ: types.TSVector}, + {Name: "query", Typ: types.TSQuery}, + {Name: "normalization", Typ: types.Int}, + }, + ReturnType: tree.FixedReturnType(types.Float4), + Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + weights, err := getWeights(tree.MustBeDArray(args[0])) + if err != nil { + return nil, err + } + rank, err := tsearch.Rank( + weights, + tree.MustBeDTSVector(args[1]).TSVector, + tree.MustBeDTSQuery(args[2]).TSQuery, + int(tree.MustBeDInt(args[3])), + ) + if err != nil { + return nil, err + } + return tree.NewDFloat(tree.DFloat(rank)), nil + }, + Info: "Ranks vectors based on the frequency of their matching lexemes.", + Volatility: volatility.Immutable, + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "weights", Typ: types.FloatArray}, + {Name: "vector", Typ: types.TSVector}, + {Name: "query", Typ: types.TSQuery}, + }, + ReturnType: tree.FixedReturnType(types.Float4), + Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + weights, err := getWeights(tree.MustBeDArray(args[0])) + if err != nil { + return nil, err + } + rank, err := tsearch.Rank( + weights, + tree.MustBeDTSVector(args[1]).TSVector, + tree.MustBeDTSQuery(args[2]).TSQuery, + 0, + ) + if err != nil { + return nil, err + } + return tree.NewDFloat(tree.DFloat(rank)), nil + }, + Info: "Ranks vectors based on the frequency of their matching lexemes.", + Volatility: volatility.Immutable, + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "vector", Typ: types.TSVector}, + {Name: "query", Typ: types.TSQuery}, + {Name: "normalization", Typ: types.Int}, + }, + ReturnType: tree.FixedReturnType(types.Float4), + Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + rank, err := tsearch.Rank( + nil, /* weights */ + tree.MustBeDTSVector(args[0]).TSVector, + tree.MustBeDTSQuery(args[1]).TSQuery, + int(tree.MustBeDInt(args[2])), + ) + if err != nil { + return nil, err + } + return tree.NewDFloat(tree.DFloat(rank)), nil + }, + Info: "Ranks vectors based on the frequency of their matching lexemes.", + Volatility: volatility.Immutable, + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "vector", Typ: types.TSVector}, + {Name: "query", Typ: types.TSQuery}, + }, + ReturnType: tree.FixedReturnType(types.Float4), + Fn: func(_ context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + rank, err := tsearch.Rank( + nil, /* weights */ + tree.MustBeDTSVector(args[0]).TSVector, + tree.MustBeDTSQuery(args[1]).TSQuery, + 0, /* method */ + ) + if err != nil { + return nil, err + } + return tree.NewDFloat(tree.DFloat(rank)), nil + }, + Info: "Ranks vectors based on the frequency of their matching lexemes.", + Volatility: volatility.Immutable, + }, + ), +} + +func getWeights(arr *tree.DArray) ([]float32, error) { + ret := make([]float32, 4) + if arr.Len() < len(ret) { + return ret, pgerror.New(pgcode.ArraySubscript, "array of weight is too short (must be at least 4)") + } + for i, d := range arr.Array { + if d == tree.DNull { + return ret, pgerror.New(pgcode.NullValueNotAllowed, "array of weight must not contain null") + } + ret[i] = float32(tree.MustBeDFloat(d)) + } + return ret, nil } diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index d747464f79b6..496563bc3020 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1927,7 +1927,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { return cmp < 0 }) sstFileName := fmt.Sprintf("tmp-ingest-%d", i) - sstFile, err := eng.fs.Create(sstFileName) + sstFile, err := eng.Create(sstFileName) require.NoError(b, err) // No improvement with v3 since the multiple versions are in different // files. diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index d4d621a2d0df..08e5f4af6c90 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/storage/pebbleiter" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" prometheusgo "github.com/prometheus/client_model/go" ) @@ -941,7 +941,7 @@ type Engine interface { // be invoked while holding mutexes). RegisterFlushCompletedCallback(cb func()) // Filesystem functionality. - fs.FS + vfs.FS // CreateCheckpoint creates a checkpoint of the engine in the given directory, // which must not exist. The directory should be on the same file system so // that hard links can be used. If spans is not empty, the checkpoint excludes diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index c53cb0470700..9e2de5259227 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -18,6 +18,7 @@ import ( "io" "math" "math/rand" + "os" "path/filepath" "reflect" "sort" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -1056,12 +1058,6 @@ func TestIngestDelayLimit(t *testing.T) { } } -type stringSorter []string - -func (s stringSorter) Len() int { return len(s) } -func (s stringSorter) Swap(i int, j int) { s[i], s[j] = s[j], s[i] } -func (s stringSorter) Less(i int, j int) bool { return strings.Compare(s[i], s[j]) < 0 } - func TestEngineFS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1114,7 +1110,7 @@ func TestEngineFS(t *testing.T) { "9h: delete /dir1", } - var f fs.File + var f vfs.File for _, tc := range testCases { s := strings.Split(tc, " ")[1:] @@ -1132,14 +1128,14 @@ func TestEngineFS(t *testing.T) { } var ( - g fs.File + g vfs.File err error ) switch s[0] { case "create": g, err = e.Create(s[1]) case "create-with-sync": - g, err = e.CreateWithSync(s[1], 1) + g, err = fs.CreateWithSync(e, s[1], 1) case "link": err = e.Link(s[1], s[2]) case "open": @@ -1151,13 +1147,13 @@ func TestEngineFS(t *testing.T) { case "rename": err = e.Rename(s[1], s[2]) case "create-dir": - err = e.MkdirAll(s[1]) + err = e.MkdirAll(s[1], os.ModePerm) case "list-dir": result, err := e.List(s[1]) if err != nil { break } - sort.Sort(stringSorter(result)) + sort.Strings(result) got := strings.Join(result, ",") want := s[3] if got != want { @@ -1304,6 +1300,7 @@ func TestFS(t *testing.T) { t.Helper() got, err := fs.List(dir) + sort.Strings(got) require.NoError(t, err) if !reflect.DeepEqual(got, want) { t.Fatalf("fs.List(%q) = %#v, want %#v", dir, got, want) @@ -1311,7 +1308,7 @@ func TestFS(t *testing.T) { } // Create a/ and assert that it's empty. - require.NoError(t, fs.MkdirAll(path("a"))) + require.NoError(t, fs.MkdirAll(path("a"), os.ModePerm)) expectLS(path("a"), []string{}) if _, err := fs.Stat(path("a/b/c")); !oserror.IsNotExist(err) { t.Fatal(`fs.Stat("a/b/c") should not exist`) @@ -1319,8 +1316,8 @@ func TestFS(t *testing.T) { // Create a/b/ and a/b/c/ in a single MkdirAll call. // Then ensure that a duplicate call returns a nil error. - require.NoError(t, fs.MkdirAll(path("a/b/c"))) - require.NoError(t, fs.MkdirAll(path("a/b/c"))) + require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm)) + require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm)) expectLS(path("a"), []string{"b"}) expectLS(path("a/b"), []string{"c"}) expectLS(path("a/b/c"), []string{}) diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index 3d626c2215e4..a64ad6ed54e7 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -12,66 +12,23 @@ package fs import ( "io" - "os" -) - -// File and FS are a partial attempt at offering the Pebble vfs.FS interface. Given the constraints -// of the RocksDB Env interface we've chosen to only include what is easy to implement. Additionally, -// it does not try to subsume all the file related functionality already in the Engine interface. -// It seems preferable to do a final cleanup only when the implementation can simply use Pebble's -// implementation of vfs.FS. At that point the following interface will become a superset of vfs.FS. -type File interface { - io.ReadWriteCloser - io.ReaderAt - Sync() error -} - -// FS provides a filesystem interface. -type FS interface { - // Create creates the named file for writing, removing the file at - // the provided path if one already exists. - Create(name string) (File, error) - - // CreateWithSync is similar to Create, but the file is periodically - // synced whenever more than bytesPerSync bytes accumulate. This syncing - // does not provide any persistency guarantees, but can prevent latency - // spikes. - CreateWithSync(name string, bytesPerSync int) (File, error) - - // Link creates newname as a hard link to the oldname file. - Link(oldname, newname string) error - - // Open opens the named file for reading. - Open(name string) (File, error) - - // OpenDir opens the named directory for syncing. - OpenDir(name string) (File, error) - // Remove removes the named file. If the file with given name doesn't - // exist, return an error that returns true from oserror.IsNotExist(). - Remove(name string) error - - // Rename renames a file. It overwrites the file at newname if one exists, - // the same as os.Rename. - Rename(oldname, newname string) error - - // MkdirAll creates the named dir and its parents. Does nothing if the - // directory already exists. - MkdirAll(name string) error - - // RemoveAll deletes the path and any children it contains. - RemoveAll(dir string) error - - // List returns a listing of the given directory. The names returned are - // relative to the directory. - List(name string) ([]string, error) + "github.com/cockroachdb/pebble/vfs" +) - // Stat returns a FileInfo describing the named file. - Stat(name string) (os.FileInfo, error) +// CreateWithSync creates a file wrapped with logic to periodically sync +// whenever more than bytesPerSync bytes accumulate. This syncing does not +// provide any persistency guarantees, but can prevent latency spikes. +func CreateWithSync(fs vfs.FS, name string, bytesPerSync int) (vfs.File, error) { + f, err := fs.Create(name) + if err != nil { + return nil, err + } + return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil } // WriteFile writes data to a file named by filename. -func WriteFile(fs FS, filename string, data []byte) error { +func WriteFile(fs vfs.FS, filename string, data []byte) error { f, err := fs.Create(filename) if err != nil { return err @@ -84,7 +41,7 @@ func WriteFile(fs FS, filename string, data []byte) error { } // ReadFile reads data from a file named by filename. -func ReadFile(fs FS, filename string) ([]byte, error) { +func ReadFile(fs vfs.FS, filename string) ([]byte, error) { file, err := fs.Open(filename) if err != nil { return nil, err diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 6218e5452513..3ada2534b01c 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -19,7 +19,6 @@ import ( "math" "os" "path/filepath" - "sort" "strconv" "strings" "sync" @@ -732,6 +731,8 @@ type EncryptionStatsHandler interface { // Pebble is a wrapper around a Pebble database instance. type Pebble struct { + vfs.FS + atomic struct { // compactionConcurrency is the current compaction concurrency set on // the Pebble store. The compactionConcurrency option in the Pebble @@ -769,7 +770,6 @@ type Pebble struct { sharedBytesWritten int64 // Relevant options copied over from pebble.Options. - fs vfs.FS unencryptedFS vfs.FS logCtx context.Context logger pebble.LoggerAndTracer @@ -1013,6 +1013,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { storeProps := computeStoreProperties(ctx, cfg.Dir, opts.ReadOnly, encryptionEnv != nil /* encryptionEnabled */) p = &Pebble{ + FS: opts.FS, readOnly: opts.ReadOnly, path: cfg.Dir, auxDir: auxDir, @@ -1024,7 +1025,6 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { settings: cfg.Settings, encryption: encryptionEnv, fileRegistry: fileRegistry, - fs: opts.FS, unencryptedFS: unencryptedFS, logger: opts.LoggerAndTracer, logCtx: logCtx, @@ -1813,7 +1813,7 @@ func (p *Pebble) GetEnvStats() (*EnvStats, error) { } stats.ActiveKeyFiles++ - filename := p.fs.PathBase(filePath) + filename := p.FS.PathBase(filePath) numStr := strings.TrimSuffix(filename, ".sst") if len(numStr) == len(filename) { continue // not a sstable @@ -1910,68 +1910,7 @@ func (p *Pebble) RegisterFlushCompletedCallback(cb func()) { p.mu.Unlock() } -// Remove implements the FS interface. -func (p *Pebble) Remove(filename string) error { - return p.fs.Remove(filename) -} - -// RemoveAll implements the Engine interface. -func (p *Pebble) RemoveAll(dir string) error { - return p.fs.RemoveAll(dir) -} - -// Link implements the FS interface. -func (p *Pebble) Link(oldname, newname string) error { - return p.fs.Link(oldname, newname) -} - -var _ fs.FS = &Pebble{} - -// Create implements the FS interface. -func (p *Pebble) Create(name string) (fs.File, error) { - return p.fs.Create(name) -} - -// CreateWithSync implements the FS interface. -func (p *Pebble) CreateWithSync(name string, bytesPerSync int) (fs.File, error) { - f, err := p.fs.Create(name) - if err != nil { - return nil, err - } - return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil -} - -// Open implements the FS interface. -func (p *Pebble) Open(name string) (fs.File, error) { - return p.fs.Open(name) -} - -// OpenDir implements the FS interface. -func (p *Pebble) OpenDir(name string) (fs.File, error) { - return p.fs.OpenDir(name) -} - -// Rename implements the FS interface. -func (p *Pebble) Rename(oldname, newname string) error { - return p.fs.Rename(oldname, newname) -} - -// MkdirAll implements the FS interface. -func (p *Pebble) MkdirAll(name string) error { - return p.fs.MkdirAll(name, 0755) -} - -// List implements the FS interface. -func (p *Pebble) List(name string) ([]string, error) { - dirents, err := p.fs.List(name) - sort.Strings(dirents) - return dirents, err -} - -// Stat implements the FS interface. -func (p *Pebble) Stat(name string) (os.FileInfo, error) { - return p.fs.Stat(name) -} +var _ vfs.FS = &Pebble{} func checkpointSpansNote(spans []roachpb.Span) []byte { note := "CRDB spans:\n" @@ -2005,7 +1944,7 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error { // TODO(#90543, cockroachdb/pebble#2285): move spans info to Pebble manifest. if len(spans) > 0 { if err := fs.SafeWriteToFile( - p.fs, dir, p.fs.PathJoin(dir, "checkpoint.txt"), + p.FS, dir, p.FS.PathJoin(dir, "checkpoint.txt"), checkpointSpansNote(spans), ); err != nil { return err diff --git a/pkg/storage/temp_engine.go b/pkg/storage/temp_engine.go index c9545c82dc8a..acc375d37892 100644 --- a/pkg/storage/temp_engine.go +++ b/pkg/storage/temp_engine.go @@ -16,16 +16,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" ) // NewTempEngine creates a new engine for DistSQL processors to use when // the working set is larger than can be stored in memory. func NewTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (diskmap.Factory, fs.FS, error) { +) (diskmap.Factory, vfs.FS, error) { return NewPebbleTempEngine(ctx, tempStorage, storeSpec) } @@ -58,13 +58,13 @@ func (r *pebbleTempEngine) NewSortedDiskMultiMap() diskmap.SortedDiskMap { // when the working set is larger than can be stored in memory. func NewPebbleTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (diskmap.Factory, fs.FS, error) { +) (diskmap.Factory, vfs.FS, error) { return newPebbleTempEngine(ctx, tempStorage, storeSpec) } func newPebbleTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (*pebbleTempEngine, fs.FS, error) { +) (*pebbleTempEngine, vfs.FS, error) { var loc Location var cacheSize int64 = 128 << 20 // 128 MiB, arbitrary, but not "too big" if tempStorage.InMemory { diff --git a/pkg/testutils/colcontainerutils/diskqueuecfg.go b/pkg/testutils/colcontainerutils/diskqueuecfg.go index 488032af484a..deaabb7afc72 100644 --- a/pkg/testutils/colcontainerutils/diskqueuecfg.go +++ b/pkg/testutils/colcontainerutils/diskqueuecfg.go @@ -12,6 +12,7 @@ package colcontainerutils import ( "context" + "os" "testing" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -54,7 +55,7 @@ func NewTestingDiskQueueCfg(t testing.TB, inMem bool) (colcontainer.DiskQueueCfg } if inMem { - if err := ngn.MkdirAll(inMemDirName); err != nil { + if err := ngn.MkdirAll(inMemDirName, os.ModePerm); err != nil { t.Fatal(err) } } diff --git a/pkg/util/tsearch/BUILD.bazel b/pkg/util/tsearch/BUILD.bazel index cc7cc235500e..01e25c3c2780 100644 --- a/pkg/util/tsearch/BUILD.bazel +++ b/pkg/util/tsearch/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "eval.go", "lex.go", "random.go", + "rank.go", "snowball.go", "stopwords.go", "tsquery.go", @@ -63,6 +64,7 @@ go_test( srcs = [ "encoding_test.go", "eval_test.go", + "rank_test.go", "tsquery_test.go", "tsvector_test.go", ], diff --git a/pkg/util/tsearch/rank.go b/pkg/util/tsearch/rank.go new file mode 100644 index 000000000000..757f736af6ec --- /dev/null +++ b/pkg/util/tsearch/rank.go @@ -0,0 +1,286 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tsearch + +import ( + "math" + "sort" + "strings" +) + +// defaultWeights is the default list of weights corresponding to the tsvector +// lexeme weights D, C, B, and A. +var defaultWeights = [4]float32{0.1, 0.2, 0.4, 1.0} + +// Bitmask for the normalization integer. These define different ranking +// behaviors. They're defined in Postgres in tsrank.c. +// 0, the default, ignores the document length. +// 1 devides the rank by 1 + the logarithm of the document length. +// 2 divides the rank by the document length. +// 4 divides the rank by the mean harmonic distance between extents. +// +// NOTE: This is only implemented by ts_rank_cd, which is currently not +// implemented by CockroachDB. This constant is left for consistency with +// the original PostgreSQL source code. +// +// 8 divides the rank by the number of unique words in document. +// 16 divides the rank by 1 + the logarithm of the number of unique words in document. +// 32 divides the rank by itself + 1. +type rankBehavior int + +const ( + // rankNoNorm is the default. It ignores the document length. + rankNoNorm rankBehavior = 0x0 + // rankNormLoglength divides the rank by 1 + the logarithm of the document length. + rankNormLoglength = 0x01 + // rankNormLength divides the rank by the document length. + rankNormLength = 0x02 + // rankNormExtdist divides the rank by the mean harmonic distance between extents. + // Note, this is only implemented by ts_rank_cd, which is not currently implemented + // by CockroachDB. The constant is kept for consistency with Postgres. + rankNormExtdist = 0x04 + // rankNormUniq divides the rank by the number of unique words in document. + rankNormUniq = 0x08 + // rankNormLoguniq divides the rank by 1 + the logarithm of the number of unique words in document. + rankNormLoguniq = 0x10 + // rankNormRdivrplus1 divides the rank by itself + 1. + rankNormRdivrplus1 = 0x20 +) + +// Defeat the unused linter. +var _ = rankNoNorm +var _ = rankNormExtdist + +// cntLen returns the count of represented lexemes in a tsvector, including +// the number of repeated lexemes in the vector. +func cntLen(v TSVector) int { + var ret int + for i := range v { + posLen := len(v[i].positions) + if posLen > 0 { + ret += posLen + } else { + ret += 1 + } + } + return ret +} + +// Rank implements the ts_rank functionality, which ranks a tsvector against a +// tsquery. The weights parameter is a list of weights corresponding to the +// tsvector lexeme weights D, C, B, and A. The method parameter is a bitmask +// defining different ranking behaviors, defined in the rankBehavior type +// above in this file. The default ranking behavior is 0, which doesn't perform +// any normalization based on the document length. +// +// N.B.: this function is directly translated from the calc_rank function in +// tsrank.c, which contains almost no comments. As of this time, I am unable +// to sufficiently explain how this ranker works, but I'm confident that the +// implementation is at least compatible with Postgres. +// https://github.com/postgres/postgres/blob/765f5df726918bcdcfd16bcc5418e48663d1dd59/src/backend/utils/adt/tsrank.c#L357 +func Rank(weights []float32, v TSVector, q TSQuery, method int) (float32, error) { + w := defaultWeights + if weights != nil { + copy(w[:4], weights[:4]) + } + if len(v) == 0 || q.root == nil { + return 0, nil + } + var res float32 + if q.root.op == and || q.root.op == followedby { + res = rankAnd(w, v, q) + } else { + res = rankOr(w, v, q) + } + if res < 0 { + // This constant is taken from the Postgres source code, unfortunately I + // don't understand its meaning. + res = 1e-20 + } + if method&rankNormLoglength > 0 { + res /= float32(math.Log(float64(cntLen(v)+1)) / math.Log(2.0)) + } + + if method&rankNormLength > 0 { + l := cntLen(v) + if l > 0 { + res /= float32(l) + } + } + // rankNormExtDist is not applicable - it's only used for ts_rank_cd. + + if method&rankNormUniq > 0 { + res /= float32(len(v)) + } + + if method&rankNormLoguniq > 0 { + res /= float32(math.Log(float64(len(v)+1)) / math.Log(2.0)) + } + + if method&rankNormRdivrplus1 > 0 { + res /= res + 1 + } + + return res, nil +} + +func sortAndDistinctQueryTerms(q TSQuery) []*tsNode { + // Extract all leaf nodes from the query tree. + leafNodes := make([]*tsNode, 0) + var extractTerms func(q *tsNode) + extractTerms = func(q *tsNode) { + if q == nil { + return + } + if q.op != invalid { + extractTerms(q.l) + extractTerms(q.r) + } else { + leafNodes = append(leafNodes, q) + } + } + extractTerms(q.root) + // Sort the terms. + sort.Slice(leafNodes, func(i, j int) bool { + return leafNodes[i].term.lexeme < leafNodes[j].term.lexeme + }) + // Then distinct: (wouldn't it be nice if Go had generics?) + lastUniqueIdx := 0 + for j := 1; j < len(leafNodes); j++ { + if leafNodes[j].term.lexeme != leafNodes[lastUniqueIdx].term.lexeme { + // We found a unique entry, at index i. The last unique entry in the array + // was at lastUniqueIdx, so set the entry after that one to our new unique + // entry, and bump lastUniqueIdx for the next loop iteration. + lastUniqueIdx++ + leafNodes[lastUniqueIdx] = leafNodes[j] + } + } + leafNodes = leafNodes[:lastUniqueIdx+1] + return leafNodes +} + +// findRankMatches finds all matches for a given query term in a tsvector, +// regardless of the expected query weight. +// query is the term being matched. v is the tsvector being searched. +// matches is a slice of matches to append to, to save on allocations as this +// function is called in a loop. +func findRankMatches(query *tsNode, v TSVector, matches [][]tsPosition) [][]tsPosition { + target := query.term.lexeme + i := sort.Search(len(v), func(i int) bool { + return v[i].lexeme >= target + }) + if i >= len(v) { + return matches + } + if query.term.isPrefixMatch() { + for j := i; j < len(v); j++ { + t := v[j] + if !strings.HasPrefix(t.lexeme, target) { + break + } + matches = append(matches, t.positions) + } + } else if v[i].lexeme == target { + matches = append(matches, v[i].positions) + } + return matches +} + +// rankOr computes the rank for a query with an OR operator at its root. +// It takes the same parameters as TSRank. +func rankOr(weights [4]float32, v TSVector, q TSQuery) float32 { + queryLeaves := sortAndDistinctQueryTerms(q) + var matches = make([][]tsPosition, 0) + var res float32 + for i := range queryLeaves { + matches = matches[:0] + matches = findRankMatches(queryLeaves[i], v, matches) + if len(matches) == 0 { + continue + } + resj := float32(0.0) + wjm := float32(-1.0) + jm := 0 + for _, innerMatches := range matches { + for j, pos := range innerMatches { + termWeight := pos.weight.val() + weight := weights[termWeight] + resj = resj + weight/float32((j+1)*(j+1)) + if weight > wjm { + wjm = weight + jm = j + } + } + } + // Explanation from Postgres tsrank.c: + // limit (sum(1/i^2),i=1,inf) = pi^2/6 + // resj = sum(wi/i^2),i=1,noccurence, + // wi - should be sorted desc, + // don't sort for now, just choose maximum weight. This should be corrected + // Oleg Bartunov + res = res + (wjm+resj-wjm/float32((jm+1)*(jm+1)))/1.64493406685 + } + if len(queryLeaves) > 0 { + res /= float32(len(queryLeaves)) + } + return res +} + +// rankAnd computes the rank for a query with an AND or followed-by operator at +// its root. It takes the same parameters as TSRank. +func rankAnd(weights [4]float32, v TSVector, q TSQuery) float32 { + queryLeaves := sortAndDistinctQueryTerms(q) + if len(queryLeaves) < 2 { + return rankOr(weights, v, q) + } + pos := make([][]tsPosition, len(queryLeaves)) + res := float32(-1) + var matches = make([][]tsPosition, 0) + for i := range queryLeaves { + matches = matches[:0] + matches = findRankMatches(queryLeaves[i], v, matches) + for _, innerMatches := range matches { + pos[i] = innerMatches + // Loop back through the earlier position matches + for k := 0; k < i; k++ { + if pos[k] == nil { + continue + } + for l := range pos[i] { + // For each of the earlier matches + for p := range pos[k] { + dist := int(pos[i][l].position) - int(pos[k][p].position) + if dist < 0 { + dist = -dist + } + if dist != 0 { + curw := float32(math.Sqrt(float64(weights[pos[i][l].weight.val()] * weights[pos[k][p].weight.val()] * wordDistance(dist)))) + if res < 0 { + res = curw + } else { + res = 1.0 - (1.0-res)*(1.0-curw) + } + } + } + } + } + } + } + return res +} + +// Returns a weight of a word collocation. See Postgres tsrank.c. +func wordDistance(dist int) float32 { + if dist > 100 { + return 1e-30 + } + return float32(1.0 / (1.005 + 0.05*math.Exp(float64(float32(dist)/1.5-2)))) +} diff --git a/pkg/util/tsearch/rank_test.go b/pkg/util/tsearch/rank_test.go new file mode 100644 index 000000000000..900f6e13100f --- /dev/null +++ b/pkg/util/tsearch/rank_test.go @@ -0,0 +1,46 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRank(t *testing.T) { + tests := []struct { + weights []float32 + v string + q string + method int + expected float32 + }{ + {v: "a:1 s:2C d g", q: "a | s", expected: 0.091189064}, + {v: "a:1 sa:2C d g", q: "a | s", expected: 0.030396355}, + {v: "a:1 sa:2C d g", q: "a | s:*", expected: 0.091189064}, + {v: "a:1 sa:2C d g", q: "a | sa:*", expected: 0.091189064}, + {v: "a:1 s:2B d g", q: "a | s", expected: 0.15198177}, + {v: "a:1 s:2 d g", q: "a | s", expected: 0.06079271}, + {v: "a:1 s:2C d g", q: "a & s", expected: 0.14015312}, + {v: "a:1 s:2B d g", q: "a & s", expected: 0.19820644}, + {v: "a:1 s:2 d g", q: "a & s", expected: 0.09910322}, + } + for _, tt := range tests { + v, err := ParseTSVector(tt.v) + assert.NoError(t, err) + q, err := ParseTSQuery(tt.q) + assert.NoError(t, err) + actual, err := Rank(tt.weights, v, q, tt.method) + assert.NoError(t, err) + assert.Equalf(t, tt.expected, actual, "Rank(%v, %v, %v, %v)", tt.weights, tt.v, tt.q, tt.method) + } +} diff --git a/pkg/util/tsearch/tsquery.go b/pkg/util/tsearch/tsquery.go index bc5c0a7ea76a..761860495911 100644 --- a/pkg/util/tsearch/tsquery.go +++ b/pkg/util/tsearch/tsquery.go @@ -499,12 +499,13 @@ func toTSQuery(config string, interpose tsOperator, input string) (TSQuery, erro } tokens = append(tokens, term) } - lexeme, ok, err := TSLexize(config, lexemeTokens[j]) + lexeme, stopWord, err := TSLexize(config, lexemeTokens[j]) if err != nil { return TSQuery{}, err } - if !ok { + if stopWord { foundStopwords = true + //continue } tokens = append(tokens, tsTerm{lexeme: lexeme, positions: tok.positions}) } @@ -518,17 +519,20 @@ func toTSQuery(config string, interpose tsOperator, input string) (TSQuery, erro } if foundStopwords { - return cleanupStopwords(query) + query = cleanupStopwords(query) + if query.root == nil { + return query, pgerror.Newf(pgcode.Syntax, "text-search query doesn't contain lexemes: %s", input) + } } - return query, nil + return query, err } -func cleanupStopwords(query TSQuery) (TSQuery, error) { +func cleanupStopwords(query TSQuery) TSQuery { query.root, _, _ = cleanupStopword(query.root) if query.root == nil { - return TSQuery{}, nil + return TSQuery{} } - return query, nil + return query } // cleanupStopword cleans up a query tree by removing stop words and adjusting diff --git a/pkg/util/tsearch/tsvector.go b/pkg/util/tsearch/tsvector.go index f811f9d4144e..3417b5d99809 100644 --- a/pkg/util/tsearch/tsvector.go +++ b/pkg/util/tsearch/tsvector.go @@ -121,6 +121,14 @@ func (w tsWeight) TSVectorPGEncoding() (byte, error) { return 0, errors.Errorf("invalid tsvector weight %d", w) } +func (w tsWeight) val() int { + b, err := w.TSVectorPGEncoding() + if err != nil { + panic(err) + } + return int(b) +} + // matches returns true if the receiver is matched by the input tsquery weight. func (w tsWeight) matches(queryWeight tsWeight) bool { if queryWeight == weightAny { @@ -287,6 +295,10 @@ func (t tsTerm) matchesWeight(targetWeight tsWeight) bool { return false } +func (t tsTerm) isPrefixMatch() bool { + return len(t.positions) >= 1 && t.positions[0].weight&weightStar != 0 +} + // TSVector is a sorted list of terms, each of which is a lexeme that might have // an associated position within an original document. type TSVector []tsTerm @@ -392,16 +404,16 @@ func TSParse(input string) []string { // TSLexize implements the "dictionary" construct that's exposed via ts_lexize. // It gets invoked once per input token to produce an output lexeme during // routines like to_tsvector and to_tsquery. -// It can return false in the second parameter to indicate a stopword was found. -func TSLexize(config string, token string) (lexeme string, notAStopWord bool, err error) { - stopwords, notAStopWord := stopwordsMap[config] - if !notAStopWord { +// It can return true in the second parameter to indicate a stopword was found. +func TSLexize(config string, token string) (lexeme string, stopWord bool, err error) { + stopwords, ok := stopwordsMap[config] + if !ok { return "", false, pgerror.Newf(pgcode.UndefinedObject, "text search configuration %q does not exist", config) } lower := strings.ToLower(token) if _, ok := stopwords[lower]; ok { - return "", false, nil + return "", true, nil } stemmer, err := getStemmer(config) if err != nil { @@ -409,7 +421,7 @@ func TSLexize(config string, token string) (lexeme string, notAStopWord bool, er } env := snowballstem.NewEnv(lower) stemmer(env) - return env.Current(), true, nil + return env.Current(), false, nil } // DocumentToTSVector parses an input document into lexemes, removes stop words, @@ -419,17 +431,18 @@ func DocumentToTSVector(config string, input string) (TSVector, error) { tokens := TSParse(input) vector := make(TSVector, 0, len(tokens)) for i := range tokens { - lexeme, ok, err := TSLexize(config, tokens[i]) + lexeme, stopWord, err := TSLexize(config, tokens[i]) if err != nil { return nil, err } - if !ok { + if stopWord { continue } term := tsTerm{lexeme: lexeme} pos := i + 1 if i > maxTSVectorPosition { + // Postgres silently truncates positions larger than 16383 to 16383. pos = maxTSVectorPosition } term.positions = []tsPosition{{position: uint16(pos)}}