Skip to content

Commit

Permalink
using single crdt service in Standalone mode which slightly reduced t…
Browse files Browse the repository at this point in the history
…he running overhead
  • Loading branch information
popduke committed Sep 23, 2024
1 parent 72552ed commit c7c76d8
Showing 1 changed file with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public class StandaloneStarter extends BaseEngineStarter<StandaloneConfig> {
private EventCollectorManager eventCollectorMgr;
private SettingProviderManager settingProviderMgr;
private IAgentHost agentHost;
private ICRDTService clientCrdtService;
private ICRDTService serverCrdtService;
private ICRDTService crdtService;
private IRPCServer sharedIORpcServer;
private IRPCServer sharedBaseKVRpcServer;
private ISessionDictClient sessionDictClient;
Expand Down Expand Up @@ -202,10 +201,8 @@ protected void init(StandaloneConfig config) {
agentHost = IAgentHost.newInstance(agentHostOptions);
agentHost.start();
log.debug("Agent host started");
clientCrdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
clientCrdtService.start(agentHost);
serverCrdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
serverCrdtService.start(agentHost);
crdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
crdtService.start(agentHost);
log.debug("CRDT service started");

EventLoopGroup rpcServerBossELG =
Expand All @@ -225,14 +222,14 @@ protected void init(StandaloneConfig config) {
.port(config.getRpcServerConfig().getPort())
.bossEventLoopGroup(rpcServerBossELG)
.workerEventLoopGroup(ioRPCWorkerELG)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.executor(rpcServerExecutor);
RPCServerBuilder sharedBaseKVRPCServerBuilder = IRPCServer.newBuilder()
.host(config.getBaseKVServerConfig().getHost())
.port(config.getBaseKVServerConfig().getPort())
.bossEventLoopGroup(rpcServerBossELG)
.workerEventLoopGroup(kvRPCWorkerELG)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.executor(baseKVServerExecutor);
if (config.getRpcServerConfig().isEnableSSL()) {
sharedIORPCServerBuilder.sslContext(buildRPCServerSslContext(config.getRpcServerConfig().getSslConfig()));
Expand All @@ -246,18 +243,18 @@ protected void init(StandaloneConfig config) {
SslContext baseKVClientSslContext = config.getBaseKVClientConfig().isEnableSSL()
? buildRPCClientSslContext(config.getBaseKVClientConfig().getSslConfig()) : null;
distClient = IDistClient.newBuilder()
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(rpcClientExecutor)
.sslContext(rpcClientSslContext)
.build();
retainClient = IRetainClient.newBuilder()
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(rpcClientExecutor)
.sslContext(rpcClientSslContext)
.build();
retainStoreClient = IBaseKVStoreClient.newBuilder()
.clusterId(IRetainStore.CLUSTER_NAME)
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(baseKVClientExecutor)
.sslContext(baseKVClientSslContext)
.queryPipelinesPerStore(config
Expand All @@ -269,7 +266,7 @@ protected void init(StandaloneConfig config) {
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
.bootstrap(config.isBootstrap())
.agentHost(agentHost)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.storeClient(retainStoreClient)
.queryExecutor(MoreExecutors.directExecutor())
.tickerThreads(config.getStateStoreConfig().getTickerThreads())
Expand All @@ -296,13 +293,13 @@ protected void init(StandaloneConfig config) {
.getWalEngineConfig(), "retain_wal")))
.build();
inboxClient = IInboxClient.newBuilder()
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(rpcClientExecutor)
.sslContext(rpcClientSslContext)
.build();
inboxStoreClient = IBaseKVStoreClient.newBuilder()
.clusterId(IInboxStore.CLUSTER_NAME)
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(baseKVClientExecutor)
.sslContext(baseKVClientSslContext)
.queryPipelinesPerStore(config.getStateStoreConfig().getInboxStoreConfig()
Expand All @@ -312,7 +309,7 @@ protected void init(StandaloneConfig config) {
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
.bootstrap(config.isBootstrap())
.agentHost(agentHost)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.inboxClient(inboxClient)
.storeClient(inboxStoreClient)
.settingProvider(settingProviderMgr)
Expand Down Expand Up @@ -354,7 +351,7 @@ protected void init(StandaloneConfig config) {
.build();
distWorkerClient = IBaseKVStoreClient.newBuilder()
.clusterId(IDistWorker.CLUSTER_NAME)
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(baseKVClientExecutor)
.sslContext(baseKVClientSslContext)
.queryPipelinesPerStore(config
Expand All @@ -363,15 +360,15 @@ protected void init(StandaloneConfig config) {
.getQueryPipelinePerStore())
.build();
mqttBrokerClient = IMqttBrokerClient.newBuilder()
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(rpcClientExecutor)
.sslContext(rpcClientSslContext)
.build();

subBrokerManager = new SubBrokerManager(pluginMgr, mqttBrokerClient, inboxClient);

sessionDictClient = ISessionDictClient.newBuilder()
.crdtService(clientCrdtService)
.crdtService(crdtService)
.executor(rpcClientExecutor)
.sslContext(rpcClientSslContext)
.build();
Expand All @@ -390,7 +387,7 @@ protected void init(StandaloneConfig config) {
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
.bootstrap(config.isBootstrap())
.agentHost(agentHost)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.eventCollector(eventCollectorMgr)
.resourceThrottler(resourceThrottlerMgr)
.distClient(distClient)
Expand Down Expand Up @@ -421,7 +418,7 @@ protected void init(StandaloneConfig config) {

distServer = IDistServer.nonStandaloneBuilder()
.rpcServerBuilder(sharedIORPCServerBuilder)
.crdtService(serverCrdtService)
.crdtService(crdtService)
.distWorkerClient(distWorkerClient)
.settingProvider(settingProviderMgr)
.eventCollector(eventCollectorMgr)
Expand Down Expand Up @@ -572,8 +569,7 @@ public void stop() {
sessionDictClient.stop();
sessionDictServer.shutdown();

clientCrdtService.stop();
serverCrdtService.stop();
crdtService.stop();

agentHost.shutdown();

Expand Down

0 comments on commit c7c76d8

Please sign in to comment.