Skip to content

Commit

Permalink
codifying peer forwarder local client (opensearch-project#626)
Browse files Browse the repository at this point in the history
Signed-off-by: Christopher Manning <[email protected]>
  • Loading branch information
cmanning09 authored and sbayer55 committed Nov 23, 2021
1 parent 1295242 commit 9552a08
Showing 1 changed file with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class PeerForwarder extends AbstractPrepper<Record<ExportTraceServiceRequ
public static final String ERRORS = "errors";
public static final String DESTINATION = "destination";

private static final TraceServiceGrpc.TraceServiceBlockingStub LOCAL_CLIENT = null;

public static final int ASYNC_REQUEST_THREAD_COUNT = 200;

private static final Logger LOG = LoggerFactory.getLogger(PeerForwarder.class);
Expand Down Expand Up @@ -110,12 +112,7 @@ public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record
final List<CompletableFuture<Record>> forwardedRequestFutures = new ArrayList<>();

for (final Map.Entry<String, List<ResourceSpans>> entry : groupedRS.entrySet()) {
final TraceServiceGrpc.TraceServiceBlockingStub client;
if (isAddressDefinedLocally(entry.getKey())) {
client = null;
} else {
client = peerClientPool.getClient(entry.getKey());
}
final TraceServiceGrpc.TraceServiceBlockingStub client = getClient(entry.getKey());

// Create ExportTraceRequest for storing single batch of spans
ExportTraceServiceRequest.Builder currRequestBuilder = ExportTraceServiceRequest.newBuilder();
Expand All @@ -124,7 +121,7 @@ public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record
final int rsSize = PeerForwarderUtils.getResourceSpansSize(rs);
if (currSpansCount >= maxNumSpansPerRequest) {
final ExportTraceServiceRequest currRequest = currRequestBuilder.build();
if (client == null) {
if (isLocalClient(client)) {
recordsToProcessLocally.add(new Record<>(currRequest));
} else {
forwardedRequestFutures.add(processRequest(client, currRequest));
Expand Down Expand Up @@ -191,6 +188,14 @@ private CompletableFuture<Record> processRequest(final TraceServiceGrpc.TraceSer
return callFuture;
}

private TraceServiceGrpc.TraceServiceBlockingStub getClient(final String address) {
return isAddressDefinedLocally(address) ? LOCAL_CLIENT : peerClientPool.getClient(address);
}

private boolean isLocalClient(final TraceServiceGrpc.TraceServiceBlockingStub client) {
return client == LOCAL_CLIENT;
}

private boolean isAddressDefinedLocally(final String address) {
final InetAddress inetAddress;
try {
Expand Down

0 comments on commit 9552a08

Please sign in to comment.