Skip to content

Commit

Permalink
Merge #28362
Browse files Browse the repository at this point in the history
28362: engine: create new OpLoggerBatch to log logical MVCC ops r=nvanbenschoten a=nvanbenschoten

This change creates a new Batch implementation that logs logical
MVCC operations. This allows a Replica to selectively decide when
a request's logical MVCC operations should be attached to a raft
command. This is important for rangefeed because it operates off
of logical operations, not physical ones.

With this change, a Replica will never decide to turn on logical op
logging, so it doesn't change any behavior yet. A future change will
include logic to conditionally log and replica logical MVCC ops.
This turns out to be fairly involved, so I figured it would be preferable
to split this into its own PR. A description of the tentative plan for that
is included in `replica.go`. Suggestions are very welcome!

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 9, 2018
2 parents 7f48c63 + 05903a8 commit 9ed0086
Show file tree
Hide file tree
Showing 14 changed files with 785 additions and 99 deletions.
19 changes: 16 additions & 3 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents {
// c) data which isn't sent to the followers but the proposer needs for tasks
// it must run when the command has applied (such as resolving intents).
type Result struct {
Local LocalResult
Replicated storagebase.ReplicatedEvalResult
WriteBatch *storagebase.WriteBatch
Local LocalResult
Replicated storagebase.ReplicatedEvalResult
WriteBatch *storagebase.WriteBatch
LogicalOpLog *storagebase.LogicalOpLog
}

// IsZero reports whether p is the zero value.
Expand All @@ -153,6 +154,9 @@ func (p *Result) IsZero() bool {
if p.WriteBatch != nil {
return false
}
if p.LogicalOpLog != nil {
return false
}
return true
}

Expand Down Expand Up @@ -333,6 +337,15 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Local.UpdatedTxns = nil

if q.LogicalOpLog != nil {
if p.LogicalOpLog == nil {
p.LogicalOpLog = q.LogicalOpLog
} else {
p.LogicalOpLog.Ops = append(p.LogicalOpLog.Ops, q.LogicalOpLog.Ops...)
}
}
q.LogicalOpLog = nil

if !q.IsZero() {
log.Fatalf(context.TODO(), "unhandled EvalResult: %s", pretty.Diff(q, Result{}))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/batcheval/result/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func TestEvalResultIsZero(t *testing.T) {
case **storagebase.WriteBatch:
*f = new(storagebase.WriteBatch)
defer func() { *f = nil }()
case **storagebase.LogicalOpLog:
*f = new(storagebase.LogicalOpLog)
defer func() { *f = nil }()
default:
tf := v.Type().Field(i)
t.Fatalf("unknown field %s of type %s on %T", tf.Name, tf.Type, p)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ type Writer interface {
// sstables). Currently only used for performance testing of appending to the
// RocksDB WAL.
LogData(data []byte) error
// LogLogicalOp logs the specified logical mvcc operation with the provided
// details to the writer, if it has logical op logging enabled. For most
// Writer implementations, this is a no-op.
LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails)
}

// ReadWriter is the read/write interface to an engine's data.
Expand Down
15 changes: 14 additions & 1 deletion pkg/storage/engine/enginepb/mvcc3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package enginepb

import proto "github.com/gogo/protobuf/proto"
import (
"fmt"

proto "github.com/gogo/protobuf/proto"
)

// ToStats converts the receiver to an MVCCStats.
func (ms *MVCCStatsDelta) ToStats() MVCCStats {
Expand Down Expand Up @@ -47,3 +51,12 @@ var isolationTypeLowerCase = map[int32]string{
func (x IsolationType) ToLowerCaseString() string {
return proto.EnumName(isolationTypeLowerCase, int32(x))
}

// MustSetValue is like SetValue, except it resets the enum and panics if the
// provided value is not a valid variant type.
func (op *MVCCLogicalOp) MustSetValue(value interface{}) {
op.Reset()
if !op.SetValue(value) {
panic(fmt.Sprintf("%T excludes %T", op, value))
}
}
49 changes: 49 additions & 0 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,9 +1197,23 @@ func mvccPutInternal(
if ms != nil {
updateStatsForInline(ms, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize)
}
if err == nil {
engine.LogLogicalOp(MVCCWriteValueOpType, MVCCLogicalOpDetails{
Key: key,
Value: roachpb.Value{RawBytes: value},
Safe: true,
})
}
return err
}

// Determine what the logical operation is. Are we writing an intent
// or a value directly?
logicalOp := MVCCWriteValueOpType
if txn != nil {
logicalOp = MVCCWriteIntentOpType
}

var meta *enginepb.MVCCMetadata
var maybeTooOldErr error
var prevValSize int64
Expand Down Expand Up @@ -1236,6 +1250,7 @@ func mvccPutInternal(
// the same timestamp (see comments in else block) we can
// overwrite the existing intent; otherwise we must manually
// delete the old intent, taking care with MVCC stats.
logicalOp = MVCCUpdateIntentOpType
if metaTimestamp.Less(timestamp) {
{
// If the older write intent has a version underneath it, we need to
Expand Down Expand Up @@ -1361,6 +1376,20 @@ func mvccPutInternal(
metaKeySize, metaValSize, meta, newMeta))
}

// Log the logical MVCC operation.
logicalOpDetails := MVCCLogicalOpDetails{
Key: key,
Value: roachpb.Value{
Timestamp: timestamp,
RawBytes: value,
},
Safe: true,
}
if buf.newMeta.Txn != nil {
logicalOpDetails.Txn = *buf.newMeta.Txn
}
engine.LogLogicalOp(logicalOp, logicalOpDetails)

return maybeTooOldErr
}

Expand Down Expand Up @@ -2317,6 +2346,20 @@ func mvccResolveWriteIntent(
return false, err
}
}

// Log the logical MVCC operation.
logicalOp := MVCCCommitIntentOpType
if pushed {
logicalOp = MVCCUpdateIntentOpType
}
engine.LogLogicalOp(logicalOp, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Value: roachpb.Value{
Timestamp: intent.Txn.Timestamp,
},
})

return true, nil
}

Expand Down Expand Up @@ -2347,6 +2390,12 @@ func mvccResolveWriteIntent(
return false, err
}

// Log the logical MVCC operation.
engine.LogLogicalOp(MVCCAbortIntentOpType, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
})

unsafeNextKey, unsafeNextValue, ok, err := unsafeNextVersion(iter, latestKey)
if err != nil {
return false, err
Expand Down
183 changes: 183 additions & 0 deletions pkg/storage/engine/mvcc_logical_ops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package engine

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
)

// MVCCLogicalOpType is an enum with values corresponding to each of the
// enginepb.MVCCLogicalOp variants.
//
// LogLogicalOp takes an MVCCLogicalOpType and a corresponding
// MVCCLogicalOpDetails instead of an enginepb.MVCCLogicalOp variant for two
// reasons. First, it serves as a form of abstraction so that callers of the
// method don't need to construct protos themselves. More importantly, it also
// avoids allocations in the common case where Writer.LogLogicalOp is a no-op.
// This makes LogLogicalOp essentially free for cases where logical op logging
// is disabled.
type MVCCLogicalOpType int

const (
// MVCCWriteValueOpType corresponds to the MVCCWriteValueOp variant.
MVCCWriteValueOpType MVCCLogicalOpType = iota
// MVCCWriteIntentOpType corresponds to the MVCCWriteIntentOp variant.
MVCCWriteIntentOpType
// MVCCUpdateIntentOpType corresponds to the MVCCUpdateIntentOp variant.
MVCCUpdateIntentOpType
// MVCCCommitIntentOpType corresponds to the MVCCCommitIntentOp variant.
MVCCCommitIntentOpType
// MVCCAbortIntentOpType corresponds to the MVCCAbortIntentOp variant.
MVCCAbortIntentOpType
)

// MVCCLogicalOpDetails contains details about the occurrence of an MVCC logical
// operation.
type MVCCLogicalOpDetails struct {
Txn enginepb.TxnMeta
Key roachpb.Key
Value roachpb.Value

// Safe indicates that the values in this struct will never be invalidated
// at a later point. If the details object cannot promise that its values
// will never be invalidated, an OpLoggerBatch will make a copy of all
// references before adding it to the log. TestMVCCOpLogWriter fails without
// this.
Safe bool
}

