Skip to content

Commit

Permalink
Merged ServiceMethodRegistry into ServiceRegistry (#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v authored Sep 3, 2023
1 parent 24dff0f commit 113886d
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 184 deletions.
27 changes: 6 additions & 21 deletions services-api/src/main/java/io/scalecube/services/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.methods.MethodInfo;
import io.scalecube.services.methods.ServiceMethodInvoker;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.routing.Router;
import io.scalecube.services.routing.Routers;
Expand All @@ -34,7 +33,6 @@ public class ServiceCall {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceCall.class);

private ClientTransport transport;
private ServiceMethodRegistry methodRegistry;
private ServiceRegistry serviceRegistry;
private Router router;
private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
Expand All @@ -45,7 +43,6 @@ public ServiceCall() {}

private ServiceCall(ServiceCall other) {
this.transport = other.transport;
this.methodRegistry = other.methodRegistry;
this.serviceRegistry = other.serviceRegistry;
this.router = other.router;
this.errorMapper = other.errorMapper;
Expand Down Expand Up @@ -77,18 +74,6 @@ public ServiceCall serviceRegistry(ServiceRegistry serviceRegistry) {
return target;
}

/**
* Setter for {@code methodRegistry}.
*
* @param methodRegistry method registry.
* @return new {@link ServiceCall} instance.
*/
public ServiceCall methodRegistry(ServiceMethodRegistry methodRegistry) {
ServiceCall target = new ServiceCall(this);
target.methodRegistry = methodRegistry;
return target;
}

/**
* Setter for {@code routerType}.
*
Expand Down Expand Up @@ -180,8 +165,8 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
return Mono.defer(
() -> {
ServiceMethodInvoker methodInvoker;
if (methodRegistry != null
&& (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) {
if (serviceRegistry != null
&& (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) {
// local service
return methodInvoker.invokeOne(request).map(this::throwIfError);
} else {
Expand Down Expand Up @@ -219,8 +204,8 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
return Flux.defer(
() -> {
ServiceMethodInvoker methodInvoker;
if (methodRegistry != null
&& (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) {
if (serviceRegistry != null
&& (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) {
// local service
return methodInvoker.invokeMany(request).map(this::throwIfError);
} else {
Expand Down Expand Up @@ -262,8 +247,8 @@ public Flux<ServiceMessage> requestBidirectional(
if (first.hasValue()) {
ServiceMessage request = first.get();
ServiceMethodInvoker methodInvoker;
if (methodRegistry != null
&& (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) {
if (serviceRegistry != null
&& (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) {
// local service
return methodInvoker.invokeBidirectional(messages).map(this::throwIfError);
} else {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.scalecube.services.registry.api;

import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.methods.ServiceMethodInvoker;
import java.util.List;

/**
Expand All @@ -20,4 +22,10 @@ public interface ServiceRegistry {
boolean registerService(ServiceEndpoint serviceEndpoint);

ServiceEndpoint unregisterService(String endpointId);

void registerService(ServiceInfo serviceInfo);

List<ServiceInfo> listServices();

ServiceMethodInvoker getInvoker(String qualifier);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalecube.services.transport.api;

import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.registry.api.ServiceRegistry;

public interface ServiceTransport {

Expand All @@ -14,10 +14,10 @@ public interface ServiceTransport {
/**
* Provider for {@link ServerTransport}.
*
* @param methodRegistry methodRegistry
* @param serviceRegistry serviceRegistry
* @return {@code ServerTransport} instance
*/
ServerTransport serverTransport(ServiceMethodRegistry methodRegistry);
ServerTransport serverTransport(ServiceRegistry serviceRegistry);

/**
* Starts {@link ServiceTransport} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.rsocket.transport.netty.server.CloseableChannel;
import io.scalecube.net.Address;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ServerTransport;
Expand All @@ -18,7 +18,7 @@ public class RSocketServerTransport implements ServerTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);

private final Authenticator<Object> authenticator;
private final ServiceMethodRegistry methodRegistry;
private final ServiceRegistry serviceRegistry;
private final ConnectionSetupCodec connectionSetupCodec;
private final HeadersCodec headersCodec;
private final Collection<DataCodec> dataCodecs;
Expand All @@ -30,21 +30,21 @@ public class RSocketServerTransport implements ServerTransport {
* Constructor for this server transport.
*
* @param authenticator authenticator
* @param methodRegistry methodRegistry
* @param serviceRegistry serviceRegistry
* @param connectionSetupCodec connectionSetupCodec
* @param headersCodec headersCodec
* @param dataCodecs dataCodecs
* @param serverTransportFactory serverTransportFactory
*/
public RSocketServerTransport(
Authenticator<Object> authenticator,
ServiceMethodRegistry methodRegistry,
ServiceRegistry serviceRegistry,
ConnectionSetupCodec connectionSetupCodec,
HeadersCodec headersCodec,
Collection<DataCodec> dataCodecs,
RSocketServerTransportFactory serverTransportFactory) {
this.authenticator = authenticator;
this.methodRegistry = methodRegistry;
this.serviceRegistry = serviceRegistry;
this.connectionSetupCodec = connectionSetupCodec;
this.headersCodec = headersCodec;
this.dataCodecs = dataCodecs;
Expand All @@ -64,7 +64,7 @@ public ServerTransport bind() {
RSocketServer.create()
.acceptor(
new RSocketServiceAcceptor(
connectionSetupCodec, headersCodec, dataCodecs, authenticator, methodRegistry))
connectionSetupCodec, headersCodec, dataCodecs, authenticator, serviceRegistry))
.payloadDecoder(PayloadDecoder.DEFAULT)
.bind(serverTransportFactory.serverTransport())
.doOnSuccess(channel -> serverChannel = channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.methods.ServiceMethodInvoker;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import java.util.Collection;
Expand All @@ -36,7 +36,7 @@ public class RSocketServiceAcceptor implements SocketAcceptor {
private final HeadersCodec headersCodec;
private final Collection<DataCodec> dataCodecs;
private final Authenticator<Object> authenticator;
private final ServiceMethodRegistry methodRegistry;
private final ServiceRegistry serviceRegistry;

/**
* Constructor.
Expand All @@ -45,19 +45,19 @@ public class RSocketServiceAcceptor implements SocketAcceptor {
* @param headersCodec headersCodec
* @param dataCodecs dataCodecs
* @param authenticator authenticator
* @param methodRegistry methodRegistry
* @param serviceRegistry serviceRegistry
*/
public RSocketServiceAcceptor(
ConnectionSetupCodec connectionSetupCodec,
HeadersCodec headersCodec,
Collection<DataCodec> dataCodecs,
Authenticator<Object> authenticator,
ServiceMethodRegistry methodRegistry) {
ServiceRegistry serviceRegistry) {
this.connectionSetupCodec = connectionSetupCodec;
this.headersCodec = headersCodec;
this.dataCodecs = dataCodecs;
this.authenticator = authenticator;
this.methodRegistry = methodRegistry;
this.serviceRegistry = serviceRegistry;
}

@Override
Expand Down Expand Up @@ -99,7 +99,7 @@ private Mono<Object> authenticate(RSocket rsocket, ConnectionSetup connectionSet

private RSocket newRSocket(Object authData) {
return new RSocketImpl(
authData, new ServiceMessageCodec(headersCodec, dataCodecs), methodRegistry);
authData, new ServiceMessageCodec(headersCodec, dataCodecs), serviceRegistry);
}

private UnauthorizedException toUnauthorizedException(Throwable th) {
Expand All @@ -115,13 +115,13 @@ private static class RSocketImpl implements RSocket {

private final Object authData;
private final ServiceMessageCodec messageCodec;
private final ServiceMethodRegistry methodRegistry;
private final ServiceRegistry serviceRegistry;

private RSocketImpl(
Object authData, ServiceMessageCodec messageCodec, ServiceMethodRegistry methodRegistry) {
Object authData, ServiceMessageCodec messageCodec, ServiceRegistry serviceRegistry) {
this.authData = authData;
this.messageCodec = messageCodec;
this.methodRegistry = methodRegistry;
this.serviceRegistry = serviceRegistry;
}

@Override
Expand All @@ -130,7 +130,8 @@ public Mono<Payload> requestResponse(Payload payload) {
.doOnNext(this::validateRequest)
.flatMap(
message -> {
ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
ServiceMethodInvoker methodInvoker =
serviceRegistry.getInvoker(message.qualifier());
validateMethodInvoker(methodInvoker, message);
return methodInvoker
.invokeOne(message)
Expand All @@ -147,7 +148,8 @@ public Flux<Payload> requestStream(Payload payload) {
.doOnNext(this::validateRequest)
.flatMapMany(
message -> {
ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
ServiceMethodInvoker methodInvoker =
serviceRegistry.getInvoker(message.qualifier());
validateMethodInvoker(methodInvoker, message);
return methodInvoker
.invokeMany(message)
Expand All @@ -168,7 +170,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
ServiceMessage message = first.get();
validateRequest(message);
ServiceMethodInvoker methodInvoker =
methodRegistry.getInvoker(message.qualifier());
serviceRegistry.getInvoker(message.qualifier());
validateMethodInvoker(methodInvoker, message);
return methodInvoker
.invokeBidirectional(messages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
Expand Down Expand Up @@ -195,10 +195,10 @@ public ClientTransport clientTransport() {
}

@Override
public ServerTransport serverTransport(ServiceMethodRegistry methodRegistry) {
public ServerTransport serverTransport(ServiceRegistry serviceRegistry) {
return new RSocketServerTransport(
authenticator,
methodRegistry,
serviceRegistry,
connectionSetupCodec,
headersCodec,
dataCodecs,
Expand Down
21 changes: 3 additions & 18 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayOptions;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.methods.ServiceMethodRegistryImpl;
import io.scalecube.services.registry.ServiceRegistryImpl;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.routing.RoundRobinServiceRouter;
Expand Down Expand Up @@ -124,7 +122,6 @@ public final class Microservices implements AutoCloseable {
private final Map<String, String> tags;
private final List<ServiceProvider> serviceProviders;
private final ServiceRegistry serviceRegistry;
private final ServiceMethodRegistry methodRegistry;
private final Authenticator<Object> defaultAuthenticator;
private final ServiceTransportBootstrap transportBootstrap;
private final GatewayBootstrap gatewayBootstrap;
Expand All @@ -143,7 +140,6 @@ private Microservices(Builder builder) {
this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags));
this.serviceProviders = new ArrayList<>(builder.serviceProviders);
this.serviceRegistry = builder.serviceRegistry;
this.methodRegistry = builder.methodRegistry;
this.defaultAuthenticator = builder.defaultAuthenticator;
this.gatewayBootstrap = builder.gatewayBootstrap;
this.discoveryBootstrap = builder.discoveryBootstrap;
Expand Down Expand Up @@ -241,7 +237,7 @@ private Mono<ServiceDiscoveryBootstrap> concludeDiscovery(
}

private void registerService(ServiceInfo serviceInfo) {
methodRegistry.registerService(
serviceRegistry.registerService(
ServiceInfo.from(serviceInfo)
.errorMapperIfAbsent(defaultErrorMapper)
.dataDecoderIfAbsent(defaultDataDecoder)
Expand All @@ -258,7 +254,6 @@ public ServiceCall call() {
return new ServiceCall()
.transport(transportBootstrap.clientTransport)
.serviceRegistry(serviceRegistry)
.methodRegistry(methodRegistry)
.contentType(defaultContentType)
.errorMapper(DefaultErrorMapper.INSTANCE)
.router(Routers.getRouter(RoundRobinServiceRouter.class));
Expand Down Expand Up @@ -288,10 +283,6 @@ public ServiceRegistry serviceRegistry() {
return serviceRegistry;
}

public ServiceMethodRegistry methodRegistry() {
return methodRegistry;
}

public Address discoveryAddress() {
return discoveryBootstrap.serviceDiscovery != null
? discoveryBootstrap.serviceDiscovery.address()
Expand Down Expand Up @@ -350,7 +341,7 @@ private Mono<Void> applyBeforeDestroy() {
return Mono.defer(
() ->
Mono.whenDelayError(
methodRegistry.listServices().stream()
serviceRegistry.listServices().stream()
.map(ServiceInfo::serviceInstance)
.map(s -> Mono.fromRunnable(() -> Injector.processBeforeDestroy(this, s)))
.collect(Collectors.toList())));
Expand All @@ -370,7 +361,6 @@ public static final class Builder {
private Map<String, String> tags = new HashMap<>();
private final List<ServiceProvider> serviceProviders = new ArrayList<>();
private ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl();
private Authenticator<Object> defaultAuthenticator = null;
private final ServiceDiscoveryBootstrap discoveryBootstrap = new ServiceDiscoveryBootstrap();
private ServiceTransportBootstrap transportBootstrap = new ServiceTransportBootstrap();
Expand Down Expand Up @@ -436,11 +426,6 @@ public Builder serviceRegistry(ServiceRegistry serviceRegistry) {
return this;
}

public Builder methodRegistry(ServiceMethodRegistry methodRegistry) {
this.methodRegistry = methodRegistry;
return this;
}

public Builder discovery(ServiceDiscoveryFactory discoveryFactory) {
this.discoveryBootstrap.operator(options -> options.discoveryFactory(discoveryFactory));
return this;
Expand Down Expand Up @@ -717,7 +702,7 @@ private ServiceTransportBootstrap start(Microservices microservices) {
try {
try {
serviceTransport = serviceTransport.start();
serverTransport = serviceTransport.serverTransport(microservices.methodRegistry).bind();
serverTransport = serviceTransport.serverTransport(microservices.serviceRegistry).bind();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 113886d

Please sign in to comment.