Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first MVCC support #209

Merged
merged 5 commits into from
Sep 21, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.4.2
- 1.5

script: make
6 changes: 3 additions & 3 deletions kv/index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ func (c *kvIndex) Delete(txn Transaction, indexedValues []interface{}, h int64)
}

func hasPrefix(prefix []byte) FnKeyCmp {
return func(k []byte) bool {
return bytes.HasPrefix(k, prefix)
return func(k Key) bool {
return bytes.HasPrefix([]byte(k), prefix)
}
}

// Drop removes the KV index from store.
func (c *kvIndex) Drop(txn Transaction) error {
prefix := []byte(c.prefix)
it, err := txn.Seek(prefix, hasPrefix(prefix))
it, err := txn.Seek(Key(prefix), hasPrefix(prefix))
if err != nil {
return err
}
Expand Down
57 changes: 49 additions & 8 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,47 @@

package kv

import "bytes"

// EncodedKey represents encoded key in low-level storage engine.
type EncodedKey []byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here why not using Key type but define a EncodedKey?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to distinguish these two types. Key cannot be use as EncodedKey, although they are both []byte.


// Key represents high-level Key type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use comment end of .

type Key []byte

// Next returns the next key in byte-order
func (k Key) Next() Key {
// add \x0 to the end of key
return append(append([]byte(nil), []byte(k)...), 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should allocate slice twice?

First allocing a slice with same size with k, then appending 0 also re-allocs another slice with size len(k) + 1 ?

buf := make([]byte, len(k) + 1)
copy(buf, k)
return buf

maybe more efficient, I don't know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

}

// Cmp returns the comparison result of two key, if A > B returns 1, A = B
// returns 0, if A < B returns -1.
func (k Key) Cmp(another Key) int {
return bytes.Compare(k, another)
}

// Cmp returns the comparison result of two key, if A > B returns 1, A = B
// returns 0, if A < B returns -1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if A > B returns 1, else if A = B returns 0, else returns -1. seems better.

func (k EncodedKey) Cmp(another EncodedKey) int {
return bytes.Compare(k, another)
}

// Next returns the next key in byte-order
func (k EncodedKey) Next() EncodedKey {
return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil))
}

// VersionProvider provides increasing IDs.
type VersionProvider interface {
GetCurrentVer() (Version, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps GetCurVersion or GetCurrVersion is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer CurrentVersion :)

}

// Version is the wrapper of KV's version.
type Version struct {
Ver uint64
}

// DecodeFn is a function that decode data after fetch from store.
type DecodeFn func(raw interface{}) (interface{}, error)

Expand All @@ -23,29 +64,29 @@ type EncodeFn func(raw interface{}) (interface{}, error)
// This is not thread safe.
type Transaction interface {
// Get gets the value for key k from KV store.
Get(k []byte) ([]byte, error)
Get(k Key) ([]byte, error)
// Set sets the value for key k as v into KV store.
Set(k []byte, v []byte) error
Set(k Key, v []byte) error
// Seek searches for the entry with key k in KV store.
Seek(k []byte, fnKeyCmp func(key []byte) bool) (Iterator, error)
Seek(k Key, fnKeyCmp func(key Key) bool) (Iterator, error)
// Inc increases the value for key k in KV store by step.
Inc(k []byte, step int64) (int64, error)
Inc(k Key, step int64) (int64, error)
// Deletes removes the entry for key k from KV store.
Delete(k []byte) error
Delete(k Key) error
// Commit commites the transaction operations to KV store.
Commit() error
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements Stringer.String() interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(keys ...[]byte) error
LockKeys(keys ...Key) error
}

// Snapshot defines the interface for the snapshot fetched from KV store.
type Snapshot interface {
// Get gets the value for key k from snapshot.
Get(k []byte) ([]byte, error)
Get(k Key) ([]byte, error)
// NewIterator gets a new iterator on the snapshot.
NewIterator(param interface{}) Iterator
// Release releases the snapshot to store.
Expand All @@ -71,7 +112,7 @@ type Storage interface {
}

// FnKeyCmp is the function for iterator the keys
type FnKeyCmp func(key []byte) bool
type FnKeyCmp func(key Key) bool

// Iterator is the interface for a interator on KV store.
type Iterator interface {
Expand Down
37 changes: 24 additions & 13 deletions store/localstore/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
package localstore

import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/engine"
"github.com/pingcap/tidb/util/codec"
"github.com/twinj/uuid"
)

Expand All @@ -34,8 +33,8 @@ type dbStore struct {
mu sync.Mutex
db engine.DB

txns map[int64]*dbTxn
keysLocked map[string]int64
txns map[uint64]*dbTxn
keysLocked map[string]uint64
uuid string
path string
}
Expand All @@ -46,12 +45,14 @@ type storeCache struct {
}

var (
globalID int64
mc storeCache
globalID int64
globalVerProvider kv.VersionProvider
mc storeCache
)

func init() {
mc.cache = make(map[string]*dbStore)
globalVerProvider = &LocalVersioProvider{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalVersioProvider -> LocalVersionProvider

}

// Driver implements kv.Driver interface.
Expand All @@ -77,8 +78,8 @@ func (d Driver) Open(schema string) (kv.Storage, error) {

log.Info("New store", schema)
s := &dbStore{
txns: make(map[int64]*dbTxn),
keysLocked: make(map[string]int64),
txns: make(map[uint64]*dbTxn),
keysLocked: make(map[string]uint64),
uuid: uuid.NewV4().String(),
path: schema,
db: db,
Expand All @@ -103,9 +104,10 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
return nil, err
}

beginVer, _ := globalVerProvider.GetCurrentVer()
txn := &dbTxn{
startTs: time.Now(),
tID: atomic.AddInt64(&globalID, 1),
tID: beginVer.Ver,
valid: true,
store: s,
snapshotVals: make(map[string][]byte),
Expand Down Expand Up @@ -143,21 +145,30 @@ func (s *dbStore) newBatch() engine.Batch {
}

// Both lock and unlock are used for simulating scenario of percolator papers

func (s *dbStore) tryConditionLockKey(tID int64, key string, snapshotVal []byte) error {
func (s *dbStore) tryConditionLockKey(tID uint64, key string, snapshotVal []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.keysLocked[key]; ok {
return errors.Trace(kv.ErrLockConflict)
}

currValue, err := s.db.Get([]byte(key))
metaKey := codec.EncodeBytes(nil, []byte(key))
currValue, err := s.db.Get(metaKey)
if err == kv.ErrNotExist || currValue == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error2.ErrorEqual

// If it's a new key, we won't need to check its version
return nil
}
if err != nil {
return errors.Trace(err)
}
_, ver, err := codec.DecodeUint(currValue)
if err != nil {
return errors.Trace(err)
}

if !bytes.Equal(currValue, snapshotVal) {
// If there's newer version of this key, returns error.
if ver > tID {
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q, snapshotVal:%q", tID, key, currValue, snapshotVal)
return errors.Trace(kv.ErrConditionNotMatch)
}
Expand Down
18 changes: 14 additions & 4 deletions store/localstore/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"testing"

"github.com/ngaut/log"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/goleveldb"
Expand Down Expand Up @@ -76,12 +77,12 @@ func mustDel(c *C, txn kv.Transaction) {
}

func encodeInt(n int) []byte {
return []byte(fmt.Sprintf("%10d", n))
return []byte(fmt.Sprintf("%010d", n))
}

func decodeInt(s []byte) int {
var n int
fmt.Sscanf(string(s), "%10d", &n)
fmt.Sscanf(string(s), "%010d", &n)
return n
}

Expand Down Expand Up @@ -168,6 +169,12 @@ func (s *testKVSuite) TestSeek(c *C) {

insertData(c, txn)

ss, _ := s.s.(*dbStore).db.GetSnapshot()
i := ss.NewIterator(nil)
for i.Next() {
log.Warn("!!!!", i.Key(), i.Value())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

}

checkSeek(c, txn)

// Check transaction results
Expand Down Expand Up @@ -197,7 +204,6 @@ func (s *testKVSuite) TestInc(c *C) {

txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()

n, err = txn.Inc(key, -200)
c.Assert(err, IsNil)
Expand All @@ -210,8 +216,13 @@ func (s *testKVSuite) TestInc(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(100))

log.Info(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line.

err = txn.Delete(key)
c.Assert(err, IsNil)

err = txn.Commit()
c.Assert(err, IsNil)

}

func (s *testKVSuite) TestDelete(c *C) {
Expand Down Expand Up @@ -271,7 +282,6 @@ func (s *testKVSuite) TestDelete2(c *C) {
it, err = it.Next(nil)
c.Assert(err, IsNil)
}

txn.Commit()

txn, err = s.s.Begin()
Expand Down
29 changes: 29 additions & 0 deletions store/localstore/local_version_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package localstore

import (
"sync"
"time"

"github.com/pingcap/tidb/kv"
)

// LocalVersioProvider uses local timestamp for version.
type LocalVersioProvider struct {
mu sync.Mutex
lastTimeStampTs int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastTimeStamp is enough for this field ?

n int64
}

// GetCurrentVer implements the VersionProvider's GetCurrentVer interface.
func (l *LocalVersioProvider) GetCurrentVer() (kv.Version, error) {
l.mu.Lock()
defer l.mu.Unlock()
ts := (time.Now().UnixNano() / int64(time.Millisecond)) << 18
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 18 here?
using a const name may be better.

if l.lastTimeStampTs == ts {
l.n++
return kv.Version{uint64(ts + l.n)}, nil
}
l.lastTimeStampTs = ts
l.n = 0
return kv.Version{uint64(ts)}, nil
}
47 changes: 47 additions & 0 deletions store/localstore/mvcc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package localstore

import (
"bytes"

"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/codec"
)

var tombstone = []byte{'\xde', '\xad'}

func isTombstone(v []byte) bool {
return bytes.Compare(v, tombstone) == 0
}

// MvccEncodeVersionKey returns the encoded key
func MvccEncodeVersionKey(key kv.Key, ver kv.Version) kv.EncodedKey {
b := codec.EncodeBytes(nil, key)
ret := codec.EncodeUintDesc(b, ver.Ver)
return ret
}

// MvccDecode parses the origin key and version of an encoded key, if the encoded key is a meta key,
// just returns the origin key
func MvccDecode(encodedKey kv.EncodedKey) (kv.Key, kv.Version, error) {
// Skip DataPrefix
remainBytes, key, err := codec.DecodeBytes([]byte(encodedKey))
if err != nil {
// should never happen
return nil, kv.Version{}, errors.Trace(err)
}
// if it's meta key
if len(remainBytes) == 0 {
return key, kv.Version{}, nil
}
var ver uint64
remainBytes, ver, err = codec.DecodeUintDesc(remainBytes)
if err != nil {
// should never happen
return nil, kv.Version{}, errors.Trace(err)
}
if len(remainBytes) != 0 {
return nil, kv.Version{}, errors.New("invalid encoded key")
}
return key, kv.Version{ver}, nil
}
Loading