Skip to content

Commit

Permalink
Merge pull request #209 from pingcap/c4pt0r/MVCC-support
Browse files Browse the repository at this point in the history
first MVCC support
  • Loading branch information
c4pt0r committed Sep 21, 2015
2 parents 6b7c3bf + 20180eb commit aa78175
Show file tree
Hide file tree
Showing 11 changed files with 439 additions and 68 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.4.2
- 1.5

script: make
6 changes: 3 additions & 3 deletions kv/index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ func (c *kvIndex) Delete(txn Transaction, indexedValues []interface{}, h int64)
}

func hasPrefix(prefix []byte) FnKeyCmp {
return func(k []byte) bool {
return bytes.HasPrefix(k, prefix)
return func(k Key) bool {
return bytes.HasPrefix([]byte(k), prefix)
}
}

// Drop removes the KV index from store.
func (c *kvIndex) Drop(txn Transaction) error {
prefix := []byte(c.prefix)
it, err := txn.Seek(prefix, hasPrefix(prefix))
it, err := txn.Seek(Key(prefix), hasPrefix(prefix))
if err != nil {
return err
}
Expand Down
61 changes: 52 additions & 9 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,82 @@

package kv

import "bytes"

// EncodedKey represents encoded key in low-level storage engine.
type EncodedKey []byte

// Key represents high-level Key type.
type Key []byte

// Next returns the next key in byte-order.
func (k Key) Next() Key {
// add \x0 to the end of key
buf := make([]byte, len([]byte(k))+1)
copy(buf, []byte(k))
return buf
}

// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k Key) Cmp(another Key) int {
return bytes.Compare(k, another)
}

// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k EncodedKey) Cmp(another EncodedKey) int {
return bytes.Compare(k, another)
}

// Next returns the next key in byte-order.
func (k EncodedKey) Next() EncodedKey {
return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil))
}

// VersionProvider provides increasing IDs.
type VersionProvider interface {
CurrentVersion() (Version, error)
}

// Version is the wrapper of KV's version.
type Version struct {
Ver uint64
}

// DecodeFn is a function that decode data after fetch from store.
type DecodeFn func(raw interface{}) (interface{}, error)

// EncodeFn is a function that encode data before put into store
// EncodeFn is a function that encode data before put into store.
type EncodeFn func(raw interface{}) (interface{}, error)

// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {
// Get gets the value for key k from KV store.
Get(k []byte) ([]byte, error)
Get(k Key) ([]byte, error)
// Set sets the value for key k as v into KV store.
Set(k []byte, v []byte) error
Set(k Key, v []byte) error
// Seek searches for the entry with key k in KV store.
Seek(k []byte, fnKeyCmp func(key []byte) bool) (Iterator, error)
Seek(k Key, fnKeyCmp func(key Key) bool) (Iterator, error)
// Inc increases the value for key k in KV store by step.
Inc(k []byte, step int64) (int64, error)
Inc(k Key, step int64) (int64, error)
// Deletes removes the entry for key k from KV store.
Delete(k []byte) error
Delete(k Key) error
// Commit commites the transaction operations to KV store.
Commit() error
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements Stringer.String() interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(keys ...[]byte) error
LockKeys(keys ...Key) error
}

