Skip to content

Commit

Permalink
problem: doesn't get head updates from all available heads
Browse files Browse the repository at this point in the history
solution: ensure all heads are started on multihead start
rel: #67
  • Loading branch information
splix committed Mar 27, 2021
1 parent a26cc96 commit 3ae5203
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ class MergedHead(
}

override fun start() {
sources.forEach { head ->
if (head is Lifecycle && !head.isRunning) {
head.start()
}
}
subscription?.dispose()
subscription = super.follow(Flux.merge(sources.map { it.getFlux() }))
}

override fun stop() {
subscription?.dispose()
subscription = null
}

override fun setCaches(caches: Caches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ open class EthereumMultistream(
upstream.setLag(0)
upstream.getHead()
} else {
val newHead = MergedHead(upstreams.map { it.getHead() }).apply {
val heads = upstreams.map { it.getHead() }
val newHead = MergedHead(heads).apply {
this.start()
}
val lagObserver = EthereumHeadLagObserver(newHead, upstreams as Collection<Upstream>).apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class EthereumWsHead(
}

override fun start() {
this.subscription?.dispose()
this.subscription = super.follow(ws.getFlux())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright (c) 2021 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream

import org.springframework.context.Lifecycle
import reactor.core.publisher.Flux
import spock.lang.Specification

class MergedHeadSpec extends Specification {

def "ensures that heads are running on start"() {
setup:
def head1 = Stub(TestHead1) {
_ * getFlux() >> Flux.empty()
}
def head2 = Mock(TestHead2) {
_ * isRunning() >> true
_ * getFlux() >> Flux.empty()
}
def head3 = Mock(TestHead2) {
_ * isRunning() >> false
_ * getFlux() >> Flux.empty()
}

when:
def merged = new MergedHead([head1, head2, head3])
merged.start()

then:
1 * head3.start()
}

class TestHead1 extends AbstractHead {

}

class TestHead2 extends AbstractHead implements Lifecycle {

@Override
void start() {

}

@Override
void stop() {

}

@Override
boolean isRunning() {
return false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ class DispatchSpec extends Specification {
calls1.size() + calls2.size() == 100
}

def "multiple calls routed roughly equal to upstreams - for latest block"() {
when:
def calls1before = ProxyClient.forOriginal(18545).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List
def calls2before = ProxyClient.forOriginal(18546).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List

100.times {
client.execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000001", data: "0x00000000" + Integer.toString(it, 16)], "0x100001"])
}

def calls1after = ProxyClient.forOriginal(18545).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List
def calls2after = ProxyClient.forOriginal(18546).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List

def calls1 = onlyNew(calls1before, calls1after)
def calls2 = onlyNew(calls2before, calls2after)

then:
calls1.size() >= 48
calls2.size() >= 48
calls1.size() + calls2.size() == 100
}

private List onlyNew(List before, List after) {
return after.findAll { a ->
!before.any { b ->
Expand Down

0 comments on commit 3ae5203

Please sign in to comment.