// OpLoggerBatch records a log of logical MVCC operations.
type OpLoggerBatch struct {
Batch
distinct distinctOpLoggerBatch
distinctOpen bool

ops []enginepb.MVCCLogicalOp
opsAlloc bufalloc.ByteAllocator
}

// NewOpLoggerBatch creates a new batch that logs logical mvcc operations and
// wraps the provided batch.
func NewOpLoggerBatch(b Batch) *OpLoggerBatch {
ol := &OpLoggerBatch{Batch: b}
ol.distinct.parent = ol
return ol
}

var _ Batch = &OpLoggerBatch{}

// LogLogicalOp implements the Writer interface.
func (ol *OpLoggerBatch) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) {
if ol.distinctOpen {
panic("distinct batch already open")
}
ol.logLogicalOp(op, details)
ol.Batch.LogLogicalOp(op, details)
}

func (ol *OpLoggerBatch) logLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) {
switch op {
case MVCCWriteValueOpType:
if !details.Safe {
ol.opsAlloc, details.Key = ol.opsAlloc.Copy(details.Key, 0)
ol.opsAlloc, details.Value.RawBytes = ol.opsAlloc.Copy(details.Value.RawBytes, 0)
}

ol.recordOp(&enginepb.MVCCWriteValueOp{
Key: details.Key,
Timestamp: details.Value.Timestamp,
Value: details.Value.RawBytes,
})
case MVCCWriteIntentOpType:
if !details.Safe {
ol.opsAlloc, details.Txn.Key = ol.opsAlloc.Copy(details.Txn.Key, 0)
}

ol.recordOp(&enginepb.MVCCWriteIntentOp{
TxnID: details.Txn.ID,
TxnKey: details.Txn.Key,
Timestamp: details.Value.Timestamp,
})
case MVCCUpdateIntentOpType:
ol.recordOp(&enginepb.MVCCUpdateIntentOp{
TxnID: details.Txn.ID,
Timestamp: details.Value.Timestamp,
})
case MVCCCommitIntentOpType:
if !details.Safe {
ol.opsAlloc, details.Key = ol.opsAlloc.Copy(details.Key, 0)
}

ol.recordOp(&enginepb.MVCCCommitIntentOp{
TxnID: details.Txn.ID,
Key: details.Key,
Timestamp: details.Value.Timestamp,
})
case MVCCAbortIntentOpType:
ol.recordOp(&enginepb.MVCCAbortIntentOp{
TxnID: details.Txn.ID,
})
default:
panic(fmt.Sprintf("unexpected op type %v", op))
}
}

func (ol *OpLoggerBatch) recordOp(op interface{}) {
ol.ops = append(ol.ops, enginepb.MVCCLogicalOp{})
ol.ops[len(ol.ops)-1].MustSetValue(op)
}

// LogicalOps returns the list of all logical MVCC operations that have been
// recorded by the logger.
func (ol *OpLoggerBatch) LogicalOps() []enginepb.MVCCLogicalOp {
if ol == nil {
return nil
}
return ol.ops
}

// Distinct implements the Batch interface.
func (ol *OpLoggerBatch) Distinct() ReadWriter {
if ol.distinctOpen {
panic("distinct batch already open")
}
ol.distinctOpen = true
ol.distinct.ReadWriter = ol.Batch.Distinct()
return &ol.distinct
}

type distinctOpLoggerBatch struct {
ReadWriter
parent *OpLoggerBatch
}

// LogLogicalOp implements the Writer interface.
func (dlw *distinctOpLoggerBatch) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) {
dlw.parent.logLogicalOp(op, details)
dlw.ReadWriter.LogLogicalOp(op, details)
}

// Close implements the Reader interface.
func (dlw *distinctOpLoggerBatch) Close() {
if !dlw.parent.distinctOpen {
panic("distinct batch not open")
}
dlw.parent.distinctOpen = false
dlw.ReadWriter.Close()
}
Loading

0 comments on commit 9ed0086

Please sign in to comment.