Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work in Progress Stage URL #17973

Closed
wants to merge 16 commits into from
Closed

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented Aug 8, 2024

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #

What this PR does / why we need it:

SQL failed to run and return error "context canceled".

operating system:

ubuntu24

go version:
go 1.22.4

To reproduce the case,

data file /tmp/data/a.csv

%  cat /tmp/data/a.csv 
1,a,b
2,aa,bb
3,aaa,bbb

SQL command to reproduce,

% mo_ctl connect
create database testdb;
use testdb;

create stage tmpstage url='file:///tmp/';
create table srctab (r_regionkey int, r_name char(25), r_comment varchar(152));
load data infile 'stage://tmpstage/data/a.csv' into table srctab fields terminated by ',';
select * from srctab into outfile 'stage://tmpstage/data/out.csv';

Result

mysql> create database testdb;
Query OK, 1 row affected (0.08 sec)

mysql> use testdb;
Database changed
mysql> 
mysql> create stage tmpstage url='file:///tmp/';
Query OK, 0 rows affected (0.02 sec)

mysql> create table srctab (r_regionkey int, r_name char(25), r_comment varchar(152));
Query OK, 0 rows affected (0.05 sec)

mysql> load data infile 'stage://tmpstage/data/a.csv' into table srctab fields terminated by ',';

ERROR 1105 (HY000): context canceled
mysql> select * from srctab into outfile 'stage://tmpstage/data/out.csv';
ERROR 1105 (HY000): context canceled
mysql> 
mysql> 

GDB

  1. set the breakpoint in stage_util.go:175 first.

b stage_util.go:175

  1. run LOAD DATA SQL and break in stage_util.go

  2. set breakpoint in sql_executor.go

b sql_executor.go:246
b backend.go:581

  1. continue and step to see until error

GDB LOG HERE

WeChatWorkScreenshot_2389fe3e-fd7c-4643-824a-ab0f7a36d88f WeChatWorkScreenshot_a667369b-8132-49a2-9a69-793026e38e26

There is a race.
image


PR Type

Bug fix, Enhancement, Tests


Description

  • Fixed SQL execution error "context canceled" when running stage-related commands.
  • Refactored export logic to use file service and buffer.
  • Added and updated utility functions for handling stage URLs and credentials.
  • Updated external scan compilation and file existence checks to support stage URLs.
  • Refactored query result dumping and status statement execution to use new export logic.
  • Added and updated test cases for stage URL handling, data loading, and snapshot results.

Changes walkthrough 📝

Relevant files
Enhancement
12 files
export.go
Refactor export logic to use file service and buffer.       

