Skip to content

Commit

Permalink
Add support to extract routing field from a JSON key. Based on conflu…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziga Mahkovec committed May 18, 2022
1 parent 826a760 commit bc1e533
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import org.apache.http.util.TextUtils;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -157,6 +160,9 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
? String.format("%s+%d+%d", record.topic(), record.kafkaPartition(), record.kafkaOffset())
: convertKey(record.keySchema(), record.key());

// routing
String routing = getRouting(record);

// delete
if (record.value() == null) {
return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record);
Expand All @@ -171,18 +177,91 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.routing(routing)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
case INSERT:
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
return maybeAddExternalVersioning(
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
new IndexRequest(index)
.id(id)
.source(payload, XContentType.JSON)
.opType(opType)
.routing(routing),
record
);
default:
return null; // shouldn't happen
}
}

/**
* Resolves and returns routing value, if configured.
*
* @param record the sink record
* @return String value to use as routing or null if not configured
* @throws DataException in case routing is configured and cannot be resolved from record
*/
private String getRouting(SinkRecord record) {
String routing = null;
if (config.routingFieldKeyExtract()) {
routing = getRoutingFieldFromKey(record.key());
if (routing == null || TextUtils.isBlank(routing)) {
throw new DataException("invalid: value for routing field is null or blank");
}
}
return routing;
}

private static String getRoutingFieldFromKey(Object key) {
SchemaAndValue val = JSON_CONVERTER.toConnectData(null, ((String)key).getBytes());
HashMap map = (HashMap)val.value();
return (String)map.get("_routing");
}

/**
* Get the value for a field from a Struct or Map
* Source: https://stackoverflow.com/a/53717409/3983812
*
* @param structOrMap the source object
* @param fieldName the field name
* @return the field value
* @throws DataException in case field cannot be found
*/
private static Object getField(Object structOrMap, String fieldName) {
validate(structOrMap, fieldName);

Object field;
if (structOrMap instanceof Struct) {
field = ((Struct) structOrMap).get(fieldName);
} else if (structOrMap instanceof Map) {
field = ((Map<?, ?>) structOrMap).get(fieldName);
if (field == null) {
throw new DataException(String.format("Unable to find nested field '%s'", fieldName));
}
return field;
} else {
throw new DataException(String.format(
"Argument not a Struct or Map. Cannot get field '%s' from %s.",
fieldName,
structOrMap
));
}
if (field == null) {
throw new DataException(
String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
}
return field;
}

private static void validate(Object o, String fieldName) {
if (o == null) {
throw new ConnectException("Attempted to extract a field from a null object.");
}
if (TextUtils.isBlank(fieldName)) {
throw new ConnectException("The field to extract cannot be null or empty.");
}
}

private String getPayload(SinkRecord record) {
if (record.value() == null) {
return null;
Expand Down Expand Up @@ -437,4 +516,5 @@ private static String recordString(SinkRecord record) {
record.kafkaOffset()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();

public static final String ROUTING_FIELD_KEY_EXTRACT_CONFIG = "routing.field.key.extract";
private static final String ROUTING_FIELD_KEY_EXTRACT_DOC =
"Whether to extract the routing field from a JSON key's _routing value. "
+ "If configured, the field is mandatory and must always resolve to a non-blank String.";
private static final String ROUTING_FIELD_KEY_EXTRACT_DISPLAY = "Routing field extract from key";
private static final boolean ROUTING_FIELD_KEY_EXTRACT_DEFAULT = false;

// Proxy group
public static final String PROXY_HOST_CONFIG = "proxy.host";
private static final String PROXY_HOST_DISPLAY = "Proxy Host";
Expand Down Expand Up @@ -678,6 +685,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
).define(
ROUTING_FIELD_KEY_EXTRACT_CONFIG,
Type.BOOLEAN,
ROUTING_FIELD_KEY_EXTRACT_DEFAULT,
Importance.LOW,
ROUTING_FIELD_KEY_EXTRACT_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
ROUTING_FIELD_KEY_EXTRACT_DISPLAY
);
}

Expand Down Expand Up @@ -1011,6 +1028,10 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public boolean routingFieldKeyExtract() {
return getBoolean(ROUTING_FIELD_KEY_EXTRACT_CONFIG);
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down

0 comments on commit bc1e533

Please sign in to comment.