diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 0dc98bc96b..5f0a7bff07 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 6263377cc6..9b3e6beaab 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -63,6 +63,8 @@ service Node { void repair() throws (1: Error err) TruncateResult truncate(1: TruncateRequest req) throws (1: Error err) + AggregateTilesResult aggregateTiles(1: AggregateTilesRequest req) throws (1: Error err) + // Management endpoints NodeHealthResult health() throws (1: Error err) // NB: bootstrapped is for use with cluster management tools like k8s. @@ -216,8 +218,8 @@ struct Block { } struct Tag { - 1: required string name - 2: required string value + 1: required string name + 2: required string value } struct FetchBlocksMetadataRawV2Request { @@ -455,41 +457,55 @@ struct QueryResultElement { } struct TermQuery { - 1: required string field - 2: required string term + 1: required string field + 2: required string term } struct RegexpQuery { - 1: required string field - 2: required string regexp + 1: required string field + 2: required string regexp } struct NegationQuery { - 1: required Query query + 1: required Query query } struct ConjunctionQuery { - 1: required list queries + 1: required list queries } struct DisjunctionQuery { - 1: required list queries + 1: required list queries } struct AllQuery {} struct FieldQuery { - 1: required string field + 1: required string field } struct Query { - 1: optional TermQuery term - 2: optional RegexpQuery regexp - 3: optional NegationQuery negation - 4: optional ConjunctionQuery conjunction - 5: optional DisjunctionQuery disjunction - 6: optional AllQuery all - 7: optional FieldQuery field + 1: optional TermQuery term + 2: optional RegexpQuery regexp + 3: optional NegationQuery negation + 4: optional ConjunctionQuery conjunction + 5: optional DisjunctionQuery disjunction + 6: optional AllQuery all + 7: optional FieldQuery field +} + +struct AggregateTilesRequest { + 1: required string sourceNamespace + 2: required string targetNamespace + 3: required i64 rangeStart + 4: required i64 rangeEnd + 5: required string step + 6: bool removeResets // FIXME: temporary, remove after metrics type metadata is available. + 7: optional TimeType rangeType = TimeType.UNIX_SECONDS +} + +struct AggregateTilesResult { + 1: required i64 processedBlockCount } struct DebugProfileStartRequest { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 811c33b172..7b6a783848 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -12906,6 +12906,445 @@ func (p *Query) String() string { return fmt.Sprintf("Query(%+v)", *p) } +// Attributes: +// - SourceNamespace +// - TargetNamespace +// - RangeStart +// - RangeEnd +// - Step +// - RemoveResets +// - RangeType +type AggregateTilesRequest struct { + SourceNamespace string `thrift:"sourceNamespace,1,required" db:"sourceNamespace" json:"sourceNamespace"` + TargetNamespace string `thrift:"targetNamespace,2,required" db:"targetNamespace" json:"targetNamespace"` + RangeStart int64 `thrift:"rangeStart,3,required" db:"rangeStart" json:"rangeStart"` + RangeEnd int64 `thrift:"rangeEnd,4,required" db:"rangeEnd" json:"rangeEnd"` + Step string `thrift:"step,5,required" db:"step" json:"step"` + RemoveResets bool `thrift:"removeResets,6" db:"removeResets" json:"removeResets"` + RangeType TimeType `thrift:"rangeType,7" db:"rangeType" json:"rangeType,omitempty"` +} + +func NewAggregateTilesRequest() *AggregateTilesRequest { + return &AggregateTilesRequest{ + RangeType: 0, + } +} + +func (p *AggregateTilesRequest) GetSourceNamespace() string { + return p.SourceNamespace +} + +func (p *AggregateTilesRequest) GetTargetNamespace() string { + return p.TargetNamespace +} + +func (p *AggregateTilesRequest) GetRangeStart() int64 { + return p.RangeStart +} + +func (p *AggregateTilesRequest) GetRangeEnd() int64 { + return p.RangeEnd +} + +func (p *AggregateTilesRequest) GetStep() string { + return p.Step +} + +func (p *AggregateTilesRequest) GetRemoveResets() bool { + return p.RemoveResets +} + +var AggregateTilesRequest_RangeType_DEFAULT TimeType = 0 + +func (p *AggregateTilesRequest) GetRangeType() TimeType { + return p.RangeType +} +func (p *AggregateTilesRequest) IsSetRangeType() bool { + return p.RangeType != AggregateTilesRequest_RangeType_DEFAULT +} + +func (p *AggregateTilesRequest) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetSourceNamespace bool = false + var issetTargetNamespace bool = false + var issetRangeStart bool = false + var issetRangeEnd bool = false + var issetStep bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + issetSourceNamespace = true + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + issetTargetNamespace = true + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } + issetRangeStart = true + case 4: + if err := p.ReadField4(iprot); err != nil { + return err + } + issetRangeEnd = true + case 5: + if err := p.ReadField5(iprot); err != nil { + return err + } + issetStep = true + case 6: + if err := p.ReadField6(iprot); err != nil { + return err + } + case 7: + if err := p.ReadField7(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetSourceNamespace { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field SourceNamespace is not set")) + } + if !issetTargetNamespace { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TargetNamespace is not set")) + } + if !issetRangeStart { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field RangeStart is not set")) + } + if !issetRangeEnd { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field RangeEnd is not set")) + } + if !issetStep { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field Step is not set")) + } + return nil +} + +func (p *AggregateTilesRequest) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.SourceNamespace = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 2: ", err) + } else { + p.TargetNamespace = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.RangeStart = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField4(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 4: ", err) + } else { + p.RangeEnd = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField5(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 5: ", err) + } else { + p.Step = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField6(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 6: ", err) + } else { + p.RemoveResets = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField7(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI32(); err != nil { + return thrift.PrependError("error reading field 7: ", err) + } else { + temp := TimeType(v) + p.RangeType = temp + } + return nil +} + +func (p *AggregateTilesRequest) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("AggregateTilesRequest"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := p.writeField4(oprot); err != nil { + return err + } + if err := p.writeField5(oprot); err != nil { + return err + } + if err := p.writeField6(oprot); err != nil { + return err + } + if err := p.writeField7(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *AggregateTilesRequest) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("sourceNamespace", thrift.STRING, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:sourceNamespace: ", p), err) + } + if err := oprot.WriteString(string(p.SourceNamespace)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.sourceNamespace (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:sourceNamespace: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("targetNamespace", thrift.STRING, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:targetNamespace: ", p), err) + } + if err := oprot.WriteString(string(p.TargetNamespace)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.targetNamespace (2) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:targetNamespace: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField3(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("rangeStart", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:rangeStart: ", p), err) + } + if err := oprot.WriteI64(int64(p.RangeStart)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeStart (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:rangeStart: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField4(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("rangeEnd", thrift.I64, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:rangeEnd: ", p), err) + } + if err := oprot.WriteI64(int64(p.RangeEnd)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeEnd (4) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:rangeEnd: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField5(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("step", thrift.STRING, 5); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:step: ", p), err) + } + if err := oprot.WriteString(string(p.Step)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.step (5) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 5:step: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField6(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("removeResets", thrift.BOOL, 6); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:removeResets: ", p), err) + } + if err := oprot.WriteBool(bool(p.RemoveResets)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.removeResets (6) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 6:removeResets: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField7(oprot thrift.TProtocol) (err error) { + if p.IsSetRangeType() { + if err := oprot.WriteFieldBegin("rangeType", thrift.I32, 7); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 7:rangeType: ", p), err) + } + if err := oprot.WriteI32(int32(p.RangeType)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeType (7) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 7:rangeType: ", p), err) + } + } + return err +} + +func (p *AggregateTilesRequest) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("AggregateTilesRequest(%+v)", *p) +} + +// Attributes: +// - ProcessedBlockCount +type AggregateTilesResult_ struct { + ProcessedBlockCount int64 `thrift:"processedBlockCount,1,required" db:"processedBlockCount" json:"processedBlockCount"` +} + +func NewAggregateTilesResult_() *AggregateTilesResult_ { + return &AggregateTilesResult_{} +} + +func (p *AggregateTilesResult_) GetProcessedBlockCount() int64 { + return p.ProcessedBlockCount +} +func (p *AggregateTilesResult_) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetProcessedBlockCount bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + issetProcessedBlockCount = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetProcessedBlockCount { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ProcessedBlockCount is not set")) + } + return nil +} + +func (p *AggregateTilesResult_) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.ProcessedBlockCount = v + } + return nil +} + +func (p *AggregateTilesResult_) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("AggregateTilesResult"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *AggregateTilesResult_) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("processedBlockCount", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:processedBlockCount: ", p), err) + } + if err := oprot.WriteI64(int64(p.ProcessedBlockCount)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.processedBlockCount (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:processedBlockCount: ", p), err) + } + return err +} + +func (p *AggregateTilesResult_) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("AggregateTilesResult_(%+v)", *p) +} + // Attributes: // - Name // - FilePathTemplate @@ -13734,6 +14173,9 @@ type Node interface { // Parameters: // - Req Truncate(req *TruncateRequest) (r *TruncateResult_, err error) + // Parameters: + // - Req + AggregateTiles(req *AggregateTilesRequest) (r *AggregateTilesResult_, err error) Health() (r *NodeHealthResult_, err error) Bootstrapped() (r *NodeBootstrappedResult_, err error) BootstrappedInPlacementOrNoPlacement() (r *NodeBootstrappedInPlacementOrNoPlacementResult_, err error) @@ -15156,6 +15598,87 @@ func (p *NodeClient) recvTruncate() (value *TruncateResult_, err error) { return } +// Parameters: +// - Req +func (p *NodeClient) AggregateTiles(req *AggregateTilesRequest) (r *AggregateTilesResult_, err error) { + if err = p.sendAggregateTiles(req); err != nil { + return + } + return p.recvAggregateTiles() +} + +func (p *NodeClient) sendAggregateTiles(req *AggregateTilesRequest) (err error) { + oprot := p.OutputProtocol + if oprot == nil { + oprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.OutputProtocol = oprot + } + p.SeqId++ + if err = oprot.WriteMessageBegin("aggregateTiles", thrift.CALL, p.SeqId); err != nil { + return + } + args := NodeAggregateTilesArgs{ + Req: req, + } + if err = args.Write(oprot); err != nil { + return + } + if err = oprot.WriteMessageEnd(); err != nil { + return + } + return oprot.Flush() +} + +func (p *NodeClient) recvAggregateTiles() (value *AggregateTilesResult_, err error) { + iprot := p.InputProtocol + if iprot == nil { + iprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.InputProtocol = iprot + } + method, mTypeId, seqId, err := iprot.ReadMessageBegin() + if err != nil { + return + } + if method != "aggregateTiles" { + err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "aggregateTiles failed: wrong method name") + return + } + if p.SeqId != seqId { + err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "aggregateTiles failed: out of sequence response") + return + } + if mTypeId == thrift.EXCEPTION { + error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error70 error + error70, err = error69.Read(iprot) + if err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + err = error70 + return + } + if mTypeId != thrift.REPLY { + err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "aggregateTiles failed: invalid message type") + return + } + result := NodeAggregateTilesResult{} + if err = result.Read(iprot); err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + if result.Err != nil { + err = result.Err + return + } + value = result.GetSuccess() + return +} + func (p *NodeClient) Health() (r *NodeHealthResult_, err error) { if err = p.sendHealth(); err != nil { return @@ -15202,16 +15725,16 @@ func (p *NodeClient) recvHealth() (value *NodeHealthResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error70 error - error70, err = error69.Read(iprot) + error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error72 error + error72, err = error71.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error70 + err = error72 return } if mTypeId != thrift.REPLY { @@ -15279,16 +15802,16 @@ func (p *NodeClient) recvBootstrapped() (value *NodeBootstrappedResult_, err err return } if mTypeId == thrift.EXCEPTION { - error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error72 error - error72, err = error71.Read(iprot) + error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error74 error + error74, err = error73.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error72 + err = error74 return } if mTypeId != thrift.REPLY { @@ -15356,16 +15879,16 @@ func (p *NodeClient) recvBootstrappedInPlacementOrNoPlacement() (value *NodeBoot return } if mTypeId == thrift.EXCEPTION { - error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error74 error - error74, err = error73.Read(iprot) + error75 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error76 error + error76, err = error75.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error74 + err = error76 return } if mTypeId != thrift.REPLY { @@ -15433,16 +15956,16 @@ func (p *NodeClient) recvGetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error75 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error76 error - error76, err = error75.Read(iprot) + error77 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error78 error + error78, err = error77.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error76 + err = error78 return } if mTypeId != thrift.REPLY { @@ -15514,16 +16037,16 @@ func (p *NodeClient) recvSetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error77 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error78 error - error78, err = error77.Read(iprot) + error79 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error80 error + error80, err = error79.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error78 + err = error80 return } if mTypeId != thrift.REPLY { @@ -15591,16 +16114,16 @@ func (p *NodeClient) recvGetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error79 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error80 error - error80, err = error79.Read(iprot) + error81 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error82 error + error82, err = error81.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error80 + err = error82 return } if mTypeId != thrift.REPLY { @@ -15672,16 +16195,16 @@ func (p *NodeClient) recvSetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error81 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error82 error - error82, err = error81.Read(iprot) + error83 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error84 error + error84, err = error83.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error82 + err = error84 return } if mTypeId != thrift.REPLY { @@ -15749,16 +16272,16 @@ func (p *NodeClient) recvGetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error83 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error84 error - error84, err = error83.Read(iprot) + error85 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error86 error + error86, err = error85.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error84 + err = error86 return } if mTypeId != thrift.REPLY { @@ -15830,16 +16353,16 @@ func (p *NodeClient) recvSetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error85 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error86 error - error86, err = error85.Read(iprot) + error87 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error88 error + error88, err = error87.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error86 + err = error88 return } if mTypeId != thrift.REPLY { @@ -15907,16 +16430,16 @@ func (p *NodeClient) recvGetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error87 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error88 error - error88, err = error87.Read(iprot) + error89 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error90 error + error90, err = error89.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error88 + err = error90 return } if mTypeId != thrift.REPLY { @@ -15988,16 +16511,16 @@ func (p *NodeClient) recvSetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error89 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error90 error - error90, err = error89.Read(iprot) + error91 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error92 error + error92, err = error91.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error90 + err = error92 return } if mTypeId != thrift.REPLY { @@ -16069,16 +16592,16 @@ func (p *NodeClient) recvDebugProfileStart() (value *DebugProfileStartResult_, e return } if mTypeId == thrift.EXCEPTION { - error91 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error92 error - error92, err = error91.Read(iprot) + error93 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error94 error + error94, err = error93.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error92 + err = error94 return } if mTypeId != thrift.REPLY { @@ -16150,16 +16673,16 @@ func (p *NodeClient) recvDebugProfileStop() (value *DebugProfileStopResult_, err return } if mTypeId == thrift.EXCEPTION { - error93 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error94 error - error94, err = error93.Read(iprot) + error95 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error96 error + error96, err = error95.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error94 + err = error96 return } if mTypeId != thrift.REPLY { @@ -16231,16 +16754,16 @@ func (p *NodeClient) recvDebugIndexMemorySegments() (value *DebugIndexMemorySegm return } if mTypeId == thrift.EXCEPTION { - error95 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error96 error - error96, err = error95.Read(iprot) + error97 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error98 error + error98, err = error97.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error96 + err = error98 return } if mTypeId != thrift.REPLY { @@ -16282,39 +16805,40 @@ func (p *NodeProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewNodeProcessor(handler Node) *NodeProcessor { - self97 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self97.processorMap["query"] = &nodeProcessorQuery{handler: handler} - self97.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} - self97.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} - self97.processorMap["write"] = &nodeProcessorWrite{handler: handler} - self97.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} - self97.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} - self97.processorMap["fetchBatchRawV2"] = &nodeProcessorFetchBatchRawV2{handler: handler} - self97.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} - self97.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} - self97.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} - self97.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} - self97.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} - self97.processorMap["writeBatchRawV2"] = &nodeProcessorWriteBatchRawV2{handler: handler} - self97.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} - self97.processorMap["writeTaggedBatchRawV2"] = &nodeProcessorWriteTaggedBatchRawV2{handler: handler} - self97.processorMap["repair"] = &nodeProcessorRepair{handler: handler} - self97.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} - self97.processorMap["health"] = &nodeProcessorHealth{handler: handler} - self97.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} - self97.processorMap["bootstrappedInPlacementOrNoPlacement"] = &nodeProcessorBootstrappedInPlacementOrNoPlacement{handler: handler} - self97.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} - self97.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} - self97.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} - self97.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} - self97.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} - self97.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} - self97.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - self97.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - self97.processorMap["debugProfileStart"] = &nodeProcessorDebugProfileStart{handler: handler} - self97.processorMap["debugProfileStop"] = &nodeProcessorDebugProfileStop{handler: handler} - self97.processorMap["debugIndexMemorySegments"] = &nodeProcessorDebugIndexMemorySegments{handler: handler} - return self97 + self99 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self99.processorMap["query"] = &nodeProcessorQuery{handler: handler} + self99.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} + self99.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} + self99.processorMap["write"] = &nodeProcessorWrite{handler: handler} + self99.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} + self99.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} + self99.processorMap["fetchBatchRawV2"] = &nodeProcessorFetchBatchRawV2{handler: handler} + self99.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} + self99.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} + self99.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} + self99.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} + self99.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} + self99.processorMap["writeBatchRawV2"] = &nodeProcessorWriteBatchRawV2{handler: handler} + self99.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} + self99.processorMap["writeTaggedBatchRawV2"] = &nodeProcessorWriteTaggedBatchRawV2{handler: handler} + self99.processorMap["repair"] = &nodeProcessorRepair{handler: handler} + self99.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} + self99.processorMap["aggregateTiles"] = &nodeProcessorAggregateTiles{handler: handler} + self99.processorMap["health"] = &nodeProcessorHealth{handler: handler} + self99.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} + self99.processorMap["bootstrappedInPlacementOrNoPlacement"] = &nodeProcessorBootstrappedInPlacementOrNoPlacement{handler: handler} + self99.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} + self99.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} + self99.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} + self99.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} + self99.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} + self99.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} + self99.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + self99.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + self99.processorMap["debugProfileStart"] = &nodeProcessorDebugProfileStart{handler: handler} + self99.processorMap["debugProfileStop"] = &nodeProcessorDebugProfileStop{handler: handler} + self99.processorMap["debugIndexMemorySegments"] = &nodeProcessorDebugIndexMemorySegments{handler: handler} + return self99 } func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -16327,12 +16851,12 @@ func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, er } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x98 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x100 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x98.Write(oprot) + x100.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x98 + return false, x100 } @@ -17145,7 +17669,60 @@ func (p *nodeProcessorRepair) Process(seqId int32, iprot, oprot thrift.TProtocol return true, err2 } } - if err2 = oprot.WriteMessageBegin("repair", thrift.REPLY, seqId); err2 != nil { + if err2 = oprot.WriteMessageBegin("repair", thrift.REPLY, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(); err == nil && err2 != nil { + err = err2 + } + if err != nil { + return + } + return true, err +} + +type nodeProcessorTruncate struct { + handler Node +} + +func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := NodeTruncateArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) + oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return false, err + } + + iprot.ReadMessageEnd() + result := NodeTruncateResult{} + var retval *TruncateResult_ + var err2 error + if retval, err2 = p.handler.Truncate(args.Req); err2 != nil { + switch v := err2.(type) { + case *Error: + result.Err = v + default: + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing truncate: "+err2.Error()) + oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return true, err2 + } + } else { + result.Success = retval + } + if err2 = oprot.WriteMessageBegin("truncate", thrift.REPLY, seqId); err2 != nil { err = err2 } if err2 = result.Write(oprot); err == nil && err2 != nil { @@ -17163,16 +17740,16 @@ func (p *nodeProcessorRepair) Process(seqId int32, iprot, oprot thrift.TProtocol return true, err } -type nodeProcessorTruncate struct { +type nodeProcessorAggregateTiles struct { handler Node } -func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { - args := NodeTruncateArgs{} +func (p *nodeProcessorAggregateTiles) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := NodeAggregateTilesArgs{} if err = args.Read(iprot); err != nil { iprot.ReadMessageEnd() x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) - oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + oprot.WriteMessageBegin("aggregateTiles", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() @@ -17180,16 +17757,16 @@ func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtoc } iprot.ReadMessageEnd() - result := NodeTruncateResult{} - var retval *TruncateResult_ + result := NodeAggregateTilesResult{} + var retval *AggregateTilesResult_ var err2 error - if retval, err2 = p.handler.Truncate(args.Req); err2 != nil { + if retval, err2 = p.handler.AggregateTiles(args.Req); err2 != nil { switch v := err2.(type) { case *Error: result.Err = v default: - x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing truncate: "+err2.Error()) - oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing aggregateTiles: "+err2.Error()) + oprot.WriteMessageBegin("aggregateTiles", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() @@ -17198,7 +17775,7 @@ func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtoc } else { result.Success = retval } - if err2 = oprot.WriteMessageBegin("truncate", thrift.REPLY, seqId); err2 != nil { + if err2 = oprot.WriteMessageBegin("aggregateTiles", thrift.REPLY, seqId); err2 != nil { err = err2 } if err2 = result.Write(oprot); err == nil && err2 != nil { @@ -21878,6 +22455,259 @@ func (p *NodeTruncateResult) String() string { return fmt.Sprintf("NodeTruncateResult(%+v)", *p) } +// Attributes: +// - Req +type NodeAggregateTilesArgs struct { + Req *AggregateTilesRequest `thrift:"req,1" db:"req" json:"req"` +} + +func NewNodeAggregateTilesArgs() *NodeAggregateTilesArgs { + return &NodeAggregateTilesArgs{} +} + +var NodeAggregateTilesArgs_Req_DEFAULT *AggregateTilesRequest + +func (p *NodeAggregateTilesArgs) GetReq() *AggregateTilesRequest { + if !p.IsSetReq() { + return NodeAggregateTilesArgs_Req_DEFAULT + } + return p.Req +} +func (p *NodeAggregateTilesArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *NodeAggregateTilesArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) ReadField1(iprot thrift.TProtocol) error { + p.Req = &AggregateTilesRequest{ + RangeType: 0, + } + if err := p.Req.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Req), err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("aggregateTiles_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("req", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:req: ", p), err) + } + if err := p.Req.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Req), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:req: ", p), err) + } + return err +} + +func (p *NodeAggregateTilesArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeAggregateTilesArgs(%+v)", *p) +} + +// Attributes: +// - Success +// - Err +type NodeAggregateTilesResult struct { + Success *AggregateTilesResult_ `thrift:"success,0" db:"success" json:"success,omitempty"` + Err *Error `thrift:"err,1" db:"err" json:"err,omitempty"` +} + +func NewNodeAggregateTilesResult() *NodeAggregateTilesResult { + return &NodeAggregateTilesResult{} +} + +var NodeAggregateTilesResult_Success_DEFAULT *AggregateTilesResult_ + +func (p *NodeAggregateTilesResult) GetSuccess() *AggregateTilesResult_ { + if !p.IsSetSuccess() { + return NodeAggregateTilesResult_Success_DEFAULT + } + return p.Success +} + +var NodeAggregateTilesResult_Err_DEFAULT *Error + +func (p *NodeAggregateTilesResult) GetErr() *Error { + if !p.IsSetErr() { + return NodeAggregateTilesResult_Err_DEFAULT + } + return p.Err +} +func (p *NodeAggregateTilesResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *NodeAggregateTilesResult) IsSetErr() bool { + return p.Err != nil +} + +func (p *NodeAggregateTilesResult) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if err := p.ReadField0(iprot); err != nil { + return err + } + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) ReadField0(iprot thrift.TProtocol) error { + p.Success = &AggregateTilesResult_{} + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) ReadField1(iprot thrift.TProtocol) error { + p.Err = &Error{ + Type: 0, + } + if err := p.Err.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("aggregateTiles_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField0(oprot); err != nil { + return err + } + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeAggregateTilesResult) writeField0(oprot thrift.TProtocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) + } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) + } + } + return err +} + +func (p *NodeAggregateTilesResult) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetErr() { + if err := oprot.WriteFieldBegin("err", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:err: ", p), err) + } + if err := p.Err.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Err), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:err: ", p), err) + } + } + return err +} + +func (p *NodeAggregateTilesResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeAggregateTilesResult(%+v)", *p) +} + type NodeHealthArgs struct { } @@ -25166,16 +25996,16 @@ func (p *ClusterClient) recvHealth() (value *HealthResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error237 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error238 error - error238, err = error237.Read(iprot) + error245 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error246 error + error246, err = error245.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error238 + err = error246 return } if mTypeId != thrift.REPLY { @@ -25247,16 +26077,16 @@ func (p *ClusterClient) recvWrite() (err error) { return } if mTypeId == thrift.EXCEPTION { - error239 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error240 error - error240, err = error239.Read(iprot) + error247 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error248 error + error248, err = error247.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error240 + err = error248 return } if mTypeId != thrift.REPLY { @@ -25327,16 +26157,16 @@ func (p *ClusterClient) recvWriteTagged() (err error) { return } if mTypeId == thrift.EXCEPTION { - error241 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error242 error - error242, err = error241.Read(iprot) + error249 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error250 error + error250, err = error249.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error242 + err = error250 return } if mTypeId != thrift.REPLY { @@ -25407,16 +26237,16 @@ func (p *ClusterClient) recvQuery() (value *QueryResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error243 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error244 error - error244, err = error243.Read(iprot) + error251 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error252 error + error252, err = error251.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error244 + err = error252 return } if mTypeId != thrift.REPLY { @@ -25488,16 +26318,16 @@ func (p *ClusterClient) recvAggregate() (value *AggregateQueryResult_, err error return } if mTypeId == thrift.EXCEPTION { - error245 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error246 error - error246, err = error245.Read(iprot) + error253 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error254 error + error254, err = error253.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error246 + err = error254 return } if mTypeId != thrift.REPLY { @@ -25569,16 +26399,16 @@ func (p *ClusterClient) recvFetch() (value *FetchResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error247 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error248 error - error248, err = error247.Read(iprot) + error255 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error256 error + error256, err = error255.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error248 + err = error256 return } if mTypeId != thrift.REPLY { @@ -25650,16 +26480,16 @@ func (p *ClusterClient) recvTruncate() (value *TruncateResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error249 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error250 error - error250, err = error249.Read(iprot) + error257 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error258 error + error258, err = error257.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error250 + err = error258 return } if mTypeId != thrift.REPLY { @@ -25701,15 +26531,15 @@ func (p *ClusterProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewClusterProcessor(handler Cluster) *ClusterProcessor { - self251 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self251.processorMap["health"] = &clusterProcessorHealth{handler: handler} - self251.processorMap["write"] = &clusterProcessorWrite{handler: handler} - self251.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} - self251.processorMap["query"] = &clusterProcessorQuery{handler: handler} - self251.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} - self251.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} - self251.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} - return self251 + self259 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self259.processorMap["health"] = &clusterProcessorHealth{handler: handler} + self259.processorMap["write"] = &clusterProcessorWrite{handler: handler} + self259.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} + self259.processorMap["query"] = &clusterProcessorQuery{handler: handler} + self259.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} + self259.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} + self259.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} + return self259 } func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -25722,12 +26552,12 @@ func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x252 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x260 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x252.Write(oprot) + x260.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x252 + return false, x260 } diff --git a/src/dbnode/generated/thrift/rpc/rpc_mock.go b/src/dbnode/generated/thrift/rpc/rpc_mock.go index 647e08992b..a7be5ca909 100644 --- a/src/dbnode/generated/thrift/rpc/rpc_mock.go +++ b/src/dbnode/generated/thrift/rpc/rpc_mock.go @@ -210,6 +210,21 @@ func (mr *MockTChanNodeMockRecorder) AggregateRaw(ctx, req interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateRaw", reflect.TypeOf((*MockTChanNode)(nil).AggregateRaw), ctx, req) } +// AggregateTiles mocks base method +func (m *MockTChanNode) AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, req) + ret0, _ := ret[0].(*AggregateTilesResult_) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockTChanNodeMockRecorder) AggregateTiles(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTChanNode)(nil).AggregateTiles), ctx, req) +} + // Bootstrapped mocks base method func (m *MockTChanNode) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) { m.ctrl.T.Helper() diff --git a/src/dbnode/generated/thrift/rpc/tchan-rpc.go b/src/dbnode/generated/thrift/rpc/tchan-rpc.go index 3d32c85d9e..cf6c94667a 100644 --- a/src/dbnode/generated/thrift/rpc/tchan-rpc.go +++ b/src/dbnode/generated/thrift/rpc/tchan-rpc.go @@ -47,6 +47,7 @@ type TChanCluster interface { type TChanNode interface { Aggregate(ctx thrift.Context, req *AggregateQueryRequest) (*AggregateQueryResult_, error) AggregateRaw(ctx thrift.Context, req *AggregateQueryRawRequest) (*AggregateQueryRawResult_, error) + AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*NodeBootstrappedInPlacementOrNoPlacementResult_, error) DebugIndexMemorySegments(ctx thrift.Context, req *DebugIndexMemorySegmentsRequest) (*DebugIndexMemorySegmentsResult_, error) @@ -518,6 +519,24 @@ func (c *tchanNodeClient) AggregateRaw(ctx thrift.Context, req *AggregateQueryRa return resp.GetSuccess(), err } +func (c *tchanNodeClient) AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) { + var resp NodeAggregateTilesResult + args := NodeAggregateTilesArgs{ + Req: req, + } + success, err := c.client.Call(ctx, c.thriftService, "aggregateTiles", &args, &resp) + if err == nil && !success { + switch { + case resp.Err != nil: + err = resp.Err + default: + err = fmt.Errorf("received no result or unknown exception for aggregateTiles") + } + } + + return resp.GetSuccess(), err +} + func (c *tchanNodeClient) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) { var resp NodeBootstrappedResult args := NodeBootstrappedArgs{} @@ -1044,6 +1063,7 @@ func (s *tchanNodeServer) Methods() []string { return []string{ "aggregate", "aggregateRaw", + "aggregateTiles", "bootstrapped", "bootstrappedInPlacementOrNoPlacement", "debugIndexMemorySegments", @@ -1082,6 +1102,8 @@ func (s *tchanNodeServer) Handle(ctx thrift.Context, methodName string, protocol return s.handleAggregate(ctx, protocol) case "aggregateRaw": return s.handleAggregateRaw(ctx, protocol) + case "aggregateTiles": + return s.handleAggregateTiles(ctx, protocol) case "bootstrapped": return s.handleBootstrapped(ctx, protocol) case "bootstrappedInPlacementOrNoPlacement": @@ -1202,6 +1224,34 @@ func (s *tchanNodeServer) handleAggregateRaw(ctx thrift.Context, protocol athrif return err == nil, &res, nil } +func (s *tchanNodeServer) handleAggregateTiles(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { + var req NodeAggregateTilesArgs + var res NodeAggregateTilesResult + + if err := req.Read(protocol); err != nil { + return false, nil, err + } + + r, err := + s.handler.AggregateTiles(ctx, req.Req) + + if err != nil { + switch v := err.(type) { + case *Error: + if v == nil { + return false, nil, fmt.Errorf("Handler for err returned non-nil error type *Error but nil value") + } + res.Err = v + default: + return false, nil, err + } + } else { + res.Success = r + } + + return err == nil, &res, nil +} + func (s *tchanNodeServer) handleBootstrapped(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { var req NodeBootstrappedArgs var res NodeBootstrappedResult diff --git a/src/dbnode/integration/large_tiles_test.go b/src/dbnode/integration/large_tiles_test.go new file mode 100644 index 0000000000..54d49e9dc1 --- /dev/null +++ b/src/dbnode/integration/large_tiles_test.go @@ -0,0 +1,162 @@ +// +build integration + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage" + xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtime "github.com/m3db/m3/src/x/time" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/zap" +) + +func TestReadAggregateWrite(t *testing.T) { + var ( + blockSize = 2 * time.Hour + indexBlockSize = 2 * blockSize + rOpts = retention.NewOptions().SetRetentionPeriod(24 * blockSize).SetBlockSize(blockSize) + idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize) + nsOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts). + SetColdWritesEnabled(true) + ) + + srcNs, err := namespace.NewMetadata(testNamespaces[0], nsOpts) + require.NoError(t, err) + trgNs, err := namespace.NewMetadata(testNamespaces[1], nsOpts) + require.NoError(t, err) + + testOpts := NewTestOptions(t). + SetNamespaces([]namespace.Metadata{srcNs, trgNs}). + SetWriteNewSeriesAsync(true) + + testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts) + defer testSetup.Close() + + reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions()) + scope, closer := tally.NewRootScope( + tally.ScopeOptions{Reporter: reporter}, time.Millisecond) + defer closer.Close() + testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions( + instrument.NewOptions().SetMetricsScope(scope))) + + storageOpts := testSetup.StorageOpts() + + // Start the server. + log := storageOpts.InstrumentOptions().Logger() + require.NoError(t, testSetup.StartServer()) + + // Stop the server. + defer func() { + require.NoError(t, testSetup.StopServer()) + log.Debug("server is now down") + }() + + start := time.Now() + session, err := testSetup.M3DBClient().DefaultSession() + require.NoError(t, err) + nowFn := testSetup.NowFn() + + tags := []ident.Tag{ + ident.StringTag("__name__", "cpu"), + ident.StringTag("job", "job1"), + } + + dpTimeStart := nowFn().Truncate(indexBlockSize).Add(-3 * indexBlockSize) + dpTime := dpTimeStart + + // Write test data. + for a := 0.0; a < 20.0; a++ { + err = session.WriteTagged(srcNs.ID(), ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(tags...)), dpTime, 42.1+a, xtime.Second, nil) + require.NoError(t, err) + dpTime = dpTime.Add(10 * time.Minute) + } + log.Info("test data written", zap.Duration("took", time.Since(start))) + + log.Info("waiting till data is cold flushed") + start = time.Now() + expectedNumWrites := int64(20) + flushed := xclock.WaitUntil(func() bool { + counters := reporter.Counters() + flushes, _ := counters["database.flushIndex.success"] + writes, _ := counters["database.series.cold-writes"] + successFlushes, _ := counters["database.flushColdData.success"] + return flushes >= 1 && writes >= expectedNumWrites && successFlushes >= 4 }, time.Minute) + require.True(t, flushed) + log.Info("verified data has been cold flushed", zap.Duration("took", time.Since(start))) + + aggOpts, err := storage.NewAggregateTilesOptions(dpTimeStart, dpTimeStart.Add(blockSize), time.Hour, false) + require.NoError(t, err) + + // Retry aggregation as persist manager could be still locked by cold writes. + // TODO: Remove retry when a separate persist manager will be implemented. + var processedBlockCount int64 + for retries := 0; retries < 10; retries++ { + processedBlockCount, err = testSetup.DB().AggregateTiles(storageOpts.ContextPool().Get(), srcNs.ID(), trgNs.ID(), aggOpts) + if err == nil { + break + } + time.Sleep(time.Second) + } + require.NoError(t, err) + assert.Equal(t, int64(1), processedBlockCount) + + log.Info("fetching aggregated data") + series, err := session.Fetch(trgNs.ID(), ident.StringID("foo"), dpTimeStart, nowFn()) + require.NoError(t, err) + + expectedDps := make(map[int64]float64) + // TODO: Replace with exact values when aggregation will be implemented. + timestamp := dpTimeStart + // TODO: now we aggregate only a single block, that's why we do expect + // 12 items in place of 20 + for a := 0; a < 12; a++ { + expectedDps[timestamp.Unix()] = 42.1 + float64(a) + timestamp = timestamp.Add(10 * time.Minute) + } + + count := 0 + for series.Next() { + dp, _, _ := series.Current() + value, ok := expectedDps[dp.Timestamp.Unix()] + require.True(t, ok, + "didn't expect to find timestamp %v in aggregated result", + dp.Timestamp.Unix()) + require.Equal(t, value, dp.Value, + "value for timestamp %v doesn't match. Expected %v but got %v", + dp.Timestamp.Unix(), value, dp.Value) + count++ + } + require.Equal(t, len(expectedDps), count) +} diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index d55983b80e..139da79017 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -538,6 +538,69 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query return result, nil } +func (s *service) AggregateTiles(tctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { + db, err := s.startWriteRPCWithDB() + if err != nil { + return nil, err + } + defer s.writeRPCCompleted() + + ctx, sp, sampled := tchannelthrift.Context(tctx).StartSampledTraceSpan(tracepoint.AggregateTiles) + defer sp.Finish() + + if sampled { + sp.LogFields( + opentracinglog.String("sourceNamespace", req.SourceNamespace), + opentracinglog.String("targetNamespace", req.TargetNamespace), + xopentracing.Time("start", time.Unix(0, req.RangeStart)), + xopentracing.Time("end", time.Unix(0, req.RangeEnd)), + opentracinglog.String("step", req.Step), + ) + } + + processedBlockCount, err := s.aggregateTiles(ctx, db, req) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + + return &rpc.AggregateTilesResult_{ + ProcessedBlockCount: processedBlockCount, + }, err +} + +func (s *service) aggregateTiles( + ctx context.Context, + db storage.Database, + req *rpc.AggregateTilesRequest, +) (int64, error) { + start, err := convert.ToTime(req.RangeStart, req.RangeType) + if err != nil { + return 0, tterrors.NewBadRequestError(err) + } + end, err := convert.ToTime(req.RangeEnd, req.RangeType) + if err != nil { + return 0, tterrors.NewBadRequestError(err) + } + step, err := time.ParseDuration(req.Step) + if err != nil { + return 0, tterrors.NewBadRequestError(err) + } + opts, err := storage.NewAggregateTilesOptions(start, end, step, req.RemoveResets) + if err != nil { + return 0, tterrors.NewBadRequestError(err) + } + + sourceNsID := s.pools.id.GetStringID(ctx, req.SourceNamespace) + targetNsID := s.pools.id.GetStringID(ctx, req.TargetNamespace) + + processedBlockCount, err := db.AggregateTiles(ctx, sourceNsID, targetNsID, opts) + if err != nil { + return processedBlockCount, convert.ToRPCError(err) + } + + return processedBlockCount, nil +} + func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { db, err := s.startReadRPCWithDB() if err != nil { diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index d4ee5a1025..81380cac6c 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -3107,3 +3107,49 @@ func TestServiceSetWriteNewSeriesLimitPerShardPerSecond(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(84), setResp.WriteNewSeriesLimitPerShardPerSecond) } + +func TestServiceAggregateTiles(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + start := time.Now().Truncate(time.Hour).Add(-1 * time.Hour) + end := start.Add(time.Hour) + + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + + step := "10m" + stepDuration, err := time.ParseDuration(step) + require.NoError(t, err) + + sourceNsID := "source" + targetNsID := "target" + + mockDB.EXPECT().AggregateTiles( + ctx, + ident.NewIDMatcher(sourceNsID), + ident.NewIDMatcher(targetNsID), + storage.AggregateTilesOptions{Start: start, End: end, Step: stepDuration, HandleCounterResets: true}, + ).Return(int64(4), nil) + + result, err := service.AggregateTiles(tctx, &rpc.AggregateTilesRequest{ + SourceNamespace: sourceNsID, + TargetNamespace: targetNsID, + RangeStart: start.Unix(), + RangeEnd: end.Unix(), + Step: step, + RemoveResets: true, + RangeType: rpc.TimeType_UNIX_SECONDS, + }) + require.NoError(t, err) + assert.Equal(t, int64(4), result.ProcessedBlockCount) +} diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go new file mode 100644 index 0000000000..7642653861 --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -0,0 +1,274 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "bytes" + "container/heap" + "errors" + "fmt" + "io" + "time" + + "github.com/m3db/m3/src/x/checked" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + + "go.uber.org/zap" +) + +var ( + errReaderNotOrderedByIndex = errors.New("crossBlockReader can only use DataFileSetReaders ordered by index") + errEmptyReader = errors.New("trying to read from empty reader") + _ heap.Interface = (*minHeap)(nil) +) + +type crossBlockReader struct { + dataFileSetReaders []DataFileSetReader + id ident.ID + tags ident.TagIterator + records []BlockRecord + started bool + minHeap minHeap + err error + iOpts instrument.Options +} + +// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. +// DataFileSetReaders must be configured to return the data in the order of index, and must be +// provided in a slice sorted by block start time. +// Callers are responsible for closing the DataFileSetReaders. +func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader, iOpts instrument.Options) (CrossBlockReader, error) { + var previousStart time.Time + for _, dataFileSetReader := range dataFileSetReaders { + if !dataFileSetReader.OrderedByIndex() { + return nil, errReaderNotOrderedByIndex + } + currentStart := dataFileSetReader.Range().Start + if !currentStart.After(previousStart) { + return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart) + } + previousStart = currentStart + } + + return &crossBlockReader{ + dataFileSetReaders: append(make([]DataFileSetReader, 0, len(dataFileSetReaders)), dataFileSetReaders...), + records: make([]BlockRecord, 0, len(dataFileSetReaders)), + iOpts: iOpts, + }, nil +} + +func (r *crossBlockReader) Next() bool { + if r.err != nil { + return false + } + + var emptyRecord BlockRecord + if !r.started { + if r.err = r.start(); r.err != nil { + return false + } + } else { + // use empty var in inner loop with "for i := range" to have compiler use memclr optimization + // see: https://codereview.appspot.com/137880043 + for i := range r.records { + r.records[i] = emptyRecord + } + } + + if len(r.minHeap) == 0 { + return false + } + + firstEntry, err := r.readOne() + if err != nil { + r.err = err + return false + } + + r.id = firstEntry.id + r.tags = firstEntry.tags + + r.records = r.records[:0] + r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) + + for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) { + nextEntry, err := r.readOne() + if err != nil { + // Close the resources that were already read but not returned to the consumer: + r.id.Finalize() + r.tags.Close() + for _, record := range r.records { + record.Data.DecRef() + record.Data.Finalize() + } + for i := range r.records { + r.records[i] = emptyRecord + } + r.records = r.records[:0] + r.err = err + return false + } + + // id and tags not needed for subsequent blocks because they are the same as in the first block + nextEntry.id.Finalize() + nextEntry.tags.Close() + + r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum}) + } + + return true +} + +func (r *crossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { + return r.id, r.tags, r.records +} + +func (r *crossBlockReader) readOne() (*minHeapEntry, error) { + if len(r.minHeap) == 0 { + return nil, errEmptyReader + } + + entry := heap.Pop(&r.minHeap).(*minHeapEntry) + if r.dataFileSetReaders[entry.dataFileSetReaderIndex] != nil { + nextEntry, err := r.readFromDataFileSet(entry.dataFileSetReaderIndex) + if err == io.EOF { + // will no longer read from this one + r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil + } else if err != nil { + return nil, err + } else if bytes.Equal(nextEntry.id.Bytes(), entry.id.Bytes()) { + err := fmt.Errorf("duplicate id %s on block starting at %s", + entry.id, r.dataFileSetReaders[entry.dataFileSetReaderIndex].Range().Start) + + instrument.EmitAndLogInvariantViolation(r.iOpts, func(l *zap.Logger) { + l.Error(err.Error()) + }) + + return nil, err + } else { + heap.Push(&r.minHeap, nextEntry) + } + } + + return entry, nil +} + +func (r *crossBlockReader) start() error { + r.started = true + r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders)) + + for i := range r.dataFileSetReaders { + entry, err := r.readFromDataFileSet(i) + if err == io.EOF { + continue + } + if err != nil { + return err + } + r.minHeap = append(r.minHeap, entry) + } + + heap.Init(&r.minHeap) + + return nil +} + +func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) { + id, tags, data, checksum, err := r.dataFileSetReaders[index].Read() + + if err == io.EOF { + return nil, err + } + + if err != nil { + multiErr := xerrors.NewMultiError(). + Add(err). + Add(r.Close()) + return nil, multiErr.FinalError() + } + + return &minHeapEntry{ + dataFileSetReaderIndex: index, + id: id, + tags: tags, + data: data, + checksum: checksum, + }, nil +} + +func (r *crossBlockReader) Err() error { + return r.err +} + +func (r *crossBlockReader) Close() error { + // Close the resources that were buffered in minHeap: + for i, entry := range r.minHeap { + entry.id.Finalize() + entry.tags.Close() + entry.data.DecRef() + entry.data.Finalize() + r.minHeap[i] = nil + } + + r.minHeap = r.minHeap[:0] + return nil +} + +type minHeapEntry struct { + dataFileSetReaderIndex int + id ident.ID + tags ident.TagIterator + data checked.Bytes + checksum uint32 +} + +type minHeap []*minHeapEntry + +func (h minHeap) Len() int { + return len(h) +} + +func (h minHeap) Less(i, j int) bool { + idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes()) + if idsCmp == 0 { + return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex + } + return idsCmp < 0 +} + +func (h minHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minHeap) Push(x interface{}) { + *h = append(*h, x.(*minHeapEntry)) +} + +func (h *minHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil + *h = old[0 : n-1] + return x +} diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go new file mode 100644 index 0000000000..819740fee6 --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -0,0 +1,221 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "errors" + "fmt" + "io" + "strconv" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var expectedError = errors.New("expected error") + +func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().OrderedByIndex().Return(false) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}, instrument.NewTestOptions(t)) + + assert.Equal(t, errReaderNotOrderedByIndex, err) +} + +func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + dfsReader1 := NewMockDataFileSetReader(ctrl) + dfsReader1.EXPECT().OrderedByIndex().Return(true) + dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now}) + + later := now.Add(time.Hour) + dfsReader2 := NewMockDataFileSetReader(ctrl) + dfsReader2.EXPECT().OrderedByIndex().Return(true) + dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}, instrument.NewTestOptions(t)) + + expectedErr := fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", later, now) + + assert.Equal(t, expectedErr, err) +} + +func TestCrossBlockReader(t *testing.T) { + tests := []struct { + name string + blockSeriesIDs [][]string + expectedIDs []string + }{ + { + name: "no readers", + blockSeriesIDs: [][]string{}, + expectedIDs: []string{}, + }, + { + name: "empty readers", + blockSeriesIDs: [][]string{{}, {}, {}}, + expectedIDs: []string{}, + }, + { + name: "one reader, one series", + blockSeriesIDs: [][]string{{"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "one reader, many series", + blockSeriesIDs: [][]string{{"id1", "id2", "id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with same series", + blockSeriesIDs: [][]string{{"id1"}, {"id1"}, {"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "many readers with different series", + blockSeriesIDs: [][]string{{"id1"}, {"id2"}, {"id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with unordered series", + blockSeriesIDs: [][]string{{"id3"}, {"id1"}, {"id2"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "complex case", + blockSeriesIDs: [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}, + expectedIDs: []string{"id1", "id2", "id3", "id4", "id5"}, + }, + { + name: "duplicate ids within a reader", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id2", "id2"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "immediate reader error", + blockSeriesIDs: [][]string{{"error"}}, + expectedIDs: []string{}, + }, + { + name: "reader error later", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id1", "error"}}, + expectedIDs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCrossBlockReader(t, tt.blockSeriesIDs, tt.expectedIDs) + }) + } +} + +func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string, expectedIDs []string) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + var dfsReaders []DataFileSetReader + expectedBlockCount := 0 + + for blockIndex, ids := range blockSeriesIds { + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().OrderedByIndex().Return(true) + dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}).AnyTimes() + + blockHasError := false + for j, id := range ids { + tags := ident.NewTags(ident.StringTag("foo", strconv.Itoa(j))) + data := checkedBytes([]byte{byte(j)}) + checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions + if id == "error" { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), expectedError) + blockHasError = true + } else { + dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil) + } + } + + if !blockHasError { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1) + } + + dfsReaders = append(dfsReaders, dfsReader) + expectedBlockCount += len(ids) + } + + cbReader, err := NewCrossBlockReader(dfsReaders, instrument.NewTestOptions(t)) + require.NoError(t, err) + defer cbReader.Close() + + blockCount := 0 + seriesCount := 0 + for cbReader.Next() { + id, tags, records := cbReader.Current() + + strId := id.String() + id.Finalize() + assert.Equal(t, expectedIDs[seriesCount], strId) + + assert.NotNil(t, tags) + tags.Close() + + previousBlockIndex := -1 + for _, record := range records { + blockIndex := int(record.DataChecksum) // see the comment above + assert.True(t, blockIndex > previousBlockIndex, "same id blocks must be read in temporal order") + previousBlockIndex = blockIndex + assert.NotNil(t, record.Data) + record.Data.DecRef() + record.Data.Finalize() + } + + blockCount += len(records) + seriesCount++ + } + + assert.Equal(t, len(expectedIDs), seriesCount, "count of series read") + + err = cbReader.Err() + if err == nil || (err.Error() != expectedError.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { + require.NoError(t, cbReader.Err()) + assert.Equal(t, expectedBlockCount, blockCount, "count of blocks read") + } + + for _, dfsReader := range dfsReaders { + assert.NotNil(t, dfsReader) + } +} diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 084b8e2afc..583e801116 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader) // Copyright (c) 2020 Uber Technologies, Inc. // @@ -229,6 +229,20 @@ func (mr *MockDataFileSetReaderMockRecorder) Open(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockDataFileSetReader)(nil).Open), arg0) } +// OrderedByIndex mocks base method +func (m *MockDataFileSetReader) OrderedByIndex() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OrderedByIndex") + ret0, _ := ret[0].(bool) + return ret0 +} + +// OrderedByIndex indicates an expected call of OrderedByIndex +func (mr *MockDataFileSetReaderMockRecorder) OrderedByIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).OrderedByIndex)) +} + // Range mocks base method func (m *MockDataFileSetReader) Range() time0.Range { m.ctrl.T.Helper() @@ -1262,3 +1276,84 @@ func (mr *MockMergeWithMockRecorder) Read(arg0, arg1, arg2, arg3 interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockMergeWith)(nil).Read), arg0, arg1, arg2, arg3) } + +// MockCrossBlockReader is a mock of CrossBlockReader interface +type MockCrossBlockReader struct { + ctrl *gomock.Controller + recorder *MockCrossBlockReaderMockRecorder +} + +// MockCrossBlockReaderMockRecorder is the mock recorder for MockCrossBlockReader +type MockCrossBlockReaderMockRecorder struct { + mock *MockCrossBlockReader +} + +// NewMockCrossBlockReader creates a new mock instance +func NewMockCrossBlockReader(ctrl *gomock.Controller) *MockCrossBlockReader { + mock := &MockCrossBlockReader{ctrl: ctrl} + mock.recorder = &MockCrossBlockReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCrossBlockReader) EXPECT() *MockCrossBlockReaderMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockCrossBlockReader) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockCrossBlockReaderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockReader)(nil).Close)) +} + +// Current mocks base method +func (m *MockCrossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Current") + ret0, _ := ret[0].(ident.ID) + ret1, _ := ret[1].(ident.TagIterator) + ret2, _ := ret[2].([]BlockRecord) + return ret0, ret1, ret2 +} + +// Current indicates an expected call of Current +func (mr *MockCrossBlockReaderMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockCrossBlockReader)(nil).Current)) +} + +// Err mocks base method +func (m *MockCrossBlockReader) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockCrossBlockReaderMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockCrossBlockReader)(nil).Err)) +} + +// Next mocks base method +func (m *MockCrossBlockReader) Next() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockCrossBlockReaderMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockCrossBlockReader)(nil).Next)) +} diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index 88712236dc..773c7b5291 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() ( ) success := false defer func() { - // Do not close opened files if read finishes sucessfully. + // Do not close opened files if read finishes successfully. if success { return } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 68bef48ef8..69c36d28c4 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -51,6 +51,8 @@ var ( // errReadNotExpectedSize returned when the size of the next read does not match size specified by the index errReadNotExpectedSize = errors.New("next read not expected size") + errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by id") + // errReadMetadataOptimizedForRead returned when we optimized for only reading metadata but are attempting a regular read errReadMetadataOptimizedForRead = errors.New("read metadata optimized for regular read") ) @@ -102,6 +104,7 @@ type reader struct { shard uint32 volume int open bool + orderedByIndex bool // NB(bodu): Informs whether or not we optimize for only reading // metadata. We don't need to sort for reading metadata but sorting is // required if we are performing regulars reads. @@ -158,6 +161,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { dataFilepath string ) + r.orderedByIndex = opts.OrderedByIndex + switch opts.FileSetType { case persist.FileSetSnapshotType: shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) @@ -270,7 +275,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.Close() return err } - if err := r.readIndexAndSortByOffsetAsc(); err != nil { + if opts.OrderedByIndex { + r.decoder.Reset(r.indexDecoderStream) + } else if err := r.readIndexAndSortByOffsetAsc(); err != nil { r.Close() return err } @@ -290,7 +297,7 @@ func (r *reader) Status() DataFileSetReaderStatus { Shard: r.shard, Volume: r.volume, BlockStart: r.start, - BlockSize: time.Duration(r.blockSize), + BlockSize: r.blockSize, } } @@ -337,6 +344,10 @@ func (r *reader) readInfo(size int) error { } func (r *reader) readIndexAndSortByOffsetAsc() error { + if r.orderedByIndex { + return errUnexpectedSortByOffset + } + r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { entry, err := r.decoder.DecodeIndexEntry(nil) @@ -355,6 +366,56 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.orderedByIndex { + return r.readInIndexedOrder() + } + return r.readInStoredOrder() +} + +func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, nil, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, nil, 0, err + } + + var data checked.Bytes + if r.bytesPool != nil { + data = r.bytesPool.Get(int(entry.Size)) + data.IncRef() + defer data.DecRef() + } else { + data = checked.NewBytes(make([]byte, 0, entry.Size), nil) + data.IncRef() + defer data.DecRef() + } + + if entry.Offset+entry.Size > int64(len(r.dataMmap.Bytes)) { + return nil, nil, nil, 0, fmt.Errorf( + "attempt to read beyond data file size (offset=%d, size=%d, file size=%d)", + entry.Offset, entry.Size, len(r.dataMmap.Bytes)) + } + + data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size]) + + // NB(r): _must_ check the checksum against known checksum as the data + // file might not have been verified if we haven't read through the file yet. + if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) { + return nil, nil, nil, 0, errSeekChecksumMismatch + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + + r.entriesRead++ + + return id, tags, data, uint32(entry.DataChecksum), nil +} + +func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { // NB(bodu): We cannot perform regular reads if we're optimizing for only reading metadata. if r.optimizedReadMetadataOnly { return nil, nil, nil, 0, errReadMetadataOptimizedForRead @@ -401,6 +462,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err } func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.orderedByIndex { + return r.readMetadataInIndexedOrder() + } + return r.readMetadataInStoredOrder() +} + +func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, 0, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, 0, 0, err + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + length := int(entry.Size) + checksum := uint32(entry.DataChecksum) + + r.metadataRead++ + return id, tags, length, checksum, nil +} + +func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) { if r.metadataRead >= r.entries { return nil, nil, 0, 0, io.EOF } @@ -501,6 +588,10 @@ func (r *reader) MetadataRead() int { return r.metadataRead } +func (r *reader) OrderedByIndex() bool { + return r.orderedByIndex +} + func (r *reader) Close() error { // Close and prepare resources that are to be reused multiErr := xerrors.NewMultiError() diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 5f6c3ca6aa..15eb299eaa 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) { assert.NoError(t, err) _, _, _, _, err = r.Read() - assert.Error(t, err) + assert.Equal(t, io.EOF, err) assert.NoError(t, r.Close()) } @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) { BlockSize: testBlockSize, Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: uint32(shard), + Shard: shard, BlockStart: start, }, } @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0x1}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0x1}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0}, }, ) } @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0xa}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0xa}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0}, }, ) } @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) { t, map[string][]byte{ infoFileSuffix: b, - indexFileSuffix: []byte{0xa}, - dataFileSuffix: []byte{0x3}, + indexFileSuffix: {0xa}, + dataFileSuffix: {0x3}, digestFileSuffix: digestOfDigest, checkpointFileSuffix: buf, }, diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index c931946172..e3681de8e5 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags { return tags } +type testEntries []testEntry + +func (e testEntries) Less(i, j int) bool { + return e[i].id < e[j].id +} + +func (e testEntries) Len() int { + return len(e) +} + +func (e testEntries) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter { writer, err := NewWriter(testDefaultOpts. SetFilePathPrefix(filePathPrefix). @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{ readTestTypeMetadata, } +func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { + readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false) + + sortedEntries := append(make(testEntries, 0, len(entries)), entries...) + sort.Sort(sortedEntries) + + readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true) +} + // readTestData will test reading back the data matches what was written, // note that this test also tests reuse of the reader since it first reads // all the data then closes it, reopens and reads through again but just // reading the metadata the second time. // If it starts to fail during the pass that reads just the metadata it could // be a newly introduced reader reuse bug. -func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { +func readTestDataWithOrderOpt( + t *testing.T, + r DataFileSetReader, + shard uint32, + timestamp time.Time, + entries []testEntry, + orderByIndex bool, +) { for _, underTest := range readTestTypes { rOpenOpts := DataReaderOpenOptions{ Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: 0, + Shard: shard, BlockStart: timestamp, }, + OrderedByIndex: orderByIndex, } err := r.Open(rOpenOpts) require.NoError(t, err) @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim tags.Close() data.DecRef() data.Finalize() + case readTestTypeMetadata: id, tags, length, checksum, err := r.ReadMetadata() require.NoError(t, err) diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 0e8db86594..049eb11c73 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s // 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker, // and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine // running the UpdateOpenlease() function that all inactive seekers have been returned and closed at -// which point the function will return sucessfully. +// which point the function will return successfully. func (m *seekerManager) UpdateOpenLease( descriptor block.LeaseDescriptor, state block.LeaseState, diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 2c28c7e359..22f1860da1 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -120,8 +120,12 @@ type DataFileSetReaderStatus struct { // DataReaderOpenOptions is options struct for the reader open method. type DataReaderOpenOptions struct { + // Identifier allows to identify a FileSetFile. Identifier FileSetFileIdentifier + // FileSetType is the file set type. FileSetType persist.FileSetType + // OrderedByIndex enforces reading of series in the order of index (which is by series Id). + OrderedByIndex bool // NB(bodu): This option can inform the reader to optimize for reading // only metadata by not sorting index entries. Setting this option will // throw an error if a regular `Read()` is attempted. @@ -174,6 +178,10 @@ type DataFileSetReader interface { // MetadataRead returns the position of metadata read into the volume MetadataRead() int + + // OrderedByIndex returns true if the reader reads the data in the order of index. + // If false, the reader reads the data in the same order as it is stored in the data file. + OrderedByIndex() bool } // DataFileSetSeeker provides an out of order reader for a TSDB file set @@ -623,6 +631,31 @@ type Segments interface { BlockStart() time.Time } +// BlockRecord wraps together M3TSZ data bytes with their checksum. +type BlockRecord struct { + Data checked.Bytes + DataChecksum uint32 +} + +// CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, +// ordered by series id first, and block start time next. +type CrossBlockReader interface { + io.Closer + + // Next advances to the next data record and returns true, or returns false if no more data exists. + Next() bool + + // Err returns the last error encountered (if any). + Err() error + + // Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding + // to that series (in temporal order). + // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with + // them so they can be returned to their respective pools. Also, []BlockRecord slice and underlying data + // is being invalidated on each call to Next(). + Current() (ident.ID, ident.TagIterator, []BlockRecord) +} + // InfoFileResultsPerShard maps shards to info files. type InfoFileResultsPerShard map[uint32][]ReadInfoFileResult diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 0568e0099c..a24d4040b1 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -218,7 +218,7 @@ type Preparer interface { } // FlushPreparer is a persist flush cycle, each shard and block start permutation needs -// to explicility be prepared. +// to explicitly be prepared. type FlushPreparer interface { Preparer @@ -227,7 +227,7 @@ type FlushPreparer interface { } // SnapshotPreparer is a persist snapshot cycle, each shard and block start permutation needs -// to explicility be prepared. +// to explicitly be prepared. type SnapshotPreparer interface { Preparer @@ -236,7 +236,7 @@ type SnapshotPreparer interface { } // IndexFlush is a persist flush cycle, each namespace, block combination needs -// to explicility be prepared. +// to explicitly be prepared. type IndexFlush interface { // Prepare prepares writing data for a given ns/blockStart, returning a // PreparedIndexPersist object and any error encountered during diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 2c569aee3a..eda0e06ef7 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -62,6 +62,9 @@ var ( // errBootstrapEnqueued raised when trying to bootstrap and bootstrap becomes enqueued. errBootstrapEnqueued = errors.New("database bootstrapping enqueued bootstrap") + + // errColdWritesDisabled raised when trying to do large tiles aggregation with cold writes disabled. + errColdWritesDisabled = errors.New("cold writes are disabled") ) const ( diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index ab0e31a103..f1e2670951 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -1086,6 +1086,41 @@ func (d *db) OwnedNamespaces() ([]databaseNamespace, error) { return d.ownedNamespacesWithLock(), nil } +func (d *db) AggregateTiles( + ctx context.Context, + sourceNsID, + targetNsID ident.ID, + opts AggregateTilesOptions, +) (int64, error) { + ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.DBAggregateTiles) + if sampled { + sp.LogFields( + opentracinglog.String("sourceNamespace", sourceNsID.String()), + opentracinglog.String("targetNamespace", targetNsID.String()), + xopentracing.Time("start", opts.Start), + xopentracing.Time("end", opts.End), + xopentracing.Duration("step", opts.Step), + ) + } + defer sp.Finish() + + sourceNs, err := d.namespaceFor(sourceNsID) + if err != nil { + d.metrics.unknownNamespaceRead.Inc(1) + return 0, err + } + + targetNs, err := d.namespaceFor(targetNsID) + if err != nil { + d.metrics.unknownNamespaceRead.Inc(1) + return 0, err + } + + // TODO: Create and use a dedicated persist manager + pm := d.opts.PersistManager() + return targetNs.AggregateTiles(ctx, sourceNs, opts, pm) +} + func (d *db) nextIndex() uint64 { // Start with index at "1" so that a default "uniqueIndex" // with "0" is invalid (AddUint64 will return the new value). @@ -1129,3 +1164,25 @@ func (m metadatas) String() (string, error) { buf.WriteRune(']') return buf.String(), nil } + +// NewAggregateTilesOptions creates new AggregateTilesOptions. +func NewAggregateTilesOptions( + start, end time.Time, + step time.Duration, + handleCounterResets bool, +) (AggregateTilesOptions, error) { + if !end.After(start) { + return AggregateTilesOptions{}, fmt.Errorf("AggregateTilesOptions.End must be after Start, got %s - %s", start, end) + } + + if step <= 0 { + return AggregateTilesOptions{}, fmt.Errorf("AggregateTilesOptions.Step must be positive, got %s", step) + } + + return AggregateTilesOptions{ + Start: start, + End: end, + Step: step, + HandleCounterResets: handleCounterResets, + }, nil +} diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 46527674d7..41149c936d 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -57,7 +57,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" "github.com/m3db/m3/src/dbnode/testdata/prototest" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -68,9 +68,9 @@ var ( defaultTestNs1ID = ident.StringID("testns1") defaultTestNs2ID = ident.StringID("testns2") defaultTestRetentionOpts = retention.NewOptions().SetBufferFuture(10 * time.Minute).SetBufferPast(10 * time.Minute). - SetBlockSize(2 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) + SetBlockSize(2 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) defaultTestNs2RetentionOpts = retention.NewOptions().SetBufferFuture(10 * time.Minute).SetBufferPast(10 * time.Minute). - SetBlockSize(4 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) + SetBlockSize(4 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) defaultTestNs1Opts = namespace.NewOptions().SetRetentionOptions(defaultTestRetentionOpts) defaultTestNs2Opts = namespace.NewOptions().SetRetentionOptions(defaultTestNs2RetentionOpts) testSchemaHistory = prototest.NewSchemaHistory() @@ -1287,3 +1287,51 @@ func TestDatabaseIsOverloaded(t *testing.T) { mockCL.EXPECT().QueueLength().Return(int64(90)) require.Equal(t, true, d.IsOverloaded()) } + +func TestDatabaseAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + defer func() { + close(mapCh) + }() + + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm = d.opts.PersistManager() + start = time.Now().Truncate(time.Hour) + ) + + opts, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, true) + require.Error(t, err) + + sourceNs := dbAddNewMockNamespace(ctrl, d, sourceNsID.String()) + targetNs := dbAddNewMockNamespace(ctrl, d, targetNsID.String()) + targetNs.EXPECT().AggregateTiles(ctx, sourceNs, opts, pm).Return(int64(4), nil) + + processedBlockCount, err := d.AggregateTiles(ctx, sourceNsID, targetNsID, opts) + require.NoError(t, err) + assert.Equal(t, int64(4), processedBlockCount) +} + +func TestNewAggregateTilesOptions(t *testing.T) { + start := time.Now().Truncate(time.Hour) + + _, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start, time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), -time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), 0, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), time.Minute, false) + assert.NoError(t, err) +} diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 04f0459bd5..8224803041 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -158,6 +158,7 @@ type databaseNamespaceMetrics struct { snapshot instrument.MethodMetrics write instrument.MethodMetrics writeTagged instrument.MethodMetrics + aggregateTiles instrument.MethodMetrics read instrument.MethodMetrics fetchBlocks instrument.MethodMetrics fetchBlocksMetadata instrument.MethodMetrics @@ -243,6 +244,7 @@ func newDatabaseNamespaceMetrics( snapshot: instrument.NewMethodMetrics(scope, "snapshot", opts), write: instrument.NewMethodMetrics(scope, "write", opts), writeTagged: instrument.NewMethodMetrics(scope, "write-tagged", opts), + aggregateTiles: instrument.NewMethodMetrics(scope, "aggregate-tiles", opts), read: instrument.NewMethodMetrics(scope, "read", opts), fetchBlocks: instrument.NewMethodMetrics(scope, "fetchBlocks", opts), fetchBlocksMetadata: instrument.NewMethodMetrics(scope, "fetchBlocksMetadata", opts), @@ -1233,7 +1235,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { // We go through this error checking process to allow for partially successful flushes. indexColdFlushError := onColdFlushNs.Done() if indexColdFlushError == nil && onColdFlushDone != nil { - // Only evict rotated cold mutable index segments if the index cold flush was sucessful + // Only evict rotated cold mutable index segments if the index cold flush was successful // or we will lose queryability of data that's still in mem. indexColdFlushError = onColdFlushDone() } @@ -1602,3 +1604,171 @@ func (n *dbNamespace) FlushState(shardID uint32, blockStart time.Time) (fileOpSt func (n *dbNamespace) nsContextWithRLock() namespace.Context { return namespace.Context{ID: n.id, Schema: n.schemaDescr} } + +func (n *dbNamespace) AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, +) (int64, error) { + callStart := n.nowFn() + processedBlockCount, err := n.aggregateTiles(ctx, sourceNs, opts, pm) + n.metrics.aggregateTiles.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) + + return processedBlockCount, err +} + +func (n *dbNamespace) aggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, +) (int64, error) { + targetBlockSize := n.Metadata().Options().RetentionOptions().BlockSize() + blockStart := opts.Start.Truncate(targetBlockSize) + if blockStart.Add(targetBlockSize).Before(opts.End) { + return 0, fmt.Errorf("tile aggregation must be done within a single target block (start=%s, end=%s, blockSize=%s)", + opts.Start, opts.End, targetBlockSize.String()) + } + + n.RLock() + if n.bootstrapState != Bootstrapped { + n.RUnlock() + return 0, errNamespaceNotBootstrapped + } + nsCtx := n.nsContextWithRLock() + n.RUnlock() + + targetShards := n.OwnedShards() + + // Note: Cold writes must be enabled for Large Tiles to work. + if !n.nopts.ColdWritesEnabled() { + return 0, errColdWritesDisabled + } + + sourceBlockSize := sourceNs.Metadata().Options().RetentionOptions().BlockSize() + sourceBlockStart := opts.Start.Truncate(sourceBlockSize) + + sourceNsOpts := sourceNs.StorageOptions() + reader, err := fs.NewReader(sourceNsOpts.BytesPool(), sourceNsOpts.CommitLogOptions().FilesystemOptions()) + if err != nil { + return 0, err + } + + wOpts := series.WriteOptions{ + TruncateType: n.opts.TruncateType(), + SchemaDesc: nsCtx.Schema, + } + + resources, err := newColdFlushReuseableResources(n.opts) + if err != nil { + return 0, err + } + + // NB(bodu): Deferred targetShard cold flushes so that we can ensure that cold flush index data is + // persisted before persisting TSDB data to ensure crash consistency. + multiErr := xerrors.NewMultiError() + var processedBlockCount int64 + for _, targetShard := range targetShards { + sourceShard, _, err := sourceNs.readableShardAt(targetShard.ID()) + if err != nil { + detailedErr := fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err) + multiErr = multiErr.Add(detailedErr) + continue + } + shardProcessedBlockCount, err := targetShard.AggregateTiles(ctx, reader, sourceNs.ID(), sourceBlockStart, sourceShard, opts, wOpts) + processedBlockCount += shardProcessedBlockCount + if err != nil { + detailedErr := fmt.Errorf("shard %d aggregation failed: %v", targetShard.ID(), err) + multiErr = multiErr.Add(detailedErr) + continue + } + + multiErr = n.coldFlushSingleShard(nsCtx, targetShard, pm, resources, multiErr) + } + + return processedBlockCount, multiErr.FinalError() +} + +func (n *dbNamespace) coldFlushSingleShard( + nsCtx namespace.Context, + shard databaseShard, + pm persist.Manager, + resources coldFlushReuseableResources, + multiErr xerrors.MultiError, +) xerrors.MultiError { + // NB(rartoul): This value can be used for emitting metrics, but should not be used + // for business logic. + callStart := n.nowFn() + + // NB(bodu): The in-mem index will lag behind the TSDB in terms of new series writes. For a period of + // time between when we rotate out the active cold mutable index segments (happens here) and when + // we actually cold flush the data to disk we will be making writes to the newly active mutable seg. + // This means that some series can live doubly in-mem and loaded from disk until the next cold flush + // where they will be evicted from the in-mem index. + var ( + onColdFlushDone OnColdFlushDone + err error + ) + if n.reverseIndex != nil { + onColdFlushDone, err = n.reverseIndex.ColdFlush([]databaseShard{shard}) + if err != nil { + n.metrics.aggregateTiles.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error preparing to coldflush a reverse index for shard %d: %v", + shard.ID(), + err)) + } + } + + onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n) + if err != nil { + n.metrics.aggregateTiles.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error preparing to coldflush a namespace for shard %d: %v", + shard.ID(), + err)) + } + + flushPersist, err := pm.StartFlushPersist() + if err != nil { + n.metrics.aggregateTiles.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error starting flush persist for shard %d: %v", + shard.ID(), + err)) + } + + localErrors := xerrors.NewMultiError() + shardColdFlush, err := shard.ColdFlush(flushPersist, resources, nsCtx, onColdFlushNs) + if err != nil { + detailedErr := fmt.Errorf("shard %d failed to compact: %v", shard.ID(), err) + localErrors = localErrors.Add(detailedErr) + } + + // We go through this error checking process to allow for partially successful flushes. + indexColdFlushError := onColdFlushNs.Done() + if indexColdFlushError == nil && onColdFlushDone != nil { + // Only evict rotated cold mutable index segments if the index cold flush was successful + // or we will lose queryability of data that's still in mem. + indexColdFlushError = onColdFlushDone() + } + if indexColdFlushError == nil { + // NB(bodu): We only want to complete data cold flushes if the index cold flush + // is successful. If index cold flush is successful, we want to attempt writing + // of checkpoint files to complete the cold data flush lifecycle for successful shards. + localErrors = localErrors.Add(shardColdFlush.Done()) + } + localErrors = localErrors.Add(indexColdFlushError) + err = flushPersist.DoneFlush() + localErrors = multiErr.Add(err) + + res := localErrors.FinalError() + n.metrics.aggregateTiles.ReportSuccessOrError(res, n.nowFn().Sub(callStart)) + + for _, err := range localErrors.Errors() { + multiErr = multiErr.Add(err) + } + + return multiErr +} diff --git a/src/dbnode/storage/namespace_readers.go b/src/dbnode/storage/namespace_readers.go index df92ed0144..ab1ba4b089 100644 --- a/src/dbnode/storage/namespace_readers.go +++ b/src/dbnode/storage/namespace_readers.go @@ -73,6 +73,8 @@ type databaseNamespaceReaderManager interface { put(reader fs.DataFileSetReader) error + latestVolume(shard uint32, blockStart time.Time) (int, error) + assignShardSet(shardSet sharding.ShardSet) tick() diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 6566db69f4..c4e7403412 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" @@ -1300,6 +1301,110 @@ func TestNamespaceFlushState(t *testing.T) { require.Equal(t, expectedFlushState, flushState) } +func TestNamespaceAggregateTilesFailOnBootstrapping(t *testing.T) { + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapping + + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.Equal(t, errNamespaceNotBootstrapped, err) +} + +func TestNamespaceAggregateTilesFailOnDisabledColdWrites(t *testing.T) { + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapped + + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.Equal(t, errColdWritesDisabled, err) +} + +func TestNamespaceAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(2 * time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapped + targetNs.nopts = targetNs.nopts.SetColdWritesEnabled(true) + + wOpts := series.WriteOptions{ + TruncateType: targetNs.opts.TruncateType(), + SchemaDesc: targetNs.Schema(), + } + + sourceShard0 := NewMockdatabaseShard(ctrl) + sourceShard1 := NewMockdatabaseShard(ctrl) + sourceNs.shards[0] = sourceShard0 + sourceNs.shards[1] = sourceShard1 + + sourceShard0.EXPECT().IsBootstrapped().Return(true) + sourceShard1.EXPECT().IsBootstrapped().Return(true) + + targetShard0 := NewMockdatabaseShard(ctrl) + targetShard1 := NewMockdatabaseShard(ctrl) + targetNs.shards[0] = targetShard0 + targetNs.shards[1] = targetShard1 + + targetShard0.EXPECT().ID().Return(uint32(0)) + targetShard1.EXPECT().ID().Return(uint32(1)) + + sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String()) + targetShard0.EXPECT().AggregateTiles(ctx, gomock.Any(), sourceNsIDMatcher, start, sourceShard0, opts, wOpts).Return(int64(3), nil) + targetShard1.EXPECT().AggregateTiles(ctx, gomock.Any(), sourceNsIDMatcher, start, sourceShard1, opts, wOpts).Return(int64(2), nil) + + shardColdFlush0 := NewMockShardColdFlush(ctrl) + shardColdFlush0.EXPECT().Done().Return(nil) + shardColdFlush1 := NewMockShardColdFlush(ctrl) + shardColdFlush1.EXPECT().Done().Return(nil) + + nsCtx := targetNs.nsContextWithRLock() + onColdFlushNs, err := targetNs.opts.OnColdFlush().ColdFlushNamespace(targetNs) + require.NoError(t, err) + targetShard0.EXPECT().ColdFlush(gomock.Any(), gomock.Any(), nsCtx, onColdFlushNs).Return(shardColdFlush0, nil) + targetShard1.EXPECT().ColdFlush(gomock.Any(), gomock.Any(), nsCtx, onColdFlushNs).Return(shardColdFlush1, nil) + + processedBlockCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.NoError(t, err) + assert.Equal(t, int64(3+2), processedBlockCount) +} + func waitForStats( reporter xmetrics.TestStatsReporter, check func(xmetrics.TestStatsReporter) bool, diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 00e70dc31d..13fb047f5f 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -21,6 +21,7 @@ package storage import ( + "bytes" "container/list" "errors" "fmt" @@ -30,6 +31,8 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/generated/proto/pagetoken" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -210,6 +213,7 @@ type dbShardMetrics struct { insertAsyncWriteInternalErrors tally.Counter insertAsyncWriteInvalidParamsErrors tally.Counter insertAsyncIndexErrors tally.Counter + largeTilesWriteErrors tally.Counter snapshotTotalLatency tally.Timer snapshotCheckNeedsSnapshotLatency tally.Timer snapshotPrepareLatency tally.Timer @@ -247,6 +251,10 @@ func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics { "error_type": "reverse-index", "suberror_type": "write-batch-error", }).Counter(insertErrorName), + largeTilesWriteErrors: scope.Tagged(map[string]string{ + "error_type": "large-tiles", + "suberror_type": "write-error", + }).Counter(insertErrorName), snapshotTotalLatency: snapshotScope.Timer("total-latency"), snapshotCheckNeedsSnapshotLatency: snapshotScope.Timer("check-needs-snapshot-latency"), snapshotPrepareLatency: snapshotScope.Timer("prepare-latency"), @@ -2650,6 +2658,78 @@ func (s *dbShard) Repair( return repairer.Repair(ctx, nsCtx, nsMeta, tr, s) } +func (s *dbShard) AggregateTiles( + ctx context.Context, + reader fs.DataFileSetReader, + sourceNsID ident.ID, + sourceBlockStart time.Time, + sourceShard databaseShard, + opts AggregateTilesOptions, + wOpts series.WriteOptions, +) (int64, error) { + latestSourceVolume, err := sourceShard.latestVolume(sourceBlockStart) + if err != nil { + return 0, err + } + + openOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: sourceNsID, + Shard: sourceShard.ID(), + BlockStart: sourceBlockStart, + VolumeIndex: latestSourceVolume, + }, + FileSetType: persist.FileSetFlushType, + //TODO add after streaming supported - OrderByIndex: true + } + if err := reader.Open(openOpts); err != nil { + return 0, err + } + defer reader.Close() + + encodingOpts := encoding.NewOptions().SetBytesPool(s.opts.BytesPool()) + bytesReader := bytes.NewReader(nil) + dataPointIter := m3tsz.NewReaderIterator(bytesReader, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) + var lastWriteError error + var processedBlockCount int64 + + for { + id, tags, data, _, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return processedBlockCount, err + } + + data.IncRef() + bytesReader.Reset(data.Bytes()) + dataPointIter.Reset(bytesReader, nil) + + for dataPointIter.Next() { + dp, unit, annot := dataPointIter.Current() + _, err = s.writeAndIndex(ctx, id, tags, dp.Timestamp, dp.Value, unit, annot, wOpts, true) + if err != nil { + s.metrics.largeTilesWriteErrors.Inc(1) + lastWriteError = err + } + } + + dataPointIter.Close() + + data.DecRef() + data.Finalize() + + processedBlockCount++ + } + + s.logger.Debug("finished aggregating tiles", + zap.Uint32("shard", s.ID()), + zap.Int64("processedBlocks", processedBlockCount)) + + return processedBlockCount, lastWriteError +} + func (s *dbShard) BootstrapState() BootstrapState { s.RLock() bs := s.bootstrapState @@ -2671,6 +2751,10 @@ func (s *dbShard) DocRef(id ident.ID) (doc.Document, bool, error) { return emptyDoc, false, err } +func (s *dbShard) latestVolume(blockStart time.Time) (int, error) { + return s.namespaceReaderMgr.latestVolume(s.shard, blockStart) +} + func (s *dbShard) logFlushResult(r dbShardFlushResult) { s.logger.Debug("shard flush outcome", zap.Uint32("shard", s.ID()), diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 06396b8274..127ec80a38 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -23,6 +23,7 @@ package storage import ( "errors" "fmt" + "io" "io/ioutil" "os" "strconv" @@ -190,8 +191,8 @@ func TestShardBootstrapWithFlushVersion(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) ) opts = opts. SetCommitLogOptions(newClOpts) @@ -268,8 +269,8 @@ func TestShardBootstrapWithFlushVersionNoCleanUp(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) ) opts = opts. SetCommitLogOptions(newClOpts) @@ -326,8 +327,8 @@ func TestShardBootstrapWithCacheShardIndices(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) mockRetriever = block.NewMockDatabaseBlockRetriever(ctrl) ) opts = opts.SetCommitLogOptions(newClOpts) @@ -1758,3 +1759,45 @@ func TestShardIterateBatchSize(t *testing.T) { require.True(t, shardIterateBatchMinSize < iterateBatchSize(2000)) } + +func TestAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + ctx = context.NewContext() + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + bytes = checked.NewBytes([]byte{}, checked.NewBytesOptions()) + ) + + sourceShard := testDatabaseShard(t, DefaultTestOptions()) + defer sourceShard.Close() + + targetShard := testDatabaseShard(t, DefaultTestOptions()) + defer targetShard.Close() + + latestSourceVolume, err := sourceShard.latestVolume(opts.Start) + require.NoError(t, err) + + sourceNsID := sourceShard.namespace.ID() + readerOpenOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: sourceNsID, + Shard: sourceShard.ID(), + BlockStart: opts.Start, + VolumeIndex: latestSourceVolume, + }, + FileSetType: persist.FileSetFlushType, + } + + reader := fs.NewMockDataFileSetReader(ctrl) + reader.EXPECT().Open(readerOpenOpts).Return(nil) + reader.EXPECT().Read().Return(ident.StringID("id1"), ident.EmptyTagIterator, bytes, uint32(11), nil) + reader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF) + reader.EXPECT().Close() + + processedBlockCount, err := targetShard.AggregateTiles(ctx, reader, sourceNsID, start, sourceShard, opts, series.WriteOptions{}) + require.NoError(t, err) + assert.Equal(t, int64(1), processedBlockCount) +} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 7fdf6edb4c..7b0e5a34d9 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -488,6 +488,21 @@ func (mr *MockDatabaseMockRecorder) FlushState(namespace, shardID, blockStart in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushState", reflect.TypeOf((*MockDatabase)(nil).FlushState), namespace, shardID, blockStart) } +// AggregateTiles mocks base method +func (m *MockDatabase) AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNsID, targetNsID, opts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockDatabaseMockRecorder) AggregateTiles(ctx, sourceNsID, targetNsID, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockDatabase)(nil).AggregateTiles), ctx, sourceNsID, targetNsID, opts) +} + // Mockdatabase is a mock of database interface type Mockdatabase struct { ctrl *gomock.Controller @@ -883,6 +898,21 @@ func (mr *MockdatabaseMockRecorder) FlushState(namespace, shardID, blockStart in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushState", reflect.TypeOf((*Mockdatabase)(nil).FlushState), namespace, shardID, blockStart) } +// AggregateTiles mocks base method +func (m *Mockdatabase) AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNsID, targetNsID, opts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseMockRecorder) AggregateTiles(ctx, sourceNsID, targetNsID, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*Mockdatabase)(nil).AggregateTiles), ctx, sourceNsID, targetNsID, opts) +} + // OwnedNamespaces mocks base method func (m *Mockdatabase) OwnedNamespaces() ([]databaseNamespace, error) { m.ctrl.T.Helper() @@ -1532,6 +1562,37 @@ func (mr *MockdatabaseNamespaceMockRecorder) WritePendingIndexInserts(pending in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritePendingIndexInserts", reflect.TypeOf((*MockdatabaseNamespace)(nil).WritePendingIndexInserts), pending) } +// AggregateTiles mocks base method +func (m *MockdatabaseNamespace) AggregateTiles(ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, pm persist.Manager) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, opts, pm) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(ctx, sourceNs, opts, pm interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), ctx, sourceNs, opts, pm) +} + +// readableShardAt mocks base method +func (m *MockdatabaseNamespace) readableShardAt(shardID uint32) (databaseShard, namespace.Context, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "readableShardAt", shardID) + ret0, _ := ret[0].(databaseShard) + ret1, _ := ret[1].(namespace.Context) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// readableShardAt indicates an expected call of readableShardAt +func (mr *MockdatabaseNamespaceMockRecorder) readableShardAt(shardID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readableShardAt", reflect.TypeOf((*MockdatabaseNamespace)(nil).readableShardAt), shardID) +} + // MockShard is a mock of Shard interface type MockShard struct { ctrl *gomock.Controller @@ -2009,6 +2070,36 @@ func (mr *MockdatabaseShardMockRecorder) DocRef(id interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DocRef", reflect.TypeOf((*MockdatabaseShard)(nil).DocRef), id) } +// AggregateTiles mocks base method +func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, reader fs.DataFileSetReader, sourceNsID ident.ID, sourceBlockStart time.Time, sourceShard databaseShard, opts AggregateTilesOptions, wOpts series.WriteOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts) +} + +// latestVolume mocks base method +func (m *MockdatabaseShard) latestVolume(blockStart time.Time) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "latestVolume", blockStart) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// latestVolume indicates an expected call of latestVolume +func (mr *MockdatabaseShardMockRecorder) latestVolume(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "latestVolume", reflect.TypeOf((*MockdatabaseShard)(nil).latestVolume), blockStart) +} + // MockShardColdFlush is a mock of ShardColdFlush interface type MockShardColdFlush struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index bc8e79d47a..eee15bf1f0 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -219,6 +219,9 @@ type Database interface { // FlushState returns the flush state for the specified shard and block start. FlushState(namespace ident.ID, shardID uint32, blockStart time.Time) (fileOpState, error) + + // AggregateTiles does large tile aggregation from source namespace to target namespace. + AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) } // database is the internal database interface. @@ -405,6 +408,16 @@ type databaseNamespace interface { // WritePendingIndexInserts will write any pending index inserts. WritePendingIndexInserts(pending []writes.PendingIndexInsert) error + + // AggregateTiles does large tile aggregation from source namespace into this namespace. + AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, + ) (int64, error) + + readableShardAt(shardID uint32) (databaseShard, namespace.Context, error) } // SeriesReadWriteRef is a read/write reference for a series, @@ -581,6 +594,19 @@ type databaseShard interface { // DocRef returns the doc if already present in a shard series. DocRef(id ident.ID) (doc.Document, bool, error) + + // AggregateTiles does large tile aggregation from source shards into this shard. + AggregateTiles( + ctx context.Context, + reader fs.DataFileSetReader, + sourceNsID ident.ID, + sourceBlockStart time.Time, + sourceShard databaseShard, + opts AggregateTilesOptions, + wOpts series.WriteOptions, + ) (int64, error) + + latestVolume(blockStart time.Time) (int, error) } // ShardSnapshotResult is a result from a shard snapshot. @@ -1243,3 +1269,14 @@ type newFSMergeWithMemFn func( dirtySeries *dirtySeriesMap, dirtySeriesToWrite map[xtime.UnixNano]*idList, ) fs.MergeWith + +// AggregateTilesOptions is the options for large tile aggregation. +type AggregateTilesOptions struct { + // Start and End specify the aggregation window. + Start, End time.Time + // Step is the downsampling step. + Step time.Duration + // HandleCounterResets is temporarily used to force counter reset handling logics on the processed series. + // TODO: remove once we have metrics type stored in the metadata. + HandleCounterResets bool +} diff --git a/src/dbnode/tracepoint/tracepoint.go b/src/dbnode/tracepoint/tracepoint.go index 2a3dab06ce..300c14254c 100644 --- a/src/dbnode/tracepoint/tracepoint.go +++ b/src/dbnode/tracepoint/tracepoint.go @@ -46,6 +46,9 @@ const ( // FetchReadSegment is the operation name for the tchannelthrift FetchReadSegment path. FetchReadSegment = "tchannelthrift/node.service.FetchReadSegment" + // AggregateTiles is the operation name for the tchannelthrift AggregateTiles path. + AggregateTiles = "tchannelthrift/node.service.AggregateTiles" + // DBQueryIDs is the operation name for the db QueryIDs path. DBQueryIDs = "storage.db.QueryIDs" @@ -64,6 +67,9 @@ const ( // DBWriteBatch is the operation name for the db WriteBatch path. DBWriteBatch = "storage.db.WriteBatch" + // DBAggregateTiles is the operation name for the db AggregateTiles path. + DBAggregateTiles = "storage.db.AggregateTiles" + // NSQueryIDs is the operation name for the dbNamespace QueryIDs path. NSQueryIDs = "storage.dbNamespace.QueryIDs"