Skip to content

Commit

Permalink
Add DeltaConnectorConfiguration options to flink sink impl (delta-io#425
Browse files Browse the repository at this point in the history
)

* refactor - move options related classes from source package up to options package

* Rename DeltaConfiguration to Delta, extract option validator class

* move options to an internal package

* fix import order

* fix import order

* fix test by passing default connector options

* Add comments

* fix comment

* unused import

* checkstyle fixes

* checkstyle fixes

* fix import order

* fix import order

* Add test for OptionValidator

* Create an exception type for option validation errors

* code review comments

* Move source option validation to use common option validator

* fix comment

* fix indent

* use junit5 api

* checkstyle error

Co-authored-by: Gopi Madabhushi <[email protected]>
  • Loading branch information
gopik and Gopi Madabhushi authored Oct 28, 2022
1 parent 601b913 commit 4eb7d3c
Show file tree
Hide file tree
Showing 45 changed files with 608 additions and 263 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

public abstract class BaseOptionTypeConverter<TYPE>
implements OptionTypeConverter<TYPE> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

/**
* Implementation of {@link OptionTypeConverter} that validates values for
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

/**
* An implementation of {@link OptionTypeConverter} interface to convert {@link DeltaConfigOption}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

import io.delta.flink.source.internal.DeltaSourceConfiguration;
import org.apache.flink.configuration.ConfigOption;

/**
Expand Down Expand Up @@ -75,22 +74,22 @@ public T defaultValue() {
}

//-------Keeping type safety with implementation of a Visitor pattern -------//
public void setOnConfig(DeltaSourceConfiguration sourceConfiguration, boolean value) {
public void setOnConfig(DeltaConnectorConfiguration sourceConfiguration, boolean value) {
T convertedValue = typeConverter.convertType(this, value);
sourceConfiguration.addOption(this, convertedValue);
}

public void setOnConfig(DeltaSourceConfiguration sourceConfiguration, int value) {
public void setOnConfig(DeltaConnectorConfiguration sourceConfiguration, int value) {
T convertedValue = typeConverter.convertType(this, value);
sourceConfiguration.addOption(this, convertedValue);
}

public void setOnConfig(DeltaSourceConfiguration sourceConfiguration, long value) {
public void setOnConfig(DeltaConnectorConfiguration sourceConfiguration, long value) {
T convertedValue = typeConverter.convertType(this, value);
sourceConfiguration.addOption(this, convertedValue);
}

public void setOnConfig(DeltaSourceConfiguration sourceConfiguration, String value) {
public void setOnConfig(DeltaConnectorConfiguration sourceConfiguration, String value) {
T convertedValue = typeConverter.convertType(this, value);
sourceConfiguration.addOption(this, convertedValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,57 @@
package io.delta.flink.source.internal;
package io.delta.flink.internal.options;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import io.delta.flink.source.internal.builder.DeltaConfigOption;
import io.delta.flink.source.internal.DeltaSourceOptions;
import org.apache.flink.configuration.ConfigOption;

/**
* This class keeps {@link DeltaSourceOptions} used for {@link io.delta.flink.source.DeltaSource}
* instance.
* This class keeps options used for delta source and sink connectors.
*
* @implNote This class should not be used directly by user but rather indirectly through {@code
* BaseDeltaSourceStepBuilder} which will have dedicated setter methods for public options.
* @implNote This class should not be used directly by user but rather indirectly through source or
* sink builders which will have dedicated setter methods for public options.
*/
public class DeltaSourceConfiguration implements Serializable {
public class DeltaConnectorConfiguration implements Serializable {

/**
* Map of used Options. The map entry key is a string representation of used {@link
* DeltaSourceOptions} and the entry map value is equal option's value used for this entry.
* Map of used Options. The map entry key is a string representation of used option name
* and the entry map value is equal option's value used for this entry.
*
* @implNote The {@code DeltaSourceConfiguration} object will be de/serialized by flink and
* @implNote The {@code DeltaConnectorConfiguration} object will be de/serialized by flink and
* passed to Cluster node during job initialization. For that the map content has to be
* serializable as well. The {@link ConfigOption} is not a serializable object, and therefore it
* cannot be used as a map entry key.
*/
private final Map<String, Object> usedSourceOptions = new HashMap<>();

/**
* Creates {@link DeltaSourceConfiguration} instance without any options.
* Creates {@link DeltaConnectorConfiguration} instance without any options.
*/
public DeltaSourceConfiguration() {
public DeltaConnectorConfiguration() {

}

/**
* Creates a copy of DeltaSourceConfiguration. Changes to the copy object do not influence
* the state of the original object.
*/
public DeltaSourceConfiguration copy() {
return new DeltaSourceConfiguration(this.usedSourceOptions);
public DeltaConnectorConfiguration copy() {
return new DeltaConnectorConfiguration(this.usedSourceOptions);
}

/**
* Creates an instance of {@link DeltaSourceConfiguration} using provided options.
* @param options options that should be added to {@link DeltaSourceConfiguration}.
* Creates an instance of {@link DeltaConnectorConfiguration} using provided options.
* @param options options that should be added to {@link DeltaConnectorConfiguration}.
*/
public DeltaSourceConfiguration(Map<String, Object> options) {
public DeltaConnectorConfiguration(Map<String, Object> options) {
this.usedSourceOptions.putAll(options);
}

public <T> DeltaSourceConfiguration addOption(DeltaConfigOption<T> name, T value) {
public <T> DeltaConnectorConfiguration addOption(DeltaConfigOption<T> name, T value) {
this.usedSourceOptions.put(name.key(), value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.delta.flink.source.internal.exceptions;
package io.delta.flink.internal.options;

import java.util.Collection;
import java.util.Collections;

import org.apache.flink.core.fs.Path;

/**
* Exception throw during validation of Delta source builder. It contains all validation error
* messages that occurred during this validation.
* Exception throw during validation of Delta connector options.
*/
public class DeltaSourceValidationException extends RuntimeException {
public class DeltaOptionValidationException extends RuntimeException {

/**
* Path to Delta table for which exception was thrown. Can be null if exception was thrown on
Expand All @@ -20,29 +21,39 @@ public class DeltaSourceValidationException extends RuntimeException {
*/
private final Collection<String> validationMessages;

public DeltaSourceValidationException(String tablePath, Collection<String> validationMessages) {
this.tablePath = String.valueOf(tablePath);
public DeltaOptionValidationException(Path tablePath, Collection<String> validationMessages) {
this(String.valueOf(tablePath), validationMessages);
}

public DeltaOptionValidationException(
String tablePathString,
Collection<String> validationMessages) {
this.tablePath = tablePathString;
this.validationMessages =
(validationMessages == null) ? Collections.emptyList() : validationMessages;

}

@Override
public String getMessage() {

String validationMessages = String.join(System.lineSeparator(), this.validationMessages);

return "Invalid Delta Source definition detected."
return "Invalid Delta connector definition detected."
+ System.lineSeparator()
+ "The reported issues are:"
+ System.lineSeparator()
+ validationMessages;
}

/** Table path for this exception. */
public String getTablePath() {
return tablePath;
}

/** Detailed validation messages for the cause of this exception. */
public Collection<String> getValidationMessages() {
return Collections.unmodifiableCollection(this.validationMessages);
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

import org.apache.flink.util.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

import java.util.Collections;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

/**
* Converter and validator for {@link DeltaConfigOption} values.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.delta.flink.internal.options;

import java.util.Collections;
import java.util.Map;

import org.apache.flink.core.fs.Path;

/**
* Validator for delta source and sink connector configuration options.
*
* Setting of an option is allowed for known option names. For invalid options, the validation
* throws {@link DeltaOptionValidationException}. Known option names are passed via constructor
* parameter {@param validOptions}.
*
* This is an internal class meant for connector implementations only.
* Usage example (for sink):
* <code>
* OptionValidator validator = new OptionValidator(sinkConfig, validSinkOptions);
* validator.option("mergeSchema", true);
* // For any option set on the sink, pass it to validator. If it's successful, sinkConfig
* // will be updated with the corresponding option.
* </code>
*/
public class OptionValidator {
private final Path tablePath;
private final Map<String, DeltaConfigOption<?>> validOptions;
private final DeltaConnectorConfiguration config;

/**
* Construct an option validator.
*
* @param tablePath Base path of the delta table.
* @param config Configuration object that is populated with the validated options.
* @param validOptions A map of valid options used by this instance.
*/
public OptionValidator(
Path tablePath,
DeltaConnectorConfiguration config,
Map<String, DeltaConfigOption<?>> validOptions) {
this.tablePath = tablePath;
this.config = config;
this.validOptions = validOptions;
}

/**
* Sets a configuration option.
*/
public void option(String optionName, String optionValue) {
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(config, optionValue);
});
}

/**
* Sets a configuration option.
*/
public void option(String optionName, boolean optionValue) {
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(config, optionValue);
});
}

/**
* Sets a configuration option.
*/
public void option(String optionName, int optionValue) {
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(config, optionValue);
});
}

/**
* Sets a configuration option.
*/
public void option(String optionName, long optionValue) {
tryToSetOption(() -> {
DeltaConfigOption<?> configOption = validateOptionName(optionName);
configOption.setOnConfig(config, optionValue);
});
}

private void tryToSetOption(Executable argument) {
try {
argument.execute();
} catch (Exception e) {
throw optionValidationException(tablePath, e);
}
}

@SuppressWarnings("unchecked")
protected <TYPE> DeltaConfigOption<TYPE> validateOptionName(String optionName) {
DeltaConfigOption<TYPE> option = (DeltaConfigOption<TYPE>) validOptions.get(optionName);
if (option == null) {
throw invalidOptionName(tablePath, optionName);
}
return option;
}

/** Exception to throw when the option name is invalid. */
private static DeltaOptionValidationException invalidOptionName(
Path tablePath,
String invalidOption) {
return new DeltaOptionValidationException(
tablePath,
Collections.singletonList(
String.format("Invalid option [%s] used for Delta Connector.",
invalidOption)));
}

/** Exception to throw when there's an error while setting an option. */
private static DeltaOptionValidationException optionValidationException(
Path tablePath,
Exception e) {
return new DeltaOptionValidationException(
tablePath,
Collections.singletonList(e.getClass() + " - " + e.getMessage())
);
}

@FunctionalInterface
private interface Executable {
void execute();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

import java.util.regex.Pattern;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.delta.flink.source.internal.builder;
package io.delta.flink.internal.options;

import io.delta.flink.source.internal.enumerator.supplier.TimestampFormatConverter;
import org.apache.flink.util.StringUtils;
Expand Down
Loading

0 comments on commit 4eb7d3c

Please sign in to comment.