Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem trying to parse response from gRPC endpoint #2065

Closed
anirudhr95 opened this issue Sep 11, 2019 · 14 comments · Fixed by #2066
Closed

Problem trying to parse response from gRPC endpoint #2065

anirudhr95 opened this issue Sep 11, 2019 · 14 comments · Fixed by #2066
Labels
Milestone

Comments

@anirudhr95
Copy link

anirudhr95 commented Sep 11, 2019

Hello, I have a gRPC server, trying to serve a few things from an array blocking queue. That works fine (As far as I can tell).

My proto files -

syntax = "proto3";

package queue;

option java_package = "com.sink.queue";
option java_outer_classname = "queue";
option java_multiple_files = true;
option optimize_for = SPEED;

/*
    * Assuming Producer Response can be 1 file or more (In byte format)
*/

message ResponseFromQueueSource {
    repeated bytes file = 1;
}

message RequestFromSink {

}

service GetMessageFromQueue {
    rpc getItem(RequestFromSink) returns (stream ResponseFromQueueSource) {}
}

In my client, I am creating a thread pool and I am hitting gRPC the endpoint continuously.

  public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(idealNumberOfThreads);

        for(int i = 0; i < Runtime.getRuntime().availableProcessors() + 1; i++) {
            executorService.execute(new PerformRequestAndSaveFile(commandLine.getOptionValue("e")));
        }

        executorService.shutdown();

        while( ! executorService.isTerminated() ) {

        }

       log.info("Total time taken - " + (System.nanoTime() - startTime) + " ns");
    }

Like so.

The code for hitting the endpoint is -

public class PerformRequestAndSaveFile implements Runnable {

	private static final Logger log = LoggerFactory.getLogger(PerformRequestAndSaveFile.class);

	private GetMessageFromQueueGrpc.GetMessageFromQueueBlockingStub getMessageFromQueueStub;

	private RequestFromSink request;

	private StreamObserver<ResponseFromQueueSource> responseStream;

	PerformRequestAndSaveFile(String endpointURL) {

		this.getMessageFromQueueStub = getClientForEndPoint(endpointURL);

		this.request = getRequestObject();
		isQueueEmpty = false;

	}

	private RequestFromSink getRequestObject() {
		return RequestFromSink.newBuilder().build();
	}

	private GetMessageFromQueueBlockingStub getClientForEndPoint(String endpointURL) {
		return Clients.newClient("gproto+http://" + endpointURL, GetMessageFromQueueBlockingStub.class);
	}

	@Override
	public void run() {

		Iterator<ResponseFromQueueSource> responseIterator = getIteratedResponseFromGRPCServer();
		** log.info("Got response - {}", responseIterator.hasNext());*

		while (responseIterator.hasNext()) {

			ResponseFromQueueSource responseFromQueueSource = responseIterator.next();
			List<ByteString> fileList = responseFromQueueSource.getFileList();

			if (fileList == null || fileList.size() == 0) {
				isQueueEmpty = true;
				continue;
			}

			for (ByteString file : fileList) {
				ResponseJsonPojo responseJsonPojo = null;
				try {
					responseJsonPojo = objectMapper.readValue(file.toStringUtf8(), ResponseJsonPojo.class);
					objectMapper.writeValue(new File(responseJsonPojo.getMessageId() + ".json"), responseJsonPojo);
				} catch (Exception e) {
					log.error("Error while parsing JSON / Writing to file for JSON - ", file.toStringUtf8(), e);
				}

			}

		}

	}

	private Iterator<ResponseFromQueueSource> getIteratedResponseFromGRPCServer() {
		return this.getMessageFromQueueStub.getItem(request);
	}

log.info("Got response - {}", responseIterator.hasNext()); is actually commented.

When I try this normally, I get nothing :| (Especially in the run method)

However, if I debug, I get the output:
Screenshot 2019-09-11 at 6 52 55 PM

It cannot be due to the delay right? This is blocking stub after all (and not asynchronous).

	private Iterator<ResponseFromQueueSource> getIteratedResponseFromGRPCServer() {
		return this.getMessageFromQueueStub.getItem(request);
	}

should be a synchronous call no?

(Additional question on top of this: If I use server-side streaming, can I use async stub in the client? )

@anuraaga
Copy link
Collaborator

Thanks for the report! This seems to be a bug in our client implementation which should be fixed in #2066.

As for FutureStub, that doesn't seem to support server streamed responses (probably since a future can only be completed once). For async streaming, you will want to use a normal streaming stub (i.e., GetMessageFromQueueStub.

@trustin trustin added the defect label Sep 11, 2019
@trustin trustin added this to the 0.92.0 milestone Sep 11, 2019
@trustin
Copy link
Member

trustin commented Sep 11, 2019

That was fast! Thanks, @anuraaga !

@anirudhr95
Copy link
Author

Thanks for the quick response @anuraaga @trustin , This issue page has been really helpful in general for Armeria. Is there any workaround I can do temporarily to get this going? Without server-side streaming, I have a working version. Just wondering if I can get server-side streaming working. I can always pull the next version from maven if that PR gets merged :)

