Skip to content

Commit

Permalink
[INLONG-10844][Manager] Support configuring HTTP type sink
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Aug 26, 2024
1 parent 73881b4 commit cfa8c40
Show file tree
Hide file tree
Showing 14 changed files with 713 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DataNodeType {
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String DORIS = "DORIS";
public static final String HTTP = "HTTP";
public static final String OCEANBASE = "OCEANBASE";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String TUBEMQ = "TUBEMQ";

@SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String HTTP = "HTTP";

@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String OCEANBASE = "OCEANBASE";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ClusterType {
public static final String DATAPROXY = "DATAPROXY";
public static final String KAFKA = "KAFKA";

public static final String SORT_HTTP = "SORT_HTTP";
public static final String SORT_ES = "SORT_ES";
public static final String SORT_CLS = "SORT_CLS";
public static final String SORT_PULSAR = "SORT_PULSAR";
Expand All @@ -48,6 +49,7 @@ public class ClusterType {
add(ClusterType.PULSAR);
add(ClusterType.DATAPROXY);
add(ClusterType.KAFKA);
add(ClusterType.SORT_HTTP);
add(ClusterType.SORT_ES);
add(ClusterType.SORT_CLS);
add(ClusterType.SORT_PULSAR);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.http;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORT_HTTP)
@ApiModel("Inlong cluster info for Sort http")
public class SortHttpClusterInfo extends BaseSortClusterInfo {

public SortHttpClusterInfo() {
this.setType(ClusterType.SORT_HTTP);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.http;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORT_HTTP)
@ApiModel("Inlong cluster request for Sort http")
public class SortHttpClusterRequest extends BaseSortClusterRequest {

public SortHttpClusterRequest() {
this.setType(ClusterType.SORT_HTTP);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.http;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import javax.validation.constraints.NotNull;

/**
* Http service data node info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("Http service data node info")
public class HttpDataNodeDTO {

@ApiModelProperty("HTTP base url")
private String baseUrl;

@ApiModelProperty("Whether to enable credential")
private Boolean enableCredential;

@ApiModelProperty("Max connect count")
private Integer maxConnect;
/**
* Get the dto instance from the request
*/
public static HttpDataNodeDTO getFromRequest(HttpDataNodeRequest request, String extParams) {
HttpDataNodeDTO dto = StringUtils.isNotBlank(extParams)
? HttpDataNodeDTO.getFromJson(extParams)
: new HttpDataNodeDTO();
return CommonBeanUtils.copyProperties(request, dto, true);
}

/**
* Get the dto instance from the JSON string.
*/
public static HttpDataNodeDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, HttpDataNodeDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
String.format("Failed to parse extParams for Cloud log service node: %s", e.getMessage()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.http;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Cloud log service data node info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.HTTP)
@ApiModel("HTTP data node info")
public class HttpDataNodeInfo extends DataNodeInfo {

@ApiModelProperty("HTTP base url")
private String baseUrl;

@ApiModelProperty("Whether to enable credential")
private Boolean enableCredential;

@ApiModelProperty("Max connect count")
private Integer maxConnect;

public HttpDataNodeInfo() {
setType(DataNodeType.HTTP);
}

@Override
public DataNodeRequest genRequest() {
return CommonBeanUtils.copyProperties(this, HttpDataNodeRequest::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.http;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Cloud log service data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.HTTP)
@ApiModel("Http service data node request")
public class HttpDataNodeRequest extends DataNodeRequest {

@ApiModelProperty("HTTP base url")
private String baseUrl;

@ApiModelProperty("Whether to enable credential")
private Boolean enableCredential;

@ApiModelProperty("Max connect count")
private Integer maxConnect;

public HttpDataNodeRequest() {
this.setType(DataNodeType.HTTP);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.sink.http;

import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Map;

/**
* HTTP sink info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "HTTP sink info")
@JsonTypeDefine(value = SinkType.HTTP)
public class HttpSink extends StreamSink {

@ApiModelProperty("HTTP path")
private String path;

@ApiModelProperty("HTTP method, like POST, GET")
private String method;

@ApiModelProperty("HTTP headers")
private Map<String, String> headers;

public HttpSink() {
this.setSinkType(SinkType.HTTP);
}

@Override
public SinkRequest genSinkRequest() {
return CommonBeanUtils.copyProperties(this, HttpSinkRequest::new);
}
}
Loading

0 comments on commit cfa8c40

Please sign in to comment.