Skip to content

Commit

Permalink
cherry pick the latest WS improvements into v0.4.0 (#2800)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos committed Nov 22, 2023
1 parent fc399ef commit 64228d9
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 171 deletions.
315 changes: 218 additions & 97 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
// to communicate with the state for eth_EstimateGas and eth_Call when
// the From field is not specified because it is optional
DefaultSenderAddress = "0x1111111111111111111111111111111111111111"

// maxTopics is the max number of topics a log can have
maxTopics = 4
)

// EthEndpoints contains implementations for the "eth" RPC endpoints
Expand Down Expand Up @@ -1057,6 +1060,7 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error {
// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
start := time.Now()
wg := sync.WaitGroup{}

wg.Add(1)
Expand All @@ -1066,127 +1070,244 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
go e.notifyNewLogs(&wg, event)

wg.Wait()
log.Debugf("[onNewL2Block] new l2 block %v took %v to send the messages to all ws connections", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()

b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
for _, filter := range blockFilters {
filter.EnqueueSubscriptionDataToBeSent(data)
}
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}

filters := e.storage.GetAllBlockFiltersWithWSConn()
log.Debugf("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start))

const maxWorkers = 32
parallelize(maxWorkers, filters, func(worker int, filters []*Filter) {
for _, filter := range filters {
f := filter
start := time.Now()
f.EnqueueSubscriptionDataToBeSent(data)
log.Debugf("[notifyNewHeads] took %v to enqueue new l2 block messages", time.Since(start))
}
})

log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

if filterParameters.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *filterParameters.BlockHash
if bh.String() != event.Block.Hash().String() {
continue
}
} else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
// in case the block hash is nil and also from and to blocks are nil, set it
// to the current block to make the query faster
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.FromBlock = &fixedFromBlock
}
filters := e.storage.GetAllLogFiltersWithWSConn()
log.Debugf("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start))

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.ToBlock = &fixedToBlock
}
const maxWorkers = 32
parallelize(maxWorkers, filters, func(worker int, filters []*Filter) {
for _, filter := range filters {
f := filter
start := time.Now()
if e.shouldSkipLogFilter(event, filter) {
return
}
log.Debugf("[notifyNewLogs] took %v to check if should skip log filter", time.Since(start))

start = time.Now()
// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err)
err := e.storage.UninstallFilter(filter.ID)
if !errors.Is(err, ErrNotFound) && err != nil {
log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err)
} else {
log.Infof("Filter %v automatically uninstalled", filter.ID)
logs := filterLogs(event.Logs, filter)
log.Debugf("[notifyNewLogs] took %v to filter logs", time.Since(start))

start = time.Now()
for _, l := range logs {
data, err := json.Marshal(l)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
continue
} else if err != nil {
log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err)
f.EnqueueSubscriptionDataToBeSent(data)
}
log.Debugf("[notifyNewLogs] took %v to enqueue log messages", time.Since(start))
}
})

log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
}

// shouldSkipLogFilter checks if the log filter can be skipped while notifying new logs.
// it checks the log filter information against the block in the event to decide if the
// information in the event is required by the filter or can be ignored to save resources.
func (e *EthEndpoints) shouldSkipLogFilter(event state.NewL2BlockEvent, filter *Filter) bool {
logFilter := filter.Parameters.(LogFilter)

if logFilter.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *logFilter.BlockHash
if bh.String() != event.Block.Hash().String() {
return true
}
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if logFilter.FromBlock != nil {
fromBlock, rpcErr := logFilter.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr)
return true
}
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
return true
}
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if logFilter.ToBlock != nil {
toBlock, rpcErr := logFilter.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr)
return true
}
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
return true
}
}
}
return false
}

// filterLogs will filter the provided logsToFilter accordingly to the filters provided
func filterLogs(logsToFilter []*ethTypes.Log, filter *Filter) []types.Log {
logFilter := filter.Parameters.(LogFilter)

logs := make([]types.Log, 0)
for _, l := range logsToFilter {
// check address filter
if len(logFilter.Addresses) > 0 {
// if the log address doesn't match any address in the filter, skip this log
if !contains(logFilter.Addresses, l.Address) {
continue
}
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
filter.EnqueueSubscriptionDataToBeSent(data)
// check topics
match := true
if len(logFilter.Topics) > 0 {
out:
// check all topics
for i := 0; i < maxTopics; i++ {
// check if the filter contains information
// to filter this topic position
checkTopic := len(logFilter.Topics) > i
if !checkTopic {
// if we shouldn't check this topic, we can assume
// no more topics needs to be checked, because there
// will be no more topic filters, so we can break out
break out
}

// check if the topic filter allows any topic
acceptAnyTopic := len(logFilter.Topics[i]) == 0
if acceptAnyTopic {
// since any topic is allowed, we continue to the next topic filters
continue
}

// check if the log has the required topic set
logHasTopic := len(l.Topics) > i
if !logHasTopic {
// if the log doesn't have the required topic set, skip this log
match = false
break out
}

// check if the any topic in the filter matches the log topic
if !contains(logFilter.Topics[i], l.Topics[i]) {
match = false
// if the log topic doesn't match any topic in the filter, skip this log
break out
}
}
}
if match {
logs = append(logs, types.NewLog(*l))
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
return logs
}

// contains check if the item can be found in the items
func contains[T comparable](items []T, itemsToFind T) bool {
for _, item := range items {
if item == itemsToFind {
return true
}
}
return false
}

// parallelize split the items into workers accordingly
// to the max number of workers and the number of items,
// allowing the fn to be executed in concurrently for different
// chunks of items.
func parallelize[T any](maxWorkers int, items []T, fn func(worker int, items []T)) {
if len(items) == 0 {
return
}

var workersCount = maxWorkers
if workersCount > len(items) {
workersCount = len(items)
}

var jobSize = len(items) / workersCount
var rest = len(items) % workersCount
if rest > 0 {
jobSize++
}

wg := sync.WaitGroup{}
for worker := 0; worker < workersCount; worker++ {
rangeStart := worker * jobSize
rangeEnd := ((worker + 1) * jobSize)

if rangeStart > len(items) {
continue
}

if rangeEnd > len(items) {
rangeEnd = len(items)
}

jobItems := items[rangeStart:rangeEnd]

wg.Add(1)
go func(worker int, filteredItems []T, fn func(worker int, items []T)) {
defer func() {
wg.Done()
err := recover()
if err != nil {
fmt.Println(err)
}
}()
fn(worker, filteredItems)
}(worker, jobItems, fn)
}
wg.Wait()
}
Loading

0 comments on commit 64228d9

Please sign in to comment.