@anuraaga
Copy link
Collaborator

I would use a normal streaming stub and a latch or similar construct for blocking similar to this pattern

https://github.com/grpc/grpc-java/blob/master/testing/src/main/java/io/grpc/testing/StreamRecorder.java

Actually I would probably do this not just as a workaround but as the recommended pattern. The Iterator gives a very simple way to iterate on a response but otherwise lacks some important functionality, for example allowing the client to cancel the request mid-stream. So if you can it's always better to use the stream observers. I get the sense upstream also doesn't really like this Iterator pattern

grpc/grpc-java#2409

@anirudhr95
Copy link
Author

Alright, thanks! I'm closing this issue now. I saw the implementation with count-down latch (Armeria examples have the same I believe) but I did not think that was necessary in my case. I'll try this and see what happens :)

@anirudhr95
Copy link
Author

Ok I hope I get some help even though I have closed this T_T

I have read through the code, and I do not understand how that might help me here (I might have totally missed the most obvious way in which it could but I want to clarify anyway :( ).

From what I understand, the countDownLatch goes in the main method, after which I can consider the program done (or part of the program I want i.e)

responseIterator.hasNext() returns false for me. How do I use countDownLatch to solve this problem here.

My use case: I am using > 10 concurrent request to my gRPC producer endpoint (Indefinitely) (using a threadpool). Basically I will have a while(true) around my current run method.

Can you please help me solve this case?

@anirudhr95
Copy link
Author

Also, IK this is not recommended, but I can technically copy over the changes from #2066 , Decompile existing JAR, build my JAR and use that in the classpath no? Like that should solve my problem right away without any code change? Again, this is a dirty dirty fix IK, but that should technically work right?

@anuraaga
Copy link
Collaborator

Sorry maybe the latch wasn't a good example for infinite streaming. In that case using a BlockingQueue should be fine - the stream observer adds any messages to it and your infinite loop in the main thread can take from the blocking queue.

@anirudhr95
Copy link
Author

I tried the fix from #2066 , still no luck @anuraaga :(

My JAR actually contains the fix i.e but the responseIterator.hasNext() still returns false :(

@anuraaga
Copy link
Collaborator

Well might need to look at the server implementation too, though there might be a chance the hotfix you applied isn't working correctly too. The test in that PR that failed without the change makes me fairly confident it's working ok.

I've also sent #2068 which adds an example using a BlockingQueue which I mentioned earlier - I'd recommend using that pattern over the Iterator since StreamObserver is a fully-featured API.

trustin pushed a commit that referenced this issue Sep 16, 2019
I had no idea blocking stubs supported querying server streaming endpoints like this. Turns out we need to notify a call's executor when messages are received so a blocking stub can read individual messages, not just the final one as we currently support.

Fixes #2065
@trustin
Copy link
Member

trustin commented Sep 16, 2019

Please feel free to reopen if this is still an issue, @anirudhr95. It was automatically closed while I merge #2066.

@anirudhr95
Copy link
Author

So sorry about the delay in getting back to this @trustin , was trying out a few other approaches. (And it was a weekend :| ). I will get back to you when I have something solid :)

@anirudhr95
Copy link
Author

Also @trustin , if you merged to master, does that mean I can pull the latest version of Armeria from mvncentral and it will have the working version? (Is the JAR published i.e)

@trustin
Copy link
Member

trustin commented Sep 16, 2019

We release like every two weeks, so I guess a new release will be out this Thursday. Thanks a lot for your patience!

eugene70 pushed a commit to eugene70/armeria that referenced this issue Oct 16, 2019
eugene70 pushed a commit to eugene70/armeria that referenced this issue Oct 16, 2019
I had no idea blocking stubs supported querying server streaming endpoints like this. Turns out we need to notify a call's executor when messages are received so a blocking stub can read individual messages, not just the final one as we currently support.

Fixes line#2065
fmguerreiro pushed a commit to fmguerreiro/armeria that referenced this issue Sep 19, 2020
fmguerreiro pushed a commit to fmguerreiro/armeria that referenced this issue Sep 19, 2020
I had no idea blocking stubs supported querying server streaming endpoints like this. Turns out we need to notify a call's executor when messages are received so a blocking stub can read individual messages, not just the final one as we currently support.

Fixes line#2065
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants