Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add write batch #137

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/statebased/statebasedimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestAddOrg(t *testing.T) {

// bad role type
err = ep.AddOrgs("unknown", "Org1")
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: statebased.RoleType("unknown")}, err)
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: "unknown"}, err)
assert.EqualError(t, err, "role type unknown does not exist")

epBytes, err := ep.Policy()
Expand Down
3 changes: 1 addition & 2 deletions shim/chaincodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/hyperledger/fabric-chaincode-go/v2/shim/internal"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"

"google.golang.org/grpc/keepalive"
)

// TLSProperties passed to ChaincodeServer
type TLSProperties struct {
//Disabled forces default to be TLS enabled
// Disabled forces default to be TLS enabled
Disabled bool
Key []byte
Cert []byte
Expand Down
200 changes: 199 additions & 1 deletion shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const (
created state = "created" // start state
established state = "established" // connection established
ready state = "ready" // ready for requests

defaultMaxSizeWriteBatch = 100
prefixMetaDataWriteBatch = "m"
prefixStateDataWriteBatch = "s"
)

// PeerChaincodeStream is the common stream interface for Peer - chaincode communication.
Expand Down Expand Up @@ -46,6 +50,13 @@ type Handler struct {
cc Chaincode
// state holds the current state of this handler.
state state
// if you can send the changes in batches.
usePeerWriteBatch bool
maxSizeWriteBatch uint32
batchMutex sync.RWMutex
batch map[string]map[string]*peer.WriteRecord
startWriteBatchMutex sync.RWMutex
startWriteBatch map[string]bool

// Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode
// responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub.
Expand Down Expand Up @@ -150,6 +161,8 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode
cc: chaincode,
responseChannels: map[string]chan *peer.ChaincodeMessage{},
state: created,
batch: map[string]map[string]*peer.WriteRecord{},
startWriteBatch: map[string]bool{},
}
}

Expand Down Expand Up @@ -188,6 +201,11 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage
return nil, fmt.Errorf("failed to marshal response: %s", err)
}

err = h.sendBatch(msg.ChannelId, msg.Txid)
if err != nil {
return nil, fmt.Errorf("failed send batch: %s", err)
}

return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
}

Expand All @@ -214,6 +232,11 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode
return nil, fmt.Errorf("failed to marshal response: %s", err)
}

err = h.sendBatch(msg.ChannelId, msg.Txid)
if err != nil {
return nil, fmt.Errorf("failed send batch: %s", err)
}

return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
}

Expand Down Expand Up @@ -312,6 +335,17 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI

// handlePutState communicates with the peer to put state information into the ledger.
func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Value: value,
Collection: collection,
Type: peer.WriteRecord_PUT_STATE,
}
return nil
}

// Construct payload for PUT_STATE
payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value})

Expand Down Expand Up @@ -340,6 +374,19 @@ func (h *Handler) handlePutState(collection string, key string, value []byte, ch
func (h *Handler) handlePutStateMetadataEntry(collection string, key string, metakey string, metadata []byte, channelID string, txID string) error {
// Construct payload for PUT_STATE_METADATA
md := &peer.StateMetadata{Metakey: metakey, Value: metadata}

if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txID) {
st := h.batchByID(channelID, txID)
st[prefixMetaDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Metadata: md,
Type: peer.WriteRecord_PUT_STATE_METADATA,
}

return nil
}

payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md})

msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID}
Expand All @@ -365,6 +412,16 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met

// handleDelState communicates with the peer to delete a key from the state in the ledger.
func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Type: peer.WriteRecord_DEL_STATE,
}
return nil
}

payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
// Execute the request and get response
Expand All @@ -388,6 +445,16 @@ func (h *Handler) handleDelState(collection string, key string, channelID string

// handlePurgeState communicates with the peer to purge a state from private data
func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
}
return nil
}

payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
// Execute the request and get response
Expand All @@ -409,6 +476,98 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
}