// Snapshot defines the interface for the snapshot fetched from KV store.
type Snapshot interface {
// Get gets the value for key k from snapshot.
Get(k []byte) ([]byte, error)
Get(k Key) ([]byte, error)
// NewIterator gets a new iterator on the snapshot.
NewIterator(param interface{}) Iterator
// Release releases the snapshot to store.
Expand All @@ -71,7 +114,7 @@ type Storage interface {
}

// FnKeyCmp is the function for iterator the keys
type FnKeyCmp func(key []byte) bool
type FnKeyCmp func(key Key) bool

// Iterator is the interface for a interator on KV store.
type Iterator interface {
Expand Down
41 changes: 28 additions & 13 deletions store/localstore/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package localstore

import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/engine"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/errors2"
"github.com/twinj/uuid"
)

Expand All @@ -34,8 +34,8 @@ type dbStore struct {
mu sync.Mutex
db engine.DB

txns map[int64]*dbTxn
keysLocked map[string]int64
txns map[uint64]*dbTxn
keysLocked map[string]uint64
uuid string
path string
}
Expand All @@ -46,12 +46,14 @@ type storeCache struct {
}

var (
globalID int64
mc storeCache
globalID int64
globalVerProvider kv.VersionProvider
mc storeCache
)

func init() {
mc.cache = make(map[string]*dbStore)
globalVerProvider = &LocalVersioProvider{}
}

// Driver implements kv.Driver interface.
Expand All @@ -77,8 +79,8 @@ func (d Driver) Open(schema string) (kv.Storage, error) {

log.Info("New store", schema)
s := &dbStore{
txns: make(map[int64]*dbTxn),
keysLocked: make(map[string]int64),
txns: make(map[uint64]*dbTxn),
keysLocked: make(map[string]uint64),
uuid: uuid.NewV4().String(),
path: schema,
db: db,
Expand All @@ -103,9 +105,13 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
return nil, err
}

beginVer, err := globalVerProvider.CurrentVersion()
if err != nil {
return nil, err
}
txn := &dbTxn{
startTs: time.Now(),
tID: atomic.AddInt64(&globalID, 1),
tID: beginVer.Ver,
valid: true,
store: s,
snapshotVals: make(map[string][]byte),
Expand Down Expand Up @@ -143,21 +149,30 @@ func (s *dbStore) newBatch() engine.Batch {
}

// Both lock and unlock are used for simulating scenario of percolator papers

func (s *dbStore) tryConditionLockKey(tID int64, key string, snapshotVal []byte) error {
func (s *dbStore) tryConditionLockKey(tID uint64, key string, snapshotVal []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.keysLocked[key]; ok {
return errors.Trace(kv.ErrLockConflict)
}

currValue, err := s.db.Get([]byte(key))
metaKey := codec.EncodeBytes(nil, []byte(key))
currValue, err := s.db.Get(metaKey)
if errors2.ErrorEqual(err, kv.ErrNotExist) || currValue == nil {
// If it's a new key, we won't need to check its version
return nil
}
if err != nil {
return errors.Trace(err)
}
_, ver, err := codec.DecodeUint(currValue)
if err != nil {
return errors.Trace(err)
}

if !bytes.Equal(currValue, snapshotVal) {
// If there's newer version of this key, returns error.
if ver > tID {
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q, snapshotVal:%q", tID, key, currValue, snapshotVal)
return errors.Trace(kv.ErrConditionNotMatch)
}
Expand Down
10 changes: 5 additions & 5 deletions store/localstore/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func mustDel(c *C, txn kv.Transaction) {
}

func encodeInt(n int) []byte {
return []byte(fmt.Sprintf("%10d", n))
return []byte(fmt.Sprintf("%010d", n))
}

func decodeInt(s []byte) int {
var n int
fmt.Sscanf(string(s), "%10d", &n)
fmt.Sscanf(string(s), "%010d", &n)
return n
}

Expand Down Expand Up @@ -167,7 +167,6 @@ func (s *testKVSuite) TestSeek(c *C) {
c.Assert(err, IsNil)

insertData(c, txn)

checkSeek(c, txn)

// Check transaction results
Expand Down Expand Up @@ -197,7 +196,6 @@ func (s *testKVSuite) TestInc(c *C) {

txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()

n, err = txn.Inc(key, -200)
c.Assert(err, IsNil)
Expand All @@ -212,6 +210,9 @@ func (s *testKVSuite) TestInc(c *C) {

err = txn.Delete(key)
c.Assert(err, IsNil)

err = txn.Commit()
c.Assert(err, IsNil)
}

func (s *testKVSuite) TestDelete(c *C) {
Expand Down Expand Up @@ -271,7 +272,6 @@ func (s *testKVSuite) TestDelete2(c *C) {
it, err = it.Next(nil)
c.Assert(err, IsNil)
}

txn.Commit()

txn, err = s.s.Begin()
Expand Down
43 changes: 43 additions & 0 deletions store/localstore/local_version_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package localstore

import (
"errors"
"sync"
"time"

"github.com/pingcap/tidb/kv"
)

// ErrOverflow is the error returned by CurrentVersion, it describes if
// there're too many versions allocations in a very short period of time, ID
// may conflict.
var ErrOverflow = errors.New("overflow when allocating new version")

// LocalVersioProvider uses local timestamp for version.
type LocalVersioProvider struct {
mu sync.Mutex
lastTimeStampTs uint64
n uint64
}

const (
timePrecisionOffset = 18
)

// CurrentVersion implements the VersionProvider's GetCurrentVer interface.
func (l *LocalVersioProvider) CurrentVersion() (kv.Version, error) {
l.mu.Lock()
defer l.mu.Unlock()
var ts uint64
ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset)
if l.lastTimeStampTs == uint64(ts) {
l.n++
if l.n >= 1<<timePrecisionOffset {
return kv.Version{}, ErrOverflow
}
return kv.Version{ts + l.n}, nil
}
l.lastTimeStampTs = ts
l.n = 0
return kv.Version{ts}, nil
}
46 changes: 46 additions & 0 deletions store/localstore/mvcc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package localstore

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/codec"
)

// ErrInvalidEncodedKey describes parsing an invalid format of EncodedKey
var ErrInvalidEncodedKey = errors.New("invalid encoded key")

func isTombstone(v []byte) bool {
return len(v) == 0
}

// MvccEncodeVersionKey returns the encoded key
func MvccEncodeVersionKey(key kv.Key, ver kv.Version) kv.EncodedKey {
b := codec.EncodeBytes(nil, key)
ret := codec.EncodeUintDesc(b, ver.Ver)
return ret
}

// MvccDecode parses the origin key and version of an encoded key, if the encoded key is a meta key,
// just returns the origin key
func MvccDecode(encodedKey kv.EncodedKey) (kv.Key, kv.Version, error) {
// Skip DataPrefix
remainBytes, key, err := codec.DecodeBytes([]byte(encodedKey))
if err != nil {
// should never happen
return nil, kv.Version{}, errors.Trace(err)
}
// if it's meta key
if len(remainBytes) == 0 {
return key, kv.Version{}, nil
}
var ver uint64
remainBytes, ver, err = codec.DecodeUintDesc(remainBytes)
if err != nil {
// should never happen
return nil, kv.Version{}, errors.Trace(err)
}
if len(remainBytes) != 0 {
return nil, kv.Version{}, ErrInvalidEncodedKey
}
return key, kv.Version{ver}, nil
}
Loading

0 comments on commit aa78175

Please sign in to comment.