Skip to content

Commit

Permalink
Cheery pick tikv#4303
Browse files Browse the repository at this point in the history
Signed-off-by: Breezewish <[email protected]>
  • Loading branch information
breezewish committed Mar 14, 2019
1 parent 229c423 commit 87f4c2f
Show file tree
Hide file tree
Showing 7 changed files with 668 additions and 124 deletions.
199 changes: 95 additions & 104 deletions src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ impl<E: Engine> Endpoint<E> {

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
/// Returns `Err` if fails.
fn try_parse_request(
fn parse_request(
&self,
mut req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> Result<(RequestHandlerBuilder<E::Snap>, ReqContext)> {
fail_point!("coprocessor_parse_request", |_| Err(box_err!(
"unsupported tp (failpoint)"
)));

let (context, data, ranges) = (
req.take_context(),
req.take_data(),
Expand Down Expand Up @@ -163,34 +167,6 @@ impl<E: Engine> Endpoint<E> {
Ok((builder, req_ctx))
}

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
#[inline]
fn parse_request(
&self,
req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> (RequestHandlerBuilder<E::Snap>, ReqContext) {
match self.try_parse_request(req, peer, is_streaming) {
Ok(v) => v,
Err(err) => {
// If there are errors when parsing requests, create a dummy request handler.
let builder =
box |_, _: &_| Ok(cop_util::ErrorRequestHandler::new(err).into_boxed());
let req_ctx = ReqContext::new(
"invalid",
kvrpcpb::Context::new(),
&[],
Duration::from_secs(60), // Large enough to avoid becoming outdated error
None,
None,
None,
);
(builder, req_ctx)
}
}
}

/// Get the batch row limit configuration.
#[inline]
fn get_batch_row_limit(&self, is_streaming: bool) -> usize {
Expand Down Expand Up @@ -269,39 +245,44 @@ impl<E: Engine> Endpoint<E> {
})
}

/// Handle a unary request and run on the read pool. Returns a future producing the
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a unary request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases.
/// The future inside may be an error however.
fn handle_unary_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Future<Item = coppb::Response, Error = ()> {
) -> Result<impl Future<Item = coppb::Response, Error = Error>> {
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// box the tracker so that moving it is cheap.
let mut tracker = box Tracker::new(req_ctx);

let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_unary_request_impl(engine, tracker, handler_builder)
});

future::result(result)
// If the read pool is full, an error response will be returned directly.
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);
Self::handle_unary_request_impl(engine, tracker, handler_builder)
})
.map_err(|_| Error::Full)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// Parses and handles a unary request. Returns a future that will never fail. If there are
/// errors during parsing or handling, they will be converted into a `Response` as the success
/// result of the future.
#[inline]
pub fn parse_and_handle_unary_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Future<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, false);
self.handle_unary_request(req_ctx, handler_builder)
let result_of_future = self.parse_request(req, peer, false).and_then(
|(handler_builder, req_ctx)| self.handle_unary_request(req_ctx, handler_builder),
);

future::result(result_of_future)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// The real implementation of handling a stream request.
Expand Down Expand Up @@ -397,62 +378,55 @@ impl<E: Engine> Endpoint<E> {
.flatten_stream()
}

/// Handle a stream request and run on the read pool. Returns a stream producing each
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a stream request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(stream)` in other cases.
/// The stream inside may produce errors however.
fn handle_stream_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (tx, rx) = mpsc::channel::<coppb::Response>(self.stream_channel_size);
) -> Result<impl Stream<Item = coppb::Response, Error = Error>> {
let (tx, rx) = mpsc::channel::<Result<coppb::Response>>(self.stream_channel_size);
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// Must be created befure `future_execute`, otherwise wait time is not tracked.
let mut tracker = box Tracker::new(req_ctx);

let tx1 = tx.clone();
let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_stream_request_impl(engine, tracker, handler_builder)
.or_else(|e| Ok::<_, mpsc::SendError<_>>(make_error_response(e)))
// Although returning `Ok()` from `or_else` will continue the stream,
// our stream has already ended when error is returned.
// Thus the stream will not continue any more even after we converting errors
// into a response.
.forward(tx1)
});
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

match result {
Err(_) => {
stream::once::<_, mpsc::SendError<_>>(Ok(make_error_response(Error::Full)))
Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream<Resp, Error>
.then(Ok::<_, mpsc::SendError<_>>) // Stream<Result<Resp, Error>, MpscError>
.forward(tx)
.then(|_| {
// ignore sink send failures
Ok::<_, ()>(())
})
// Should not be blocked, since the channel is large enough to hold 1 value.
.wait()
.unwrap();
}
Ok(cpu_future) => {
})
.map_err(|_| Error::Full)
.and_then(move |cpu_future| {
// Keep running stream producer
cpu_future.forget();
}
}

rx
// Returns the stream instead of a future
Ok(rx.then(|r| r.unwrap()))
})
}

/// Parses and handles a stream request. Returns a stream that produce each result in a
/// `Response` and will never fail. If there are errors during parsing or handling, they will
/// be converted into a `Response` as the only stream item.
#[inline]
pub fn parse_and_handle_stream_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, true);
self.handle_stream_request(req_ctx, handler_builder)
let result_of_stream = self.parse_request(req, peer, true).and_then(
|(handler_builder, req_ctx)| self.handle_stream_request(req_ctx, handler_builder),
); // Result<Stream<Resp, Error>, Error>

stream::once(result_of_stream) // Stream<Stream<Resp, Error>, Error>
.flatten() // Stream<Resp, Error>
.or_else(|e| Ok(make_error_response(e))) // Stream<Resp, ()>
}
}

Expand Down Expand Up @@ -648,6 +622,7 @@ mod tests {
box |_, _: &_| Ok(UnaryFixture::new(Ok(coppb::Response::new())).into_boxed());
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert!(resp.get_other_error().is_empty());
Expand All @@ -664,11 +639,12 @@ mod tests {
None,
None,
);
let resp = cop
.handle_unary_request(outdated_req_ctx, handler_builder)
.wait()
.unwrap();
assert_eq!(resp.get_other_error(), OUTDATED_ERROR_MSG);
assert!(
cop.handle_unary_request(outdated_req_ctx, handler_builder)
.unwrap()
.wait()
.is_err()
);
}

#[test]
Expand Down Expand Up @@ -777,25 +753,28 @@ mod tests {

let handler_builder =
box |_, _: &_| Ok(UnaryFixture::new_with_duration(Ok(response), 1000).into_boxed());
let future = cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
let tx = tx.clone();
thread::spawn(move || tx.send(future.wait().unwrap()));
let result_of_future =
cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
match result_of_future {
Err(full_error) => {
tx.send(Err(full_error)).unwrap();
}
Ok(future) => {
let tx = tx.clone();
thread::spawn(move || {
tx.send(future.wait()).unwrap();
});
}
}
thread::sleep(Duration::from_millis(100));
}

// verify
for _ in 2..5 {
let resp: coppb::Response = rx.recv().unwrap();
assert_eq!(resp.get_data().len(), 0);
assert!(resp.has_region_error());
assert!(resp.get_region_error().has_server_is_busy());
assert_eq!(
resp.get_region_error().get_server_is_busy().get_reason(),
BUSY_ERROR_MSG
);
assert!(rx.recv().unwrap().is_err());
}
for i in 0..2 {
let resp = rx.recv().unwrap();
let resp = rx.recv().unwrap().unwrap();
assert_eq!(resp.get_data(), [1, 2, i]);
assert!(!resp.has_region_error());
}
Expand All @@ -814,6 +793,7 @@ mod tests {
box |_, _: &_| Ok(UnaryFixture::new(Err(Error::Other(box_err!("foo")))).into_boxed());
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert_eq!(resp.get_data().len(), 0);
Expand All @@ -836,6 +816,7 @@ mod tests {
};
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -855,6 +836,7 @@ mod tests {
let handler_builder = box |_, _: &_| Ok(StreamFixture::new(responses).into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -878,6 +860,7 @@ mod tests {
let handler_builder = box |_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -913,6 +896,7 @@ mod tests {
let handler_builder = box move |_, _: &_| Ok(handler.into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -938,6 +922,7 @@ mod tests {
let handler_builder = box move |_, _: &_| Ok(handler.into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -963,6 +948,7 @@ mod tests {
let handler_builder = box move |_, _: &_| Ok(handler.into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -1000,6 +986,7 @@ mod tests {
let handler_builder = box move |_, _: &_| Ok(handler.into_boxed());
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.take(7)
.collect()
.wait()
Expand Down Expand Up @@ -1056,8 +1043,9 @@ mod tests {
PAYLOAD_SMALL as u64,
).into_boxed())
};
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1070,8 +1058,9 @@ mod tests {
.into_boxed(),
)
};
let resp_future_2 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_2 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_2.wait().unwrap()]).unwrap());
thread::sleep(Duration::from_millis(SNAPSHOT_DURATION_MS as u64));
Expand Down Expand Up @@ -1128,8 +1117,9 @@ mod tests {
PAYLOAD_LARGE as u64,
).into_boxed())
};
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1150,8 +1140,9 @@ mod tests {
],
).into_boxed())
};
let resp_future_3 =
cop.handle_stream_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_3 = cop
.handle_stream_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || {
sender
Expand Down
4 changes: 4 additions & 0 deletions src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl Deadline {

/// Returns error if the deadline is exceeded.
pub fn check_if_exceeded(&self) -> Result<()> {
fail_point!("coprocessor_deadline_check_exceeded", |_| Err(
Error::Outdated(Duration::from_secs(60), self.tag)
));

let now = Instant::now_coarse();
if self.deadline <= now {
let elapsed = now.duration_since(self.start_time);
Expand Down
Loading

0 comments on commit 87f4c2f

Please sign in to comment.