From 9552a087d7d0edb634d8e8fbb8c8db22f347dd0b Mon Sep 17 00:00:00 2001 From: Christopher Manning Date: Thu, 18 Nov 2021 16:53:45 -0600 Subject: [PATCH] codifying peer forwarder local client (#626) Signed-off-by: Christopher Manning --- .../prepper/peerforwarder/PeerForwarder.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/peer-forwarder/src/main/java/com/amazon/dataprepper/plugins/prepper/peerforwarder/PeerForwarder.java b/data-prepper-plugins/peer-forwarder/src/main/java/com/amazon/dataprepper/plugins/prepper/peerforwarder/PeerForwarder.java index d2c6a21ea8..536906ad19 100644 --- a/data-prepper-plugins/peer-forwarder/src/main/java/com/amazon/dataprepper/plugins/prepper/peerforwarder/PeerForwarder.java +++ b/data-prepper-plugins/peer-forwarder/src/main/java/com/amazon/dataprepper/plugins/prepper/peerforwarder/PeerForwarder.java @@ -47,6 +47,8 @@ public class PeerForwarder extends AbstractPrepper> doExecute(final Collection> forwardedRequestFutures = new ArrayList<>(); for (final Map.Entry> entry : groupedRS.entrySet()) { - final TraceServiceGrpc.TraceServiceBlockingStub client; - if (isAddressDefinedLocally(entry.getKey())) { - client = null; - } else { - client = peerClientPool.getClient(entry.getKey()); - } + final TraceServiceGrpc.TraceServiceBlockingStub client = getClient(entry.getKey()); // Create ExportTraceRequest for storing single batch of spans ExportTraceServiceRequest.Builder currRequestBuilder = ExportTraceServiceRequest.newBuilder(); @@ -124,7 +121,7 @@ public List> doExecute(final Collection= maxNumSpansPerRequest) { final ExportTraceServiceRequest currRequest = currRequestBuilder.build(); - if (client == null) { + if (isLocalClient(client)) { recordsToProcessLocally.add(new Record<>(currRequest)); } else { forwardedRequestFutures.add(processRequest(client, currRequest)); @@ -191,6 +188,14 @@ private CompletableFuture processRequest(final TraceServiceGrpc.TraceSer return callFuture; } + private TraceServiceGrpc.TraceServiceBlockingStub getClient(final String address) { + return isAddressDefinedLocally(address) ? LOCAL_CLIENT : peerClientPool.getClient(address); + } + + private boolean isLocalClient(final TraceServiceGrpc.TraceServiceBlockingStub client) { + return client == LOCAL_CLIENT; + } + private boolean isAddressDefinedLocally(final String address) { final InetAddress inetAddress; try {