-
Notifications
You must be signed in to change notification settings - Fork 279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Work in Progress Stage URL #17973
Work in Progress Stage URL #17973
Changes from 15 commits
6a74a52
a48803b
403a5f6
da3d834
57f5ff5
01bebb9
2682a8e
cb8dc6f
5090b11
6793aab
f071887
81031dc
fcd99a8
7ad1661
45abb75
27816f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1442,9 +1442,9 @@ const ( | |
|
||
checkStageFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;` | ||
|
||
checkStageStatusFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_status = "%s" order by stage_id;` | ||
//checkStageStatusFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_status = "%s" order by stage_id;` | ||
|
||
checkStageStatusWithStageNameFormat = `select url, stage_status from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;` | ||
//checkStageStatusWithStageNameFormat = `select url, stage_status from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;` | ||
|
||
dropStageFormat = `delete from mo_catalog.mo_stages where stage_name = '%s' order by stage_id;` | ||
|
||
|
@@ -1522,20 +1522,25 @@ func getSqlForCheckStage(ctx context.Context, stage string) (string, error) { | |
return fmt.Sprintf(checkStageFormat, stage), nil | ||
} | ||
|
||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
func getSqlForCheckStageStatus(ctx context.Context, status string) string { | ||
return fmt.Sprintf(checkStageStatusFormat, status) | ||
} | ||
*/ | ||
|
||
func getSqlForCheckUdfWithDb(dbName string) string { | ||
return fmt.Sprintf(checkUdfWithDb, dbName) | ||
} | ||
|
||
/* | ||
func getSqlForCheckStageStatusWithStageName(ctx context.Context, stage string) (string, error) { | ||
err := inputNameIsInvalid(ctx, stage) | ||
if err != nil { | ||
return "", err | ||
} | ||
return fmt.Sprintf(checkStageStatusWithStageNameFormat, stage), nil | ||
} | ||
*/ | ||
|
||
func getSqlForInsertIntoMoStages(ctx context.Context, stageName, url, credentials, status, createdTime, comment string) (string, error) { | ||
err := inputNameIsInvalid(ctx, stageName) | ||
|
@@ -3250,7 +3255,7 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err | |
} | ||
} else { | ||
// format credentials and hash it | ||
credentials = HashPassWord(formatCredentials(cs.Credentials)) | ||
credentials = formatCredentials(cs.Credentials) | ||
|
||
if !cs.Status.Exist { | ||
StageStatus = "disabled" | ||
|
@@ -3262,6 +3267,11 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err | |
comment = cs.Comment.Comment | ||
} | ||
|
||
if !(strings.HasPrefix(cs.Url, function.STAGE_PROTOCOL+"://") || strings.HasPrefix(cs.Url, function.S3_PROTOCOL+"://") || | ||
strings.HasPrefix(cs.Url, function.FILE_PROTOCOL+":///")) { | ||
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///") | ||
} | ||
|
||
sql, err = getSqlForInsertIntoMoStages(ctx, string(cs.Name), cs.Url, credentials, StageStatus, types.CurrentTimestamp().String2(time.UTC, 0), comment) | ||
if err != nil { | ||
return err | ||
|
@@ -3279,88 +3289,24 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err | |
func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (err error) { | ||
//var err error | ||
var filePath string | ||
var sql string | ||
var erArray []ExecResult | ||
var stageName string | ||
var stageStatus string | ||
var url string | ||
if ep == nil { | ||
return err | ||
} | ||
|
||
bh := ses.GetBackgroundExec(ctx) | ||
defer bh.Close() | ||
|
||
err = bh.Exec(ctx, "begin;") | ||
defer func() { | ||
err = finishTxn(ctx, bh, err) | ||
}() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// detect filepath contain stage or not | ||
filePath = ep.FilePath | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic is changed in this PR. There is no need to query the stage record and check it. Am I right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. enable/disable in stage_status is not valid. The path expansion is done in pkg/sql/plan/function/stage_util.go now. |
||
if !strings.Contains(filePath, ":") { | ||
// the filepath is the target path | ||
sql = getSqlForCheckStageStatus(ctx, "enabled") | ||
bh.ClearExecResultSet() | ||
err = bh.Exec(ctx, sql) | ||
if strings.HasPrefix(filePath, function.STAGE_PROTOCOL+"://") { | ||
// stage:// URL | ||
s, err := function.UrlToStageDef(filePath, ses.proc) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
erArray, err = getResultSet(ctx, bh) | ||
// s.ToPath() returns the fileservice filepath, i.e. s3,...:/path for S3 or /path for local file | ||
ses.ep.userConfig.StageFilePath, _, err = s.ToPath() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// if have stage enabled | ||
if execResultArrayHasData(erArray) { | ||
return moerr.NewInternalError(ctx, "stage exists, please try to check and use a stage instead") | ||
} else { | ||
// use the filepath | ||
return err | ||
} | ||
} else { | ||
stageName = strings.Split(filePath, ":")[0] | ||
// check the stage status | ||
sql, err = getSqlForCheckStageStatusWithStageName(ctx, stageName) | ||
if err != nil { | ||
return err | ||
} | ||
bh.ClearExecResultSet() | ||
err = bh.Exec(ctx, sql) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
erArray, err = getResultSet(ctx, bh) | ||
if err != nil { | ||
return err | ||
} | ||
if execResultArrayHasData(erArray) { | ||
stageStatus, err = erArray[0].GetString(ctx, 0, 1) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// is the stage staus is disabled | ||
if stageStatus == tree.StageStatusDisabled.String() { | ||
return moerr.NewInternalError(ctx, "stage '%s' is invalid, please check", stageName) | ||
} else if stageStatus == tree.StageStatusEnabled.String() { | ||
// replace the filepath using stage url | ||
url, err = erArray[0].GetString(ctx, 0, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
filePath = strings.Replace(filePath, stageName+":", url, 1) | ||
ses.ep.userConfig.StageFilePath = filePath | ||
} | ||
} else { | ||
return moerr.NewInternalError(ctx, "stage '%s' is not exists, please check", stageName) | ||
} | ||
} | ||
return err | ||
|
||
|
@@ -3424,6 +3370,11 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e | |
} | ||
} else { | ||
if as.UrlOption.Exist { | ||
if !(strings.HasPrefix(as.UrlOption.Url, function.STAGE_PROTOCOL+"://") || | ||
strings.HasPrefix(as.UrlOption.Url, function.S3_PROTOCOL+"://") || | ||
strings.HasPrefix(as.UrlOption.Url, function.FILE_PROTOCOL+":///")) { | ||
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///") | ||
} | ||
sql = getsqlForUpdateStageUrl(string(as.Name), as.UrlOption.Url) | ||
err = bh.Exec(ctx, sql) | ||
if err != nil { | ||
|
@@ -3432,7 +3383,7 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e | |
} | ||
|
||
if as.CredentialsOption.Exist { | ||
credentials = HashPassWord(formatCredentials(as.CredentialsOption)) | ||
credentials = formatCredentials(as.CredentialsOption) | ||
sql = getsqlForUpdateStageCredentials(string(as.Name), credentials) | ||
err = bh.Exec(ctx, sql) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just remove it, if it wil not be used anymore.