Skip to content

Commit

Permalink
Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)" (
Browse files Browse the repository at this point in the history
#9702)

* Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)"

This reverts commit 072bbe2.

* fix build
  • Loading branch information
jihoonson authored Apr 15, 2020
1 parent cda9f41 commit b8f7128
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.selector.DiscoverySelector;
import org.apache.druid.client.selector.Server;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -70,6 +72,9 @@ public class DruidLeaderClient

private final String leaderRequestPath;

//Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future.
private final DiscoverySelector<Server> serverDiscoverySelector;

private LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
Expand All @@ -78,13 +83,15 @@ public DruidLeaderClient(
HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
NodeRole nodeRoleToWatch,
String leaderRequestPath
String leaderRequestPath,
DiscoverySelector<Server> serverDiscoverySelector
)
{
this.httpClient = httpClient;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.nodeRoleToWatch = nodeRoleToWatch;
this.leaderRequestPath = leaderRequestPath;
this.serverDiscoverySelector = serverDiscoverySelector;
}

@LifecycleStart
Expand Down Expand Up @@ -296,6 +303,16 @@ private String getCurrentKnownLeader(final boolean cached) throws IOException
@Nullable
private String pickOneHost()
{
Server server = serverDiscoverySelector.pick();
if (server != null) {
return StringUtils.format(
"%s://%s:%s",
server.getScheme(),
server.getAddress(),
server.getPort()
);
}

Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator();
if (iter.hasNext()) {
DiscoveryDruidNode node = iter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.inject.Provides;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorSelectorConfig;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
Expand All @@ -40,19 +42,32 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class);
}

@Provides
@Coordinator
@ManageLifecycle
public ServerDiscoverySelector getServiceProvider(
CoordinatorSelectorConfig config,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
return serverDiscoveryFactory.createSelector(config.getServiceName());
}

@Provides
@Coordinator
@ManageLifecycle
public DruidLeaderClient getLeaderHttpClient(
@EscalatedGlobal HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
@Coordinator ServerDiscoverySelector serverDiscoverySelector
)
{
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.COORDINATOR,
"/druid/coordinator/v1/leader"
"/druid/coordinator/v1/leader",
serverDiscoverySelector
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.inject.Provides;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.client.indexing.IndexingServiceSelectorConfig;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
Expand All @@ -40,19 +42,32 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
}

@Provides
@IndexingService
@ManageLifecycle
public ServerDiscoverySelector getServiceProvider(
IndexingServiceSelectorConfig config,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
return serverDiscoveryFactory.createSelector(config.getServiceName());
}

@Provides
@IndexingService
@ManageLifecycle
public DruidLeaderClient getLeaderHttpClient(
@EscalatedGlobal HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
@IndexingService ServerDiscoverySelector serverDiscoverySelector
)
{
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.OVERLORD,
"/druid/indexer/v1/leader"
"/druid/indexer/v1/leader",
serverDiscoverySelector
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.google.inject.servlet.GuiceFilter;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
Expand Down Expand Up @@ -122,7 +123,8 @@ public void testSimple() throws Exception
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
"/simple/leader"
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();

Expand All @@ -146,7 +148,8 @@ public void testNoLeaderFound() throws Exception
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
"/simple/leader"
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();

Expand All @@ -172,7 +175,8 @@ public void testRedirection() throws Exception
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
"/simple/leader"
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();

Expand All @@ -184,6 +188,9 @@ public void testRedirection() throws Exception
@Test
public void testServerFailureAndRedirect() throws Exception
{
ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class);
EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes();

DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
DiscoveryDruidNode dummyNode = new DiscoveryDruidNode(
new DruidNode("test", "dummyhost", false, 64231, null, true, false),
Expand All @@ -196,13 +203,14 @@ public void testServerFailureAndRedirect() throws Exception
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery).anyTimes();

EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);

DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
"/simple/leader"
"/simple/leader",
serverDiscoverySelector
);
druidLeaderClient.start();

Expand All @@ -228,7 +236,8 @@ public void testFindCurrentLeader()
httpClient,
druidNodeDiscoveryProvider,
NodeRole.PEON,
"/simple/leader"
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private class TestDruidLeaderClient extends DruidLeaderClient

private TestDruidLeaderClient(ObjectMapper jsonMapper)
{
super(null, new TestNodeDiscoveryProvider(), null, null);
super(null, new TestNodeDiscoveryProvider(), null, null, null);
this.jsonMapper = jsonMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,10 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI
new FakeHttpClient(),
provider,
NodeRole.COORDINATOR,
"/simple/leader"
"/simple/leader",
() -> {
throw new UnsupportedOperationException();
}
);

return new SystemSchema(
Expand Down

0 comments on commit b8f7128

Please sign in to comment.