Skip to content

Commit

Permalink
Support multithreading in groupreduce
Browse files Browse the repository at this point in the history
Keep the default to a single thread until we find a reliable way of
predicting a reasonably optimal number of threads.
  • Loading branch information
nalimilan committed Nov 25, 2020
1 parent 8645651 commit 2d57734
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 130 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
JULIA_NUM_THREADS: 2
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
Expand Down
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# DataFrames v1.0 Release Notes

## New functionalities

* `combine`, `select` and `transform` with `GroupedDataFrame` now accept
a `nthreads` argument which enables multithreading for some optimized
grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)).


# DataFrames v0.22 Release Notes

## Breaking changes
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrames.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module DataFrames
using Statistics, Printf, REPL
using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays, CategoricalArrays
@reexport using Missings, InvertedIndices
using Base.Sort, Base.Order, Base.Iterators
using Base.Sort, Base.Order, Base.Iterators, Base.Threads
using TableTraits, IteratorInterfaceExtensions
import LinearAlgebra: norm
using Markdown
Expand Down
26 changes: 20 additions & 6 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,10 @@ end
select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
select(args::Callable, df::DataFrame; renamecols::Bool=true)
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
ungroup::Bool=true, renamecols::Bool=true)
ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1)
select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Integer=1)
Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result is guaranteed to have the same number of rows
Expand All @@ -664,6 +665,9 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
currently has an effect only for some optimized grouped reductions. Values higher than
`Threads.nthreads()` will be replaced with that value.
# Examples
```jldoctest
Expand Down Expand Up @@ -858,9 +862,11 @@ end
transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
transform(f::Callable, df::DataFrame; renamecols::Bool=true)
transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Integer=1)
transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Integer=1)
Create a new data frame that contains columns from `df` or `gd` plus columns
specified by `args` and return it. The result is guaranteed to have the same
Expand All @@ -877,6 +883,9 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
currently has an effect only for some optimized grouped reductions. Values higher than
`Threads.nthreads()` will be replaced with that value.
Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false`
is needed to be able to return a different value for the grouping column:
Expand Down Expand Up @@ -924,9 +933,11 @@ end
combine(df::AbstractDataFrame, args...; renamecols::Bool=true)
combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true)
combine(gd::GroupedDataFrame, args...;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Integer=1)
combine(f::Base.Callable, gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Integer=1)
Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result can have any number of rows that is determined
Expand All @@ -941,6 +952,9 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
currently has an effect only for some optimized grouped reductions. Values higher than
`Threads.nthreads()` will be replaced with that value.
# Examples
```jldoctest
Expand Down
137 changes: 104 additions & 33 deletions src/groupeddataframe/fastaggregates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,84 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
end

function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame)
incol::AbstractVector, gd::GroupedDataFrame, nthreads::Integer)
n = length(gd)
groups = gd.groups
if adjust !== nothing || checkempty
counts = zeros(Int, n)
end
groups = gd.groups
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
nt = min(nthreads, Threads.nthreads())
if nt <= 1 || axes(incol) != axes(groups)
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
else
res_vec = Vector{typeof(res)}(undef, nt)
# needs to be always allocated to fix type instability with @threads
counts_vec = Vector{Vector{Int}}(undef, nt)
res_vec[1] = res
if adjust !== nothing || checkempty
counts_vec[1] = counts
end
for i in 2:nt
res_vec[i] = copy(res)
if adjust !== nothing || checkempty
counts_vec[i] = zeros(Int, n)
end
end
Threads.@threads for tid in 1:nt
res′ = res_vec[tid]
if adjust !== nothing || checkempty
counts[gix] += 1
counts′ = counts_vec[tid]
end
start = 1 + ((tid - 1) * length(groups)) ÷ nt
stop = (tid * length(groups)) ÷ nt
@inbounds for i in start:stop
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res′) === Any && !isassigned(res′, gix)
res′[gix] = f(x, gix)
else
res′[gix] = op(res′[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts′[gix] += 1
end
end
end
end
for i in 2:length(res_vec)
resi = res_vec[i]
@inbounds @simd for j in eachindex(res)
# this check should be optimized out if eltype is not Any
if eltype(res) === Any
if isassigned(resi, j) && isassigned(res, j)
res[j] = op(res[j], resi[j])
elseif isassigned(resi, j)
res[j] = resi[j]
end
else
res[j] = op(res[j], resi[j])
end
end
end
if adjust !== nothing || checkempty
for i in 2:length(counts_vec)
counts .+= counts_vec[i]
end
end
end
Expand Down Expand Up @@ -218,26 +278,31 @@ end

# function barrier works around type instability of groupreduce_init due to applicable
groupreduce(f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame,
nthreads::Integer) =
groupreduce!(groupreduce_init(op, condf, adjust, incol, gd),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads)
# Avoids the overhead due to Missing when computing reduction
groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame,
nthreads::Integer) =
groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads)

(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Integer=1) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads)

# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
# TODO: remove this when we drop 1.0 support
if VERSION < v"1.1"
Base.zero(::Type{Missing}) = missing
end

function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Integer=1)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false,
incol, gd, nthreads)
# !ismissing check is purely an optimization to avoid a copy later
if eltype(means) >: Missing && agg.condf !== !ismissing
T = Union{Missing, real(eltype(means))}
Expand All @@ -247,32 +312,38 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
res = zeros(T, length(gd))
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
false, incol, gd)
false, incol, gd, nthreads)
end

function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
outcol = Aggregate(var, agg.condf)(incol, gd)
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Integer=1)
outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads)
if eltype(outcol) <: Union{Missing, Rational}
return sqrt.(outcol)
else
return map!(sqrt, outcol, outcol)
end
end

for f in (first, last)
function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame)
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
for f in (:first, :last)
# Without using @eval the presence of a keyword argument triggers a Julia bug
@eval begin
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Integer=1)
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
end
end
end
end

function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame)
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Integer=1)
if getfield(gd, :idx) === nothing
lens = zeros(Int, length(gd))
@inbounds for gix in gd.groups
Expand Down
Loading

0 comments on commit 2d57734

Please sign in to comment.