Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #758] validate subscriber Url #759

Merged
merged 5 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ subprojects {

dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"

dependency "com.github.seancfoley:ipaddress:5.3.3"
}
}
}
2 changes: 2 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j-impl"

implementation 'com.github.seancfoley:ipaddress:5.3.3'

implementation "com.lmax:disruptor"

api "com.fasterxml.jackson.core:jackson-databind"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,35 @@

package org.apache.eventmesh.common.utils;

import org.apache.commons.lang3.StringUtils;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

import inet.ipaddr.HostName;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;

public class IPUtils {

private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);

public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
Expand Down Expand Up @@ -173,4 +187,59 @@ public static String parseChannelRemoteAddr(final Channel channel) {

return "";
}

public static boolean isValidDomainOrIp(String url, List<IPAddress> ipV4ReservedAddrs, List<IPAddress> ipV6ReservedAddrs) {
if (StringUtils.isBlank(url)) {
return false;
}
// Engine only need to verify DNS transformed result
if (isValidIp(url)) {
return true;
}
IPAddress ipAddress = domain2Ip(url);
if (ipAddress == null) {
return false;
}
if (ipAddress.isIPv4()) {
return isReservedIp(ipAddress, ipV4ReservedAddrs);
} else {
return isReservedIp(ipAddress, ipV6ReservedAddrs);
}
}

public static boolean isValidIp(String url) {
try {
IPAddressString ipString = new IPAddressString(url);
if (!ipString.isValid()) {
return new IPAddressString(new URL(url).getHost()).isValid();
}
} catch (Exception e) {
logger.warn("Invalid URL format url={}", url, e);
return false;
}
return true;
}

public static IPAddress domain2Ip(String url) {
HostName hostName = new HostName(url);
if (hostName.isValid()) {
return hostName.getAddress();
}
try {
String host = new URL(url).getHost();
return new HostName(host).getAddress();
} catch (MalformedURLException e) {
logger.error("Invalid URL format url={}", url, e);
return null;
}
}

private static boolean isReservedIp(IPAddress ipAddress, List<IPAddress> reservedIps) {
for (IPAddress address : reservedIps) {
if (address.contains(ipAddress)) {
return true;
}
}
return false;
}
}
2 changes: 2 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation "org.apache.httpcomponents:httpclient"
implementation 'io.netty:netty-all'

implementation 'com.github.seancfoley:ipaddress:5.3.3'

implementation project(":eventmesh-common")
implementation project(":eventmesh-spi")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
Expand Down
4 changes: 4 additions & 0 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200

#ip address blacklist
eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;

import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;

public class EventMeshHTTPConfiguration extends CommonConfiguration {

public static Logger logger = LoggerFactory.getLogger(EventMeshHTTPConfiguration.class);

public int httpServerPort = 10105;

public boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE;
Expand Down Expand Up @@ -70,6 +83,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {

public int eventMeshBatchMsgRequestNumPerSecond = 20000;

public List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();

public List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();

public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) {
super(configurationWrapper);
}
Expand Down Expand Up @@ -248,7 +265,32 @@ public void init() {
eventMeshHttpMsgReqNumPerSecond = Integer.parseInt(eventMeshHttpMsgReqNumPerSecondStr);

}

String ipv4BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv4BlackList)) {
eventMeshIpv4BlackList = getBlacklist(ipv4BlackList);
}

String ipv6BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv6BlackList)) {
eventMeshIpv6BlackList = getBlacklist(ipv6BlackList);
}
}
}

private static List<IPAddress> getBlacklist(String cidrs) {
List<String> cidrList = Splitter.on(",").omitEmptyStrings()
.trimResults().splitToList(cidrs);

List<IPAddress> ipAddresses = Lists.newArrayList();
for (String cidr : cidrList) {
try {
ipAddresses.add(new IPAddressString(cidr).toAddress());
} catch (Exception e) {
logger.warn("Invalid cidr={}", cidr, e);
}
}
return ipAddresses;
}

static class ConfKeys {
Expand Down Expand Up @@ -296,5 +338,9 @@ static class ConfKeys {
public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled";

public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond";

public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4";

public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
String url = subscribeRequestBody.getUrl();
String consumerGroup = subscribeRequestBody.getConsumerGroup();

// validate URL
try {
if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
httpLogger.error("subscriber url {} is not valid", url);
responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
} catch (Exception e) {
httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage());
responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url));
asyncContext.onComplete(responseEventMeshCommand);
jinrongluo marked this conversation as resolved.
Show resolved Hide resolved
return;
}

synchronized (eventMeshHTTPServer.localClientInfoMapping) {

registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
Expand Down
1 change: 1 addition & 0 deletions tools/third-party-dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ guava-29.0-jre.jar
hamcrest-core-1.3.jar
httpclient-4.5.13.jar
httpcore-4.4.13.jar
ipaddress-5.3.3.jar
j2objc-annotations-1.3.jar
jackson-annotations-2.11.0.jar
jackson-core-2.11.0.jar
Expand Down
Loading