Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL Output: UI and config gen service updates #86

Merged
merged 6 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace DataX.Config.ConfigDataModel
{
// Class representing union schema of all supported ouputs (Blob, CosmosDB, EventHub, Metrics)
// Class representing union schema of all supported ouputs (Blob, CosmosDB, EventHub, SQL Server, Metrics)
public class FlowGuiOutputProperties
{
[JsonProperty("connectionString", NullValueHandling = NullValueHandling.Ignore)]
Expand All @@ -32,5 +32,14 @@ public class FlowGuiOutputProperties

[JsonProperty("collection", NullValueHandling = NullValueHandling.Ignore)]
public string Collection;

[JsonProperty("tableName", NullValueHandling = NullValueHandling.Ignore)]
public string TableName;

[JsonProperty("writeMode", NullValueHandling = NullValueHandling.Ignore)]
public string WriteMode;

[JsonProperty("useBulkInsert", NullValueHandling = NullValueHandling.Ignore)]
public string UseBulkInsert;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ public class FlowOutputSpec
[JsonProperty("httppost", NullValueHandling = NullValueHandling.Ignore)]
public FlowHttpOutputSpec HttpOutput { get; set; }

[JsonProperty("sqlServer", NullValueHandling = NullValueHandling.Ignore)]
public FlowSqlOutputSpec SqlOutput { get; set; }

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// *********************************************************************
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using Newtonsoft.Json;

namespace DataX.Config.ConfigDataModel.RuntimeConfig
{
public class FlowSqlOutputSpec
{
[JsonProperty("connectionStringRef")]
public string ConnectionStringRef { get; set; }

[JsonProperty("tableName")]
public string TableName { get; set; }

[JsonProperty("databaseName")]
public string DatabaseName { get; set; }

[JsonProperty("user")]
public string User { get; set; }

[JsonProperty("password")]
public string Password { get; set; }

[JsonProperty("url")]
public string Url { get; set; }

[JsonProperty("writeMode")]
public string WriteMode { get; set; }

[JsonProperty("useBulkInsert")]
public string UseBulkInsert { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private async Task<FlowOutputSpec[]> ProcessOutputs(FlowGuiOutput[] uiOutputs, R
{
var httpOutput = ProcessLocalOutputMetric(configName, localMetricsEndpoint);
Ensure.EnsureNullElseThrowNotSupported(flowOutput.HttpOutput, "Multiple target httpost/metric ouptut for same dataset not supported.");
flowOutput.HttpOutput = httpOutput;
flowOutput.HttpOutput = httpOutput;
}
break;
}
Expand All @@ -175,6 +175,13 @@ private async Task<FlowOutputSpec[]> ProcessOutputs(FlowGuiOutput[] uiOutputs, R
flowOutput.BlobOutput = blobOutput;
break;
}
case "sqlserver":
{
var sqlOutput = await ProcessOutputSql(configName, output);
Ensure.EnsureNullElseThrowNotSupported(flowOutput.SqlOutput, "Multiple target Sql ouptut for same dataset not supported.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output typo. 1 SQL output for a Spark SQL table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One spark dataset can output to only one output (any). Different table is also not supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. typo in ouptut.

flowOutput.SqlOutput = sqlOutput;
break;
}
default:
throw new NotSupportedException($"{output.Type} output type not supported");
}
Expand Down Expand Up @@ -320,6 +327,45 @@ private FlowHttpOutputSpec ProcessLocalOutputMetric(string configName, string en

}

private async Task<FlowSqlOutputSpec> ProcessOutputSql(string configName, FlowGuiOutput uiOutput)
{
if (uiOutput != null && uiOutput.Properties != null)
{
var sparkKeyVaultName = Configuration[Constants.ConfigSettingName_RuntimeKeyVaultName];

string connectionString = await KeyVaultClient.ResolveSecretUriAsync(uiOutput.Properties.ConnectionString).ConfigureAwait(false);

var database = GetValueFromJdbcConnection(connectionString,"database");
var user = GetValueFromJdbcConnection(connectionString, "user");
var pwd = GetValueFromJdbcConnection(connectionString, "password");
var url = GetUrlFromJdbcConnection(connectionString);

// Save password and url in keyvault
var pwdSecretId = $"{configName}-outSqlPassword";
var pwdRef = await KeyVaultClient.SaveSecretAsync(sparkKeyVaultName, pwdSecretId, pwd, true).ConfigureAwait(false);

var urlSecretId = $"{configName}-outSqlUrl";
var urlRef = await KeyVaultClient.SaveSecretAsync(sparkKeyVaultName, urlSecretId, url, true).ConfigureAwait(false);

FlowSqlOutputSpec sqlOutput = new FlowSqlOutputSpec()
{
ConnectionStringRef = uiOutput.Properties.ConnectionString,
TableName = uiOutput.Properties.TableName,
WriteMode = uiOutput.Properties.WriteMode,
UseBulkInsert = uiOutput.Properties.UseBulkInsert,
DatabaseName = database,
User = user,
Password = pwdRef,
Url = urlRef
};
return sqlOutput;
}
else
{
return null;
}
}

/// <summary>
/// Parses the account name from connection string
/// </summary>
Expand Down Expand Up @@ -359,5 +405,32 @@ private string ParseBlobAccountKey(string connectionString)

return matched;
}

private string GetValueFromJdbcConnection(string connectionString, string key)
{
try
{
Match match = Regex.Match(connectionString, $"{key}=([^;]*);", RegexOptions.IgnoreCase);
string value = match.Groups[1].Value;
return value;
}
catch (Exception)
{
throw new Exception ($"{key} not found in jdbc connection string");
}
}
private string GetUrlFromJdbcConnection(string connectionString)
{
try
{
Match match = Regex.Match(connectionString, @"jdbc:sqlserver://(.*):", RegexOptions.IgnoreCase);
string value = match.Groups[1].Value;
return value;
}
catch (Exception)
{
throw new Exception("url pattern not found in jdbc connecton string");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Licensed under the MIT License
should match the Name and Version attributes of the ServiceManifest element defined in the
ServiceManifest.xml file. -->
<ServiceManifestImport>
<ServiceManifestRef ServiceManifestName="Flow.LiveDataServicePkg" ServiceManifestVersion="1.0.0" />
<ServiceManifestRef ServiceManifestName="Flow.LiveDataServicePkg" ServiceManifestVersion="1.0.1" />
<ConfigOverrides>
<ConfigOverride Name="Config">
<Settings>
Expand All @@ -51,7 +51,7 @@ Licensed under the MIT License
</EnvironmentOverrides>
</ServiceManifestImport>
<ServiceManifestImport>
<ServiceManifestRef ServiceManifestName="Flow.SchemaInferenceServicePkg" ServiceManifestVersion="1.0.0" />
<ServiceManifestRef ServiceManifestName="Flow.SchemaInferenceServicePkg" ServiceManifestVersion="1.0.1" />
<ConfigOverrides>
<ConfigOverride Name="Config">
<Settings>
Expand All @@ -73,7 +73,7 @@ Licensed under the MIT License
</EnvironmentOverrides>
</ServiceManifestImport>
<ServiceManifestImport>
<ServiceManifestRef ServiceManifestName="Flow.InteractiveQueryServicePkg" ServiceManifestVersion="1.0.0" />
<ServiceManifestRef ServiceManifestName="Flow.InteractiveQueryServicePkg" ServiceManifestVersion="1.0.1" />
<ConfigOverrides>
<ConfigOverride Name="Config">
<Settings>
Expand All @@ -94,7 +94,7 @@ Licensed under the MIT License
</EnvironmentOverrides>
</ServiceManifestImport>
<ServiceManifestImport>
<ServiceManifestRef ServiceManifestName="Flow.ManagementServicePkg" ServiceManifestVersion="1.0.0" />
<ServiceManifestRef ServiceManifestName="Flow.ManagementServicePkg" ServiceManifestVersion="1.0.1" />
<ConfigOverrides>
<ConfigOverride Name="Config">
<Settings>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ class FlowDefinitionPanel extends React.Component {
onUpdateBlobPartitionFormat={this.props.onUpdateBlobPartitionFormat}
onUpdateFormatType={this.props.onUpdateFormatType}
onUpdateCompressionType={this.props.onUpdateCompressionType}
onUpdateSqlConnection={this.props.onUpdateSqlConnection}
onUpdateSqlTableName={this.props.onUpdateSqlTableName}
onUpdateSqlWriteMode={this.props.onUpdateSqlWriteMode}
onUpdateSqlUseBulkInsert={this.props.onUpdateSqlUseBulkInsert}
addOutputSinkButtonEnabled={this.state.addOutputSinkButtonEnabled}
deleteOutputSinkButtonEnabled={this.state.deleteOutputSinkButtonEnabled}
/>
Expand Down Expand Up @@ -862,6 +866,11 @@ const mapDispatchToProps = dispatch => ({
onUpdateFormatType: type => dispatch(Actions.updateFormatType(type)),
onUpdateCompressionType: type => dispatch(Actions.updateCompressionType(type)),

onUpdateSqlConnection: connection => dispatch(Actions.updateSqlConnection(connection)),
onUpdateSqlTableName: name => dispatch(Actions.updateSqlTableName(name)),
onUpdateSqlWriteMode: mode => dispatch(Actions.updateSqlWriteMode(mode)),
onUpdateSqlUseBulkInsert: useBulkInsert => dispatch(Actions.updateSqlUseBulkInsert(useBulkInsert)),

// Rule Actions
onNewRule: type => dispatch(Actions.newRule(type)),
onDeleteRule: index => dispatch(Actions.deleteRule(index)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Colors, IconButtonStyles, PanelHeader, PanelHeaderButtons, ScrollableCo
import CosmosDbSinkerSettings from './cosmosdbSinkerSettings';
import EventHubSinkerSettings from './eventHubSinkerSettings';
import BlobSinkerSettings from './blobSinkerSettings';
import SqlSinkerSettings from './SqlSinkerSettings';
vijayupadya marked this conversation as resolved.
Show resolved Hide resolved
import MetricSinkerSettings from './metricSinkerSettings';
import LocalSinkerSettings from './localSinkerSettings';
import * as Styles from '../../../../common/styles';
Expand Down Expand Up @@ -42,6 +43,7 @@ export default class OutputSettingsContent extends React.Component {
[Models.sinkerTypeEnum.cosmosdb]: sinker => this.renderCosmosDbSettings(sinker),
[Models.sinkerTypeEnum.eventHub]: sinker => this.renderEventHubSettings(sinker),
[Models.sinkerTypeEnum.blob]: sinker => this.renderBlobSettings(sinker),
[Models.sinkerTypeEnum.sql]: sinker => this.renderSqlSettings(sinker),
[Models.sinkerTypeEnum.metric]: sinker => this.renderMetricSettings(sinker),
[Models.sinkerTypeEnum.local]: sinker => this.renderLocalSettings(sinker)
};
Expand Down Expand Up @@ -233,6 +235,20 @@ export default class OutputSettingsContent extends React.Component {
);
}

renderSqlSettings(sinker) {
return (
<SqlSinkerSettings
sinker={sinker}
sinkerDisplayName={this.sinkerTypeToDisplayMap[sinker.type]}
onUpdateSinkerName={this.props.onUpdateSinkerName}
onUpdateSqlConnection={this.props.onUpdateSqlConnection}
onUpdateSqlTableName={this.props.onUpdateSqlTableName}
onUpdateSqlWriteMode={this.props.onUpdateSqlWriteMode}
onUpdateSqlUseBulkInsert={this.props.onUpdateSqlUseBulkInsert}
/>
);
}

renderMetricSettings(sinker) {
return (
<MetricSinkerSettings flowId={this.props.flowId} sinker={sinker} sinkerDisplayName={this.sinkerTypeToDisplayMap[sinker.type]} />
Expand Down Expand Up @@ -308,6 +324,12 @@ OutputSettingsContent.propTypes = {
onUpdateBlobPrefix: PropTypes.func.isRequired,
onUpdateBlobPartitionFormat: PropTypes.func.isRequired,

// Sql
onUpdateSqlConnection: PropTypes.func.isRequired,
onUpdateSqlTableName: PropTypes.func.isRequired,
onUpdateSqlWriteMode: PropTypes.func.isRequired,
onUpdateSqlUseBulkInsert: PropTypes.func.isRequired,

addOutputSinkButtonEnabled: PropTypes.bool.isRequired,
deleteOutputSinkButtonEnabled: PropTypes.bool.isRequired
};
Expand Down
Loading