Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sort-merge join #147

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/TSFrame.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ struct TSFrame
throw(ArgumentError("only Int and TimeType index is supported"))
end

if (DataFrames.ncol(coredata) == 1)
if (DataFrames.ncol(coredata) == 1) && propertynames(coredata)[1] !== :Index
TSFrame(coredata, collect(Base.OneTo(DataFrames.nrow(coredata))); issorted = issorted, copycols = copycols)
end

Expand Down
197 changes: 189 additions & 8 deletions src/join.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
joinmap = Dict(
:JoinInner=>DataFrames.innerjoin,
:JoinBoth=>DataFrames.innerjoin,
:JoinOuter=>DataFrames.outerjoin,
:JoinAll=>DataFrames.outerjoin,
:JoinLeft=>DataFrames.leftjoin,
:JoinRight=>DataFrames.rightjoin
:JoinInner => :inner,
:JoinBoth => :inner,
:JoinOuter => :outer,
:JoinAll => :outer,
:JoinLeft => :left,
:JoinRight => :right,
:outerjoin => :outer,
:innerjoin => :inner,
:leftjoin => :left,
:rightjoin => :right,
:inner => :inner,
:outer => :outer,
:left => :left,
:right => :right,
)

"""
Expand Down Expand Up @@ -71,6 +79,16 @@ where `jointype` must be one of `:JoinInner`, `:JoinBoth`, `:JoinOuter`, `:JoinA

`cbind` is an alias for `join` method.

## Using the `DataFrames` join methods

DataFrames.jl's join methods are battle-tested, and handle quite a few error cases which `TSFrames.join` may not.
In order to use DataFrames' join methods, which are somewhat slower than `TSFrames.join`, you would have to
join the TSFrames' internal DataFrames, then construct a new TSFrame. For `ts1::TSFrame`, `ts2::TSFrame`,
this is how you would construct an outer join:
```julia
TSFrame(DataFrames.outerjoin(ts1.coredata, ts2.coredata; makeunique = true))
```

# Examples
```jldoctest; setup = :(using TSFrames, DataFrames, Dates, Random, Statistics)
julia> using Random;
Expand Down Expand Up @@ -294,12 +312,175 @@ function Base.join(
ts...;
jointype::Symbol=:JoinAll
)
result = joinmap[jointype](ts1.coredata, ts2.coredata, on=:Index, makeunique=true)
result = TSFrames.fast_join(ts1, ts2; method = joinmap[jointype])
for tsf in ts
result = joinmap[jointype](result, tsf.coredata, on=:Index, makeunique=true)
result = TSFrames.fast_join(result, tsf; method = joinmap[jointype])
end
return TSFrame(result)
end

# alias
cbind = join

# EXPERIMENTAL: basic merge-join algorithm

# requirements:
# `allunique(left) & allunique(right)` # TODO: look at this issue.
# `issorted(left) & issorted(right)`
function sort_merge_idx(left::AbstractVector, right::AbstractVector, ::Val{DoLeft}, ::Val{DoRight}) where {DoLeft, DoRight}
# iteration variables
i = 1
j = 1
k = 1

length_left, length_right = length(left), length(right)

merged_length = (DoLeft ? length(left) : 0) + (DoRight ? length(right) : 0)

result = DataFrames.similar_outer(left, right, merged_length)
idx_left = Vector{Int32}(undef, length(left))
idx_right = Vector{Int32}(undef, length(right))

@inbounds begin
while (i <= length_left) && (j <= length_right)
if left[i] < right[j]
result[k] = left[i]
idx_left[i] = k
i += 1
elseif left[i] > right[j]
result[k] = right[j]
idx_right[j] = k
j += 1
else # equal - true in all cases!
result[k] = left[i]
idx_left[i] = k
idx_right[j] = k
i += 1
j += 1
end
k += 1
end
DoLeft && while i <= length_left
result[k] = left[i]
idx_left[i] = k
i += 1
k += 1
end
DoRight && while j <= length_right
result[k] = right[j]
idx_right[j] = k
j += 1
k += 1
end
end
(i-1) > length(idx_left) && resize!(idx_left, i - 1)
(j-1) > length(idx_right) && resize!(idx_right, j - 1)
resize!(result, k - 1)
return result, idx_left, idx_right
end

function fast_join(left::TSFrame, right::TSFrame; method = :outer)

@assert method in (:inner, :outer, :left, :right)

if method == :outer
do_left = Val(true)
do_right = Val(true)
elseif method == :inner
do_left = Val(false)
do_right = Val(false)
elseif method == :left
do_left = Val(true)
do_right = Val(false)
elseif method == :right
do_left = Val(true)
do_right = Val(true)
end

merged_idx, merged_idx_left, merged_idx_right = sort_merge_idx(index(left), index(right), Val(true), Val(true))

merged_length = length(merged_idx)

# TODO: add this feature,
# if all three arrays have the same length, then the indices are the same
# and we can go down a faster path of simple concatenation.
# add_missings = !(length(merged_idx) == length(left) == length(right))

# this machinery disambiguates column names
# It doesn't take too much time, but it would be cleaner to somehow use
# DataFrames' machinery here.
left_colnames = setdiff(Tables.columnnames(left.coredata), (:Index,))
right_colnames = setdiff(Tables.columnnames(right.coredata), (:Index,))
left_colidxs = Tables.columnindex.((left.coredata,), left_colnames)
right_colidxs = Tables.columnindex.((right.coredata,), right_colnames)
disambiguated_right_colnames = deepcopy(right_colnames)

# disambiguate col names
for (ind, colname) in enumerate(right_colnames)
leftind = findfirst(==(colname), left_colnames)
isnothing(leftind) && continue
unique_indicator = Symbol(colname)
try_idx = 0
while in(unique_indicator, left_colnames)
try_idx += 1
unique_indicator = Symbol(colname, "_", try_idx)
end
disambiguated_right_colnames[ind] = unique_indicator
end


# Construct the DataFrame
result = DataFrame(:Index => merged_idx; makeunique = false, copycols = false)
left_coredata = left.coredata
right_coredata = right.coredata

# Store the data from the left table in the result
for idx in 1:length(left_colnames)
col_idx = left_colidxs[idx]
contents = DataFrames.similar_missing(left.coredata[!, col_idx], merged_length)
@inbounds contents[merged_idx_left] = left_coredata[!, col_idx]
result[!, left_colnames[idx]] = contents
end

# Store the data from the right table in the result
for idx in 1:length(right_colnames)
col_idx = right_colidxs[idx]
contents = DataFrames.similar_missing(right.coredata[!, col_idx], merged_length)
@inbounds contents[merged_idx_right] = right_coredata[!, col_idx]
# note that column names have to be disambiguated
result[!, disambiguated_right_colnames[idx]] = contents
end

return TSFrame(result, :Index; issorted = true, copycols = false)

end

# # as of 22-Jan-22, the timer outputs are as follows:
# BenchmarkTools.Trial: 100 samples with 1 evaluation.
# Range (min … max): 61.627 ms … 287.822 ms ┊ GC (min … max): 0.00% … 71.47%
# Time (median): 76.965 ms ┊ GC (median): 0.00%
# Time (mean ± σ): 94.324 ms ± 47.940 ms ┊ GC (mean ± σ): 22.42% ± 21.94%

# ▄█▆▇▄
# ▄█████▆▁▁▆▅▃▄▃▁▁▃▁▁▁▁▁▃▃▁▁▃▃▁▁▁▃▁▁▁▁▃▁▁▃▃▁▁▁▁▁▁▁▁▁▁▁▁▃▃▁▁▁▃▃ ▃
# 61.6 ms Histogram: frequency by time 270 ms <

# Memory estimate: 584.75 MiB, allocs estimate: 127.

# ──────────────────────────────────────────────────────────────────────────────────
# Time Allocations
# ─────────────────────── ────────────────────────
# Tot / % measured: 43.8s / 93.7% 244GiB / 99.8%

# Section ncalls time %tot avg alloc %tot avg
# ──────────────────────────────────────────────────────────────────────────────────
# column building 854 23.0s 56.1% 26.9ms 74.2GiB 30.4% 89.0MiB
# column population 854 13.5s 32.9% 15.8ms 4.82MiB 0.0% 5.78KiB
# column allocation 854 9.49s 23.1% 11.1ms 74.2GiB 30.4% 89.0MiB
# column transfer 854 16.9ms 0.0% 19.8μs 1.02MiB 0.0% 1.23KiB
# sort_merge_idx 427 12.4s 30.1% 29.0ms 95.4GiB 39.1% 229MiB
# TSFrame construction 427 5.64s 13.8% 13.2ms 74.2GiB 30.4% 178MiB
# column disambiguation 427 6.44ms 0.0% 15.1μs 709KiB 0.0% 1.66KiB
# ──────────────────────────────────────────────────────────────────────────────────