Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: implementing the select object interface #143

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/*
Manifest.toml
Manifest.toml
.minio*
2 changes: 2 additions & 0 deletions src/AWSS3.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export S3Path, s3_arn, s3_put, s3_get, s3_get_file, s3_exists, s3_delete, s3_cop
s3_sign_url, s3_begin_multipart_upload, s3_upload_part,
s3_complete_multipart_upload, s3_multipart_upload,
s3_get_tags, s3_put_tags, s3_delete_tags
export S3InputSerialization, S3OutputSerialization, s3_select_object_content, s3select

using AWS
using AWS.AWSServices: s3
Expand Down Expand Up @@ -839,6 +840,7 @@ function s3_nuke_bucket(aws::AbstractAWSConfig, bucket_name)
end


include("s3select.jl")
include("s3path.jl")

end #module AWSS3
Expand Down
4 changes: 4 additions & 0 deletions src/s3path.jl
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,7 @@ function s3_get_name(prefix::String, s::String)
tokenized = split(subkey, "/"; limit=2)
return length(tokenized) == 2 ? first(tokenized) * "/" : first(tokenized)
end

function s3select(fp::S3Path, expression, args::AbstractDict{String,<:Any}=Dict{String,Any}(); kwargs...)
s3_select_object_content(fp.config, fp.bucket, fp.key, expression, args; kwargs...)
end
103 changes: 103 additions & 0 deletions src/s3select.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@

abstract type S3SerializationStruct end

function _key_string(field::Symbol)
strs = split(string(field), '_')
join(uppercasefirst.(strs), "")
end

s3_serialization_dict(kwargs) = Dict(_key_string(k)=>v for (k,v) ∈ kwargs)

Base.Dict(s::S3SerializationStruct) = s.dict


"""
S3InputSerialization

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_InputSerialization.html
"""
module S3InputSerialization
using AWSS3: S3SerializationStruct, s3_serialization_dict

"""
S3InputSerialization.CSV

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVInput.html
"""
struct CSV <: S3SerializationStruct
dict::Dict{String,Any}
end
CSV(;kwargs...) = s3_serialization_dict(kwargs)


"""
S3InputSerialization.JSON

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONInput.html
"""
struct JSON <: S3SerializationStruct
dict::Dict{String,Any}
end
JSON(;kwargs...) = JSON(s3_serialization_dict(kwargs))

"""
S3InputSerialization.Parquet

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ParquetInput.html
"""
struct Parquet <: S3SerializationStruct
dict::Dict{String,Any}
end
Parquet(;kwargs...) = Parquet(s3_serialization_dict(kwargs))
end

"""
S3OutputSerialization

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_OutputSerialization.html
"""
module S3OutputSerialization
using AWSS3: S3SerializationStruct, s3_serialization_dict

"""
S3OutputSerialization.CSV

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CSVOutput.html
"""
struct CSV <: S3SerializationStruct
dict::Dict{String,Any}
end
CSV(;kwargs...) = CSV(s3_serialization_dict(kwargs))

"""
S3OutputSerialization.JSON

see AWS API docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_JSONOutput.html
"""
struct JSON <: S3SerializationStruct
dict::Dict{String,Any}
end
JSON(;kwargs...) = JSON(s3_serialization_dict(kwargs))
end

# for now this just looks at the key
function _infer_input_serialization(aws::AbstractAWSConfig, bucket, key)
if endswith(key, "csv")
S3InputSerialization.CSV()
elseif endswith(key, "json")
S3InputSerialization.JSON()
else
S3InputSerialization.Parquet()
end
end

function s3_select_object_content(aws::AbstractAWSConfig, bucket, key, expression,
args::AbstractDict{String,<:Any}=Dict{String,Any}();
expression_type::AbstractString="SQL",
input_serialization::Union{Nothing,S3SerializationStruct}=nothing,
output_serialization::S3SerializationStruct=S3OutputSerialization.CSV())
input_serialization ≡ nothing && (input_serialization = _infer_input_serialization(aws, bucket, key))
S3.select_object_content(bucket, expression, expression_type, Dict(input_serialization), key,
Dict(output_serialization), args, aws_config=aws)
end
s3_select_object_content(a...; b...) = s3_select_object_content(global_aws_config(), a...; b...)
8 changes: 8 additions & 0 deletions test/select_experiments.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using AWSS3, Minio

s = Minio.Server(@__DIR__)

csv = S3Path("s3://testbucket/test.csv", config=MinioConfig(s))

test(csv) = s3select(csv, "select * from s3object s where s.C >= 4")

6 changes: 6 additions & 0 deletions test/testbucket/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
A,B,C
0.5231510582131029,kirk,1
0.7377557522717386,spock,2
0.1847573737369297,bones,3
0.691671880638087,scotty,4
0.30899820492077623,uhura,5