Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion) Set pipeline_name on UI recipes with forms #5535

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;

import java.net.URISyntaxException;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -31,6 +35,7 @@
/**
* Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege.
*/
@Slf4j
public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityClient _entityClient;
Expand All @@ -51,6 +56,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

final MetadataChangeProposal proposal = new MetadataChangeProposal();
String ingestionSourceUrnString;

if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
Expand All @@ -61,6 +67,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
ingestionSourceUrnString = ingestionSourceUrn.get();
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
Expand All @@ -71,10 +78,11 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));
ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr);
}

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString);
proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME);
proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
Expand All @@ -90,20 +98,24 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
});
}

private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) {
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo();
result.setType(input.getType());
result.setName(input.getName());
result.setConfig(mapConfig(input.getConfig()));
result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn));
if (input.getSchedule() != null) {
result.setSchedule(mapSchedule(input.getSchedule()));
}
return result;
}

private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) {
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig();
result.setRecipe(input.getRecipe());
String recipe = input.getRecipe();
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
if (recipe != null) {
recipe = optionallySetPipelineName(recipe, ingestionSourceUrn);
}
result.setRecipe(recipe);
if (input.getVersion() != null) {
result.setVersion(input.getVersion());
}
Expand All @@ -119,4 +131,19 @@ private DataHubIngestionSourceSchedule mapSchedule(final UpdateIngestionSourceSc
result.setTimezone(input.getTimezone());
return result;
}

private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) {
try {
JSONObject jsonRecipe = new JSONObject(recipe);
boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals("");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to punt: I'd store "pipeline_name" in a constant since we reuse it 4 times as a raw string here!


if (!hasPipelineName) {
jsonRecipe.put("pipeline_name", ingestionSourceUrn);
recipe = jsonRecipe.toString();
}
} catch (JSONException e) {
log.warn("Error parsing ingestion recipe in JSON form", e);
}
return recipe;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,25 +301,6 @@ export const INCLUDE_LINEAGE: RecipeField = {
},
};

export const IGNORE_START_TIME_LINEAGE: RecipeField = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Agree with removing these ones.

name: 'ignore_start_time_lineage',
label: 'Ignore Start Time Lineage',
tooltip: 'Get all lineage by ignoring the start_time field. It is suggested to set to true initially.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.ignore_start_time_lineage',
rules: null,
};

export const CHECK_ROLE_GRANTS: RecipeField = {
name: 'check_role_grants',
label: 'Check Role Grants',
tooltip:
'If set to True then checks role grants at the beginning of the ingestion run. To be used for debugging purposes. If you think everything is working fine then set it to False. In some cases this can take long depending on how many roles you might have.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.check_role_grants',
rules: null,
};

export const PROFILING_ENABLED: RecipeField = {
name: 'profiling.enabled',
label: 'Enable Profiling',
Expand All @@ -341,7 +322,7 @@ export const STATEFUL_INGESTION_ENABLED: RecipeField = {
export const UPSTREAM_LINEAGE_IN_REPORT: RecipeField = {
name: 'upstream_lineage_in_report',
label: 'Include Upstream Lineage In Report.',
tooltip: 'Useful for debugging lineage information. Set to True to see the raw lineage created internally.',
tooltip: 'Remove stale datasets from datahub once they have been deleted in the source.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.upstream_lineage_in_report',
rules: null,
Expand Down Expand Up @@ -613,13 +594,7 @@ export const DASHBOARD_DENY: RecipeField = {
export const RECIPE_FIELDS = {
[SNOWFLAKE]: {
fields: [SNOWFLAKE_ACCOUNT_ID, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_USERNAME, SNOWFLAKE_PASSWORD, SNOWFLAKE_ROLE],
advancedFields: [
INCLUDE_LINEAGE,
IGNORE_START_TIME_LINEAGE,
CHECK_ROLE_GRANTS,
PROFILING_ENABLED,
STATEFUL_INGESTION_ENABLED,
],
advancedFields: [INCLUDE_LINEAGE, PROFILING_ENABLED, STATEFUL_INGESTION_ENABLED],
filterFields: [
TABLE_ALLOW,
TABLE_DENY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ source:
# Credentials
credential:
project_id: # Your BQ project id, e.g. sample_project_id

# Add secret in Secrets Tab with the relevant names for each variable below
# Your BQ private key id, e.g. "d0121d0000882411234e11166c6aaa23ed5d74e0"
private_key_id: "\${BQ_PRIVATE_KEY_ID}"
# Your BQ private key, e.g. "-----BEGIN PRIVATE KEY-----\\nMIIyourkey\\n-----END PRIVATE KEY-----\\n"
private_key: "\${BQ_PRIVATE_KEY}"

client_email: # Your BQ client email, e.g. "[email protected]"
client_id: # Your BQ client id, e.g. "123456678890"

include_table_lineage: true
include_view_lineage: true
profiling:
enabled: true
stateful_ingestion:
enabled: true
`;

export const BIGQUERY = 'bigquery';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ source:
username: "\${REDSHIFT_USERNAME}" # Your Redshift username, e.g. admin
password: "\${REDSHIFT_PASSWORD}" # Your Redshift password, e.g. password_01

# Options
include_tables: True
include_views: True

# Profiling
table_lineage_mode: stl_scan_based
include_table_lineage: true
include_view_lineage: true
profiling:
enabled: false
enabled: true
stateful_ingestion:
enabled: true
`;

export const REDSHIFT = 'redshift';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ source:
account_id: "example_id"
warehouse: "example_warehouse"
role: "datahub_role"
ignore_start_time_lineage: true
include_table_lineage: true
include_view_lineage: true
check_role_grants: true
profiling:
enabled: true
stateful_ingestion:
Expand Down