Skip to content
This repository has been archived by the owner on Sep 13, 2018. It is now read-only.

Middleware & Stream Services #19

Closed
wants to merge 11 commits into from

Conversation

withoutboats
Copy link

Middleware

This PR adds a Middleware trait (different from the Middleware trait proposed in #17). This trait is intended to be the lowest common denominator of different middleware shapes: its literally isomorphic to FnOnce(Service) -> Service:

trait Middleware<S: Service> {
    type WrappedService: Service;
    fn wrap(self, service: S) -> Self::WrappedService;
}

This supports the following operations:

  • wrap (analogous to apply). Service -> Middleware -> Service. This is user supplied on each middleware trait, and Service contains a default method to do it as well.

  • chain (analogous to compose). Middleware -> Middleware -> Middleware. Combine two middleware to create a new middlware. This is provided through a default method on Middleware.

Users therefore have a fairly convenient API of middleware.chain(middleware).wrap(service) or service.wrap(middleware).wrap(middleware), depending on what values they have in context at that moment.

Streams

This also adds StreamService and StreamMiddleware, analogous to Service and Middleware except that they yield streams of responses instead of futures. Literally some trait bounds are the only difference between these traits and their non-stream analogs (note: use case for ConstraintKinds someday?).

Stream Reducers

The last piece of this is the StreamReduce trait, which is similar to a middleware except that it is a function of StreamService -> Service. This way you can deal with a stream at one layer of the stack & collapse it into a future at another layer.

Stream reducers can be composed with both StreamMiddleware and Middleware to produce another stream reducer.

Future extensions

Given that this has Service -> Service, StreamService -> StreamService, and StreamService -> Service, its clearly missing a Service -> StreamService (ServiceUnfold?). Time will tell if that's actually a useful trait.

The middleware definition here is the "LCD," probable we will want to explore different higher level abstractions on top of it for "before / after / around" middleware patterns. But this seems like the baseline that any of those will have to be built on top of.

A middleware is a function from Service to Service.

Two combinators are provided:

* Service::wrap (equivalent to function application)
* Middleware::chain (equivalent to function composition)
@carllerche
Copy link
Member

Looking good, could you elaborate more on StreamService as this is a new concept introduced here? What is the rational for introducing this?

@withoutboats
Copy link
Author

I suspect there may be other use cases (in protocols with a different messaging structure from HTTP), but my own use case comes from an MVC-like framework I'm experimenting with. It makes sense for the Index "controller" to return a Stream, rather than a Future, because it returns multiple domain objects. That would be a StreamService.

However the view layer which transforms that stream of domain objects into an http::Response needs to ultimately return a future (because that's contract a server implementation like Hyper would expect) which is why the StreamReduce trait is added

(Depending on the view layer, its possible that the body could still be streaming since an http Response's body can be streaming, but that;s just a method of reducing a stream to a future.)

@carllerche
Copy link
Member

carllerche commented Mar 28, 2017 via email

@withoutboats
Copy link
Author

@carllerche To make sure I understand, you mean something like how the Body type of hyper::Response is a Stream?

I'd like to be able to let users just write something like:

fn index() -> impl Stream<Item = T, Error = Error>

And then a StreamReduce type can transform that into a future with a stream body.

In addition to being easier for end applications, by distinguishing StreamMidldeware for Middleware it seems like it allows for better abstractions because you can define a middleware that operates over any stream, whereas you can't define a middleware that operates over any future that has a stream in it, since there's no clear abstraction between types that have that shape.

@carllerche
Copy link
Member

I don't have much context into why you want your index to return a stream, so I can't really common on whether or not it is a good design. However, I do feel very strongly that I want to keep the core abstraction as minimal as possible. I am very strongly leaning against adding a second core trait at this point and would rather explore ways to keep the current Service trait.

For example, would there be a way to use trait aliasing here?

It also could just be that your index fn is not a Service (it doesn't take a request), so whatever calls index ends up mapping it -> a future.

@withoutboats
Copy link
Author

withoutboats commented Mar 29, 2017

For example, would there be a way to use trait aliasing here?

What we'd need are what I've called "associated traits," which are sort of like a Haskell feature called "constraint kinds":

trait Service {
     type Request;
     type Response;
     type Error;
     trait Outcome<T, E>;

     fn call(&self, req: Self::Request) -> impl Self::Outcome<Self::Response, Self::Error>;
}

trait FutureService = Service<Outcome<T, E> = Future<Item = T, Error = E>>;
trait StreamService = Service<Outcome<T, E> = Stream<Item = T, Error = E>>;

This isn't in the near future.

I do feel very strongly that I want to keep the core abstraction as minimal as possible.

What does this mean to you and why do you feel it is very important?

@withoutboats
Copy link
Author

It also could just be that your index fn is not a Service (it doesn't take a request), so whatever calls index ends up mapping it -> a future.

The index actually takes an Environment argument for data middleware want to pass to the handler (which is a TypeMap right now). Certainly its triggered by a request to a particular route.

But a clearer example is probably fetching a to many relationship, e.g. /posts/{slug}/comments is a function of slug -> Stream<Comment>.

@withoutboats
Copy link
Author

withoutboats commented Apr 17, 2017

Okay I think I've been convinced that this is largely a better design for streaming services:

pub struct Response<H, B> {
    pub header: H,
    pub body: B,
}

pub trait StreamingService {
    type Request;
    type Header;
    type Member;
    type Error;
    type Body: Stream<Item = Self::Member, Error = Self::Error>;
    type Future: Future<Item = Response<Self::Header, Self::Body>, Error = Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future;
}

impl<S: StreamingService> Service for S {
    type Request = S::Request;
    type Response = Response<S::Header, S::Body>;
    type Error = S::Error;
    type Future = S::Future;

    fn call(&self, req: Self::Request) -> Self::Future {
        StreamingService::call(self, req)
    }
}

Advantages

  • All StreamingService also implement Service; its now a subset of a single trait.
  • As a consequence, you can have a single NewService and Middleware trait instead of splitting up the space.
  • Further, its easy for things that are agnostic as to whether or not the response is streaming.
  • Also, this definition allows the response to contain headers in advance of the stream, which is a common need.

Disadvantage

The primary disadvantage is that being agnostic about what the response is can be a footgun. Middleware can mistakenly assume that Service::Response is a fully evaluated response. I've looked at solutions to this that keep the advantage.

The most obvious is to define an auto trait like Final which is not implemented by futures and streams. However, this is unstable & unlikely to be stable soon, and currently those negative impls for future and stream would conflict (which is a whole other language issue).

However, with specialization, it should be possible for conscious middleware authors to write their middleware for S: Service and S: StreamingService (because the latter will be a specialization of the former). So once specialization lands we're just talking about a footgun & not actually closing on treating streaming services differently.


@carllerche @aturon thoughts?

@nielsle
Copy link

nielsle commented Apr 17, 2017

I really like the new version of wrap(). It is very simple and beautiful. It would be great to be able to use closures as well

my_service.wrap(| &service, request | {
    foo(service.call(bar(request)))
})

Regarding the reduce(), you can solve many cases at the future-stream level by applying combinators to the output stream. I wonder if reduce() is really necessary, but I may have missed something.

my_service.wrap(| &service, request | {
    service.call(request).and_then(|response| {
        // .. do stuff here
    })
})

The same goes for chain(). It is a really nice idea, but perhaps it should live in a separate library.

@nielsle
Copy link

nielsle commented Apr 17, 2017

AFAICS the following finagle issue corresponds to chain()
twitter/finagle#385

@carllerche
Copy link
Member

I still do not understand what is gained by special casing a streaming service. Could this be explained further?

@withoutboats
Copy link
Author

@carllerche Can we talk in terms of concrete trade offs? I don't agree with the framing that this is 'special casing.'

People are going to write streaming services - indeed, they already appear in tokio_proto for example, hyper can correctly handle if the service has a streaming body, etc. If we provide no abstraction, all of those streaming services will not share a protocol that a middleware author can use to manipulate them.

A service which has a streaming response is significantly different from one which has a fully evaluated response & some middleware will be incorrect if it can't make the distinction. Some middleware will want to manipulate the stream, and needs access to it. This makes it possible to write those middleware correctly and abstractly.

Meanwhile, in the most recent proposal they all implement Service, so things which want to be agnostic over whether or not the response is streaming can be.

@carllerche
Copy link
Member

I guess, I should clarify that I don't see how the pros of special casing streaming service come close to the cons.

So, w/ the StreamingService above, the biggest disadvantage would be that the Response struct is fixed. This means that hyper could not provide its own Response struct, which would be a non starter IMO.

Maybe we should start by listing out specific examples of middleware that need to know about the streaming body?

@withoutboats
Copy link
Author

withoutboats commented Apr 17, 2017

Maybe we should start by listing out specific examples of middleware that need to know about the streaming body?

  • Instrumentation middleware that cares about when a response 'finishes.' Same for timeout middleware depending on the semantics you want. Post about the problem in Rack
  • Any middleware which wraps a service which is dealing with domain objects (the items in the stream) instead of just a protocol. For these, they care about the StreamingService::Member type.

This means that hyper could not provide its own Response struct, which would be a non starter IMO.

The only difference in definition is that Hyper's response body is in an Option, which this should one should probably be as well. I don't see why hyper can't use a newtype wrapper around this struct. EDIT: Well I do see why & nominal typing but I think we can work that out.

@withoutboats
Copy link
Author

withoutboats commented Apr 17, 2017

Also if we just had ATC, there would be no Response struct:

pub trait StreamingService {
    type Request;
    type Header;
    type Member;
    type Error;
    type Body: Stream<Item = Self::Member, Error = Self::Error>;
    type Response<H, B>;
    type Future: Future<Item = Self::Response<Self::Header, Self::Body>, Error = Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future;
}

EDIT:

We can get the same effect by adding a StreamingResponse trait:

pub trait StreamingService {
    type Request;
    type Header;
    type Member;
    type Error;
    type Response: StreamingResponse<Self::Header, Self::Member, Self::Error>;
    type Future: Future<Item = Self::Response, Error = Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future;
}

Definitions:

First, IMO ideal, purely structural, with fields in traits:

pub trait StreamingResponse<H, M, E> {
    type Body: Stream<Item = M, Error = E>;
    struct {
        header: H,
        body: Option<Self::Body>,
    }
}

On stable, today:

pub trait StreamingResponse<H, M, E> {
    type Body: Stream<Item = M, Error = E>;
    fn new(headers: H, body: Option<Self::Body>) -> Self;
    fn members(self) -> (H, Option<Self::Body>);
    fn members_ref(&self) -> (&H, Option<&Self::Body>);
    fn members_mut(&self) -> (&mut H, Option<&mut Self::Body>);
}

EDIT2: For demonstration, implementation signature of StreamingResponse for hyper::Response.

impl StreamingResponse<MessageHead<StatusCode>, Chunk, Error> for Response {
    type Body = Body;
}

@carllerche
Copy link
Member

@withoutboats I am familiar w/ how this exists in rack and the discussion around it. However, I am trying to ask about a list of middleware to better understand the problem.

The example listed in the blog post is the only middleware that I can personally think of that needs to be a) generic b) hook into the end of the body stream. Most other middleware that I can think of doesn't care about the end of the body stream or can be implemented to specifically know about streaming bodies (are pretty concrete).

Also, if there is a response trait, why would a streaming service trait still be needed? Middleware could be implemented to take an upstream Service such that the response is bound by the streaming response trait.

@carllerche
Copy link
Member

In general, I still don't have a good sense of what the exact problem being targeted is vs. a vaguer "streaming middleware is hard" problem.

@withoutboats
Copy link
Author

Also, if there is a response trait, why would a streaming service trait still be needed? Middleware could be implemented to take an upstream Service such that the response is bound by the streaming response trait.

This is a good point, its probably enough to provide a trait for a streaming response.

@carllerche
Copy link
Member

I wonder if somehow there would be a way to avoid a StreamingResponse type in favor of using conversion traits in std...

something like:

trait StreamingResponse<Head, Body> = From<(Head, Option<Body>)> + Into<(Head, Option<Body>)>`

or whatever the trait alias syntax is... then types that are "streaming" can implement the conversions w/o needing any third party traits.

@withoutboats
Copy link
Author

We'd also want AsRef and AsMut, but they don't work in general because they return &(Head, Option<Body>) which you can't construct, you want (&Head, &Option<Body>).

@carllerche
Copy link
Member

Can you elaborate on why AsRef and AsMut are needed as part of the streaming req / resp API?

They could also be asked for separately (unrelated to the into "parts")

@carllerche
Copy link
Member

For example, the timing middleware case doesn't actually need AsRef / AsMut... It would split the response (using into()), wrap the body, then convert back to the response w/ from.

I would be interested in a more concrete case that would need both "splitting" + AsRef / AsMut.

@withoutboats
Copy link
Author

My comment was not based on a particular anticipated use case, just what it would mean to define isomorphy to (Head, Option<Body>).

@withoutboats
Copy link
Author

withoutboats commented Apr 18, 2017

I'm kinda coming around to your way of thinking about this in general.

I guess a full structural definition could be formed from:

trait StreamingResponse<Head, Body> = From<(Head, Option<Body>)> + Into<(Head, Option<Body>)> +
    AsRef<Head> + AsRef<Option<Body>> + AsMut<Head> + AsMut<Option<Body>>;

@carllerche
Copy link
Member

One unknown right now is exactly how to implement a piece of middleware that doesn't care if the upstream service is streaming or not (i.e. the timer middleware).

Using the above pattern, in the streaming case, the timer middleware would split the response, decorate the body to finish the timing logic once the body is done streaming, then take the new stream and recreate the response (using From). If the body is None, I would guess that the timer would stop timing immediately...

So, the question is then how does the timer middleware handle non streaming responses? Are non-streaming responses still expected to implement From & Into parts? It could be that not all responses even have a body component (or a field to store the body). In which case, what happens if you call try to do SomeResponse::from((head, Some(body))? I would guess that it would panic...

Anyway, I think we should try to actually implement the timer middleware to make sure it works for upstream services that have streaming bodies and don't have streaming bodies.

Hopefully that makes sense.

@withoutboats
Copy link
Author

@carllerche My intent was to solve that with specialization. The non-streaming response would be the default: S: Service, whereas the streaming response would specialize it S: Service, S::Response: StreamingResponse.

@withoutboats
Copy link
Author

Closing in favor of #21

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants