From 86df42c9dc36ea2d57f164bc24cc9cd2fdf9a0d1 Mon Sep 17 00:00:00 2001 From: Fedor Partanskiy Date: Tue, 20 Aug 2024 04:03:51 +0300 Subject: [PATCH] add put state batch Signed-off-by: Fedor Partanskiy --- pkg/statebased/statebasedimpl_test.go | 2 +- shim/chaincodeserver.go | 3 +- shim/handler.go | 200 +++++++++++++++++++++++++- shim/handler_test.go | 29 ++-- shim/interfaces.go | 14 +- shim/internal/config.go | 2 +- shim/internal/server.go | 8 +- shim/shim.go | 8 +- shim/shim_test.go | 1 - shim/stub.go | 12 ++ shim/stub_test.go | 58 +++++++- 11 files changed, 300 insertions(+), 37 deletions(-) diff --git a/pkg/statebased/statebasedimpl_test.go b/pkg/statebased/statebasedimpl_test.go index fd0e8202..f6fb948c 100644 --- a/pkg/statebased/statebasedimpl_test.go +++ b/pkg/statebased/statebasedimpl_test.go @@ -22,7 +22,7 @@ func TestAddOrg(t *testing.T) { // bad role type err = ep.AddOrgs("unknown", "Org1") - assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: statebased.RoleType("unknown")}, err) + assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: "unknown"}, err) assert.EqualError(t, err, "role type unknown does not exist") epBytes, err := ep.Policy() diff --git a/shim/chaincodeserver.go b/shim/chaincodeserver.go index 455ec4a1..1bfb22d2 100644 --- a/shim/chaincodeserver.go +++ b/shim/chaincodeserver.go @@ -9,13 +9,12 @@ import ( "github.com/hyperledger/fabric-chaincode-go/v2/shim/internal" "github.com/hyperledger/fabric-protos-go-apiv2/peer" - "google.golang.org/grpc/keepalive" ) // TLSProperties passed to ChaincodeServer type TLSProperties struct { - //Disabled forces default to be TLS enabled + // Disabled forces default to be TLS enabled Disabled bool Key []byte Cert []byte diff --git a/shim/handler.go b/shim/handler.go index a465ae26..eed207d4 100644 --- a/shim/handler.go +++ b/shim/handler.go @@ -18,6 +18,10 @@ const ( created state = "created" // start state established state = "established" // connection established ready state = "ready" // ready for requests + + defaultMaxSizeWriteBatch = 100 + prefixMetaDataWriteBatch = "m" + prefixStateDataWriteBatch = "s" ) // PeerChaincodeStream is the common stream interface for Peer - chaincode communication. @@ -46,6 +50,13 @@ type Handler struct { cc Chaincode // state holds the current state of this handler. state state + // if you can send the changes in batches. + usePeerWriteBatch bool + maxSizeWriteBatch uint32 + batchMutex sync.RWMutex + batch map[string]map[string]*peer.WriteRecord + startWriteBatchMutex sync.RWMutex + startWriteBatch map[string]bool // Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode // responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub. @@ -150,6 +161,8 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode cc: chaincode, responseChannels: map[string]chan *peer.ChaincodeMessage{}, state: created, + batch: map[string]map[string]*peer.WriteRecord{}, + startWriteBatch: map[string]bool{}, } } @@ -188,6 +201,11 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage return nil, fmt.Errorf("failed to marshal response: %s", err) } + err = h.sendBatch(msg.ChannelId, msg.Txid) + if err != nil { + return nil, fmt.Errorf("failed send batch: %s", err) + } + return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil } @@ -214,6 +232,11 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode return nil, fmt.Errorf("failed to marshal response: %s", err) } + err = h.sendBatch(msg.ChannelId, msg.Txid) + if err != nil { + return nil, fmt.Errorf("failed send batch: %s", err) + } + return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil } @@ -312,6 +335,17 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI // handlePutState communicates with the peer to put state information into the ledger. func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error { + if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) { + st := h.batchByID(channelID, txid) + st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{ + Key: key, + Value: value, + Collection: collection, + Type: peer.WriteRecord_PUT_STATE, + } + return nil + } + // Construct payload for PUT_STATE payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value}) @@ -340,6 +374,19 @@ func (h *Handler) handlePutState(collection string, key string, value []byte, ch func (h *Handler) handlePutStateMetadataEntry(collection string, key string, metakey string, metadata []byte, channelID string, txID string) error { // Construct payload for PUT_STATE_METADATA md := &peer.StateMetadata{Metakey: metakey, Value: metadata} + + if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txID) { + st := h.batchByID(channelID, txID) + st[prefixMetaDataWriteBatch+collection+key] = &peer.WriteRecord{ + Key: key, + Collection: collection, + Metadata: md, + Type: peer.WriteRecord_PUT_STATE_METADATA, + } + + return nil + } + payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md}) msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID} @@ -365,6 +412,16 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met // handleDelState communicates with the peer to delete a key from the state in the ledger. func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error { + if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) { + st := h.batchByID(channelID, txid) + st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{ + Key: key, + Collection: collection, + Type: peer.WriteRecord_DEL_STATE, + } + return nil + } + payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key}) msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID} // Execute the request and get response @@ -388,6 +445,16 @@ func (h *Handler) handleDelState(collection string, key string, channelID string // handlePurgeState communicates with the peer to purge a state from private data func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error { + if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) { + st := h.batchByID(channelID, txid) + st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{ + Key: key, + Collection: collection, + Type: peer.WriteRecord_PURGE_PRIVATE_DATA, + } + return nil + } + payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key}) msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID} // Execute the request and get response @@ -409,6 +476,98 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR) } +// handleWriteBatch communicates with the peer to write batch to state all changes information into the ledger. +func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string, txid string) error { + // Construct payload for PUT_STATE_BATCH + payloadBytes := marshalOrPanic(batch) + + msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_WRITE_BATCH_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID} + + // Execute the request and get response + responseMsg, err := h.callPeerWithChaincodeMsg(msg, channelID, txid) + if err != nil { + return fmt.Errorf("[%s] error sending %s: %s", msg.Txid, peer.ChaincodeMessage_WRITE_BATCH_STATE, err) + } + + if responseMsg.Type == peer.ChaincodeMessage_RESPONSE { + // Success response + return nil + } + + if responseMsg.Type == peer.ChaincodeMessage_ERROR { + // Error response + return fmt.Errorf("%s", responseMsg.Payload[:]) + } + + // Incorrect chaincode message received + return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR) +} + +func (h *Handler) sendBatch(channelID string, txid string) error { + if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txid) { + return nil + } + + st := h.batchByID(channelID, txid) + txCtxID := transactionContextID(channelID, txid) + + defer func() { + h.batchMutex.Lock() + h.startWriteBatchMutex.Lock() + + delete(h.batch, txCtxID) + delete(h.startWriteBatch, txCtxID) + + h.startWriteBatchMutex.Unlock() + h.batchMutex.Unlock() + }() + + batch := &peer.WriteBatchState{} + for _, kv := range st { + batch.Rec = append(batch.Rec, kv) + if len(batch.Rec) >= int(h.maxSizeWriteBatch) { + err := h.handleWriteBatch(batch, channelID, txid) + if err != nil { + return fmt.Errorf("failed send batch: %s", err) + } + batch.Rec = batch.Rec[:0] + } + } + + if len(batch.Rec) != 0 { + err := h.handleWriteBatch(batch, channelID, txid) + if err != nil { + return fmt.Errorf("failed send batch: %s", err) + } + } + + return nil +} + +func (h *Handler) handleStartWriteBatch(channelID string, txID string) { + if !h.usePeerWriteBatch { + return + } + + txCtxID := transactionContextID(channelID, txID) + h.startWriteBatchMutex.Lock() + defer h.startWriteBatchMutex.Unlock() + + h.startWriteBatch[txCtxID] = true +} + +func (h *Handler) handleFinishWriteBatch(channelID string, txID string) error { + return h.sendBatch(channelID, txID) +} + +func (h *Handler) isStartWriteBatch(channelID string, txID string) bool { + txCtxID := transactionContextID(channelID, txID) + h.startWriteBatchMutex.RLock() + defer h.startWriteBatchMutex.RUnlock() + + return h.startWriteBatch[txCtxID] +} + func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte, channelID string, txid string) (*peer.QueryResponse, error) { // Send GET_STATE_BY_RANGE message to peer chaincode support @@ -655,6 +814,23 @@ func (h *Handler) handleEstablished(msg *peer.ChaincodeMessage) error { } h.state = ready + if len(msg.Payload) == 0 { + return nil + } + + ccAdditionalParams := &peer.ChaincodeAdditionalParams{} + err := proto.Unmarshal(msg.Payload, ccAdditionalParams) + if err != nil { + return nil + } + + h.usePeerWriteBatch = ccAdditionalParams.UseWriteBatch + h.maxSizeWriteBatch = ccAdditionalParams.MaxSizeWriteBatch + + if h.usePeerWriteBatch && h.maxSizeWriteBatch < defaultMaxSizeWriteBatch { + h.maxSizeWriteBatch = defaultMaxSizeWriteBatch + } + return nil } @@ -697,7 +873,29 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err return nil } -// marshalOrPanic attempts to marshal the provided protobbuf message but will panic +func (h *Handler) batchByID(channelID string, txID string) map[string]*peer.WriteRecord { + txCtxID := transactionContextID(channelID, txID) + + h.batchMutex.RLock() + st, ok := h.batch[txCtxID] + h.batchMutex.RUnlock() + if ok { + return st + } + + h.batchMutex.Lock() + defer h.batchMutex.Unlock() + st, ok = h.batch[txCtxID] + if ok { + return st + } + + st = make(map[string]*peer.WriteRecord) + h.batch[txCtxID] = st + return st +} + +// marshalOrPanic attempts to marshal the provided protobuf message but will panic // when marshaling fails instead of returning an error. func marshalOrPanic(msg proto.Message) []byte { bytes, err := proto.Marshal(msg) diff --git a/shim/handler_test.go b/shim/handler_test.go index f9d325c0..92e28b85 100644 --- a/shim/handler_test.go +++ b/shim/handler_test.go @@ -9,7 +9,6 @@ import ( "github.com/hyperledger/fabric-chaincode-go/v2/shim/internal/mock" "github.com/hyperledger/fabric-protos-go-apiv2/peer" - "github.com/stretchr/testify/assert" ) @@ -49,6 +48,8 @@ func TestNewHandler_CreatedState(t *testing.T) { cc: cc, responseChannels: map[string]chan *peer.ChaincodeMessage{}, state: created, + batch: map[string]map[string]*peer.WriteRecord{}, + startWriteBatch: map[string]bool{}, } handler := newChaincodeHandler(chatStream, cc) @@ -125,7 +126,7 @@ func TestHandlerState(t *testing.T) { } err := handler.handleMessage(test.msg, nil) if test.expectedErr != "" { - assert.Contains(t, err.Error(), test.expectedErr) + assert.ErrorContains(t, err, test.expectedErr) } else { assert.NoError(t, err) } @@ -217,7 +218,7 @@ func TestHandleMessage(t *testing.T) { err := handler.handleMessage(test.msg, nil) if test.expectedErr != "" { - assert.Contains(t, err.Error(), test.expectedErr) + assert.ErrorContains(t, err, test.expectedErr) } else { if err != nil { t.Fatalf("Unexpected error for '%s': %s", test.name, err) @@ -264,36 +265,36 @@ func TestHandlePeerCalls(t *testing.T) { // force error by removing responseChannels h.responseChannels = nil _, err = h.handleGetState("col", "key", "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending GET_STATE") + assert.ErrorContains(t, err, "[txid] error sending GET_STATE") _, err = h.handleGetPrivateDataHash("col", "key", "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending GET_PRIVATE_DATA_HASH") + assert.ErrorContains(t, err, "[txid] error sending GET_PRIVATE_DATA_HASH") _, err = h.handleGetStateMetadata("col", "key", "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_METADATA") + assert.ErrorContains(t, err, "[txid] error sending GET_STATE_METADATA") err = h.handlePutState("col", "key", []byte{}, "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE") + assert.ErrorContains(t, err, "[txid] error sending PUT_STATE") err = h.handlePutStateMetadataEntry("col", "key", "mkey", []byte{}, "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE_METADATA") + assert.ErrorContains(t, err, "[txid] error sending PUT_STATE_METADATA") err = h.handleDelState("col", "key", "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending DEL_STATE") + assert.ErrorContains(t, err, "[txid] error sending DEL_STATE") _, err = h.handleGetStateByRange("col", "start", "end", []byte{}, "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_BY_RANGE") + assert.ErrorContains(t, err, "[txid] error sending GET_STATE_BY_RANGE") _, err = h.handleQueryStateNext("id", "channel", "txid") - assert.Contains(t, err.Error(), "cannot create response channel") + assert.ErrorContains(t, err, "cannot create response channel") _, err = h.handleQueryStateClose("id", "channel", "txid") - assert.Contains(t, err.Error(), "cannot create response channel") + assert.ErrorContains(t, err, "cannot create response channel") _, err = h.handleGetQueryResult("col", "query", []byte{}, "channel", "txid") - assert.Contains(t, err.Error(), "[txid] error sending GET_QUERY_RESULT") + assert.ErrorContains(t, err, "[txid] error sending GET_QUERY_RESULT") _, err = h.handleGetHistoryForKey("key", "channel", "txid") - assert.Contains(t, err.Error(), "cannot create response channel") + assert.ErrorContains(t, err, "cannot create response channel") } diff --git a/shim/interfaces.go b/shim/interfaces.go index 7e6a6431..e0965ab5 100644 --- a/shim/interfaces.go +++ b/shim/interfaces.go @@ -312,8 +312,8 @@ type ChaincodeStubInterface interface { // Call Close() on the returned StateQueryIteratorInterface object when done. // The query is re-executed during validation phase to ensure result set // has not changed since transaction endorsement (phantom reads detected). This function should be used only for - //a partial composite key. For a full composite key, an iter with empty response - //would be returned. + // a partial composite key. For a full composite key, an iter with empty response + // would be returned. GetPrivateDataByPartialCompositeKey(collection, objectType string, keys []string) (StateQueryIteratorInterface, error) // GetPrivateDataQueryResult performs a "rich" query against a given private @@ -370,6 +370,16 @@ type ChaincodeStubInterface interface { // from the outer-most invoked chaincode in chaincode-to-chaincode scenarios. // The marshaled ChaincodeEvent will be available in the transaction's ChaincodeAction.events field. SetEvent(name string, payload []byte) error + + // StartWriteBatch enables a mode where all changes are not immediately forwarded to the feast, + // but accumulate in the cache. The cache is sent in large batches either at the end of transaction + // execution or after the FinishWriteBatch call. + // IMPORTANT: in this mode, the expected order of transaction execution and expected errors can be changed. + StartWriteBatch() + + // FinishWriteBatch sends accumulated changes in large batches to the peer + // if StartWriteBatch has been called before it. + FinishWriteBatch() error } // CommonIteratorInterface allows a chaincode to check whether any more result diff --git a/shim/internal/config.go b/shim/internal/config.go index e7c2c4b5..d174494d 100644 --- a/shim/internal/config.go +++ b/shim/internal/config.go @@ -128,7 +128,7 @@ func LoadTLSConfig(isserver bool, key, cert, root []byte) (*tls.Config, error) { Certificates: []tls.Certificate{cccert}, } - //follow Peer's server default config properties + // follow Peer's server default config properties if isserver { tlscfg.ClientCAs = rootCertPool tlscfg.SessionTicketsDisabled = true diff --git a/shim/internal/server.go b/shim/internal/server.go index 89a31fc1..d4445e4d 100644 --- a/shim/internal/server.go +++ b/shim/internal/server.go @@ -58,13 +58,13 @@ func NewServer( return nil, errors.New("server listen address not provided") } - //create our listener + // create our listener listener, err := net.Listen("tcp", address) if err != nil { return nil, err } - //set up server options for keepalive and TLS + // set up server options for keepalive and TLS var serverOpts []grpc.ServerOption if srvKaOpts != nil { @@ -89,7 +89,7 @@ func NewServer( serverOpts = append(serverOpts, grpc.MaxSendMsgSize(maxSendMessageSize)) serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(maxRecvMessageSize)) - //set enforcement policy + // set enforcement policy kep := keepalive.EnforcementPolicy{ MinTime: serverMinInterval, // allow keepalive w/o rpc @@ -97,7 +97,7 @@ func NewServer( } serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep)) - //set default connection timeout + // set default connection timeout serverOpts = append(serverOpts, grpc.ConnectionTimeout(connectionTimeout)) server := grpc.NewServer(serverOpts...) diff --git a/shim/shim.go b/shim/shim.go index f75839d8..f9e263bd 100644 --- a/shim/shim.go +++ b/shim/shim.go @@ -19,8 +19,8 @@ import ( ) const ( - minUnicodeRuneValue = 0 //U+0000 - maxUnicodeRuneValue = utf8.MaxRune //U+10FFFF - maximum (and unallocated) code point + minUnicodeRuneValue = 0 // U+0000 + maxUnicodeRuneValue = utf8.MaxRune // U+10FFFF - maximum (and unallocated) code point compositeKeyNamespace = "\x00" emptyKeySubstitute = "\x01" ) @@ -62,7 +62,7 @@ func Start(cc Chaincode) error { return errors.New("'CORE_CHAINCODE_ID_NAME' must be set") } - //mock stream not set up ... get real stream + // mock stream not set up ... get real stream if streamGetter == nil { streamGetter = userChaincodeStreamGetter } @@ -85,7 +85,7 @@ func StartInProc(chaincodename string, stream ClientStream, cc Chaincode) error // this is the chat stream resulting from the chaincode-as-client model where the chaincode initiates connection func chaincodeAsClientChat(chaincodename string, stream ClientStream, cc Chaincode) error { - defer stream.CloseSend() //nolint:Errcheck + defer stream.CloseSend() //nolint:errcheck return chatWithPeer(chaincodename, stream, cc) } diff --git a/shim/shim_test.go b/shim/shim_test.go index d64c1bc9..15ef616a 100644 --- a/shim/shim_test.go +++ b/shim/shim_test.go @@ -11,7 +11,6 @@ import ( "github.com/hyperledger/fabric-chaincode-go/v2/shim/internal/mock" "github.com/hyperledger/fabric-protos-go-apiv2/peer" - "github.com/stretchr/testify/assert" ) diff --git a/shim/stub.go b/shim/stub.go index c4fd2a74..5436b115 100644 --- a/shim/stub.go +++ b/shim/stub.go @@ -579,6 +579,18 @@ func (s *ChaincodeStub) GetQueryResultWithPagination(query string, pageSize int3 return s.handleGetQueryResult(collection, query, metadata) } +// --------- Batch State functions ---------- + +// StartWriteBatch documentation can be found in interfaces.go +func (s *ChaincodeStub) StartWriteBatch() { + s.handler.handleStartWriteBatch(s.ChannelID, s.TxID) +} + +// FinishWriteBatch documentation can be found in interfaces.go +func (s *ChaincodeStub) FinishWriteBatch() error { + return s.handler.handleFinishWriteBatch(s.ChannelID, s.TxID) +} + // Next ... func (iter *StateQueryIterator) Next() (*queryresult.KV, error) { result, err := iter.nextResult(StateQueryResult) diff --git a/shim/stub_test.go b/shim/stub_test.go index 5f41d6e8..d25ae5af 100644 --- a/shim/stub_test.go +++ b/shim/stub_test.go @@ -6,7 +6,6 @@ package shim import ( "crypto/sha256" "encoding/binary" - "os" "testing" "github.com/hyperledger/fabric-chaincode-go/v2/shim/internal/mock" @@ -245,13 +244,11 @@ func TestGetMSPID(t *testing.T) { _, err := GetMSPID() assert.EqualError(t, err, "'CORE_PEER_LOCALMSPID' is not set") - os.Setenv("CORE_PEER_LOCALMSPID", "mspid") + t.Setenv("CORE_PEER_LOCALMSPID", "mspid") mspid, err := GetMSPID() assert.NoError(t, err) assert.Equal(t, "mspid", mspid) - - os.Unsetenv("CORE_PEER_LOCALMSPID") } func TestChaincodeStubHandlers(t *testing.T) { @@ -319,6 +316,41 @@ func TestChaincodeStubHandlers(t *testing.T) { }, }, + { + name: "Simple Response with WriteBatch", + resType: peer.ChaincodeMessage_RESPONSE, + payload: []byte("myvalue"), + testFunc: func(s *ChaincodeStub, h *Handler, t *testing.T, payload []byte) { + s.StartWriteBatch() + err := s.PutState("key", payload) + assert.NoError(t, err) + err = s.PutPrivateData("col", "key", payload) + assert.NoError(t, err) + err = s.SetStateValidationParameter("key", payload) + assert.NoError(t, err) + err = s.SetPrivateDataValidationParameter("col", "key", payload) + assert.NoError(t, err) + err = s.DelState("key") + assert.NoError(t, err) + err = s.DelPrivateData("col", "key") + assert.NoError(t, err) + err = s.PurgePrivateData("col", "key") + assert.NoError(t, err) + err = s.FinishWriteBatch() + assert.NoError(t, err) + + s.StartWriteBatch() + s.StartWriteBatch() + err = s.PutState("key", payload) + assert.NoError(t, err) + err = s.PutPrivateData("col", "key", payload) + assert.NoError(t, err) + err = s.FinishWriteBatch() + assert.NoError(t, err) + err = s.FinishWriteBatch() + assert.NoError(t, err) + }, + }, { name: "ValidationParameter", resType: peer.ChaincodeMessage_RESPONSE, @@ -577,6 +609,14 @@ func TestChaincodeStubHandlers(t *testing.T) { resp := s.InvokeChaincode("cc", [][]byte{}, "channel") assert.Equal(t, payload, resp.GetPayload()) + s.StartWriteBatch() + s.StartWriteBatch() + err = s.PutState("key", payload) + assert.NoError(t, err) + err = s.FinishWriteBatch() + assert.ErrorContains(t, err, string(payload)) + err = s.FinishWriteBatch() + assert.NoError(t, err) }, }, } @@ -587,9 +627,13 @@ func TestChaincodeStubHandlers(t *testing.T) { t.Parallel() handler := &Handler{ - cc: &mockChaincode{}, - responseChannels: map[string]chan *peer.ChaincodeMessage{}, - state: ready, + cc: &mockChaincode{}, + responseChannels: map[string]chan *peer.ChaincodeMessage{}, + state: ready, + batch: map[string]map[string]*peer.WriteRecord{}, + startWriteBatch: map[string]bool{}, + usePeerWriteBatch: true, + maxSizeWriteBatch: 100, } stub := &ChaincodeStub{ ChannelID: "channel",