Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python client receives out-of-order stream #611

Open
luan-xiaokun opened this issue Mar 19, 2024 · 10 comments
Open

Python client receives out-of-order stream #611

luan-xiaokun opened this issue Mar 19, 2024 · 10 comments

Comments

@luan-xiaokun
Copy link

Hi all, I have a server implemented in Scala using zio-grpc, and a client in Python.

I found that sometimes the stream received by the client is out of order. For example, the sequence of data sent by the server is [1, 2, 3, 4, 5], but those received by the client are [1, 2, 4, 3, 5]. Though I'm new to grpc, I don't think this is an expected behavior.

Here is a minimal example: https://drive.google.com/file/d/1Eew2sOhjSt2tCBEupE1glo6PALYkB0t1/view

import io.grpc.StatusException
import scalapb.zio_grpc.ServerMain
import scalapb.zio_grpc.ServiceList
import zio.{ZIO, stream}
import zio.stream.ZStream
import example.demo._

class DemoService extends ZioDemo.Demo {
  def foo(request: FooRequest): stream.Stream[StatusException, FooResponse] = {
    ZStream.fromIterable(getStream)
  }

  private def getStream: List[FooResponse] = List.range(0, 100).map(i => FooResponse(i.toString))
}


object Main extends ServerMain {
  override def port: Int = 8981

  override def services: ServiceList[Any] =
    ServiceList.add(new DemoService)
}
import grpc
import demo_pb2
import demo_pb2_grpc


def get_streaming_response(stub):
    responses = list(stub.Foo(demo_pb2.FooRequest(data="hello")))
    if any(r.data != str(i) for i, r in enumerate(responses)):
        print("Expected:", list(range(len(responses))))
        print("Got:     ", [int(r.data) for r in responses])
        print("Diff:    ", [i for i, r in enumerate(responses) if r.data != str(i)])
        raise ValueError("Out of order response")


if __name__ == "__main__":
    with grpc.insecure_channel("localhost:8981") as channel:
        demo_stub = demo_pb2_grpc.DemoStub(channel)

        n = 10000
        for _ in range(n):
            get_streaming_response(demo_stub)

#### output looks like:
# Expected: [0, 1, 2, ...]
# Got:      [0, 1, 2, ...]
# Diff:     [x, y, ...]

According to my testing results, this happens rarely (with n=10000 this will almost always happen), and usually only one or two pairs of adjacent items are swapped.

I'm not sure if this problem is with zio-grpc or Pyhton's grpcio, below is some relevant information:
OS: Ubuntu 22.04
Python version: 3.10.13
grpcio version: 1.51.1 (couldn't find 1.50.1)
grpcio-tools version: 1.51.1
grpc version: 4.25.3
Scala version: 2.13.13
grpc-netty version: 1.50.1

@thesamet
Copy link
Contributor

I was able to reproduce - looks like it's fairly common for messages to go out of order when streaming. Testing this with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=1000 makes the problem go away. Testing with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=-1 results in the Python client receiving an error. This suggests backpressue logic needs to be fixed. @regiskuckaertz @cipriansofronia @ghostdogpr - will one of you have time to look into this?

@ghostdogpr
Copy link
Contributor

The main difference with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE being -1 and 1000 is that when using 1000, we call stream.buffer which loses the chunking. With -1, we keep the chunking which is quite high here since ZStream.fromIterable uses DefaultChunkSize = 4096.
When we keep the chunking, we're doing chunk.foreach(call.call.sendMessage) in a single IO, while when we lose it, we're doing individual call.call.sendMessage per IO, which is much slower but I guess contributes to keeping the order. I am a little surprised that sendMessage doesn't guarantee the order if we call it very quickly 🤔

@regiskuckaertz
Copy link
Contributor

I've been playing with it for a while and it is hard to reproduce on my end. But what I noticed is that this is definitely not a bug in this library. To check this I changed the handler to:

ZChannel.readWithCause(
          xs =>
            ZChannel.fromZIO(GIO.attempt {
              println("--")
              println(xs.mkString(","))
              xs.foreach(call.call.sendMessage)
            }) *> ...

and then the user code stops when finding a reproducer:

ZioTestservice.GreeterClient
                 .sayHelloStreaming(HelloRequest(request = Some(Hello(name = "Testing streaming"))))
                 .runCollect
                 .repeatUntilZIO { ys =>
                   if (ys.map(_.i) != (0 until 100)) {
                     println(ys.map(_.i).mkString(", "))
                     ZIO.succeed(true)
                   } else ref.updateAndGet(_ - 1).map(_ == 0)
                 }

What I observed is that:

  • xs always has size 1, which is surprising to me but fair enough
  • irremediably, whenever there's a reproducer, the successive xs are also inverted, e.g.
0, 1, ..., 84, 83, ..., 99

and

--
HelloReply(84, UnknownFieldSet(Map()))
--
HelloReply(83, UnknownFieldSet(Map()))

I also note that the bug only happens (at least for me) when using ZStream.fromIterable(0 until 100), I was unable to observe it is with ZStream.iterate(0)(_ + 1).take(100) for instance—doesn't mean it can't happen for someone else though.

Anyway, this appears to be a bug in the stream runtime and it will be fun to find it 🤠

@regiskuckaertz
Copy link
Contributor

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

@ghostdogpr
Copy link
Contributor

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

Isn't it because of buffer that destroys chunking rather than map?

@ghostdogpr
Copy link
Contributor

Small reproducer using zio only:

import zio.*
import zio.stream.*

object Test extends ZIOAppDefault {
  val expected = Chunk.fromIterable(0 until 100)
  val s        = ZStream.fromChunk(expected).buffer(16)

  def run = s.runCollect.map(_ == expected).debug.repeatWhile(identity)
}

It seems to work without buffer. Will open an issue on the zio repo.

@ghostdogpr
Copy link
Contributor

zio/zio#8699

@regiskuckaertz
Copy link
Contributor

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

Isn't it because of buffer that destroys chunking rather than map?

I had added a rechunk and forgotten about it 😅

@regiskuckaertz
Copy link
Contributor

Speaking of buffer, I thought about using bufferChunks instead, that would avoid the rechunking, though one would need to change the size of the queue. Thoughts?

@ghostdogpr
Copy link
Contributor

I actually added #578 to not buffer at all 😆 But yeah bufferChunks sounds better than buffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants