Skip to content

Commit

Permalink
filters: add resetIdleTimer filter callback (#1577)
Browse files Browse the repository at this point in the history
Description: This may be useful if a filter stops iteration for an extended period of time, since stream timeouts will still apply. The callback may be called periodically to continue to indicate "activity" on the stream. Note the final call to Envoy is commented out in this PR as we need the upstream change to land as well. Uncommenting and integration tests will come in a subsequent update.
Risk Level: Moderate
Testing: Local

Signed-off-by: Mike Schore <[email protected]>
  • Loading branch information
goaway authored Jul 12, 2021
1 parent fd64af9 commit dab0f9a
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,17 @@ typedef void (*envoy_filter_release_f)(const void* context);
*/
typedef void (*envoy_filter_resume_f)(const void* context);

/**
* Function signature for async filter callback to reset stream idle timeout.
*/
typedef void (*envoy_filter_reset_idle_f)(const void* context);

/**
* Raw datatype containing asynchronous callbacks for platform HTTP filters.
*/
typedef struct {
envoy_filter_resume_f resume_iteration;
envoy_filter_reset_idle_f reset_idle;
envoy_filter_release_f release_callbacks;
const void* callback_context;
} envoy_http_filter_callbacks;
Expand Down
22 changes: 22 additions & 0 deletions library/common/extensions/filters/http/platform_bridge/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ static void envoy_filter_callback_resume_encoding(const void* context) {
}
}

static void envoy_filter_reset_idle(const void* context) {
PlatformBridgeFilterWeakPtr* weak_filter =
static_cast<PlatformBridgeFilterWeakPtr*>(const_cast<void*>(context));
if (auto filter = weak_filter->lock()) {
filter->resetIdleTimer();
}
}