// handleWriteBatch communicates with the peer to write batch to state all changes information into the ledger.
func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string, txid string) error {
// Construct payload for PUT_STATE_BATCH
payloadBytes := marshalOrPanic(batch)

msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_WRITE_BATCH_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}

// Execute the request and get response
responseMsg, err := h.callPeerWithChaincodeMsg(msg, channelID, txid)
if err != nil {
return fmt.Errorf("[%s] error sending %s: %s", msg.Txid, peer.ChaincodeMessage_WRITE_BATCH_STATE, err)
}

if responseMsg.Type == peer.ChaincodeMessage_RESPONSE {
// Success response
return nil
}

if responseMsg.Type == peer.ChaincodeMessage_ERROR {
// Error response
return fmt.Errorf("%s", responseMsg.Payload[:])
}

// Incorrect chaincode message received
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
}

func (h *Handler) sendBatch(channelID string, txid string) error {
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txid) {
return nil
}

st := h.batchByID(channelID, txid)
txCtxID := transactionContextID(channelID, txid)

defer func() {
h.batchMutex.Lock()
h.startWriteBatchMutex.Lock()

delete(h.batch, txCtxID)
delete(h.startWriteBatch, txCtxID)

h.startWriteBatchMutex.Unlock()
h.batchMutex.Unlock()
}()

batch := &peer.WriteBatchState{}
for _, kv := range st {
batch.Rec = append(batch.Rec, kv)
if len(batch.Rec) >= int(h.maxSizeWriteBatch) {
err := h.handleWriteBatch(batch, channelID, txid)
if err != nil {
return fmt.Errorf("failed send batch: %s", err)
}
batch.Rec = batch.Rec[:0]
}
}

if len(batch.Rec) != 0 {
err := h.handleWriteBatch(batch, channelID, txid)
if err != nil {
return fmt.Errorf("failed send batch: %s", err)
}
}

return nil
}

func (h *Handler) handleStartWriteBatch(channelID string, txID string) {
if !h.usePeerWriteBatch {
return
}

txCtxID := transactionContextID(channelID, txID)
h.startWriteBatchMutex.Lock()
defer h.startWriteBatchMutex.Unlock()

h.startWriteBatch[txCtxID] = true
}

func (h *Handler) handleFinishWriteBatch(channelID string, txID string) error {
return h.sendBatch(channelID, txID)
}

func (h *Handler) isStartWriteBatch(channelID string, txID string) bool {
txCtxID := transactionContextID(channelID, txID)
h.startWriteBatchMutex.RLock()
defer h.startWriteBatchMutex.RUnlock()

return h.startWriteBatch[txCtxID]
}

func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte,
channelID string, txid string) (*peer.QueryResponse, error) {
// Send GET_STATE_BY_RANGE message to peer chaincode support
Expand Down Expand Up @@ -655,6 +814,23 @@ func (h *Handler) handleEstablished(msg *peer.ChaincodeMessage) error {
}

h.state = ready
if len(msg.Payload) == 0 {
return nil
}

ccAdditionalParams := &peer.ChaincodeAdditionalParams{}
err := proto.Unmarshal(msg.Payload, ccAdditionalParams)
if err != nil {
return nil
}

h.usePeerWriteBatch = ccAdditionalParams.UseWriteBatch
h.maxSizeWriteBatch = ccAdditionalParams.MaxSizeWriteBatch

if h.usePeerWriteBatch && h.maxSizeWriteBatch < defaultMaxSizeWriteBatch {
h.maxSizeWriteBatch = defaultMaxSizeWriteBatch
}

return nil
}

Expand Down Expand Up @@ -697,7 +873,29 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err
return nil
}

