Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreading in groupreduce #2491

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ jobs:
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 2
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

## New functionalities

* `combine`, `select` and `transform` with `GroupedDataFrame` now have
support multithreading for some optimized grouped reductions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
support multithreading for some optimized grouped reductions.
support multithreading for some optimized grouped reductions.

This can be enabled using an experimental global option via
`DataFrames.NTHREADS[] = n` (with n > 1)
([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)).

## Deprecated

Expand Down
4 changes: 3 additions & 1 deletion src/DataFrames.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module DataFrames
using Statistics, Printf, REPL
using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays
@reexport using Missings, InvertedIndices
using Base.Sort, Base.Order, Base.Iterators
using Base.Sort, Base.Order, Base.Iterators, Base.Threads
using TableTraits, IteratorInterfaceExtensions
import LinearAlgebra: norm
using Markdown
Expand Down Expand Up @@ -91,6 +91,8 @@ else
export only
end

const NTHREADS = Ref(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a docstring?


include("other/utils.jl")
include("other/index.jl")

Expand Down
82 changes: 72 additions & 10 deletions src/groupeddataframe/fastaggregates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,80 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo
counts = zeros(Int, n)
end
groups = gd.groups
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
nt = min(NTHREADS[], Threads.nthreads())
if nt <= 1 || axes(incol) != axes(groups)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when axes might be different?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since groups is a Vector, that could happen only when incol doesn't use 1-based indexing. It's not completely clear whether we support it but it's probably safer the check that.

@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
else
res_vec = Vector{typeof(res)}(undef, nt)
# needs to be always allocated to fix type instability with @threads
counts_vec = Vector{Vector{Int}}(undef, nt)
res_vec[1] = res
if adjust !== nothing || checkempty
counts_vec[1] = counts
end
for i in 2:nt
res_vec[i] = copy(res)
if adjust !== nothing || checkempty
counts[gix] += 1
counts_vec[i] = zeros(Int, n)
end
end
@sync for tid in 1:nt
Threads.@spawn begin
res′ = res_vec[tid]
if adjust !== nothing || checkempty
counts′ = counts_vec[tid]
end
start = 1 + ((tid - 1) * length(groups)) ÷ nt
stop = (tid * length(groups)) ÷ nt
Comment on lines +203 to +204
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid overflow on 32-bit machines, on 64-bit machine this is a no-op

Suggested change
start = 1 + ((tid - 1) * length(groups)) ÷ nt
stop = (tid * length(groups)) ÷ nt
start = 1 + ((tid - 1) * Int64(length(groups))) ÷ nt
stop = (tid * Int64(length(groups))) ÷ nt

@inbounds for i in start:stop
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res′) === Any && !isassigned(res′, gix)
res′[gix] = f(x, gix)
else
res′[gix] = op(res′[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts′[gix] += 1
end
end
end
end
end
for i in 2:length(res_vec)
resi = res_vec[i]
@inbounds @simd for j in eachindex(res)
# this check should be optimized out if eltype is not Any
if eltype(res) === Any
if isassigned(resi, j) && isassigned(res, j)
res[j] = op(res[j], resi[j])
elseif isassigned(resi, j)
res[j] = resi[j]
end
else
res[j] = op(res[j], resi[j])
end
end
end
if adjust !== nothing || checkempty
for i in 2:length(counts_vec)
counts .+= counts_vec[i]
end
end
end
Expand Down
Loading