Skip to content

Commit

Permalink
Fix bug in dynamic HTTPRouterHandler switching to prevent StackOverflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
mafei6827 committed Apr 15, 2024
1 parent b2c296f commit 7fd75e3
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.baidu.bifromq.plugin.settingprovider.ISettingProvider;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

public class NonTLSServerInitializer extends AbstractServerInitializer {
Expand All @@ -30,6 +31,7 @@ protected void initChannel(SocketChannel ch) {
final ChannelPipeline p = ch.pipeline();
final HttpServerCodec sourceCodec = new HttpServerCodec();
p.addLast(sourceCodec);
p.addLast(new HttpObjectAggregator(1024 * 1024));
p.addLast(new HTTPRequestRouter(routeMap, settingProvider));
p.addLast(ExceptionHandler.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.baidu.bifromq.apiserver.http.IHTTPRouteMap;
import com.baidu.bifromq.plugin.settingprovider.ISettingProvider;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;

Expand All @@ -32,6 +33,7 @@ public TLSServerInitializer(SslContext sslContext, IHTTPRouteMap routeMap, ISett
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1024 * 1024));
ch.pipeline().addLast(new HTTPRequestRouter(routeMap, settingProvider));
ch.pipeline().addLast(ExceptionHandler.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package com.baidu.bifromq.apiserver.http;

import static com.baidu.bifromq.apiserver.Headers.HEADER_REQ_ID;
import static com.baidu.bifromq.apiserver.Headers.HEADER_TENANT_ID;
import static com.baidu.bifromq.apiserver.http.handler.HTTPHeaderUtils.getHeader;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
Expand All @@ -21,31 +22,42 @@
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_0;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import com.baidu.bifromq.apiserver.http.handler.HTTPHeaderUtils;
import com.baidu.bifromq.plugin.settingprovider.ISettingProvider;
import com.baidu.bifromq.plugin.settingprovider.Setting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.handler.codec.http.HttpVersion;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public class HTTPRequestRouter extends SimpleChannelInboundHandler<HttpRequest> {
public class HTTPRequestRouter extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final FullHttpResponse TOO_LARGE_CLOSE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);

static {
TOO_LARGE_CLOSE.headers().set(CONTENT_LENGTH, 0);
TOO_LARGE_CLOSE.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
}

private final IHTTPRouteMap routeMap;
private final ISettingProvider settingProvider;

private String lastTenantId;

public HTTPRequestRouter(IHTTPRouteMap routeMap, ISettingProvider settingProvider) {
this.routeMap = routeMap;
this.settingProvider = settingProvider;
Expand All @@ -60,35 +72,53 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) {
ChannelPipeline pipeline = ctx.pipeline();
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
String tenantId = getHeader(HEADER_TENANT_ID, req, false);
if (tenantId != null && (lastTenantId == null || !lastTenantId.equals(tenantId))) {
lastTenantId = tenantId;
// setup aggregator at the first time or tenant change
pipeline.addAfter(ctx.name(), null, new HTTPRouteHandler(tenantId, routeMap));
pipeline.addAfter(ctx.name(), null,
new HttpObjectAggregator(settingProvider.provide(Setting.MaxUserPayloadBytes, tenantId)));
}
if (tenantId != null || lastTenantId != null) {
ReferenceCountUtil.retain(req);
ctx.fireChannelRead(req);
} else {
if (StringUtils.isEmpty(tenantId)) {
FullHttpResponse response =
new DefaultFullHttpResponse(req.protocolVersion(), HttpResponseStatus.BAD_REQUEST, EMPTY_BUFFER);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
boolean keepAlive = HttpUtil.isKeepAlive(req);
if (keepAlive) {
if (req.protocolVersion().equals(HTTP_1_0)) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
ctx.writeAndFlush(response);
} else {
// Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
doResponse(ctx, req, response);
} else {
Integer maxUserPayloadBytes = settingProvider.provide(Setting.MaxUserPayloadBytes, tenantId);
if (HttpUtil.getContentLength(req, -1) > maxUserPayloadBytes) {
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
return;
}
req.retain();
long reqId = HTTPHeaderUtils.getOptionalReqId(req);
routeMap.getHandler(req)
.handle(reqId, tenantId, req)
.whenComplete((v, e) -> {
FullHttpResponse response;
if (e != null) {
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(e.getMessage().getBytes());
response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, content);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
} else {
response = v;
}
response.headers().set(HEADER_REQ_ID.header, reqId);
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
doResponse(ctx, req, response);
req.release();
});
}
}

private void doResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse response) {
boolean keepAlive = HttpUtil.isKeepAlive(req);
if (keepAlive) {
if (req.protocolVersion().equals(HTTP_1_0)) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
ctx.writeAndFlush(response);
} else {
// Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}

This file was deleted.

Loading

0 comments on commit 7fd75e3

Please sign in to comment.