PlatformBridgeFilterConfig::PlatformBridgeFilterConfig(
const envoymobile::extensions::filters::http::platform_bridge::PlatformBridge& proto_config)
: filter_name_(proto_config.platform_filter_name()),
Expand Down Expand Up @@ -100,6 +108,7 @@ void PlatformBridgeFilter::setDecoderFilterCallbacks(
// heap allocation below occurs when it could be avoided.
if (platform_filter_.set_request_callbacks) {
platform_request_callbacks_.resume_iteration = envoy_filter_callback_resume_decoding;
platform_request_callbacks_.reset_idle = envoy_filter_reset_idle;
platform_request_callbacks_.release_callbacks = envoy_filter_release_callbacks;
// We use a weak_ptr wrapper for the filter to ensure presence before dispatching callbacks.
// The weak_ptr is heap-allocated, because it must be managed (and eventually released) by
Expand All @@ -121,6 +130,7 @@ void PlatformBridgeFilter::setEncoderFilterCallbacks(
// heap allocation below occurs when it could be avoided.
if (platform_filter_.set_response_callbacks) {
platform_response_callbacks_.resume_iteration = envoy_filter_callback_resume_encoding;
platform_response_callbacks_.reset_idle = envoy_filter_reset_idle;
platform_response_callbacks_.release_callbacks = envoy_filter_release_callbacks;
// We use a weak_ptr wrapper for the filter to ensure presence before dispatching callbacks.
// The weak_ptr is heap-allocated, because it must be managed (and eventually released) by
Expand Down Expand Up @@ -504,6 +514,18 @@ void PlatformBridgeFilter::resumeEncoding() {
});
}

void PlatformBridgeFilter::resetIdleTimer() {
ENVOY_LOG(trace, "PlatformBridgeFilter({})::resetIdleTimer", filter_name_);

auto weak_self = weak_from_this();
dispatcher_.post([weak_self]() -> void {
if (auto self = weak_self.lock()) {
// Stream idle timeout is nondirectional.
// self->decoder_callbacks_->resetIdleTimer();
}
});
}

void PlatformBridgeFilter::FilterBase::onResume() {
ScopeTrackerScopeState scope(&parent_, parent_.scopeTracker());
ENVOY_LOG(debug, "PlatformBridgeFilter({})::onResume", parent_.filter_name_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class PlatformBridgeFilter final : public Http::PassThroughFilter,
// This is a no-op if filter iteration is already ongoing.
void resumeEncoding();

// Asynchronously reset the stream idle timeout. Does not affect other timeouts.
void resetIdleTimer();

// StreamFilterBase
void onDestroy() override;

Expand Down
13 changes: 13 additions & 0 deletions library/common/jni/jni_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,19 @@ Java_io_envoyproxy_envoymobile_engine_EnvoyHTTPFilterCallbacksImpl_callResumeIte
env->DeleteGlobalRef(retained_context);
}

extern "C" JNIEXPORT void JNICALL
Java_io_envoyproxy_envoymobile_engine_EnvoyHTTPFilterCallbacksImpl_callResetIdleTimer(
JNIEnv* env, jclass, jlong callback_handle, jobject j_context) {
jni_log("[Envoy]", "callResetIdleTimer");
// Context is only passed here to ensure it's not inadvertently gc'd during execution of this
// function. To be extra safe, do an explicit retain with a GlobalRef.
jobject retained_context = env->NewGlobalRef(j_context);
envoy_http_filter_callbacks* callbacks =
reinterpret_cast<envoy_http_filter_callbacks*>(callback_handle);
callbacks->reset_idle(callbacks->callback_context);
env->DeleteGlobalRef(retained_context);
}

extern "C" JNIEXPORT void JNICALL
Java_io_envoyproxy_envoymobile_engine_EnvoyHTTPFilterCallbacksImpl_callReleaseCallbacks(
JNIEnv* env, jclass, jlong callback_handle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ static EnvoyHTTPFilterCallbacksImpl create(long callbackHandle) {

public void resumeIteration() { callResumeIteration(callbackHandle, this); }

public void resetIdleTimer() { callResetIdleTimer(callbackHandle, this); }

/**
* @param callbackHandle, native handle for callback execution.
* @param object, pass this object so that the JNI retains it, preventing it from potentially
* being concurrently garbage-collected while the native call is executing.
*/
private native void callResumeIteration(long callbackHandle, EnvoyHTTPFilterCallbacksImpl object);

/**
* @param callbackHandle, native handle for callback execution.
* @param object, pass this object so that the JNI retains it, preventing it from potentially
* being concurrently garbage-collected while the native call is executing.
*/
private native void callResetIdleTimer(long callbackHandle, EnvoyHTTPFilterCallbacksImpl object);

private static native void callReleaseCallbacks(long callbackHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

public interface EnvoyHTTPFilterCallbacks {
void resumeIteration();
void resetIdleTimer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,13 @@ interface RequestFilterCallbacks {
* calls.
*/
fun resumeRequest()

/**
* Reset the underlying stream idle timeout to its configured threshold.
*
* This may be useful if a filter stops iteration for an extended period of time, since ordinarily
* timeouts will still apply. This may be called periodically to continue to indicate "activity"
* on the stream.
*/
fun resetIdleTimer()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ internal class RequestFilterCallbacksImpl constructor(
override fun resumeRequest() {
callbacks.resumeIteration()
}

override fun resetIdleTimer() {
callbacks.resetIdleTimer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,13 @@ interface ResponseFilterCallbacks {
* calls.
*/
fun resumeResponse()

/**
* Reset the underlying stream idle timeout to its configured threshold.
*
* This may be useful if a filter stops iteration for an extended period of time, since ordinarily
* timeouts will still apply. This may be called periodically to continue to indicate "activity"
* on the stream.
*/
fun resetIdleTimer()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ internal class ResponseFilterCallbacksImpl constructor(
override fun resumeResponse() {
callbacks.resumeIteration()
}

override fun resetIdleTimer() {
callbacks.resetIdleTimer()
}
}
5 changes: 5 additions & 0 deletions library/objective-c/EnvoyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ extern const int kEnvoyFilterResumeStatusResumeIteration;
/// filter.
- (void)resumeIteration;

/// Reset the underlying stream idle timeout to its configured threshold. This may be useful if
/// a filter stops iteration for an extended period of time, since ordinarily timeouts will still
/// apply. This may be called periodically to continue to indicate "activity" on the stream.
- (void)resetIdleTimer;

@end

@interface EnvoyHTTPFilter : NSObject
Expand Down
4 changes: 4 additions & 0 deletions library/objective-c/EnvoyHTTPFilterCallbacksImpl.m
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ - (void)resumeIteration {
_callbacks.resume_iteration(_callbacks.callback_context);
}

- (void)resetIdleTimer {
_callbacks.reset_idle(_callbacks.callback_context);
}

- (void)dealloc {
_callbacks.release_callbacks(_callbacks.callback_context);
}
Expand Down
7 changes: 7 additions & 0 deletions library/swift/filters/RequestFilterCallbacks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ public protocol RequestFilterCallbacks {
/// If the request is not complete, the filter may receive further `onData()`/`onTrailers()`
/// calls.
func resumeRequest()

/// Reset the underlying stream idle timeout to its configured threshold.
///
/// This may be useful if a filter stops iteration for an extended period of time, since stream
/// timeouts will still apply. This may be called periodically to continue to indicate "activity"
/// on the stream.
func resetIdleTimer()
}
4 changes: 4 additions & 0 deletions library/swift/filters/RequestFilterCallbacksImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ extension RequestFilterCallbacksImpl: RequestFilterCallbacks {
func resumeRequest() {
self.callbacks.resumeIteration()
}

func resetIdleTimer() {
self.callbacks.resetIdleTimer()
}
}
7 changes: 7 additions & 0 deletions library/swift/filters/ResponseFilterCallbacks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ public protocol ResponseFilterCallbacks {
/// If the response is not complete, the filter may receive further `onData()`/`onTrailers()`
/// calls.
func resumeResponse()

/// Reset the underlying stream idle timeout to its configured threshold.
///
/// This may be useful if a filter stops iteration for an extended period of time, since stream
/// timeouts will still apply. This may be called periodically to continue to indicate "activity"
/// on the stream.
func resetIdleTimer()
}
4 changes: 4 additions & 0 deletions library/swift/filters/ResponseFilterCallbacksImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ extension ResponseFilterCallbacksImpl: ResponseFilterCallbacks {
func resumeResponse() {
self.callbacks.resumeIteration()
}

func resetIdleTimer() {
self.callbacks.resetIdleTimer()
}
}

0 comments on commit dab0f9a

Please sign in to comment.