Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: make sorts and joins actually spill to disk #45318

Merged
merged 4 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ArrowBatchConverter struct {
func NewArrowBatchConverter(typs []coltypes.T) (*ArrowBatchConverter, error) {
for _, t := range typs {
if _, supported := supportedTypes[t]; !supported {
return nil, errors.Errorf("unsupported type %v", t.String())
return nil, errors.Errorf("arrowbatchconverter unsupported type %v", t.String())
}
}
c := &ArrowBatchConverter{typs: typs}
Expand Down
8 changes: 7 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import (
"github.com/cockroachdb/logtags"
raven "github.com/getsentry/raven-go"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/marusama/semaphore"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -636,7 +637,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
TempStorage: tempEngine,
TempStoragePath: s.cfg.TempStorageConfig.Path,
TempFS: tempFS,
DiskMonitor: s.cfg.TempStorageConfig.Mon,
// COCKROACH_VEC_MAX_OPEN_FDS specifies the maximum number of open file
// descriptors that the vectorized execution engine may have open at any
// one time. This limit is implemented as a weighted semaphore acquired
// before opening files.
VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", 256)),
DiskMonitor: s.cfg.TempStorageConfig.Mon,

ParentMemoryMonitor: &rootSQLMemoryMonitor,
BulkAdder: func(
Expand Down
36 changes: 24 additions & 12 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ func (w *diskQueueWriter) numBytesBuffered() int {
// writes the same amount of buffer space.
// NOTE: We could reuse the memory used to buffer uncompressed writes to buffer
// uncompressed reads, but this would only work with the limitation that all
// writes happen before all reads.
// TODO(asubiotto): The improvement mentioned above might be worth it once we
// ensure that we only use DiskQueues for the write-everything, read-everything
// pattern.
// writes happen before all reads, which is not the case for the
// colexec.HashRouter.
type diskQueue struct {
// dirName is the directory in cfg.Path that holds this queue's files.
dirName string
Expand Down Expand Up @@ -194,6 +192,9 @@ type Queue interface {
// to. If an error is returned, the batch and boolean returned are
// meaningless.
Dequeue(coldata.Batch) (bool, error)
// CloseRead closes the read file descriptor. If Dequeued, the file may be
// reopened.
CloseRead() error
// Close closes any resources associated with the Queue.
Close() error
}
Expand Down Expand Up @@ -268,6 +269,16 @@ func NewDiskQueue(typs []coltypes.T, cfg DiskQueueCfg) (Queue, error) {
return d, d.rotateFile()
}

func (d *diskQueue) CloseRead() error {
if d.readFile != nil {
if err := d.readFile.Close(); err != nil {
return err
}
d.readFile = nil
}
return nil
}

func (d *diskQueue) Close() error {
if d.serializer != nil {
if err := d.writeFooterAndFlush(); err != nil {
Expand All @@ -287,14 +298,11 @@ func (d *diskQueue) Close() error {
}
d.writeFile = nil
}
if d.readFile != nil {
if err := d.readFile.Close(); err != nil {
return err
}
d.readFile = nil
// The readFile will be removed below in RemoveAll.
// The readFile will be removed below in DeleteDirAndFiles.
if err := d.CloseRead(); err != nil {
return err
}
if err := d.cfg.FS.DeleteDir(filepath.Join(d.cfg.Path, d.dirName)); err != nil {
if err := d.cfg.FS.DeleteDirAndFiles(filepath.Join(d.cfg.Path, d.dirName)); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -366,6 +374,10 @@ func (d *diskQueue) writeFooterAndFlush() error {

func (d *diskQueue) Enqueue(b coldata.Batch) error {
if b.Length() == 0 {
if d.done {
// Already done.
return nil
}
if err := d.writeFooterAndFlush(); err != nil {
return err
}
Expand Down Expand Up @@ -421,7 +433,7 @@ func (d *diskQueue) maybeInitDeserializer() (bool, error) {
// writer has rotated to a new file.
if fileToRead.finishedWriting {
// Close and remove current file.
if err := d.readFile.Close(); err != nil {
if err := d.CloseRead(); err != nil {
return false, err
}
if err := d.cfg.FS.DeleteFile(d.files[d.readFileIdx].name); err != nil {
Expand Down
Loading