// marshalOrPanic attempts to marshal the provided protobbuf message but will panic
func (h *Handler) batchByID(channelID string, txID string) map[string]*peer.WriteRecord {
txCtxID := transactionContextID(channelID, txID)

h.batchMutex.RLock()
st, ok := h.batch[txCtxID]
h.batchMutex.RUnlock()
if ok {
return st
}

h.batchMutex.Lock()
defer h.batchMutex.Unlock()
st, ok = h.batch[txCtxID]
if ok {
return st
}

st = make(map[string]*peer.WriteRecord)
h.batch[txCtxID] = st
return st
}

// marshalOrPanic attempts to marshal the provided protobuf message but will panic
// when marshaling fails instead of returning an error.
func marshalOrPanic(msg proto.Message) []byte {
bytes, err := proto.Marshal(msg)
Expand Down
29 changes: 15 additions & 14 deletions shim/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/hyperledger/fabric-chaincode-go/v2/shim/internal/mock"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -49,6 +48,8 @@ func TestNewHandler_CreatedState(t *testing.T) {
cc: cc,
responseChannels: map[string]chan *peer.ChaincodeMessage{},
state: created,
batch: map[string]map[string]*peer.WriteRecord{},
startWriteBatch: map[string]bool{},
}

handler := newChaincodeHandler(chatStream, cc)
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestHandlerState(t *testing.T) {
}
err := handler.handleMessage(test.msg, nil)
if test.expectedErr != "" {
assert.Contains(t, err.Error(), test.expectedErr)
assert.ErrorContains(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestHandleMessage(t *testing.T) {

err := handler.handleMessage(test.msg, nil)
if test.expectedErr != "" {
assert.Contains(t, err.Error(), test.expectedErr)
assert.ErrorContains(t, err, test.expectedErr)
} else {
if err != nil {
t.Fatalf("Unexpected error for '%s': %s", test.name, err)
Expand Down Expand Up @@ -264,36 +265,36 @@ func TestHandlePeerCalls(t *testing.T) {
// force error by removing responseChannels
h.responseChannels = nil
_, err = h.handleGetState("col", "key", "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE")
assert.ErrorContains(t, err, "[txid] error sending GET_STATE")

_, err = h.handleGetPrivateDataHash("col", "key", "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending GET_PRIVATE_DATA_HASH")
assert.ErrorContains(t, err, "[txid] error sending GET_PRIVATE_DATA_HASH")

_, err = h.handleGetStateMetadata("col", "key", "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_METADATA")
assert.ErrorContains(t, err, "[txid] error sending GET_STATE_METADATA")

err = h.handlePutState("col", "key", []byte{}, "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE")
assert.ErrorContains(t, err, "[txid] error sending PUT_STATE")

err = h.handlePutStateMetadataEntry("col", "key", "mkey", []byte{}, "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE_METADATA")
assert.ErrorContains(t, err, "[txid] error sending PUT_STATE_METADATA")

err = h.handleDelState("col", "key", "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending DEL_STATE")
assert.ErrorContains(t, err, "[txid] error sending DEL_STATE")

_, err = h.handleGetStateByRange("col", "start", "end", []byte{}, "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_BY_RANGE")
assert.ErrorContains(t, err, "[txid] error sending GET_STATE_BY_RANGE")

_, err = h.handleQueryStateNext("id", "channel", "txid")
assert.Contains(t, err.Error(), "cannot create response channel")
assert.ErrorContains(t, err, "cannot create response channel")

_, err = h.handleQueryStateClose("id", "channel", "txid")
assert.Contains(t, err.Error(), "cannot create response channel")
assert.ErrorContains(t, err, "cannot create response channel")

_, err = h.handleGetQueryResult("col", "query", []byte{}, "channel", "txid")
assert.Contains(t, err.Error(), "[txid] error sending GET_QUERY_RESULT")
assert.ErrorContains(t, err, "[txid] error sending GET_QUERY_RESULT")

_, err = h.handleGetHistoryForKey("key", "channel", "txid")
assert.Contains(t, err.Error(), "cannot create response channel")
assert.ErrorContains(t, err, "cannot create response channel")

}
Loading
Loading