Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage fix #18280

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5247247
Stage READ/WRITE support and restrict URL format (#17820)
cpegeric Jul 31, 2024
e5d7693
Stage READ/WRITE support and restrict URL format (#17820)
cpegeric Jul 31, 2024
e19a073
stage list file
cpegeric Aug 1, 2024
af06604
list with or without wildcard
cpegeric Aug 1, 2024
a31b03e
find the prefix without wildcard character to speed up file list
cpegeric Aug 1, 2024
2e97998
fix SCA test compile warning
cpegeric Aug 2, 2024
4012b31
fix test and remove test TestDoCheckFilePath to avoid failed to run S…
cpegeric Aug 2, 2024
edc08ce
disable updateTimeZone test. Not working in my machine. Expected 'lo…
cpegeric Aug 2, 2024
cf0dc02
fix SCA test
cpegeric Aug 2, 2024
6f522ae
fix SCA test
cpegeric Aug 2, 2024
a66c41e
add stage_list tests
cpegeric Aug 5, 2024
ff31d05
fix SCA test
cpegeric Aug 5, 2024
5c635eb
export work on top of fileservice (matrixorigin #17748)
cpegeric Aug 6, 2024
8c57483
fix bvt test cases (#17820)
cpegeric Aug 7, 2024
3932318
bug fix get proc context (#17820)
cpegeric Aug 8, 2024
058ff9b
remove unused code (##17820)
cpegeric Aug 8, 2024
7fb4fa5
remove unused code (#17820)
cpegeric Aug 9, 2024
88aafab
remove appendBytes and standardize to use bytes.Buffer
cpegeric Aug 12, 2024
cfb9edf
move stage_list() function to table_function (#17820)
cpegeric Aug 21, 2024
45e70b3
cleanup (#17820)
cpegeric Aug 21, 2024
add027d
Stage READ/WRITE support and restrict URL format (#17820)
cpegeric Jul 31, 2024
009bb8b
Stage READ/WRITE support and restrict URL format (#17820)
cpegeric Jul 31, 2024
603261e
stage list file
cpegeric Aug 1, 2024
3ca6fdc
list with or without wildcard
cpegeric Aug 1, 2024
2cdcc3c
find the prefix without wildcard character to speed up file list
cpegeric Aug 1, 2024
76ac94c
fix SCA test compile warning
cpegeric Aug 2, 2024
80f3995
fix test and remove test TestDoCheckFilePath to avoid failed to run S…
cpegeric Aug 2, 2024
b893ec8
disable updateTimeZone test. Not working in my machine. Expected 'lo…
cpegeric Aug 2, 2024
2fccf15
fix SCA test
cpegeric Aug 2, 2024
dbc49b5
fix SCA test
cpegeric Aug 2, 2024
44fdbe7
add stage_list tests
cpegeric Aug 5, 2024
7989889
fix SCA test
cpegeric Aug 5, 2024
4866038
export work on top of fileservice (matrixorigin #17748)
cpegeric Aug 6, 2024
f33e1a1
bug fix get proc context (#17820)
cpegeric Aug 8, 2024
0421b88
remove unused code (##17820)
cpegeric Aug 8, 2024
f1d165d
remove unused code (#17820)
cpegeric Aug 9, 2024
3476abe
remove appendBytes and standardize to use bytes.Buffer
cpegeric Aug 12, 2024
b1dfc76
merge and bug fix (#17820)
cpegeric Aug 21, 2024
d505742
go fmt code (#17820)
cpegeric Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 17 additions & 86 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,10 +1458,6 @@ const (

checkStageFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;`

checkStageStatusFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_status = "%s" order by stage_id;`

checkStageStatusWithStageNameFormat = `select url, stage_status from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;`

dropStageFormat = `delete from mo_catalog.mo_stages where stage_name = '%s' order by stage_id;`

updateStageUrlFormat = `update mo_catalog.mo_stages set url = '%s' where stage_name = '%s' order by stage_id;`
Expand Down Expand Up @@ -1538,20 +1534,9 @@ func getSqlForCheckStage(ctx context.Context, stage string) (string, error) {
return fmt.Sprintf(checkStageFormat, stage), nil
}

func getSqlForCheckStageStatus(ctx context.Context, status string) string {
return fmt.Sprintf(checkStageStatusFormat, status)
}

func getSqlForCheckUdfWithDb(dbName string) string {
return fmt.Sprintf(checkUdfWithDb, dbName)
}
func getSqlForCheckStageStatusWithStageName(ctx context.Context, stage string) (string, error) {
err := inputNameIsInvalid(ctx, stage)
if err != nil {
return "", err
}
return fmt.Sprintf(checkStageStatusWithStageNameFormat, stage), nil
}

func getSqlForInsertIntoMoStages(ctx context.Context, stageName, url, credentials, status, createdTime, comment string) (string, error) {
err := inputNameIsInvalid(ctx, stageName)
Expand Down Expand Up @@ -3266,7 +3251,7 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
}
} else {
// format credentials and hash it
credentials = HashPassWord(formatCredentials(cs.Credentials))
credentials = formatCredentials(cs.Credentials)

if !cs.Status.Exist {
StageStatus = "disabled"
Expand All @@ -3278,6 +3263,11 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
comment = cs.Comment.Comment
}

if !(strings.HasPrefix(cs.Url, function.STAGE_PROTOCOL+"://") || strings.HasPrefix(cs.Url, function.S3_PROTOCOL+"://") ||
strings.HasPrefix(cs.Url, function.FILE_PROTOCOL+":///")) {
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///")
}

sql, err = getSqlForInsertIntoMoStages(ctx, string(cs.Name), cs.Url, credentials, StageStatus, types.CurrentTimestamp().String2(time.UTC, 0), comment)
if err != nil {
return err
Expand All @@ -3295,88 +3285,24 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (err error) {
//var err error
var filePath string
var sql string
var erArray []ExecResult
var stageName string
var stageStatus string
var url string
if ep == nil {
return err
}

bh := ses.GetBackgroundExec(ctx)
defer bh.Close()

err = bh.Exec(ctx, "begin;")
defer func() {
err = finishTxn(ctx, bh, err)
}()
if err != nil {
return err
}

// detect filepath contain stage or not
filePath = ep.FilePath
if !strings.Contains(filePath, ":") {
// the filepath is the target path
sql = getSqlForCheckStageStatus(ctx, "enabled")
bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if strings.HasPrefix(filePath, function.STAGE_PROTOCOL+"://") {
// stage:// URL
s, err := function.UrlToStageDef(filePath, ses.proc)
if err != nil {
return err
}

erArray, err = getResultSet(ctx, bh)
// s.ToPath() returns the fileservice filepath, i.e. s3,...:/path for S3 or /path for local file
ses.ep.userConfig.StageFilePath, _, err = s.ToPath()
if err != nil {
return err
}

// if have stage enabled
if execResultArrayHasData(erArray) {
return moerr.NewInternalError(ctx, "stage exists, please try to check and use a stage instead")
} else {
// use the filepath
return err
}
} else {
stageName = strings.Split(filePath, ":")[0]
// check the stage status
sql, err = getSqlForCheckStageStatusWithStageName(ctx, stageName)
if err != nil {
return err
}
bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if err != nil {
return err
}

erArray, err = getResultSet(ctx, bh)
if err != nil {
return err
}
if execResultArrayHasData(erArray) {
stageStatus, err = erArray[0].GetString(ctx, 0, 1)
if err != nil {
return err
}

// is the stage staus is disabled
if stageStatus == tree.StageStatusDisabled.String() {
return moerr.NewInternalError(ctx, "stage '%s' is invalid, please check", stageName)
} else if stageStatus == tree.StageStatusEnabled.String() {
// replace the filepath using stage url
url, err = erArray[0].GetString(ctx, 0, 0)
if err != nil {
return err
}

filePath = strings.Replace(filePath, stageName+":", url, 1)
ses.ep.userConfig.StageFilePath = filePath
}
} else {
return moerr.NewInternalError(ctx, "stage '%s' is not exists, please check", stageName)
}
}
return err

Expand Down Expand Up @@ -3440,6 +3366,11 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e
}
} else {
if as.UrlOption.Exist {
if !(strings.HasPrefix(as.UrlOption.Url, function.STAGE_PROTOCOL+"://") ||
strings.HasPrefix(as.UrlOption.Url, function.S3_PROTOCOL+"://") ||
strings.HasPrefix(as.UrlOption.Url, function.FILE_PROTOCOL+":///")) {
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///")
}
sql = getsqlForUpdateStageUrl(string(as.Name), as.UrlOption.Url)
err = bh.Exec(ctx, sql)
if err != nil {
Expand All @@ -3448,7 +3379,7 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e
}

if as.CredentialsOption.Exist {
credentials = HashPassWord(formatCredentials(as.CredentialsOption))
credentials = formatCredentials(as.CredentialsOption)
sql = getsqlForUpdateStageCredentials(string(as.Name), credentials)
err = bh.Exec(ctx, sql)
if err != nil {
Expand Down
Loading
Loading