diff --git a/.gitignore b/.gitignore index 5a19a513..c02bb677 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/* -Manifest.toml \ No newline at end of file +Manifest.toml +.minio* diff --git a/src/AWSS3.jl b/src/AWSS3.jl index 13dcb4db..624a39c7 100644 --- a/src/AWSS3.jl +++ b/src/AWSS3.jl @@ -18,6 +18,7 @@ export S3Path, s3_arn, s3_put, s3_get, s3_get_file, s3_exists, s3_delete, s3_cop s3_sign_url, s3_begin_multipart_upload, s3_upload_part, s3_complete_multipart_upload, s3_multipart_upload, s3_get_tags, s3_put_tags, s3_delete_tags +export S3InputSerialization, S3OutputSerialization, s3_select_object_content, s3select using AWS using AWS.AWSServices: s3 @@ -839,6 +840,7 @@ function s3_nuke_bucket(aws::AbstractAWSConfig, bucket_name) end +include("s3select.jl") include("s3path.jl") end #module AWSS3 diff --git a/src/s3path.jl b/src/s3path.jl index 5fe860bf..405f14d1 100644 --- a/src/s3path.jl +++ b/src/s3path.jl @@ -382,3 +382,7 @@ function s3_get_name(prefix::String, s::String) tokenized = split(subkey, "/"; limit=2) return length(tokenized) == 2 ? first(tokenized) * "/" : first(tokenized) end + +function s3select(fp::S3Path, expression, args::AbstractDict{String,<:Any}=Dict{String,Any}(); kwargs...) + s3_select_object_content(fp.config, fp.bucket, fp.key, expression, args; kwargs...) +end diff --git a/src/s3select.jl b/src/s3select.jl new file mode 100644 index 00000000..0755981b --- /dev/null +++ b/src/s3select.jl @@ -0,0 +1,103 @@ + +abstract type S3SerializationStruct end + +function _key_string(field::Symbol) + strs = split(string(field), '_') + join(uppercasefirst.(strs), "") +end + +s3_serialization_dict(kwargs) = Dict(_key_string(k)=>v for (k,v) ∈ kwargs) + +Base.Dict(s::S3SerializationStruct) = s.dict + + +""" + S3InputSerialization + +see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_InputSerialization.html +""" +module S3InputSerialization + using AWSS3: S3SerializationStruct, s3_serialization_dict + + """ + S3InputSerialization.CSV + + see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html + """ + struct CSV <: S3SerializationStruct + dict::Dict{String,Any} + end + CSV(;kwargs...) = s3_serialization_dict(kwargs) + + + """ + S3InputSerialization.JSON + + see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONInput.html + """ + struct JSON <: S3SerializationStruct + dict::Dict{String,Any} + end + JSON(;kwargs...) = JSON(s3_serialization_dict(kwargs)) + + """ + S3InputSerialization.Parquet + + see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ParquetInput.html + """ + struct Parquet <: S3SerializationStruct + dict::Dict{String,Any} + end + Parquet(;kwargs...) = Parquet(s3_serialization_dict(kwargs)) +end + +""" + S3OutputSerialization + +see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_OutputSerialization.html +""" +module S3OutputSerialization + using AWSS3: S3SerializationStruct, s3_serialization_dict + + """ + S3OutputSerialization.CSV + + see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVOutput.html + """ + struct CSV <: S3SerializationStruct + dict::Dict{String,Any} + end + CSV(;kwargs...) = CSV(s3_serialization_dict(kwargs)) + + """ + S3OutputSerialization.JSON + + see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONOutput.html + """ + struct JSON <: S3SerializationStruct + dict::Dict{String,Any} + end + JSON(;kwargs...) = JSON(s3_serialization_dict(kwargs)) +end + +# for now this just looks at the key +function _infer_input_serialization(aws::AbstractAWSConfig, bucket, key) + if endswith(key, "csv") + S3InputSerialization.CSV() + elseif endswith(key, "json") + S3InputSerialization.JSON() + else + S3InputSerialization.Parquet() + end +end + +function s3_select_object_content(aws::AbstractAWSConfig, bucket, key, expression, + args::AbstractDict{String,<:Any}=Dict{String,Any}(); + expression_type::AbstractString="SQL", + input_serialization::Union{Nothing,S3SerializationStruct}=nothing, + output_serialization::S3SerializationStruct=S3OutputSerialization.CSV()) + input_serialization ≡ nothing && (input_serialization = _infer_input_serialization(aws, bucket, key)) + S3.select_object_content(bucket, expression, expression_type, Dict(input_serialization), key, + Dict(output_serialization), args, aws_config=aws) +end +s3_select_object_content(a...; b...) = s3_select_object_content(global_aws_config(), a...; b...) diff --git a/test/select_experiments.jl b/test/select_experiments.jl new file mode 100644 index 00000000..27832f0f --- /dev/null +++ b/test/select_experiments.jl @@ -0,0 +1,8 @@ +using AWSS3, Minio + +s = Minio.Server(@__DIR__) + +csv = S3Path("s3://testbucket/test.csv", config=MinioConfig(s)) + +test(csv) = s3select(csv, "select * from s3object s where s.C >= 4") + diff --git a/test/testbucket/test.csv b/test/testbucket/test.csv new file mode 100644 index 00000000..92830226 --- /dev/null +++ b/test/testbucket/test.csv @@ -0,0 +1,6 @@ +A,B,C +0.5231510582131029,kirk,1 +0.7377557522717386,spock,2 +0.1847573737369297,bones,3 +0.691671880638087,scotty,4 +0.30899820492077623,uhura,5