Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent checkTx #141

Merged
merged 5 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,33 +213,38 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

var mode runTxMode

switch {
case req.Type == abci.CheckTxType_New:
mode = runTxModeCheck

case req.Type == abci.CheckTxType_Recheck:
mode = runTxModeReCheck
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

default:
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

gInfo, result, err := app.runTx(mode, req.Tx)
gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
return sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
}

return abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking.
app.setCheckState(req.Header)
return abci.ResponseBeginRecheckTx{Code: abci.CodeTypeOK}
}

// EndRecheckTx implements the ABCI interface.
func (app *BaseApp) EndRecheckTx(req abci.RequestEndRecheckTx) abci.ResponseEndRecheckTx {
return abci.ResponseEndRecheckTx{Code: abci.CodeTypeOK}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
Expand All @@ -258,7 +263,12 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx)
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
}

gInfo, result, err := app.runTx(req.Tx, tx, false)
if err != nil {
resultStr = "failed"
return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
Expand All @@ -275,11 +285,10 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx

// Commit implements the ABCI interface. It will commit all state that exists in
// the deliver state's multi-store and includes the resulting commit ID in the
// returned abci.ResponseCommit. Commit will set the check state based on the
// latest header and reset the deliver state. Also, if a non-zero halt height is
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
// returned abci.ResponseCommit. Commit will reset the deliver state.
// Also, if a non-zero halt height is defined in config, Commit will execute
// a deferred function call to check against that height and gracefully halt if
// it matches the latest committed height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
defer telemetry.MeasureSince(time.Now(), "abci", "commit")

Expand All @@ -293,12 +302,6 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
commitID := app.cms.Commit()
app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
//
// NOTE: This is safe because Tendermint holds a lock on the mempool for
// Commit. Use the header from this latest block.
app.setCheckState(header)

// empty/reset the deliver state
app.deliverState = nil

Expand Down
88 changes: 88 additions & 0 deletions baseapp/accountlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package baseapp

import (
"encoding/binary"
"sort"
"sync"

sdk "github.com/line/lbm-sdk/v2/types"
)

// NOTE should 1 <= sampleBytes <= 4. If modify it, you should revise `getAddressKey()` as well
const sampleBytes = 2

type AccountLock struct {
accMtx [1 << (sampleBytes * 8)]sync.Mutex
}

func (al *AccountLock) Lock(ctx sdk.Context, tx sdk.Tx) []uint32 {
if !ctx.IsCheckTx() || ctx.IsReCheckTx() {
return nil
}

signers := getSigners(tx)
accKeys := getUniqSortedAddressKey(signers)

for _, key := range accKeys {
al.accMtx[key].Lock()
}

return accKeys
}

func (al *AccountLock) Unlock(accKeys []uint32) {
// NOTE reverse order
for i, length := 0, len(accKeys); i < length; i++ {
key := accKeys[length-1-i]
al.accMtx[key].Unlock()
}
}

func getSigners(tx sdk.Tx) []sdk.AccAddress {
seen := map[string]bool{}
var signers []sdk.AccAddress
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, addr)
seen[addr.String()] = true
}
}
}
return signers
}

func getUniqSortedAddressKey(addrs []sdk.AccAddress) []uint32 {
accKeys := make([]uint32, 0, len(addrs))
for _, addr := range addrs {
accKeys = append(accKeys, getAddressKey(addr))
}

accKeys = uniq(accKeys)
sort.Sort(uint32Slice(accKeys))

return accKeys
}

func getAddressKey(addr sdk.AccAddress) uint32 {
return uint32(binary.BigEndian.Uint16(addr))
}

func uniq(u []uint32) []uint32 {
seen := map[uint32]bool{}
var ret []uint32
for _, v := range u {
if !seen[v] {
ret = append(ret, v)
seen[v] = true
}
}
return ret
}

// Uint32Slice attaches the methods of Interface to []uint32, sorting in increasing order.
type uint32Slice []uint32

func (p uint32Slice) Len() int { return len(p) }
func (p uint32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
119 changes: 119 additions & 0 deletions baseapp/accountlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package baseapp

import (
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

ostproto "github.com/line/ostracon/proto/ostracon/types"

"github.com/line/lbm-sdk/v2/crypto/keys/secp256k1"
"github.com/line/lbm-sdk/v2/testutil/testdata"
sdk "github.com/line/lbm-sdk/v2/types"
)

func TestAccountLock(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, ostproto.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}

app.accountLock.Unlock(accKeys)

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
}

func TestGetUniqSortedAddressKey(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)

// length should be reduced because `duplicated` is removed
require.Less(t, len(accKeys), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
if i != j {
require.True(t, iv != jv)
}
}
}

// should be sorted
require.True(t, sort.IsSorted(uint32Slice(accKeys)))
}

type AccountLockTestTx struct {
Msgs []sdk.Msg
}

var _ sdk.Tx = AccountLockTestTx{}

func (tx AccountLockTestTx) GetMsgs() []sdk.Msg {
return tx.Msgs
}

func (tx AccountLockTestTx) ValidateBasic() error {
return nil
}

func newTestPrivKeys(num int) []*secp256k1.PrivKey {
privs := make([]*secp256k1.PrivKey, 0, num)
for i := 0; i < num; i++ {
privs = append(privs, secp256k1.GenPrivKey())
}
return privs
}

func getAddrs(privs []*secp256k1.PrivKey) []sdk.AccAddress {
addrs := make([]sdk.AccAddress, 0, len(privs))
for _, priv := range privs {
addrs = append(addrs, sdk.AccAddress(priv.PubKey().Address()))
}
return addrs
}

func newTestTx(privs []*secp256k1.PrivKey) sdk.Tx {
addrs := getAddrs(privs)
msgs := make([]sdk.Msg, len(addrs))
for i, addr := range addrs {
msgs[i] = testdata.NewTestMsg(addr)
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading