From 3d2915f8c37be8db3d3302d053cc21ed78f4963d Mon Sep 17 00:00:00 2001 From: nate Date: Thu, 17 Sep 2020 13:03:48 -0400 Subject: [PATCH] [dbnode] Make caching after block retrieval a configuration option (#2613) --- src/cmd/services/m3dbnode/config/config.go | 4 + .../dtest/docker/harness/resources/harness.go | 19 +- .../generated/proto/namespace/namespace.pb.go | 179 ++++++++++++------ .../generated/proto/namespace/namespace.proto | 23 +-- src/dbnode/namespace/config.go | 22 ++- src/dbnode/namespace/convert.go | 23 ++- src/dbnode/namespace/convert_test.go | 29 ++- src/dbnode/namespace/namespace_mock.go | 28 +++ src/dbnode/namespace/options.go | 60 +++--- src/dbnode/namespace/types.go | 8 + src/dbnode/persist/fs/retriever.go | 10 +- src/dbnode/persist/fs/retriever_options.go | 13 ++ src/dbnode/persist/fs/retriever_test.go | 111 ++++++++++- src/dbnode/persist/fs/types.go | 6 + src/dbnode/server/server.go | 3 + .../api/v1/handler/database/create_test.go | 6 + .../api/v1/handler/namespace/add_test.go | 11 +- .../api/v1/handler/namespace/get_test.go | 26 +-- .../api/v1/handler/namespace/update_test.go | 36 ++-- 19 files changed, 446 insertions(+), 171 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 82b7e58797..5c0f4aee33 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -261,6 +261,10 @@ type BlockRetrievePolicy struct { // FetchConcurrency is the concurrency to fetch blocks from disk. For // spinning disks it is highly recommended to set this value to 1. FetchConcurrency int `yaml:"fetchConcurrency" validate:"min=0"` + + // CacheBlocksOnRetrieve globally enables/disables callbacks used to cache blocks fetched + // from disk. + CacheBlocksOnRetrieve *bool `yaml:"cacheBlocksOnRetrieve"` } // CommitLogPolicy is the commit log policy. diff --git a/src/cmd/tools/dtest/docker/harness/resources/harness.go b/src/cmd/tools/dtest/docker/harness/resources/harness.go index 728d91e129..31850e241d 100644 --- a/src/cmd/tools/dtest/docker/harness/resources/harness.go +++ b/src/cmd/tools/dtest/docker/harness/resources/harness.go @@ -27,7 +27,9 @@ import ( "github.com/m3db/m3/src/query/generated/proto/admin" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" - dockertest "github.com/ory/dockertest" + + protobuftypes "github.com/gogo/protobuf/types" + "github.com/ory/dockertest" "go.uber.org/zap" ) @@ -150,13 +152,14 @@ func SetupSingleM3DBNode() (DockerResources, error) { coldWriteNamespace = admin.NamespaceAddRequest{ Name: ColdWriteNsName, Options: &namespace.NamespaceOptions{ - BootstrapEnabled: true, - FlushEnabled: true, - WritesToCommitLog: true, - CleanupEnabled: true, - SnapshotEnabled: true, - RepairEnabled: true, - ColdWritesEnabled: true, + BootstrapEnabled: true, + FlushEnabled: true, + WritesToCommitLog: true, + CleanupEnabled: true, + SnapshotEnabled: true, + RepairEnabled: true, + ColdWritesEnabled: true, + CacheBlocksOnRetrieve: &protobuftypes.BoolValue{Value: true}, RetentionOptions: &namespace.RetentionOptions{ RetentionPeriodNanos: int64(4 * time.Hour), BlockSizeNanos: int64(time.Hour), diff --git a/src/dbnode/generated/proto/namespace/namespace.pb.go b/src/dbnode/generated/proto/namespace/namespace.pb.go index 8d4ff1f104..8cb1d8aa68 100644 --- a/src/dbnode/generated/proto/namespace/namespace.pb.go +++ b/src/dbnode/generated/proto/namespace/namespace.pb.go @@ -147,17 +147,18 @@ func (m *IndexOptions) GetBlockSizeNanos() int64 { } type NamespaceOptions struct { - BootstrapEnabled bool `protobuf:"varint,1,opt,name=bootstrapEnabled,proto3" json:"bootstrapEnabled,omitempty"` - FlushEnabled bool `protobuf:"varint,2,opt,name=flushEnabled,proto3" json:"flushEnabled,omitempty"` - WritesToCommitLog bool `protobuf:"varint,3,opt,name=writesToCommitLog,proto3" json:"writesToCommitLog,omitempty"` - CleanupEnabled bool `protobuf:"varint,4,opt,name=cleanupEnabled,proto3" json:"cleanupEnabled,omitempty"` - RepairEnabled bool `protobuf:"varint,5,opt,name=repairEnabled,proto3" json:"repairEnabled,omitempty"` - RetentionOptions *RetentionOptions `protobuf:"bytes,6,opt,name=retentionOptions" json:"retentionOptions,omitempty"` - SnapshotEnabled bool `protobuf:"varint,7,opt,name=snapshotEnabled,proto3" json:"snapshotEnabled,omitempty"` - IndexOptions *IndexOptions `protobuf:"bytes,8,opt,name=indexOptions" json:"indexOptions,omitempty"` - SchemaOptions *SchemaOptions `protobuf:"bytes,9,opt,name=schemaOptions" json:"schemaOptions,omitempty"` - ColdWritesEnabled bool `protobuf:"varint,10,opt,name=coldWritesEnabled,proto3" json:"coldWritesEnabled,omitempty"` - RuntimeOptions *NamespaceRuntimeOptions `protobuf:"bytes,11,opt,name=runtimeOptions" json:"runtimeOptions,omitempty"` + BootstrapEnabled bool `protobuf:"varint,1,opt,name=bootstrapEnabled,proto3" json:"bootstrapEnabled,omitempty"` + FlushEnabled bool `protobuf:"varint,2,opt,name=flushEnabled,proto3" json:"flushEnabled,omitempty"` + WritesToCommitLog bool `protobuf:"varint,3,opt,name=writesToCommitLog,proto3" json:"writesToCommitLog,omitempty"` + CleanupEnabled bool `protobuf:"varint,4,opt,name=cleanupEnabled,proto3" json:"cleanupEnabled,omitempty"` + RepairEnabled bool `protobuf:"varint,5,opt,name=repairEnabled,proto3" json:"repairEnabled,omitempty"` + RetentionOptions *RetentionOptions `protobuf:"bytes,6,opt,name=retentionOptions" json:"retentionOptions,omitempty"` + SnapshotEnabled bool `protobuf:"varint,7,opt,name=snapshotEnabled,proto3" json:"snapshotEnabled,omitempty"` + IndexOptions *IndexOptions `protobuf:"bytes,8,opt,name=indexOptions" json:"indexOptions,omitempty"` + SchemaOptions *SchemaOptions `protobuf:"bytes,9,opt,name=schemaOptions" json:"schemaOptions,omitempty"` + ColdWritesEnabled bool `protobuf:"varint,10,opt,name=coldWritesEnabled,proto3" json:"coldWritesEnabled,omitempty"` + RuntimeOptions *NamespaceRuntimeOptions `protobuf:"bytes,11,opt,name=runtimeOptions" json:"runtimeOptions,omitempty"` + CacheBlocksOnRetrieve *google_protobuf.BoolValue `protobuf:"bytes,12,opt,name=cacheBlocksOnRetrieve" json:"cacheBlocksOnRetrieve,omitempty"` } func (m *NamespaceOptions) Reset() { *m = NamespaceOptions{} } @@ -242,6 +243,13 @@ func (m *NamespaceOptions) GetRuntimeOptions() *NamespaceRuntimeOptions { return nil } +func (m *NamespaceOptions) GetCacheBlocksOnRetrieve() *google_protobuf.BoolValue { + if m != nil { + return m.CacheBlocksOnRetrieve + } + return nil +} + type Registry struct { Namespaces map[string]*NamespaceOptions `protobuf:"bytes,1,rep,name=namespaces" json:"namespaces,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` } @@ -505,6 +513,16 @@ func (m *NamespaceOptions) MarshalTo(dAtA []byte) (int, error) { } i += n4 } + if m.CacheBlocksOnRetrieve != nil { + dAtA[i] = 0x62 + i++ + i = encodeVarintNamespace(dAtA, i, uint64(m.CacheBlocksOnRetrieve.Size())) + n5, err := m.CacheBlocksOnRetrieve.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + } return i, nil } @@ -543,11 +561,11 @@ func (m *Registry) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintNamespace(dAtA, i, uint64(v.Size())) - n5, err := v.MarshalTo(dAtA[i:]) + n6, err := v.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n5 + i += n6 } } } @@ -573,21 +591,21 @@ func (m *NamespaceRuntimeOptions) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintNamespace(dAtA, i, uint64(m.WriteIndexingPerCPUConcurrency.Size())) - n6, err := m.WriteIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) + n7, err := m.WriteIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n7 } if m.FlushIndexingPerCPUConcurrency != nil { dAtA[i] = 0x12 i++ i = encodeVarintNamespace(dAtA, i, uint64(m.FlushIndexingPerCPUConcurrency.Size())) - n7, err := m.FlushIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) + n8, err := m.FlushIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n8 } return i, nil } @@ -680,6 +698,10 @@ func (m *NamespaceOptions) Size() (n int) { l = m.RuntimeOptions.Size() n += 1 + l + sovNamespace(uint64(l)) } + if m.CacheBlocksOnRetrieve != nil { + l = m.CacheBlocksOnRetrieve.Size() + n += 1 + l + sovNamespace(uint64(l)) + } return n } @@ -1303,6 +1325,39 @@ func (m *NamespaceOptions) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CacheBlocksOnRetrieve", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNamespace + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CacheBlocksOnRetrieve == nil { + m.CacheBlocksOnRetrieve = &google_protobuf.BoolValue{} + } + if err := m.CacheBlocksOnRetrieve.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipNamespace(dAtA[iNdEx:]) @@ -1723,48 +1778,50 @@ func init() { } var fileDescriptorNamespace = []byte{ - // 679 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x5d, 0x6e, 0xd3, 0x4c, - 0x14, 0xfd, 0x9c, 0xf4, 0x27, 0x9d, 0xf4, 0x27, 0xdf, 0x08, 0xa9, 0x51, 0x40, 0x51, 0x65, 0x10, - 0x8a, 0x10, 0x8a, 0x45, 0xfa, 0x82, 0x40, 0xaa, 0x54, 0xd2, 0x52, 0x81, 0x50, 0x89, 0xa6, 0xfc, - 0x48, 0x7d, 0x1b, 0xdb, 0x37, 0x8e, 0x55, 0x7b, 0xc6, 0x9a, 0x19, 0xd3, 0x86, 0x35, 0xf0, 0xc0, - 0x3e, 0xd8, 0x48, 0x1f, 0x59, 0x02, 0x2a, 0x62, 0x1f, 0xc8, 0x33, 0x38, 0xb5, 0x9d, 0xb6, 0x54, - 0xbc, 0x44, 0xce, 0xb9, 0xe7, 0xde, 0x63, 0xdf, 0x73, 0x66, 0xd0, 0x41, 0x10, 0xaa, 0x49, 0xea, - 0xf6, 0x3d, 0x1e, 0x3b, 0xf1, 0xb6, 0xef, 0x3a, 0xf1, 0xb6, 0x23, 0x85, 0xe7, 0xf8, 0x2e, 0xe3, - 0x3e, 0x38, 0x01, 0x30, 0x10, 0x54, 0x81, 0xef, 0x24, 0x82, 0x2b, 0xee, 0x30, 0x1a, 0x83, 0x4c, - 0xa8, 0x07, 0x97, 0x4f, 0x7d, 0x5d, 0xc1, 0x2b, 0x33, 0xa0, 0xd3, 0x0d, 0x38, 0x0f, 0x22, 0x30, - 0x2d, 0x6e, 0x3a, 0x76, 0x4e, 0x05, 0x4d, 0x12, 0x10, 0xd2, 0x50, 0x3b, 0x7b, 0xff, 0xaa, 0x29, - 0xbd, 0x09, 0xc4, 0xd4, 0x4c, 0xb1, 0xbf, 0xd4, 0x51, 0x8b, 0x80, 0x02, 0xa6, 0x42, 0xce, 0xde, - 0x26, 0xd9, 0xaf, 0xc4, 0x03, 0x74, 0x47, 0xe4, 0xd8, 0x08, 0x44, 0xc8, 0xfd, 0x43, 0xca, 0xb8, - 0x6c, 0x5b, 0x5b, 0x56, 0xaf, 0x4e, 0xae, 0xac, 0xe1, 0x87, 0x68, 0xdd, 0x8d, 0xb8, 0x77, 0x72, - 0x14, 0x7e, 0x06, 0xc3, 0xae, 0x69, 0x76, 0x05, 0xc5, 0x8f, 0xd1, 0xff, 0x6e, 0x3a, 0x1e, 0x83, - 0x78, 0x99, 0xaa, 0x54, 0xfc, 0xa1, 0xd6, 0x35, 0x75, 0xbe, 0x80, 0x7b, 0x68, 0xc3, 0x80, 0x23, - 0x2a, 0x95, 0xe1, 0x2e, 0x68, 0x6e, 0x15, 0xd6, 0xcc, 0x4c, 0x69, 0x8f, 0x2a, 0xba, 0x7f, 0x96, - 0x84, 0x62, 0xda, 0x5e, 0xdc, 0xb2, 0x7a, 0x0d, 0x52, 0x85, 0xf1, 0x31, 0xea, 0x55, 0xa0, 0xdd, - 0xb1, 0x02, 0x71, 0xc8, 0xd5, 0xae, 0xe7, 0x81, 0x94, 0xc5, 0x2f, 0x5e, 0xd2, 0x62, 0xb7, 0xe6, - 0xe3, 0x1d, 0xd4, 0x19, 0xeb, 0xd7, 0x27, 0x57, 0xed, 0x6f, 0x59, 0x4f, 0xbb, 0x81, 0x61, 0x8f, - 0xd0, 0xea, 0x2b, 0xe6, 0xc3, 0x59, 0xee, 0x44, 0x1b, 0x2d, 0x03, 0xa3, 0x6e, 0x04, 0xbe, 0x5e, - 0x7e, 0x83, 0xe4, 0x7f, 0x6f, 0xbb, 0x6f, 0xfb, 0x7c, 0x01, 0xb5, 0x0e, 0x73, 0xef, 0xf3, 0xb1, - 0x8f, 0x50, 0xcb, 0xe5, 0x5c, 0x49, 0x25, 0x68, 0xb2, 0x5f, 0x9a, 0x3f, 0x87, 0x63, 0x1b, 0xad, - 0x8e, 0xa3, 0x54, 0x4e, 0x72, 0x5e, 0x4d, 0xf3, 0x4a, 0x58, 0x66, 0xea, 0xa9, 0x08, 0x15, 0xc8, - 0x77, 0x7c, 0xc8, 0xe3, 0x38, 0x54, 0x6f, 0x78, 0xa0, 0x4d, 0x6d, 0x90, 0xf9, 0x42, 0xf6, 0xea, - 0x5e, 0x04, 0x94, 0xa5, 0x33, 0xed, 0x05, 0x4d, 0xad, 0xa0, 0xf8, 0x01, 0x5a, 0x13, 0x90, 0xd0, - 0x50, 0xe4, 0x34, 0x63, 0x68, 0x19, 0xc4, 0x07, 0xa8, 0x25, 0x2a, 0x01, 0xd6, 0xb6, 0x35, 0x07, - 0x77, 0xfb, 0x97, 0xc7, 0xab, 0x9a, 0x71, 0x32, 0xd7, 0x94, 0x25, 0x48, 0x32, 0x9a, 0xc8, 0x09, - 0x57, 0xb9, 0xe0, 0xb2, 0x49, 0x50, 0x05, 0xc6, 0xcf, 0xd1, 0x6a, 0x58, 0x70, 0xa9, 0xdd, 0xd0, - 0x72, 0x9b, 0x05, 0xb9, 0xa2, 0x89, 0xa4, 0x44, 0xc6, 0x3b, 0x68, 0xcd, 0x9c, 0xc0, 0xbc, 0x7b, - 0x45, 0x77, 0xb7, 0x0b, 0xdd, 0x47, 0xc5, 0x3a, 0x29, 0xd3, 0xb3, 0x5d, 0x7b, 0x3c, 0xf2, 0x3f, - 0xea, 0xb5, 0xe6, 0x2f, 0x8a, 0xcc, 0xae, 0xe7, 0x0a, 0xf8, 0x35, 0x5a, 0x17, 0x29, 0x53, 0x61, - 0x9c, 0x7b, 0xdf, 0x6e, 0x6a, 0x39, 0xbb, 0x20, 0x37, 0x8b, 0x07, 0x29, 0x31, 0x49, 0xa5, 0xd3, - 0xfe, 0x66, 0xa1, 0x06, 0x81, 0x20, 0x94, 0x4a, 0x4c, 0xf1, 0x10, 0xa1, 0xd9, 0x84, 0xec, 0x66, - 0xa8, 0xf7, 0x9a, 0x83, 0xfb, 0xa5, 0x85, 0x1b, 0xe2, 0xe5, 0x74, 0xb9, 0xcf, 0x94, 0x98, 0x92, - 0x42, 0x5b, 0xe7, 0x18, 0x6d, 0x54, 0xca, 0xb8, 0x85, 0xea, 0x27, 0x30, 0xd5, 0x69, 0x5c, 0x21, - 0xd9, 0x23, 0x7e, 0x82, 0x16, 0x3f, 0xd1, 0x28, 0x05, 0x9d, 0xbc, 0xb2, 0xab, 0xd5, 0x60, 0x13, - 0xc3, 0x7c, 0x56, 0x7b, 0x6a, 0xd9, 0xbf, 0x2c, 0xb4, 0x79, 0xcd, 0x97, 0x61, 0x1f, 0x75, 0x75, - 0x2c, 0xb5, 0x4d, 0x21, 0x0b, 0x46, 0x20, 0x86, 0xa3, 0xf7, 0x43, 0xce, 0xbc, 0x54, 0x08, 0x60, - 0x9e, 0xd1, 0x6f, 0x0e, 0xee, 0xf5, 0xcd, 0x25, 0xdc, 0xcf, 0x2f, 0xe1, 0xfe, 0x1e, 0x4f, 0xdd, - 0x08, 0x3e, 0x64, 0x2a, 0xe4, 0x2f, 0x33, 0x32, 0x15, 0x7d, 0x4a, 0xae, 0x57, 0xa9, 0xdd, 0x46, - 0xe5, 0xe6, 0x19, 0x2f, 0x5a, 0xe7, 0x17, 0x5d, 0xeb, 0xfb, 0x45, 0xd7, 0xfa, 0x71, 0xd1, 0xb5, - 0xbe, 0xfe, 0xec, 0xfe, 0xe7, 0x2e, 0xe9, 0x39, 0xdb, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x18, - 0x2e, 0x5a, 0xd3, 0x96, 0x06, 0x00, 0x00, + // 715 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xdd, 0x6a, 0xdb, 0x4a, + 0x10, 0x3e, 0xb2, 0xf3, 0xe3, 0xac, 0x9d, 0xc4, 0x67, 0x39, 0x87, 0x18, 0x9f, 0x83, 0x09, 0x6e, + 0x29, 0xa6, 0x14, 0x8b, 0x3a, 0x37, 0xa5, 0x85, 0x40, 0xe2, 0xa4, 0xa1, 0xa5, 0x24, 0x66, 0xd3, + 0x1f, 0xc8, 0xdd, 0x4a, 0x1a, 0xcb, 0x22, 0xd2, 0xae, 0xd8, 0x5d, 0x25, 0x71, 0x9f, 0xa1, 0x17, + 0xbd, 0xee, 0x2b, 0xf4, 0x45, 0x7a, 0xd9, 0x47, 0x28, 0x29, 0x7d, 0x8f, 0xa2, 0xdd, 0xca, 0x91, + 0xe4, 0x24, 0x0d, 0xbd, 0x31, 0xeb, 0x6f, 0xbe, 0x99, 0x6f, 0x34, 0xdf, 0xec, 0xa2, 0x03, 0x3f, + 0x50, 0x93, 0xc4, 0xe9, 0xbb, 0x3c, 0xb2, 0xa3, 0x2d, 0xcf, 0xb1, 0xa3, 0x2d, 0x5b, 0x0a, 0xd7, + 0xf6, 0x1c, 0xc6, 0x3d, 0xb0, 0x7d, 0x60, 0x20, 0xa8, 0x02, 0xcf, 0x8e, 0x05, 0x57, 0xdc, 0x66, + 0x34, 0x02, 0x19, 0x53, 0x17, 0xae, 0x4e, 0x7d, 0x1d, 0xc1, 0x2b, 0x33, 0xa0, 0xdd, 0xf1, 0x39, + 0xf7, 0x43, 0x30, 0x29, 0x4e, 0x32, 0xb6, 0xcf, 0x05, 0x8d, 0x63, 0x10, 0xd2, 0x50, 0xdb, 0x7b, + 0x7f, 0xaa, 0x29, 0xdd, 0x09, 0x44, 0xd4, 0x54, 0xe9, 0x7e, 0xa8, 0xa2, 0x26, 0x01, 0x05, 0x4c, + 0x05, 0x9c, 0x1d, 0xc5, 0xe9, 0xaf, 0xc4, 0x03, 0xf4, 0x8f, 0xc8, 0xb0, 0x11, 0x88, 0x80, 0x7b, + 0x87, 0x94, 0x71, 0xd9, 0xb2, 0x36, 0xad, 0x5e, 0x95, 0x5c, 0x1b, 0xc3, 0x0f, 0xd0, 0x9a, 0x13, + 0x72, 0xf7, 0xf4, 0x38, 0x78, 0x0f, 0x86, 0x5d, 0xd1, 0xec, 0x12, 0x8a, 0x1f, 0xa1, 0xbf, 0x9d, + 0x64, 0x3c, 0x06, 0xf1, 0x3c, 0x51, 0x89, 0xf8, 0x45, 0xad, 0x6a, 0xea, 0x7c, 0x00, 0xf7, 0xd0, + 0xba, 0x01, 0x47, 0x54, 0x2a, 0xc3, 0x5d, 0xd0, 0xdc, 0x32, 0xac, 0x99, 0xa9, 0xd2, 0x1e, 0x55, + 0x74, 0xff, 0x22, 0x0e, 0xc4, 0xb4, 0xb5, 0xb8, 0x69, 0xf5, 0x6a, 0xa4, 0x0c, 0xe3, 0x13, 0xd4, + 0x2b, 0x41, 0x3b, 0x63, 0x05, 0xe2, 0x90, 0xab, 0x1d, 0xd7, 0x05, 0x29, 0xf3, 0x5f, 0xbc, 0xa4, + 0xc5, 0xee, 0xcc, 0xc7, 0xdb, 0xa8, 0x3d, 0xd6, 0xed, 0x93, 0xeb, 0xe6, 0xb7, 0xac, 0xab, 0xdd, + 0xc2, 0xe8, 0x8e, 0x50, 0xe3, 0x05, 0xf3, 0xe0, 0x22, 0x73, 0xa2, 0x85, 0x96, 0x81, 0x51, 0x27, + 0x04, 0x4f, 0x0f, 0xbf, 0x46, 0xb2, 0xbf, 0x77, 0x9d, 0x77, 0xf7, 0xd3, 0x22, 0x6a, 0x1e, 0x66, + 0xde, 0x67, 0x65, 0x1f, 0xa2, 0xa6, 0xc3, 0xb9, 0x92, 0x4a, 0xd0, 0x78, 0xbf, 0x50, 0x7f, 0x0e, + 0xc7, 0x5d, 0xd4, 0x18, 0x87, 0x89, 0x9c, 0x64, 0xbc, 0x8a, 0xe6, 0x15, 0xb0, 0xd4, 0xd4, 0x73, + 0x11, 0x28, 0x90, 0xaf, 0xf9, 0x90, 0x47, 0x51, 0xa0, 0x5e, 0x71, 0x5f, 0x9b, 0x5a, 0x23, 0xf3, + 0x81, 0xb4, 0x75, 0x37, 0x04, 0xca, 0x92, 0x99, 0xf6, 0x82, 0xa6, 0x96, 0x50, 0x7c, 0x1f, 0xad, + 0x0a, 0x88, 0x69, 0x20, 0x32, 0x9a, 0x31, 0xb4, 0x08, 0xe2, 0x03, 0xd4, 0x14, 0xa5, 0x05, 0xd6, + 0xb6, 0xd5, 0x07, 0xff, 0xf5, 0xaf, 0xae, 0x57, 0x79, 0xc7, 0xc9, 0x5c, 0x52, 0xba, 0x41, 0x92, + 0xd1, 0x58, 0x4e, 0xb8, 0xca, 0x04, 0x97, 0xcd, 0x06, 0x95, 0x60, 0xfc, 0x0c, 0x35, 0x82, 0x9c, + 0x4b, 0xad, 0x9a, 0x96, 0xdb, 0xc8, 0xc9, 0xe5, 0x4d, 0x24, 0x05, 0x32, 0xde, 0x46, 0xab, 0xe6, + 0x06, 0x66, 0xd9, 0x2b, 0x3a, 0xbb, 0x95, 0xcb, 0x3e, 0xce, 0xc7, 0x49, 0x91, 0x9e, 0xce, 0xda, + 0xe5, 0xa1, 0xf7, 0x4e, 0x8f, 0x35, 0x6b, 0x14, 0x99, 0x59, 0xcf, 0x05, 0xf0, 0x4b, 0xb4, 0x26, + 0x12, 0xa6, 0x82, 0x28, 0xf3, 0xbe, 0x55, 0xd7, 0x72, 0xdd, 0x9c, 0xdc, 0x6c, 0x3d, 0x48, 0x81, + 0x49, 0x4a, 0x99, 0x78, 0x84, 0xfe, 0x75, 0xa9, 0x3b, 0x81, 0xdd, 0x74, 0xc3, 0xe4, 0x11, 0x23, + 0xa0, 0x44, 0x00, 0x67, 0xd0, 0x6a, 0xe8, 0x92, 0xed, 0xbe, 0x79, 0xb1, 0xfa, 0xd9, 0x8b, 0xd5, + 0xdf, 0xe5, 0x3c, 0x7c, 0x4b, 0xc3, 0x04, 0xc8, 0xf5, 0x89, 0xdd, 0xcf, 0x16, 0xaa, 0x11, 0xf0, + 0x03, 0xa9, 0xc4, 0x14, 0x0f, 0x11, 0x9a, 0xf5, 0x94, 0xbe, 0x35, 0xd5, 0x5e, 0x7d, 0x70, 0xaf, + 0x60, 0xa1, 0x21, 0x5e, 0xf5, 0x2b, 0xf7, 0x99, 0x12, 0x53, 0x92, 0x4b, 0x6b, 0x9f, 0xa0, 0xf5, + 0x52, 0x18, 0x37, 0x51, 0xf5, 0x14, 0xa6, 0x7a, 0xbf, 0x57, 0x48, 0x7a, 0xc4, 0x8f, 0xd1, 0xe2, + 0x59, 0xda, 0x96, 0xde, 0xe5, 0xe2, 0x9e, 0x94, 0xaf, 0x0a, 0x31, 0xcc, 0xa7, 0x95, 0x27, 0x56, + 0xf7, 0x87, 0x85, 0x36, 0x6e, 0x98, 0x15, 0xf6, 0x50, 0x47, 0x2f, 0xba, 0x36, 0x3e, 0x60, 0xfe, + 0x08, 0xc4, 0x70, 0xf4, 0x66, 0xc8, 0x99, 0x9b, 0x08, 0x01, 0xcc, 0x35, 0xfa, 0xf5, 0xc1, 0xff, + 0x73, 0x43, 0xda, 0xe3, 0x89, 0x13, 0x82, 0x19, 0xd3, 0x6f, 0x6a, 0xa4, 0x2a, 0xfa, 0xde, 0xdd, + 0xac, 0x52, 0xb9, 0x8b, 0xca, 0xed, 0x35, 0x76, 0x9b, 0x5f, 0x2e, 0x3b, 0xd6, 0xd7, 0xcb, 0x8e, + 0xf5, 0xed, 0xb2, 0x63, 0x7d, 0xfc, 0xde, 0xf9, 0xcb, 0x59, 0xd2, 0x75, 0xb6, 0x7e, 0x06, 0x00, + 0x00, 0xff, 0xff, 0xd5, 0x7d, 0x11, 0xd6, 0xe8, 0x06, 0x00, 0x00, } diff --git a/src/dbnode/generated/proto/namespace/namespace.proto b/src/dbnode/generated/proto/namespace/namespace.proto index 17e82a57dd..45410b450a 100644 --- a/src/dbnode/generated/proto/namespace/namespace.proto +++ b/src/dbnode/generated/proto/namespace/namespace.proto @@ -21,17 +21,18 @@ message IndexOptions { } message NamespaceOptions { - bool bootstrapEnabled = 1; - bool flushEnabled = 2; - bool writesToCommitLog = 3; - bool cleanupEnabled = 4; - bool repairEnabled = 5; - RetentionOptions retentionOptions = 6; - bool snapshotEnabled = 7; - IndexOptions indexOptions = 8; - SchemaOptions schemaOptions = 9; - bool coldWritesEnabled = 10; - NamespaceRuntimeOptions runtimeOptions = 11; + bool bootstrapEnabled = 1; + bool flushEnabled = 2; + bool writesToCommitLog = 3; + bool cleanupEnabled = 4; + bool repairEnabled = 5; + RetentionOptions retentionOptions = 6; + bool snapshotEnabled = 7; + IndexOptions indexOptions = 8; + SchemaOptions schemaOptions = 9; + bool coldWritesEnabled = 10; + NamespaceRuntimeOptions runtimeOptions = 11; + google.protobuf.BoolValue cacheBlocksOnRetrieve = 12; } message Registry { diff --git a/src/dbnode/namespace/config.go b/src/dbnode/namespace/config.go index 5374e02888..82115b8eb3 100644 --- a/src/dbnode/namespace/config.go +++ b/src/dbnode/namespace/config.go @@ -48,15 +48,16 @@ func (m *MapConfiguration) Map() (Map, error) { // MetadataConfiguration is the configuration for a single namespace type MetadataConfiguration struct { - ID string `yaml:"id" validate:"nonzero"` - BootstrapEnabled *bool `yaml:"bootstrapEnabled"` - FlushEnabled *bool `yaml:"flushEnabled"` - WritesToCommitLog *bool `yaml:"writesToCommitLog"` - CleanupEnabled *bool `yaml:"cleanupEnabled"` - RepairEnabled *bool `yaml:"repairEnabled"` - ColdWritesEnabled *bool `yaml:"coldWritesEnabled"` - Retention retention.Configuration `yaml:"retention" validate:"nonzero"` - Index IndexConfiguration `yaml:"index"` + ID string `yaml:"id" validate:"nonzero"` + BootstrapEnabled *bool `yaml:"bootstrapEnabled"` + FlushEnabled *bool `yaml:"flushEnabled"` + WritesToCommitLog *bool `yaml:"writesToCommitLog"` + CleanupEnabled *bool `yaml:"cleanupEnabled"` + RepairEnabled *bool `yaml:"repairEnabled"` + ColdWritesEnabled *bool `yaml:"coldWritesEnabled"` + CacheBlocksOnRetrieve *bool `yaml:"cacheBlocksOnRetrieve"` + Retention retention.Configuration `yaml:"retention" validate:"nonzero"` + Index IndexConfiguration `yaml:"index"` } // Metadata returns a Metadata corresponding to the receiver struct @@ -84,6 +85,9 @@ func (mc *MetadataConfiguration) Metadata() (Metadata, error) { if v := mc.ColdWritesEnabled; v != nil { opts = opts.SetColdWritesEnabled(*v) } + if v := mc.CacheBlocksOnRetrieve; v != nil { + opts = opts.SetCacheBlocksOnRetrieve(*v) + } return NewMetadata(ident.StringID(mc.ID), opts) } diff --git a/src/dbnode/namespace/convert.go b/src/dbnode/namespace/convert.go index 853485a8bc..8746eed342 100644 --- a/src/dbnode/namespace/convert.go +++ b/src/dbnode/namespace/convert.go @@ -110,12 +110,12 @@ func ToMetadata( return nil, errNamespaceNil } - ropts, err := ToRetention(opts.RetentionOptions) + rOpts, err := ToRetention(opts.RetentionOptions) if err != nil { return nil, err } - iopts, err := ToIndexOptions(opts.IndexOptions) + iOpts, err := ToIndexOptions(opts.IndexOptions) if err != nil { return nil, err } @@ -130,7 +130,7 @@ func ToMetadata( return nil, err } - mopts := NewOptions(). + mOpts := NewOptions(). SetBootstrapEnabled(opts.BootstrapEnabled). SetFlushEnabled(opts.FlushEnabled). SetCleanupEnabled(opts.CleanupEnabled). @@ -138,16 +138,20 @@ func ToMetadata( SetWritesToCommitLog(opts.WritesToCommitLog). SetSnapshotEnabled(opts.SnapshotEnabled). SetSchemaHistory(sr). - SetRetentionOptions(ropts). - SetIndexOptions(iopts). + SetRetentionOptions(rOpts). + SetIndexOptions(iOpts). SetColdWritesEnabled(opts.ColdWritesEnabled). SetRuntimeOptions(runtimeOpts) - if err := mopts.Validate(); err != nil { + if opts.CacheBlocksOnRetrieve != nil { + mOpts = mOpts.SetCacheBlocksOnRetrieve(opts.CacheBlocksOnRetrieve.Value) + } + + if err := mOpts.Validate(); err != nil { return nil, err } - return NewMetadata(ident.StringID(id), mopts) + return NewMetadata(ident.StringID(id), mOpts) } // ToProto converts Map to nsproto.Registry @@ -202,8 +206,9 @@ func OptionsToProto(opts Options) *nsproto.NamespaceOptions { Enabled: iopts.Enabled(), BlockSizeNanos: iopts.BlockSize().Nanoseconds(), }, - ColdWritesEnabled: opts.ColdWritesEnabled(), - RuntimeOptions: toRuntimeOptions(opts.RuntimeOptions()), + ColdWritesEnabled: opts.ColdWritesEnabled(), + RuntimeOptions: toRuntimeOptions(opts.RuntimeOptions()), + CacheBlocksOnRetrieve: &protobuftypes.BoolValue{Value: opts.CacheBlocksOnRetrieve()}, } } diff --git a/src/dbnode/namespace/convert_test.go b/src/dbnode/namespace/convert_test.go index e0914a369a..f38bf2ef43 100644 --- a/src/dbnode/namespace/convert_test.go +++ b/src/dbnode/namespace/convert_test.go @@ -25,10 +25,11 @@ import ( "time" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" - "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/x/ident" + protobuftypes "github.com/gogo/protobuf/types" "github.com/stretchr/testify/require" ) @@ -55,13 +56,14 @@ var ( validNamespaceOpts = []nsproto.NamespaceOptions{ nsproto.NamespaceOptions{ - BootstrapEnabled: true, - FlushEnabled: true, - WritesToCommitLog: true, - CleanupEnabled: true, - RepairEnabled: true, - RetentionOptions: &validRetentionOpts, - SchemaOptions: testSchemaOptions, + BootstrapEnabled: true, + FlushEnabled: true, + WritesToCommitLog: true, + CleanupEnabled: true, + RepairEnabled: true, + CacheBlocksOnRetrieve: &protobuftypes.BoolValue{Value: false}, + RetentionOptions: &validRetentionOpts, + SchemaOptions: testSchemaOptions, }, nsproto.NamespaceOptions{ BootstrapEnabled: true, @@ -69,8 +71,9 @@ var ( WritesToCommitLog: true, CleanupEnabled: true, RepairEnabled: true, - RetentionOptions: &validRetentionOpts, - IndexOptions: &validIndexOpts, + // Explicitly not setting CacheBlocksOnRetrieve here to test defaulting to true when not set. + RetentionOptions: &validRetentionOpts, + IndexOptions: &validIndexOpts, }, } @@ -275,11 +278,17 @@ func assertEqualMetadata(t *testing.T, name string, expected nsproto.NamespaceOp require.Equal(t, name, observed.ID().String()) opts := observed.Options() + expectedCacheBlocksOnRetrieve := true + if expected.CacheBlocksOnRetrieve != nil { + expectedCacheBlocksOnRetrieve = expected.CacheBlocksOnRetrieve.Value + } + require.Equal(t, expected.BootstrapEnabled, opts.BootstrapEnabled()) require.Equal(t, expected.FlushEnabled, opts.FlushEnabled()) require.Equal(t, expected.WritesToCommitLog, opts.WritesToCommitLog()) require.Equal(t, expected.CleanupEnabled, opts.CleanupEnabled()) require.Equal(t, expected.RepairEnabled, opts.RepairEnabled()) + require.Equal(t, expectedCacheBlocksOnRetrieve, opts.CacheBlocksOnRetrieve()) expectedSchemaReg, err := namespace.LoadSchemaHistory(expected.SchemaOptions) require.NoError(t, err) require.NotNil(t, expectedSchemaReg) diff --git a/src/dbnode/namespace/namespace_mock.go b/src/dbnode/namespace/namespace_mock.go index e8d41e1dce..0a9d45196e 100644 --- a/src/dbnode/namespace/namespace_mock.go +++ b/src/dbnode/namespace/namespace_mock.go @@ -284,6 +284,34 @@ func (mr *MockOptionsMockRecorder) ColdWritesEnabled() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdWritesEnabled", reflect.TypeOf((*MockOptions)(nil).ColdWritesEnabled)) } +// SetCacheBlocksOnRetrieve mocks base method +func (m *MockOptions) SetCacheBlocksOnRetrieve(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetCacheBlocksOnRetrieve", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetCacheBlocksOnRetrieve indicates an expected call of SetCacheBlocksOnRetrieve +func (mr *MockOptionsMockRecorder) SetCacheBlocksOnRetrieve(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCacheBlocksOnRetrieve", reflect.TypeOf((*MockOptions)(nil).SetCacheBlocksOnRetrieve), value) +} + +// CacheBlocksOnRetrieve mocks base method +func (m *MockOptions) CacheBlocksOnRetrieve() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CacheBlocksOnRetrieve") + ret0, _ := ret[0].(bool) + return ret0 +} + +// CacheBlocksOnRetrieve indicates an expected call of CacheBlocksOnRetrieve +func (mr *MockOptionsMockRecorder) CacheBlocksOnRetrieve() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheBlocksOnRetrieve", reflect.TypeOf((*MockOptions)(nil).CacheBlocksOnRetrieve)) +} + // SetRetentionOptions mocks base method func (m *MockOptions) SetRetentionOptions(value retention.Options) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/namespace/options.go b/src/dbnode/namespace/options.go index f8cc25f312..7816381e6a 100644 --- a/src/dbnode/namespace/options.go +++ b/src/dbnode/namespace/options.go @@ -47,6 +47,9 @@ const ( // Namespace with cold writes disabled by default. defaultColdWritesEnabled = false + + // Namespace caches retrieved blocks by default. + defaultCacheBlocksOnRetrieve = true ) var ( @@ -57,17 +60,18 @@ var ( ) type options struct { - bootstrapEnabled bool - flushEnabled bool - snapshotEnabled bool - writesToCommitLog bool - cleanupEnabled bool - repairEnabled bool - coldWritesEnabled bool - retentionOpts retention.Options - indexOpts IndexOptions - schemaHis SchemaHistory - runtimeOpts RuntimeOptions + bootstrapEnabled bool + flushEnabled bool + snapshotEnabled bool + writesToCommitLog bool + cleanupEnabled bool + repairEnabled bool + coldWritesEnabled bool + cacheBlocksOnRetrieve bool + retentionOpts retention.Options + indexOpts IndexOptions + schemaHis SchemaHistory + runtimeOpts RuntimeOptions } // NewSchemaHistory returns an empty schema history. @@ -78,17 +82,18 @@ func NewSchemaHistory() SchemaHistory { // NewOptions creates a new namespace options func NewOptions() Options { return &options{ - bootstrapEnabled: defaultBootstrapEnabled, - flushEnabled: defaultFlushEnabled, - snapshotEnabled: defaultSnapshotEnabled, - writesToCommitLog: defaultWritesToCommitLog, - cleanupEnabled: defaultCleanupEnabled, - repairEnabled: defaultRepairEnabled, - coldWritesEnabled: defaultColdWritesEnabled, - retentionOpts: retention.NewOptions(), - indexOpts: NewIndexOptions(), - schemaHis: NewSchemaHistory(), - runtimeOpts: NewRuntimeOptions(), + bootstrapEnabled: defaultBootstrapEnabled, + flushEnabled: defaultFlushEnabled, + snapshotEnabled: defaultSnapshotEnabled, + writesToCommitLog: defaultWritesToCommitLog, + cleanupEnabled: defaultCleanupEnabled, + repairEnabled: defaultRepairEnabled, + coldWritesEnabled: defaultColdWritesEnabled, + cacheBlocksOnRetrieve: defaultCacheBlocksOnRetrieve, + retentionOpts: retention.NewOptions(), + indexOpts: NewIndexOptions(), + schemaHis: NewSchemaHistory(), + runtimeOpts: NewRuntimeOptions(), } } @@ -128,6 +133,7 @@ func (o *options) Equal(value Options) bool { o.cleanupEnabled == value.CleanupEnabled() && o.repairEnabled == value.RepairEnabled() && o.coldWritesEnabled == value.ColdWritesEnabled() && + o.cacheBlocksOnRetrieve == value.CacheBlocksOnRetrieve() && o.retentionOpts.Equal(value.RetentionOptions()) && o.indexOpts.Equal(value.IndexOptions()) && o.schemaHis.Equal(value.SchemaHistory()) && @@ -204,6 +210,16 @@ func (o *options) ColdWritesEnabled() bool { return o.coldWritesEnabled } +func (o *options) SetCacheBlocksOnRetrieve(value bool) Options { + opts := *o + opts.cacheBlocksOnRetrieve = value + return &opts +} + +func (o *options) CacheBlocksOnRetrieve() bool { + return o.cacheBlocksOnRetrieve +} + func (o *options) SetRetentionOptions(value retention.Options) Options { opts := *o opts.retentionOpts = value diff --git a/src/dbnode/namespace/types.go b/src/dbnode/namespace/types.go index acd61360fe..ff5177c000 100644 --- a/src/dbnode/namespace/types.go +++ b/src/dbnode/namespace/types.go @@ -80,6 +80,14 @@ type Options interface { // ColdWritesEnabled returns whether cold writes are enabled for this namespace. ColdWritesEnabled() bool + // SetCacheBlocksOnRetrieve sets whether to cache blocks from this namespace when retrieved. + // If global CacheBlocksOnRetrieve option in config.BlockRetrievePolicy is set to false, + // then that will override any namespace-specific CacheBlocksOnRetrieve options set to true. + SetCacheBlocksOnRetrieve(value bool) Options + + // CacheBlocksOnRetrieve returns whether to cache blocks from this namespace when retrieved. + CacheBlocksOnRetrieve() bool + // SetRetentionOptions sets the retention options for this namespace SetRetentionOptions(value retention.Options) Options diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 7332e4705a..d678fd58dd 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -93,7 +93,8 @@ type blockRetriever struct { idPool ident.Pool nsMetadata namespace.Metadata - blockSize time.Duration + blockSize time.Duration + nsCacheBlocksOnRetrieve bool status blockRetrieverStatus reqsByShardIdx []*shardRetrieveRequests @@ -151,8 +152,9 @@ func (r *blockRetriever) Open( r.status = blockRetrieverOpen r.seekerMgr = seekerMgr - // Cache blockSize result + // Cache blockSize result and namespace specific block caching option r.blockSize = ns.Options().RetentionOptions().BlockSize() + r.nsCacheBlocksOnRetrieve = ns.Options().CacheBlocksOnRetrieve() for i := 0; i < r.opts.FetchConcurrency(); i++ { go r.fetchLoop(seekerMgr) @@ -339,6 +341,8 @@ func (r *blockRetriever) fetchBatch( tagDecoderPool := r.fsOpts.TagDecoderPool() + blockCachingEnabled := r.opts.CacheBlocksOnRetrieve() && r.nsCacheBlocksOnRetrieve + // Seek and execute all requests for _, req := range reqs { var ( @@ -366,7 +370,7 @@ func (r *blockRetriever) fetchBatch( } // We don't need to call onRetrieve.OnRetrieveBlock if the ID was not found. - callOnRetrieve := req.onRetrieve != nil && req.foundAndHasNoError() + callOnRetrieve := blockCachingEnabled && req.onRetrieve != nil && req.foundAndHasNoError() if callOnRetrieve { // NB(r): Need to also trigger callback with a copy of the data. // This is used by the database to cache the in memory data for diff --git a/src/dbnode/persist/fs/retriever_options.go b/src/dbnode/persist/fs/retriever_options.go index 8e0d9920b2..5c29cbd5ec 100644 --- a/src/dbnode/persist/fs/retriever_options.go +++ b/src/dbnode/persist/fs/retriever_options.go @@ -34,6 +34,7 @@ import ( var ( // Allow max concurrency to match available CPUs. defaultFetchConcurrency = runtime.NumCPU() + defaultCacheOnRetrieve = true errBlockLeaseManagerNotSet = errors.New("block lease manager is not set") ) @@ -42,6 +43,7 @@ type blockRetrieverOptions struct { requestPool RetrieveRequestPool bytesPool pool.CheckedBytesPool fetchConcurrency int + cacheOnRetrieve bool identifierPool ident.Pool blockLeaseManager block.LeaseManager queryLimits limits.QueryLimits @@ -66,6 +68,7 @@ func NewBlockRetrieverOptions() BlockRetrieverOptions { requestPool: requestPool, bytesPool: bytesPool, fetchConcurrency: defaultFetchConcurrency, + cacheOnRetrieve: defaultCacheOnRetrieve, identifierPool: ident.NewPool(bytesPool, ident.PoolOptions{}), queryLimits: limits.NoOpQueryLimits(), } @@ -110,6 +113,16 @@ func (o *blockRetrieverOptions) FetchConcurrency() int { return o.fetchConcurrency } +func (o *blockRetrieverOptions) SetCacheBlocksOnRetrieve(value bool) BlockRetrieverOptions { + opts := *o + opts.cacheOnRetrieve = value + return &opts +} + +func (o *blockRetrieverOptions) CacheBlocksOnRetrieve() bool { + return o.cacheOnRetrieve +} + func (o *blockRetrieverOptions) SetIdentifierPool(value ident.Pool) BlockRetrieverOptions { opts := *o opts.identifierPool = value diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index b470e2af7f..f176e53323 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index/convert" @@ -67,6 +68,7 @@ type testCleanupFn func() func newOpenTestBlockRetriever( t *testing.T, + md namespace.Metadata, opts testBlockRetrieverOptions, ) (*blockRetriever, testCleanupFn) { require.NotNil(t, opts.retrieverOpts) @@ -88,7 +90,7 @@ func newOpenTestBlockRetriever( nsPath := NamespaceDataDirPath(opts.fsOpts.FilePathPrefix(), testNs1ID) require.NoError(t, os.MkdirAll(nsPath, opts.fsOpts.NewDirectoryMode())) - require.NoError(t, retriever.Open(testNs1Metadata(t), shardSet)) + require.NoError(t, retriever.Open(md, shardSet)) return retriever, func() { require.NoError(t, retriever.Close()) @@ -211,7 +213,7 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices shards: shards, } - retriever, cleanup := newOpenTestBlockRetriever(t, opts) + retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) defer cleanup() // Setup the open seeker function to fail sometimes to exercise that code path. @@ -553,7 +555,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { fsOpts: fsOpts, shards: []uint32{shard}, } - retriever, cleanup := newOpenTestBlockRetriever(t, opts) + retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) defer cleanup() // Write out a test file @@ -602,7 +604,7 @@ func TestBlockRetrieverOnlyCreatesTagItersIfTagsExists(t *testing.T) { fsOpts: fsOpts, shards: []uint32{shard}, } - retriever, cleanup := newOpenTestBlockRetriever(t, opts) + retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) defer cleanup() // Write out a test file. @@ -674,6 +676,105 @@ func TestBlockRetrieverOnlyCreatesTagItersIfTagsExists(t *testing.T) { require.NoError(t, err) } +// TestBlockRetrieverDoesNotInvokeOnRetrieveWithGlobalFlag verifies that the block retriever +// does not invoke the OnRetrieve block if the global CacheBlocksOnRetrieve is not enabled. +func TestBlockRetrieverDoesNotInvokeOnRetrieveWithGlobalFlag(t *testing.T) { + testBlockRetrieverOnRetrieve(t, false, true) +} + +// TestBlockRetrieverDoesNotInvokeOnRetrieveWithNamespacesFlag verifies that the block retriever +// does not invoke the OnRetrieve block if the namespace-specific CacheBlocksOnRetrieve is not enabled. +func TestBlockRetrieverDoesNotInvokeOnRetrieveWithNamespaceFlag(t *testing.T) { + testBlockRetrieverOnRetrieve(t, true, false) +} + +func TestBlockRetrieverDoesNotInvokeOnRetrieve(t *testing.T) { + testBlockRetrieverOnRetrieve(t, false, false) +} + +func TestBlockRetrieverDoesInvokeOnRetrieve(t *testing.T) { + testBlockRetrieverOnRetrieve(t, true, true) +} + +func testBlockRetrieverOnRetrieve(t *testing.T, globalFlag bool, nsFlag bool) { + // Make sure reader/writer are looking at the same test directory. + dir, err := ioutil.TempDir("", "testdb") + require.NoError(t, err) + defer os.RemoveAll(dir) + filePathPrefix := filepath.Join(dir, "") + + // Setup constants and config. + md, err := namespace.NewMetadata(testNs1ID, namespace.NewOptions(). + SetCacheBlocksOnRetrieve(nsFlag). + SetRetentionOptions(retention.NewOptions().SetBlockSize(testBlockSize)). + SetIndexOptions(namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(testBlockSize))) + require.NoError(t, err) + + fsOpts := testDefaultOpts.SetFilePathPrefix(filePathPrefix) + rOpts := md.Options().RetentionOptions() + nsCtx := namespace.NewContextFrom(md) + shard := uint32(0) + blockStart := time.Now().Truncate(rOpts.BlockSize()) + + // Setup the reader. + opts := testBlockRetrieverOptions{ + retrieverOpts: defaultTestBlockRetrieverOptions.SetCacheBlocksOnRetrieve(globalFlag), + fsOpts: fsOpts, + shards: []uint32{shard}, + } + retriever, cleanup := newOpenTestBlockRetriever(t, md, opts) + defer cleanup() + + // Write out a test file. + var ( + w, closer = newOpenTestWriter(t, fsOpts, shard, blockStart, 0) + tag = ident.Tag{ + Name: ident.StringID("name"), + Value: ident.StringID("value"), + } + tags = ident.NewTags(tag) + id = "foo" + ) + data := checked.NewBytes([]byte("Hello world!"), nil) + data.IncRef() + defer data.DecRef() + + metadata := persist.NewMetadataFromIDAndTags(ident.StringID(id), tags, + persist.MetadataOptions{}) + err = w.Write(metadata, data, digest.Checksum(data.Bytes())) + require.NoError(t, err) + closer() + + // Make sure we return the correct error if the ID does not exist + ctx := context.NewContext() + defer ctx.Close() + + onRetrieveCalled := false + retrieveFn := block.OnRetrieveBlockFn(func( + id ident.ID, + tagsIter ident.TagIterator, + startTime time.Time, + segment ts.Segment, + nsCtx namespace.Context, + ) { + onRetrieveCalled = true + }) + + segmentReader, err := retriever.Stream(ctx, shard, + ident.StringID("foo"), blockStart, retrieveFn, nsCtx) + + _, err = segmentReader.Segment() + require.NoError(t, err) + + if globalFlag && nsFlag { + require.True(t, onRetrieveCalled) + } else { + require.False(t, onRetrieveCalled) + } + + require.NoError(t, err) +} + // TestBlockRetrieverHandlesErrors verifies the behavior of the Stream() method // on the retriever in the case where the SeekIndexEntry function returns an // error. @@ -742,7 +843,7 @@ func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, newSeekerMgrFn: newSeekerMgr, shards: []uint32{shard}, } - retriever, cleanup := newOpenTestBlockRetriever(t, opts) + retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) defer cleanup() // Make sure we return the correct error. diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 6997b35c17..a9828cfe8f 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -551,6 +551,12 @@ type BlockRetrieverOptions interface { // FetchConcurrency returns the fetch concurrency. FetchConcurrency() int + // SetCacheBlocksOnRetrieve sets whether to cache blocks after retrieval at a global level. + SetCacheBlocksOnRetrieve(value bool) BlockRetrieverOptions + + // CacheBlocksOnRetrieve returns whether to cache blocks after retrieval at a global level. + CacheBlocksOnRetrieve() bool + // SetIdentifierPool sets the identifierPool. SetIdentifierPool(value ident.Pool) BlockRetrieverOptions diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 4b9279f837..9fde570a20 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -556,6 +556,9 @@ func Run(runOpts RunOptions) { if blockRetrieveCfg := cfg.BlockRetrieve; blockRetrieveCfg != nil { retrieverOpts = retrieverOpts. SetFetchConcurrency(blockRetrieveCfg.FetchConcurrency) + if blockRetrieveCfg.CacheBlocksOnRetrieve != nil { + retrieverOpts = retrieverOpts.SetCacheBlocksOnRetrieve(*blockRetrieveCfg.CacheBlocksOnRetrieve) + } } blockRetrieverMgr := block.NewDatabaseBlockRetrieverManager( func(md namespace.Metadata, shardSet sharding.ShardSet) (block.DatabaseBlockRetriever, error) { diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index 15ef14d6ff..c2a9b41083 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -152,6 +152,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, @@ -315,6 +316,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, @@ -430,6 +432,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, @@ -551,6 +554,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, @@ -802,6 +806,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, @@ -946,6 +951,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { "namespaces": { "testNamespace": { "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index 3655083719..a58eee35a7 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -112,11 +112,12 @@ func TestNamespaceAddHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "repairEnabled": true, + "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "repairEnabled": true, "retentionOptions": xjson.Map{ "retentionPeriodNanos": "172800000000000", "blockSizeNanos": "7200000000000", diff --git a/src/query/api/v1/handler/namespace/get_test.go b/src/query/api/v1/handler/namespace/get_test.go index 33890d1d31..f05a01fc48 100644 --- a/src/query/api/v1/handler/namespace/get_test.go +++ b/src/query/api/v1/handler/namespace/get_test.go @@ -118,12 +118,13 @@ func TestNamespaceGetHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "test": xjson.Map{ - "bootstrapEnabled": true, - "cleanupEnabled": false, - "coldWritesEnabled": false, - "flushEnabled": true, - "indexOptions": nil, - "repairEnabled": false, + "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": nil, + "cleanupEnabled": false, + "coldWritesEnabled": false, + "flushEnabled": true, + "indexOptions": nil, + "repairEnabled": false, "retentionOptions": xjson.Map{ "blockDataExpiry": true, "blockDataExpiryAfterNotAccessPeriodNanos": "3600000000000", @@ -198,12 +199,13 @@ func TestNamespaceGetHandlerWithDebug(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "test": xjson.Map{ - "bootstrapEnabled": true, - "cleanupEnabled": false, - "coldWritesEnabled": false, - "flushEnabled": true, - "indexOptions": nil, - "repairEnabled": false, + "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": nil, + "cleanupEnabled": false, + "coldWritesEnabled": false, + "flushEnabled": true, + "indexOptions": nil, + "repairEnabled": false, "retentionOptions": xjson.Map{ "blockDataExpiry": true, "blockDataExpiryAfterNotAccessPeriodDuration": "1h0m0s", diff --git a/src/query/api/v1/handler/namespace/update_test.go b/src/query/api/v1/handler/namespace/update_test.go index 3ffd063f59..584561abff 100644 --- a/src/query/api/v1/handler/namespace/update_test.go +++ b/src/query/api/v1/handler/namespace/update_test.go @@ -36,6 +36,7 @@ import ( xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" + "github.com/gogo/protobuf/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -101,12 +102,13 @@ func TestNamespaceUpdateHandler(t *testing.T) { registry := nsproto.Registry{ Namespaces: map[string]*nsproto.NamespaceOptions{ "testNamespace": { - BootstrapEnabled: true, - FlushEnabled: true, - SnapshotEnabled: true, - WritesToCommitLog: true, - CleanupEnabled: false, - RepairEnabled: false, + BootstrapEnabled: true, + CacheBlocksOnRetrieve: &types.BoolValue{Value: true}, + FlushEnabled: true, + SnapshotEnabled: true, + WritesToCommitLog: true, + CleanupEnabled: false, + RepairEnabled: false, RetentionOptions: &nsproto.RetentionOptions{ RetentionPeriodNanos: 172800000000000, BlockSizeNanos: 7200000000000, @@ -136,11 +138,12 @@ func TestNamespaceUpdateHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": false, - "repairEnabled": false, + "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": false, + "repairEnabled": false, "retentionOptions": xjson.Map{ "retentionPeriodNanos": "345600000000000", "blockSizeNanos": "7200000000000", @@ -190,11 +193,12 @@ func TestNamespaceUpdateHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": false, - "repairEnabled": false, + "bootstrapEnabled": true, + "cacheBlocksOnRetrieve": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": false, + "repairEnabled": false, "retentionOptions": xjson.Map{ "retentionPeriodNanos": "172800000000000", "blockSizeNanos": "7200000000000",