Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit input/output handling #66

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions bench-tool/bench-tool.cabal
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.34.4.
-- This file has been generated from package.yaml by hpack version 0.37.0.
--
-- see: https://github.com/sol/hpack
--
-- hash: 55d227f1000a57af28adf509eeccdb465720fbf0a9766ae0ae4f8a897276d7d1

name: bench-tool
version: 0.4.0.1
Expand Down Expand Up @@ -40,7 +38,7 @@ library
async >=2.2.1 && <3
, base >=4.10 && <5
, binary >=0.8.5 && <0.9
, bytestring >=0.10.8 && <0.13
, bytestring >=0.10.8 && <0.11
, case-insensitive >=1.2.0 && <1.3
, containers
, http-types ==0.12.*
Expand Down Expand Up @@ -75,7 +73,7 @@ executable bench-tool-exe
, base >=4.10 && <5
, bench-tool
, binary >=0.8.5 && <0.9
, bytestring >=0.10.8 && <0.13
, bytestring >=0.10.8 && <0.11
, case-insensitive >=1.2.0 && <1.3
, containers
, http-types ==0.12.*
Expand Down
3 changes: 2 additions & 1 deletion bench-tool/src/BenchTool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PackageImports #-}

module BenchTool where

Expand Down Expand Up @@ -29,7 +30,7 @@ import qualified Network.GRPC.Client.Helpers as Client
import Network.GRPC.HTTP2.Encoding as Encoding
import qualified Network.GRPC.HTTP2.ProtoLens as ProtoLens
import Network.GRPC.Server as Server
import qualified Network.HTTP2.Client as Client
import qualified "http2-client" Network.HTTP2.Client as Client
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.WarpTLS as WarpTLS
import Options.Generic
Expand Down
127 changes: 85 additions & 42 deletions http2-client-grpc/src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE PackageImports #-}
Expand Down Expand Up @@ -60,6 +61,7 @@ module Network.GRPC.Client (
, streamRequest
, steppedBiDiStream
, generalHandler
, explicitHandler
, CompressMode(..)
, StreamDone(..)
, BiDiStep(..)
Expand Down Expand Up @@ -99,6 +101,7 @@ import Network.HTTP2.Frame
import Network.HPACK
import "http2-client" Network.HTTP2.Client hiding (next)
import Network.HTTP2.Client.Helpers
import Data.IORef

type CIHeaderList = [(CI ByteString, ByteString)]

Expand Down Expand Up @@ -412,7 +415,7 @@ newtype InvalidParse = InvalidParse String deriving Show
instance Exception InvalidParse where

-- | An event for the incoming loop of 'generalHandler'.
data IncomingEvent o a =
data IncomingEvent o =
Headers HeaderList
-- ^ The server sent some initial metadata with the headers.
| RecvMessage o
Expand All @@ -423,7 +426,7 @@ data IncomingEvent o a =
-- ^ Something went wrong (the loop stops).

-- | An event for the outgoing loop of 'generalHandler'.
data OutgoingEvent i b =
data OutgoingEvent i =
Finalize
-- ^ The client is done with the RPC (the loop stops).
| SendMessage CompressMode i
Expand All @@ -440,64 +443,104 @@ data OutgoingEvent i b =
-- One loop accepts and chunks messages from the HTTP2 stream, then return events
-- and stops on Trailers or Invalid. The other loop waits for messages to send to
-- the server or finalize and returns.
generalHandler
generalHandler
:: (GRPCInput r i, GRPCOutput r o)
=> r
-- ^ RPC to call.
-> a
-- ^ An initial state for the incoming loop.
-> (a -> IncomingEvent o a -> ClientIO a)
-> (a -> IncomingEvent o -> ClientIO a)
-- ^ A state-passing function for the incoming loop.
-> b
-- ^ An initial state for the outgoing loop.
-> (b -> ClientIO (b, OutgoingEvent i b))
-> (b -> ClientIO (b, OutgoingEvent i))
-- ^ A state-passing function for the outgoing loop.
-> RPCCall r (a,b)
generalHandler rpc v0 handle w0 next = RPCCall rpc $ \conn stream isfc osfc encoding decoding ->
go conn stream isfc osfc encoding decoding
generalHandler rpc v0 handle w0 next =
explicitHandler rpc go
where
go conn stream isfc osfc encoding decoding =
concurrently (incomingLoop Nothing newDecoder v0) (outGoingLoop w0)
go getIncoming sendOutgoing =
concurrently (incomingLoop v0) (outGoingLoop w0)
where
ocfc = _outgoingFlowControl conn
newDecoder = decodeOutput rpc decompress
decompress = _getDecodingCompression decoding
outGoingLoop v1 = do
(v2, event) <- next v1
case event of
Finalize -> do
sendData conn stream setEndStream ""
sendOutgoing event
return v2
SendMessage doCompress msg -> do
SendMessage{} -> do
sendOutgoing event
outGoingLoop v2
incomingLoop v1 =
getIncoming >>= \case
Nothing -> return v1
Just event -> do
handle v1 event >>= incomingLoop

explicitHandler
:: (GRPCInput r i, GRPCOutput r o)
=> r
-- ^ RPC to call.
-> (ClientIO (Maybe (IncomingEvent o)) -> (OutgoingEvent i -> ClientIO ()) -> ClientIO a)
-> RPCCall r a
explicitHandler rpc act = RPCCall rpc go
where
go conn stream isfc osfc encoding decoding = do
receivedHeaders <- liftIO $ newIORef False
decoderVar <- liftIO $ newIORef newDecoder
gotTrailers <- liftIO $ newIORef False
act (getIncoming receivedHeaders gotTrailers decoderVar) sendOutgoing
where
ocfc = _outgoingFlowControl conn
newDecoder = decodeOutput rpc decompress
decompress = _getDecodingCompression decoding
sendOutgoing event = do
case event of
Finalize -> do
sendData conn stream setEndStream ""
SendMessage doCompress msg -> do
let compress = case doCompress of
Compressed -> _getEncodingCompression encoding
Uncompressed -> uncompressed
sendSingleMessage rpc msg (Encoding compress) id conn ocfc stream osfc
outGoingLoop v2
incomingLoop Nothing decode v1 =
_waitEvent stream >>= \case
StreamHeadersEvent _ hdrs ->
handle v1 (Headers hdrs) >>= incomingLoop (Just hdrs) decode
_ ->
handle v1 (Invalid $ toException $ InvalidState "no headers")
incomingLoop jhdrs decode v1 =
_waitEvent stream >>= \case
StreamHeadersEvent _ hdrs ->
handle v1 (Trailers hdrs)
StreamDataEvent _ dat -> do
liftIO $ _addCredit isfc (ByteString.length dat)
_ <- liftIO $ _consumeCredit isfc (ByteString.length dat)
_ <- _updateWindow isfc
case pushChunk decode dat of
Done unusedDat _ (Right val) ->
handle v1 (RecvMessage val) >>= incomingLoop jhdrs (pushChunk newDecoder unusedDat)
partial@(Partial _) ->
incomingLoop jhdrs partial v1
Done _ _ (Left err) ->
handle v1 (Invalid $ toException $ InvalidParse $ "invalid-done-parse: " ++ err)
Fail _ _ err ->
handle v1 (Invalid $ toException $ InvalidParse $ "invalid-parse: " ++ err)
StreamPushPromiseEvent {} ->
handle v1 (Invalid $ toException UnallowedPushPromiseReceived)
StreamErrorEvent {} ->
handle v1 (Invalid $ toException $ InvalidState "stream error")
getIncoming receivedHeaders gotTrailersVar decoderVar = do
gotTrailers <- liftIO $ readIORef gotTrailersVar
didReceive <- liftIO $ readIORef receivedHeaders
case (gotTrailers, didReceive) of
(True, _) -> return Nothing
(False, False) ->
_waitEvent stream >>= \case
StreamHeadersEvent _ hdrs -> do
liftIO $ writeIORef receivedHeaders True
return (Just (Headers hdrs))
_ ->
return (Just (Invalid $ toException $ InvalidState "no headers"))
(False, True) ->
_waitEvent stream >>= \case
StreamHeadersEvent _ hdrs -> do
liftIO $ writeIORef gotTrailersVar True
return (Just (Trailers hdrs))
StreamDataEvent _ dat -> do
liftIO $ _addCredit isfc (ByteString.length dat)
_ <- liftIO $ _consumeCredit isfc (ByteString.length dat)
_ <- _updateWindow isfc
decode <- liftIO $ readIORef decoderVar
case pushChunk decode dat of
Done unusedDat _ (Right val) -> do
liftIO $ writeIORef decoderVar (pushChunk newDecoder unusedDat)
return (Just (RecvMessage val))
partial@(Partial _) -> do
liftIO $ writeIORef decoderVar partial
getIncoming receivedHeaders gotTrailersVar decoderVar
Done _ _ (Left err) -> do
liftIO $ writeIORef gotTrailersVar True
return (Just (Invalid $ toException $ InvalidParse $ "invalid-done-parse: " ++ err))
Fail _ _ err -> do
liftIO $ writeIORef gotTrailersVar True
return (Just (Invalid $ toException $ InvalidParse $ "invalid-parse: " ++ err))
StreamPushPromiseEvent {} -> do
liftIO $ writeIORef gotTrailersVar True
return (Just (Invalid $ toException UnallowedPushPromiseReceived))
StreamErrorEvent {} -> do
liftIO $ writeIORef gotTrailersVar True
return (Just (Invalid $ toException $ InvalidState "stream error"))
9 changes: 5 additions & 4 deletions http2-client-grpc/src/Network/GRPC/Client/Helpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE PackageImports #-}

Expand Down Expand Up @@ -33,8 +34,8 @@ import Data.Monoid ((<>))
#endif

import "http2-client" Network.HTTP2.Client (frameHttp2RawConnection, ClientIO, ClientError, newHttp2FrameConnection, newHttp2Client, Http2Client(..), IncomingFlowControl(..), GoAwayHandler, FallBackFrameHandler, ignoreFallbackHandler, HostName, PortNumber, TooMuchConcurrency)
import Network.HTTP2.Client.Helpers (ping)
import Network.HTTP2.Client.RawConnection (newRawHttp2ConnectionSocket, newRawHttp2ConnectionUnix)
import "http2-client" Network.HTTP2.Client.Helpers (ping)
import "http2-client" Network.HTTP2.Client.RawConnection (newRawHttp2ConnectionSocket, newRawHttp2ConnectionUnix)
import Network.GRPC.Client
import Network.GRPC.HTTP2.Encoding
import qualified Network.Socket as Network
Expand Down Expand Up @@ -234,11 +235,11 @@ rawGeneralStream
-- ^ An initialized client.
-> a
-- ^ An initial state for the incoming loop.
-> (a -> IncomingEvent o a -> ClientIO a)
-> (a -> IncomingEvent o -> ClientIO a)
-- ^ A state-passing function for the incoming loop.
-> b
-- ^ An initial state for the outgoing loop.
-> (b -> ClientIO (b, OutgoingEvent i b))
-> (b -> ClientIO (b, OutgoingEvent i))
-- ^ A state-passing function for the ougoing loop.
-> ClientIO (Either TooMuchConcurrency (a,b))
rawGeneralStream rpc (GrpcClient client authority headers timeout compression _) v0 handler w0 next =
Expand Down
6 changes: 5 additions & 1 deletion http2-grpc-proto-lens/http2-grpc-proto-lens.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.37.0.
--
-- see: https://github.com/sol/hpack

name: http2-grpc-proto-lens
version: 0.1.1.0
version: 0.1.0.0
synopsis: Encoders based on `proto-lens` for gRPC over HTTP2.
description: Please see the README on GitHub at <https://github.com/haskell-grpc-native/http2-grpc-haskell/blob/master/http2-grpc-proto-lens/README.md>
category: Network
Expand Down
6 changes: 2 additions & 4 deletions http2-grpc-proto3-wire/http2-grpc-proto3-wire.cabal
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.34.4.
-- This file has been generated from package.yaml by hpack version 0.37.0.
--
-- see: https://github.com/sol/hpack
--
-- hash: 304a39ccf01d8fa7f4be733a59b8257ddf85c6d3141be2869e5d4a4c16380b37

name: http2-grpc-proto3-wire
version: 0.1.0.1
Expand Down Expand Up @@ -41,6 +39,6 @@ library
, bytestring >=0.10.8 && <0.12
, case-insensitive >=1.2.0 && <1.3
, http2-grpc-types
, proto3-wire >=1 && <1.5
, proto3-wire >=1 && <1.4
, zlib >=0.6.2 && <0.7
default-language: Haskell2010
8 changes: 4 additions & 4 deletions stack-18.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ packages:
- http2-client-grpc
- bench-tool
extra-deps:
- git: https://github.com/lucasdicioccio/http2-client
commit: 046b1f23ea547b75f1a3dc3d878603968bff4fb4
- proto-lens-0.7.1.1@sha256:8419e86fd7f521206a991c3ccb1777332a07a8250b380b06694f9f301b1ff38d,2972
- proto-lens-runtime-0.7.0.2@sha256:25d11fd08b56025c89e6be5da46f40a82bd845218b235b9e0114253b45caa074,3051
- http2-client-0.10.0.0
- proto-lens-0.7.0.0
- proto-lens-runtime-0.7.0.0
- proto3-wire-1.0.0
17 changes: 17 additions & 0 deletions stack-22.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
resolver: lts-22.34
allow-newer: true
packages:
- http2-grpc-types
- http2-grpc-proto-lens
- http2-grpc-proto3-wire
- warp-grpc
- http2-client-grpc
- bench-tool
extra-deps:
- http2-client-0.10.0.2
- proto3-wire-1.4.3
- word-compat-0.0.6
- recv-0.1.0
- http2-5.0.1
- warp-tls-3.4.6
- warp-3.3.31
19 changes: 12 additions & 7 deletions stack-nightly.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
resolver: nightly-2022-06-19
resolver: nightly-2024-09-26
allow-newer: true
packages:
- http2-client-grpc
- http2-grpc-types
- http2-grpc-proto-lens
- http2-grpc-proto3-wire
- http2-grpc-types
- warp-grpc
- http2-client-grpc
- bench-tool
extra-deps:
- git: https://github.com/lucasdicioccio/http2-client
commit: 046b1f23ea547b75f1a3dc3d878603968bff4fb4
- proto3-wire-1.3.0@sha256:45220b0c7ad6e62521bacc18a71dff5e8eb6b77cfdbead14176385e2a35e1c12,2811
- word-compat-0.0.2@sha256:c881977321de67d6f1d0cafe805e66d771a6e0614cafaa2104391f44cf4afd21,1202
- http2-client-0.10.0.2
- proto3-wire-1.4.3
- word-compat-0.0.6
- recv-0.1.0
- http2-5.0.1
- warp-tls-3.4.6
- warp-3.3.31
2 changes: 1 addition & 1 deletion stack.yaml
1 change: 1 addition & 0 deletions test-builds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ stack build --stack-yaml=stack-16.yaml
stack build --stack-yaml=stack-17.yaml
stack build --stack-yaml=stack-18.yaml
stack build --stack-yaml=stack-19.yaml
stack build --stack-yaml=stack-22.yaml
11 changes: 10 additions & 1 deletion warp-grpc/src/Network/GRPC/Server/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module Network.GRPC.Server.Handlers (
, bidiStream
, H.GeneralStreamHandler, H.IncomingStream(..), H.OutgoingStream(..)
, generalStream
, H.ExplicitStreamHandler
, explicitStream
) where

import Network.GRPC.HTTP2.Encoding
Expand Down Expand Up @@ -48,4 +50,11 @@ generalStream
=> r
-> H.GeneralStreamHandler IO i o a b
-> ServiceHandler
generalStream = H.generalStream id
generalStream = H.generalStream id

explicitStream
:: (GRPCInput r i, GRPCOutput r o)
=> r
-> H.ExplicitStreamHandler IO i o
-> ServiceHandler
explicitStream = H.explicitStream id
Loading