Skip to content

Commit

Permalink
Merge pull request ethereum#60 from obscuren/quorum-private-logs
Browse files Browse the repository at this point in the history
core, eth: support private state log and bloom filtering
  • Loading branch information
jpmsam authored Mar 3, 2017
2 parents e6282c2 + 9d5d5dd commit 8c47c29
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 98 deletions.
16 changes: 11 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,13 +931,14 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
// Process block using the parent state as reference point.
receipts, logs, usedGas, err := self.processor.Process(block, self.publicStateCache, self.privateStateCache, self.config.VmConfig)
publicReceipts, privateReceipts, logs, usedGas, err := self.processor.Process(block, self.publicStateCache, self.privateStateCache, self.config.VmConfig)
if err != nil {
reportBlock(block, err)
return i, err
}

// Validate the state using the default validator
err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.publicStateCache, receipts, usedGas)
err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.publicStateCache, publicReceipts, usedGas)
if err != nil {
reportBlock(block, err)
return i, err
Expand All @@ -960,7 +961,8 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// coalesce logs for later processing
coalescedLogs = append(coalescedLogs, logs...)

if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
allReceipts := append(publicReceipts, privateReceipts...)
if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), allReceipts); err != nil {
return i, err
}

Expand All @@ -983,11 +985,15 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
// store the receipts
if err := WriteReceipts(self.chainDb, receipts); err != nil {
if err := WriteReceipts(self.chainDb, allReceipts); err != nil {
return i, err
}
// Write map map bloom filters
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), allReceipts); err != nil {
return i, err
}
// Write private block bloom
if err := WritePrivateBlockBloom(self.chainDb, block.NumberU64(), privateReceipts); err != nil {
return i, err
}
case SideStatTy:
Expand Down
6 changes: 3 additions & 3 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, statedb, vm.Config{})
receipts, _, _, usedGas, err := blockchain.Processor().Process(block, statedb, statedb, vm.Config{})
if err != nil {
reportBlock(block, err)
return err
Expand Down Expand Up @@ -434,8 +434,8 @@ func (bproc) ValidateHeader(ethdb.Database, *types.Header, *types.Header) error
func (bproc) ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas *big.Int) error {
return nil
}
func (bproc) Process(block *types.Block, statedb, state2 *state.StateDB, cfg vm.Config) (types.Receipts, vm.Logs, *big.Int, error) {
return nil, nil, new(big.Int), nil
func (bproc) Process(block *types.Block, statedb, state2 *state.StateDB, cfg vm.Config) (types.Receipts, types.Receipts, vm.Logs, *big.Int, error) {
return nil, nil, nil, new(big.Int), nil
}

func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Header {
Expand Down
2 changes: 2 additions & 0 deletions core/call_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (cg *callHelper) MakeCall(private bool, key *ecdsa.PrivateKey, to common.Ad
if !private {
privateState = publicState
}

cg.header.Number = new(big.Int)
_, _, err = ApplyMessage(NewEnv(publicState, privateState, &ChainConfig{}, nil, ptx.Transaction, &cg.header, vm.Config{}), ptx, cg.gp)
if err != nil {
return err
Expand Down
34 changes: 27 additions & 7 deletions core/database_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ var (
headBlockKey = []byte("LastBlock")
headFastKey = []byte("LastFast")

headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
tdSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + tdSuffix -> td
numSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + numSuffix -> hash
blockHashPrefix = []byte("H") // blockHashPrefix + hash -> num (uint64 big endian)
bodyPrefix = []byte("b") // bodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
privateRootPrefix = []byte("P") // rootPrefix + block public root -> hash
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
tdSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + tdSuffix -> td
numSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + numSuffix -> hash
blockHashPrefix = []byte("H") // blockHashPrefix + hash -> num (uint64 big endian)
bodyPrefix = []byte("b") // bodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
privateRootPrefix = []byte("P") // rootPrefix + block public root -> hash
privateblockReceiptsPrefix = []byte("Pr") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
privateReceiptPrefix = []byte("Prs")
privateBloomPrefix = []byte("Pb")

txMetaSuffix = []byte{0x01}
receiptsPrefix = []byte("receipts-")
Expand All @@ -62,6 +65,22 @@ var (
oldBlockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually]
)

// WritePrivateBlockBloom creates a bloom filter for the given receipts and saves it to the database
// with the number given as identifier (i.e. block number).
func WritePrivateBlockBloom(db ethdb.Database, number uint64, receipts types.Receipts) error {
rbloom := types.CreateBloom(receipts)
return db.Put(append(privateBloomPrefix, encodeBlockNumber(number)...), rbloom[:])
}

// GetPrivateBlockBloom retrieves the private bloom associated with the given number.
func GetPrivateBlockBloom(db ethdb.Database, number uint64) (bloom types.Bloom) {
data, _ := db.Get(append(privateBloomPrefix, encodeBlockNumber(number)...))
if len(data) > 0 {
bloom = types.BytesToBloom(data)
}
return bloom
}

// encodeBlockNumber encodes a block number as big endian uint64
func encodeBlockNumber(number uint64) []byte {
enc := make([]byte, 8)
Expand Down Expand Up @@ -396,6 +415,7 @@ func WriteBlockReceipts(db ethdb.Database, hash common.Hash, number uint64, rece
if err != nil {
return err
}

// Store the flattened receipt slice
key := append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
if err := db.Put(key, bytes); err != nil {
Expand Down
14 changes: 5 additions & 9 deletions core/private_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,9 @@ func TestPrivateTransaction(t *testing.T) {

prvContractAddr := common.Address{1}
pubContractAddr := common.Address{2}
/* gllc
asm {
PUSH1 10
PUSH1 0
SSTORE
}
*/
privateState.SetCode(prvContractAddr, common.Hex2Bytes("600a60005500"))
privateState.SetCode(prvContractAddr, common.Hex2Bytes("600a600055600060006001a1"))
privateState.SetState(prvContractAddr, common.Hash{}, common.Hash{9})
publicState.SetCode(pubContractAddr, common.Hex2Bytes("601460005500"))
publicState.SetCode(pubContractAddr, common.Hex2Bytes("6014600055"))
publicState.SetState(pubContractAddr, common.Hash{}, common.Hash{19})

// Private transaction 1
Expand All @@ -74,6 +67,9 @@ func TestPrivateTransaction(t *testing.T) {
if stateEntry.Cmp(big.NewInt(10)) != 0 {
t.Error("expected state to have 10, got", stateEntry)
}
if len(privateState.Logs()) != 1 {
t.Error("expected private state to have 1 log, got", len(privateState.Logs()))
}

// Public transaction 1
err = helper.MakeCall(false, key, pubContractAddr, nil)
Expand Down
6 changes: 3 additions & 3 deletions core/quorum/block_maker.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ func (ps *pendingState) applyTransaction(tx *types.Transaction, bc *core.BlockCh
}
config.ForceJit = false // disable forcing jit

receipt, logs, _, err := core.ApplyTransaction(cc, bc, ps.gp, ps.publicState, ps.privateState, ps.header, tx, ps.header.GasUsed, config)
publicReceipt, _, _, err := core.ApplyTransaction(cc, bc, ps.gp, ps.publicState, ps.privateState, ps.header, tx, ps.header.GasUsed, config)
if err != nil {
ps.publicState.RevertToSnapshot(publicSnaphot)
ps.privateState.RevertToSnapshot(privateSnapshot)

return err, nil
}
ps.txs = append(ps.txs, tx)
ps.receipts = append(ps.receipts, receipt)
ps.receipts = append(ps.receipts, publicReceipt)

return nil, logs
return nil, publicReceipt.Logs
}

func (ps *pendingState) applyTransactions(txs *types.TransactionsByPriorityAndNonce, mux *event.TypeMux, bc *core.BlockChain, cc *core.ChainConfig) (types.Transactions, types.Transactions) {
Expand Down
65 changes: 45 additions & 20 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,46 @@ func NewStateProcessor(config *ChainConfig, bc *BlockChain) *StateProcessor {
// Process returns the receipts and logs accumulated during the process and
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, publicState, privateState *state.StateDB, cfg vm.Config) (types.Receipts, vm.Logs, *big.Int, error) {
func (p *StateProcessor) Process(block *types.Block, publicState, privateState *state.StateDB, cfg vm.Config) (types.Receipts, types.Receipts, vm.Logs, *big.Int, error) {
var (
receipts types.Receipts
totalUsedGas = big.NewInt(0)
err error
header = block.Header()
allLogs vm.Logs
gp = new(GasPool).AddGas(block.GasLimit())
publicReceipts types.Receipts
privateReceipts types.Receipts
totalUsedGas = big.NewInt(0)
err error
header = block.Header()
allLogs vm.Logs
gp = new(GasPool).AddGas(block.GasLimit())
)

for i, tx := range block.Transactions() {
publicState.StartRecord(tx.Hash(), block.Hash(), i)
receipt, logs, _, err := ApplyTransaction(p.config, p.bc, gp, publicState, privateState, header, tx, totalUsedGas, cfg)
privateState.StartRecord(tx.Hash(), block.Hash(), i)

publicReceipt, privateReceipt, _, err := ApplyTransaction(p.config, p.bc, gp, publicState, privateState, header, tx, totalUsedGas, cfg)
if err != nil {
return nil, nil, totalUsedGas, err
return nil, nil, nil, totalUsedGas, err
}
publicReceipts = append(publicReceipts, publicReceipt)
allLogs = append(allLogs, publicReceipt.Logs...)

// if the private receipt is nil this means the tx was public
// and we do not need to apply the additional logic.
if privateReceipt != nil {
privateReceipts = append(privateReceipts, privateReceipt)
allLogs = append(allLogs, privateReceipt.Logs...)
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, logs...)
}
AccumulateRewards(publicState, header, block.Uncles())

return receipts, allLogs, totalUsedGas, err
return publicReceipts, privateReceipts, allLogs, totalUsedGas, err
}

// ApplyTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment.
//
// ApplyTransactions returns the generated receipts and vm logs during the
// execution of the state transition phase.
func ApplyTransaction(config *ChainConfig, bc *BlockChain, gp *GasPool, publicState, privateState *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *big.Int, cfg vm.Config) (*types.Receipt, vm.Logs, *big.Int, error) {
func ApplyTransaction(config *ChainConfig, bc *BlockChain, gp *GasPool, publicState, privateState *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *big.Int, cfg vm.Config) (*types.Receipt, *types.Receipt, *big.Int, error) {
if !tx.IsPrivate() {
privateState = publicState
}
Expand All @@ -100,19 +110,34 @@ func ApplyTransaction(config *ChainConfig, bc *BlockChain, gp *GasPool, publicSt

// Update the state with pending changes
usedGas.Add(usedGas, gas)
receipt := types.NewReceipt(publicState.IntermediateRoot().Bytes(), usedGas)
receipt.TxHash = tx.Hash()
receipt.GasUsed = new(big.Int).Set(gas)
publicReceipt := types.NewReceipt(publicState.IntermediateRoot().Bytes(), usedGas)
publicReceipt.TxHash = tx.Hash()
publicReceipt.GasUsed = new(big.Int).Set(gas)
if MessageCreatesContract(tx) {
from, _ := tx.From()
receipt.ContractAddress = crypto.CreateAddress(from, tx.Nonce())
publicReceipt.ContractAddress = crypto.CreateAddress(from, tx.Nonce())
}

logs := publicState.GetLogs(tx.Hash())
receipt.Logs = logs
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
publicReceipt.Logs = logs
publicReceipt.Bloom = types.CreateBloom(types.Receipts{publicReceipt})

var privateReceipt *types.Receipt
if tx.IsPrivate() {
privateReceipt = types.NewReceipt(privateState.IntermediateRoot().Bytes(), usedGas)
privateReceipt.TxHash = tx.Hash()
privateReceipt.GasUsed = new(big.Int).Set(gas)
if MessageCreatesContract(tx) {
from, _ := tx.From()
privateReceipt.ContractAddress = crypto.CreateAddress(from, tx.Nonce())
}

logs := privateState.GetLogs(tx.Hash())
privateReceipt.Logs = logs
privateReceipt.Bloom = types.CreateBloom(types.Receipts{privateReceipt})
}

return receipt, logs, gas, err
return publicReceipt, privateReceipt, gas, err
}

// AccumulateRewards credits the coinbase of the given block with the
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type HeaderValidator interface {
// of gas used in the process and return an error if any of the internal rules
// failed.
type Processor interface {
Process(block *types.Block, publicState, privateState *state.StateDB, cfg vm.Config) (types.Receipts, vm.Logs, *big.Int, error)
Process(block *types.Block, publicState, privateState *state.StateDB, cfg vm.Config) (types.Receipts, types.Receipts, vm.Logs, *big.Int, error)
}

// Finiliser is an interface which finilises blocks.
Expand Down
45 changes: 0 additions & 45 deletions core/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,40 +77,6 @@ func (evm *EVM) Run(contract *Contract, input []byte) (ret []byte, err error) {
if codehash == (common.Hash{}) {
codehash = crypto.Keccak256Hash(contract.Code)
}
var program *Program
if false {
// JIT disabled due to JIT not being Homestead gas reprice ready.

// If the JIT is enabled check the status of the JIT program,
// if it doesn't exist compile a new program in a separate
// goroutine or wait for compilation to finish if the JIT is
// forced.
switch GetProgramStatus(codehash) {
case progReady:
return RunProgram(GetProgram(codehash), evm.env, contract, input)
case progUnknown:
if evm.cfg.ForceJit {
// Create and compile program
program = NewProgram(contract.Code)
perr := CompileProgram(program)
if perr == nil {
return RunProgram(program, evm.env, contract, input)
}
glog.V(logger.Info).Infoln("error compiling program", err)
} else {
// create and compile the program. Compilation
// is done in a separate goroutine
program = NewProgram(contract.Code)
go func() {
err := CompileProgram(program)
if err != nil {
glog.V(logger.Info).Infoln("error compiling program", err)
return
}
}()
}
}
}

var (
caller = contract.caller
Expand Down Expand Up @@ -159,17 +125,6 @@ func (evm *EVM) Run(contract *Contract, input []byte) (ret []byte, err error) {
}

for ; ; instrCount++ {
/*
if EnableJit && it%100 == 0 {
if program != nil && progStatus(atomic.LoadInt32(&program.status)) == progReady {
// move execution
fmt.Println("moved", it)
glog.V(logger.Info).Infoln("Moved execution to JIT")
return runProgram(program, pc, mem, stack, evm.env, contract, input)
}
}
*/

// Get the memory location of pc
op = contract.GetOp(pc)
if evm.env.ReadOnly() && op.isMutating() {
Expand Down
2 changes: 1 addition & 1 deletion eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf
return false, structLogger.StructLogs(), err
}

receipts, _, usedGas, err := processor.Process(block, publicStateDb, privateStateDb, config)
receipts, _, _, usedGas, err := processor.Process(block, publicStateDb, privateStateDb, config)
if err != nil {
return false, structLogger.StructLogs(), err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (f *Filter) getLogs(start, end uint64) (logs []Log) {

// Use bloom filtering to see if this block is interesting given the
// current parameters
if f.bloomFilter(block) {
if f.bloomFilter(block.Bloom()) || f.bloomFilter(core.GetPrivateBlockBloom(f.db, block.NumberU64())) {
// Get the logs of the block
var (
receipts = core.GetBlockReceipts(f.db, block.Hash(), i)
Expand Down Expand Up @@ -207,11 +207,11 @@ Logs:
return ret
}

func (f *Filter) bloomFilter(block *types.Block) bool {
func (f *Filter) bloomFilter(bloom types.Bloom) bool {
if len(f.addresses) > 0 {
var included bool
for _, addr := range f.addresses {
if types.BloomLookup(block.Bloom(), addr) {
if types.BloomLookup(bloom, addr) {
included = true
break
}
Expand All @@ -225,7 +225,7 @@ func (f *Filter) bloomFilter(block *types.Block) bool {
for _, sub := range f.topics {
var included bool
for _, topic := range sub {
if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) {
if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) {
included = true
break
}
Expand Down

0 comments on commit 8c47c29

Please sign in to comment.