pkg/frontend/export.go

  • Removed unused imports and variables.
  • Refactored ExportConfig to remove file handling and buffer writer.
  • Simplified file opening logic and integrated file service handling.
  • Refactored CSV writing functions to use a buffer.
  • +122/-201
    stage_util.go
    Add utility functions for stage URL handling and file listing.

    pkg/sql/plan/function/stage_util.go

  • Added new utility functions for handling stage URLs and credentials.
  • Implemented functions to parse and expand stage definitions.
  • Added functions for listing files in stages with and without
    wildcards.
  • +430/-0 
    authenticate.go
    Update stage creation and credential handling logic.         

    pkg/frontend/authenticate.go

  • Removed commented-out SQL query strings.
  • Updated stage creation to validate URL protocols.
  • Modified credential handling to avoid hashing.
  • +24/-73 
    utils.go
    Add and refactor parameter initialization for stage URLs.

    pkg/sql/plan/utils.go

  • Added functions to initialize stage parameters for S3 and local files.
  • Refactored parameter initialization to support stage URLs.
  • +125/-0 
    func_unary.go
    Add StageList function for listing files in a stage.         

    pkg/sql/plan/function/func_unary.go

    • Added StageList function to list files in a stage.
    +55/-0   
    list_builtIn.go
    Add stage_list function to built-in functions.                     

    pkg/sql/plan/function/list_builtIn.go

  • Added stage_list function to the list of supported built-in functions.

  • +34/-0   
    compile.go
    Support stage URLs in external scan compilation.                 

    pkg/sql/compile/compile.go

    • Updated external scan compilation to support stage URLs.
    +16/-1   
    query_result.go
    Refactor query result dumping to use new export logic.     

    pkg/frontend/query_result.go

    • Refactored query result dumping to use new export logic.
    +1/-5     
    function_id.go
    Add STAGE_LIST function ID.                                                           

    pkg/sql/plan/function/function_id.go

    • Added STAGE_LIST function ID.
    +6/-0     
    status_stmt.go
    Refactor status statement execution to use new export logic.

    pkg/frontend/status_stmt.go

    • Refactored status statement execution to use new export logic.
    +3/-5     
    build_load.go
    Support stage URLs in file existence check.                           

    pkg/sql/plan/build_load.go

    • Updated file existence check to support stage URLs.
    +1/-1     
    external.go
    Support stage URLs in external stats initialization.         

    pkg/sql/plan/external.go

    • Updated external stats initialization to support stage URLs.
    +1/-1     
    Tests
    12 files
    authenticate_test.go
    Update test cases for stage URL and credentials format.   

    pkg/frontend/authenticate_test.go

  • Updated test cases to remove single quotes from URLs and credentials.
  • +13/-314
    export_test.go
    Refactor export tests to use buffer and remove file handling.

    pkg/frontend/export_test.go

  • Removed tests related to file handling.
  • Updated tests to use buffer for CSV writing.
  • Refactored test cases to align with new export logic.
  • +31/-93 
    session_test.go
    Comment out test case for updating time zone to system.   

    pkg/frontend/session_test.go

    • Commented out a test case for updating time zone to "system".
    +6/-4     
    nonsys_restore_system_table_to_nonsys_account.result
    Update timestamps and snapshot results.                                   

    test/distributed/cases/snapshot/nonsys_restore_system_table_to_nonsys_account.result

    • Updated timestamps and snapshot results.
    +36/-36 
    load_data_parquet.result
    Add test results for loading data from a stage.                   

    test/distributed/cases/load_data/load_data_parquet.result

    • Added test results for loading data from a stage.
    +15/-1   
    load_data_parquet.sql
    Add test case for loading data from a stage.                         

    test/distributed/cases/load_data/load_data_parquet.sql

    • Added test case for loading data from a stage.
    +8/-1     
    cluster_level_snapshot_restore_system_table_to_nonsys.result
    Update timestamps and stage credentials in snapshot tests

    test/distributed/cases/snapshot/cluster_level_snapshot_restore_system_table_to_nonsys.result

  • Updated timestamps for snapshot creation and function definitions.
  • Adjusted stage credentials format.
  • Modified procedure definitions with new timestamps.
  • +31/-31 
    sys_restore_system_table_to_nonsys_account.result
    Update timestamps and stage credentials in sys restore tests

    test/distributed/cases/snapshot/sys_restore_system_table_to_nonsys_account.result

  • Updated timestamps for snapshot creation and function definitions.
  • Adjusted stage credentials format.
  • Modified procedure definitions with new timestamps.
  • +30/-30 
    sys_restore_system_table_to_newnonsys_account.result
    Update timestamps and stage credentials in sys restore to new account
    tests

    test/distributed/cases/snapshot/sys_restore_system_table_to_newnonsys_account.result

  • Updated timestamps for snapshot creation and function definitions.
  • Adjusted stage credentials format.
  • Modified procedure definitions with new timestamps.
  • +29/-29 
    stage.result
    Add and update stage creation and data loading tests         

    test/distributed/cases/stage/stage.result

  • Added new stage creation tests with various URL protocols.
  • Updated stage credentials format.
  • Added tests for loading data from stages and creating external tables.

  • +137/-110
    stage.sql
    Refactor stage URLs and enhance test cases for stages       

    test/distributed/cases/stage/stage.sql

  • Updated stage URLs to use consistent 's3://bucket' format.
  • Added new test cases for invalid paths and protocols.
  • Included new stages with additional credentials and regions.
  • Enhanced test cases with sub-stages and file operations.
  • +99/-65 
    restore_cluster_table.result
    Update test results with new timestamps and credentials format

    test/distributed/cases/snapshot/cluster/restore_cluster_table.result

  • Updated timestamps for stored procedures and functions.
  • Modified stage credentials format in test results.
  • Adjusted snapshot timestamps for consistency.
  • +18/-18 

    💡 PR-Agent usage:
    Comment /help on the PR to get a list of all available PR-Agent tools and their descriptions

    @CLAassistant
    Copy link

    CLAassistant commented Aug 8, 2024

    CLA assistant check
    All committers have signed the CLA.

    @mergify mergify bot added the kind/bug Something isn't working label Aug 8, 2024
    Copy link

    qodo-merge-pro bot commented Aug 8, 2024

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Reviewer Guide 🔍

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 No relevant tests
    🔒 Security concerns

    Sensitive information exposure:
    The new code in pkg/sql/plan/function/stage_util.go handles credentials for stages, which can include sensitive information such as AWS keys. It is crucial to ensure that these credentials are handled securely, not logged inappropriately, and are not vulnerable to injection attacks or leaks through error messages or logs.

    ⚡ Key issues to review

    Refactoring
    The removal of bufio and os imports suggests significant changes in file handling logic. Ensure that the new implementation using io.Pipe and fileservice.FileService correctly handles file operations, especially in concurrent environments or where error handling is critical.

    Functionality Change
    The function openNewFile has been refactored to remove direct file system interactions and instead use a service-based approach (fileservice.FileService). This change should be thoroughly tested to ensure that file operations are still performed correctly, particularly in terms of file creation, writing, and error handling.

    Error Handling
    The new asynchronous file writing logic in openNewFile introduces more complexity. It's crucial to ensure that errors from asynchronous operations are properly propagated and handled, especially since file I/O operations can fail for various reasons like disk full, permissions issues, etc.

    New Functionality
    The addition of a new file stage_util.go introduces significant functionality related to stage management in SQL operations. This includes parsing stage URLs, handling credentials, and converting stage paths. Each of these functionalities is critical and should be reviewed for security implications, especially credential handling and URL parsing.

    Copy link

    qodo-merge-pro bot commented Aug 8, 2024

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Code Suggestions ✨

    CategorySuggestion                                                                                                                                    Score
    Possible bug
    Ensure error handling for errgroup.WithContext to prevent ignoring critical errors

    Handle potential errors from the errgroup.WithContext call in the openNewFile
    function to ensure that no error is silently ignored, which could lead to runtime
    panics or unexpected behavior.

    pkg/frontend/export.go [180-181]

    -ep.AsyncGroup, _ = errgroup.WithContext(ctx)
    +var err error
    +ep.AsyncGroup, err = errgroup.WithContext(ctx)
    +if err != nil {
    +    return err
    +}
     ep.AsyncGroup.Go(asyncWriteFunc)
     
    • Apply this suggestion
    Suggestion importance[1-10]: 9

    Why: This suggestion addresses a potential bug by ensuring that errors from errgroup.WithContext are not ignored, which is crucial for preventing runtime panics or unexpected behavior.

    9
    Add error handling for URL parsing to prevent runtime errors

    Implement error handling for the URL parsing to handle potential runtime errors
    gracefully.

    pkg/frontend/authenticate_test.go [9607]

    -Url: "s3://load/files/",
    +Url, err := parseURL("s3://load/files/")
    +if err != nil {
    +    log.Fatal(err)
    +}
     
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: Adding error handling for URL parsing prevents potential runtime errors, improving the robustness of the code. This is a significant improvement for reliability.

    8
    Security
    Manage credentials securely using a secure store

    Use a struct or a configuration file to manage credentials securely rather than
    hardcoding them in the source code.

    pkg/frontend/authenticate_test.go [9786]

    -Credentials: []string{"AWS_KEY_ID", "1a2b3c", "AWS_SECRET_KEY", "4x5y6z"},
    +Credentials: loadCredentialsFromSecureStore(),
     
    • Apply this suggestion
    Suggestion importance[1-10]: 9

    Why: Managing credentials securely using a secure store is crucial for security. This suggestion addresses a major security concern by preventing hardcoded sensitive information.

    9
    Add URL scheme validation to enhance security

    Ensure that the URL scheme is validated to prevent potential security risks or
    misconfigurations.

    pkg/frontend/authenticate_test.go [9607]

    -Url: "s3://load/files/",
    +Url: validateURLScheme("s3://load/files/"),
     
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: Adding URL scheme validation enhances security by preventing potential misconfigurations or malicious inputs. This is a significant improvement for security.

    8
    Possible issue
    Add error logging for the Write method to ensure all errors are captured and handled

    Add error handling for the Write method inside the asyncWriteFunc to manage
    potential errors during the asynchronous write operation. This ensures that all
    errors are properly handled and logged or returned.

    pkg/frontend/export.go [170-176]

     err := ep.FileService.Write(ctx, vec)
     if err != nil {
    +    log.Errorf("Failed to write data: %v", err)
         err2 := ep.AsyncReader.CloseWithError(err)
         if err2 != nil {
    +        log.Errorf("Failed to close reader with error: %v", err2)
             return err2
         }
    +    return err
     }
     
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: Adding error logging improves the robustness of the code by ensuring that all errors are captured and handled, which is important for debugging and maintaining the system.

    8
    Maintainability
    Encapsulate URL validation logic into a separate function to enhance code reusability

    Ensure that the URL validation logic is encapsulated in a separate function to
    improve code reusability and maintainability.

    pkg/frontend/authenticate.go [3270-3273]

    -if !(strings.HasPrefix(cs.Url, function.STAGE_PROTOCOL+"://") || strings.HasPrefix(cs.Url, function.S3_PROTOCOL+"://") ||
    -    strings.HasPrefix(cs.Url, function.FILE_PROTOCOL+":///")) {
    +if !isValidURL(cs.Url) {
         return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///")
     }
     
    +// Define the function elsewhere in your codebase
    +func isValidURL(url string) bool {
    +    return strings.HasPrefix(url, function.STAGE_PROTOCOL+"://") || 
    +           strings.HasPrefix(url, function.S3_PROTOCOL+"://") ||
    +           strings.HasPrefix(url, function.FILE_PROTOCOL+":///")
    +}
    +
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: Encapsulating the URL validation logic into a separate function improves code reusability and maintainability, making the codebase cleaner and easier to manage.

    8
    Replace hardcoded URL with a variable or constant

    Replace the hardcoded S3 URL with a variable or a constant to make the code more
    maintainable and flexible for changes in the URL.

    pkg/frontend/authenticate_test.go [9607]

    -Url: "s3://load/files/",
    +Url: s3BaseURL + "/load/files/",
     
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: This suggestion improves maintainability by replacing hardcoded URLs with a variable or constant, making future changes easier. However, it is not critical for functionality or security.

    7
    Reduce code duplication by encapsulating repeated checks and writes into a helper function

    Refactor the repeated checks for flag && enclosed != 0 in formatOutputString to a
    single check by encapsulating the repeated code into a helper function. This will
    reduce code duplication and improve maintainability.

    pkg/frontend/export.go [215-227]

    +writeEnclosed := func() error {
    +    if _, err = buffer.Write([]byte{enclosed}); err != nil {
    +        return err
    +    }
    +    return nil
    +}
     if flag && enclosed != 0 {
    -    if _, err = buffer.Write([]byte{enclosed}); err != nil {
    +    if err := writeEnclosed(); err != nil {
             return err
         }
     }
     if _, err = buffer.Write(tmp); err != nil {
         return err
     }
     if flag && enclosed != 0 {
    -    if _, err = buffer.Write([]byte{enclosed}); err != nil {
    +    if err := writeEnclosed(); err != nil {
             return err
         }
     }
     
    • Apply this suggestion
    Suggestion importance[1-10]: 6

    Why: This suggestion improves maintainability by reducing code duplication, but it does not address a critical issue. It makes the code cleaner and easier to maintain.

    6
    Simplify error handling by removing repetitive checks in the function

    Refactor the error handling in InitInfileOrStageParam to avoid repetitive error
    checks and streamline the function logic.

    pkg/sql/plan/utils.go [1571-1575]

    -if err != nil {
    -    return err
    -}
     if err != nil {
         return err
     }
     
    • Apply this suggestion
    Suggestion importance[1-10]: 6

    Why: Simplifying error handling by removing repetitive checks improves code readability and maintainability, although the impact is relatively minor.

    6
    Enhancement
    Improve code consistency and readability by using fmt.Sprintf instead of strconv.AppendInt

    Replace the use of strconv.AppendInt with fmt.Sprintf for consistency and
    readability. This change applies to the handling of MYSQL_TYPE_YEAR and other
    integer types in the exportDataFromResultSetToCSVFile function.

    pkg/frontend/export.go [544-546]

    -var lineStr []byte
    -lineStr = strconv.AppendInt(lineStr, value, 10)
    -if err = formatOutputString(oq, lineStr, symbol[i], closeby, flag[i], buffer); err != nil {
    +lineStr := fmt.Sprintf("%d", value)
    +if err = formatOutputString(oq, []byte(lineStr), symbol[i], closeby, flag[i], buffer); err != nil {
         return err
     }
     
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: The suggestion improves readability and consistency, but it does not address a critical issue. Using fmt.Sprintf is more readable than strconv.AppendInt, but the existing code is still functional.

    7
    Performance
    Consolidate multiple defer statements into a single one to optimize resource cleanup

    Replace the multiple defer statements for resetting stubs with a single defer to
    clean up resources more efficiently.

    pkg/frontend/export_test.go [138-156]

    -defer stubs.Reset()
    -defer stubs.Reset()
    -defer stubs.Reset()
    +defer stubs.Reset()  // Single defer statement to reset stubs after all operations
     
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: Consolidating multiple defer statements into a single one is a good practice for optimizing resource cleanup, although it provides only a minor performance improvement.

    7

    @@ -1442,9 +1442,9 @@ const (

    checkStageFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_name = "%s" order by stage_id;`

    checkStageStatusFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_status = "%s" order by stage_id;`
    //checkStageStatusFormat = `select stage_id, stage_name from mo_catalog.mo_stages where stage_status = "%s" order by stage_id;`
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Just remove it, if it wil not be used anymore.

    @@ -1522,20 +1522,25 @@ func getSqlForCheckStage(ctx context.Context, stage string) (string, error) {
    return fmt.Sprintf(checkStageFormat, stage), nil
    }

    /*
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    same as above

    if err != nil {
    return err
    }

    // detect filepath contain stage or not
    filePath = ep.FilePath
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    The logic is changed in this PR. There is no need to query the stage record and check it. Am I right?

    Copy link
    Contributor Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    enable/disable in stage_status is not valid. The path expansion is done in pkg/sql/plan/function/stage_util.go now.

    bh.sql2result["commit;"] = nil
    bh.sql2result["rollback;"] = nil

    err := doCheckFilePath(ctx, ses, cs.Ep)
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Due the change of implementation of the doCheckFilePath. So the UT is removed in this PR. But doCheckFilePath needs a simpler UT instead of none.

    filePath = getExportFilePath(ep.userConfig.FilePath, ep.FileCnt)
    }
    ep.File, err = OpenFile(filePath, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0o666)

    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Fileservice unify the operation on the file.

    @@ -557,16 +476,15 @@ func addEscapeToString(s []byte) []byte {
    return ret
    }

    func exportDataToCSVFile(oq *ExportConfig) error {
    if !oq.OutTofile {
    return exportDataToCSVFile2(oq)
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Why this branch should be removed?

    symbol := oq.Symbol
    closeby := oq.userConfig.Fields.EnclosedBy.Value
    flag := oq.ColumnFlag

    buffer := &bytes.Buffer{}
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    bytes.Buffer allocate memory every time. the memory may explode

    convey.So(writeToCSVFile(ep, output), convey.ShouldNotBeNil)

    ep.Rows = 1
    stubs := gostub.StubFunc(&Flush, moerr.NewInternalError(context.TODO(), "Flush error"))
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Means, these variables has beend removed in the PR.

    Copy link
    Contributor Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    yes, Writer is removed. We only support AsyncWriter and AsyncReader. For LineSize, it is same as FileSize. variables should be removed.

    exportParam.FileService = getGlobalPu().FileService
    exportParam.Ctx = ctx
    defer func() {
    exportParam.LineBuffer = nil
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    these two buffers for resuing memory. Is it really necessary to remove them?

    Copy link
    Contributor Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    LineBuffer should be a local variable inside a function. Not need to be a member variable.
    LineStr is a also local variable.

    err := updateTimeZone(ctx, ses, ses.GetSessionSysVars(), "time_zone", "system")
    assert.NoError(t, err)
    assert.Equal(t, ses.GetTimeZone().String(), "Local")
    /*
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Why this UT should be removed.

    Copy link
    Contributor Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    from my machihe, the return value is UTC but not local. The result is not always True

    @cpegeric cpegeric closed this Aug 8, 2024
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants