Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract TapedTask try-catch method #120

Merged
merged 1 commit into from
Feb 18, 2022

Conversation

rikhuijzer
Copy link
Contributor

@rikhuijzer rikhuijzer commented Feb 18, 2022

I'm still working a bit through the code in order to find performance gains. This PR applies an extract method refactoring to make the TapedTask easier to read. It has no negative nor positive impact on performance. I've verified this by putting a @show on the TapedTask(tf, args...) call and that shows (32 allocations: 2.312 KiB) in both versions.

Because the git diff is very unhelpful, here are the before and after:

master branch

function TapedTask(tf::TapedFunction, args...)
    produce_ch = Channel()
    consume_ch = Channel{Int}()
    task = @task try
        producer = () -> begin
            ttask = current_task().storage[:tapedtask]
            if length(ttask.produced_val) > 0
                val = pop!(ttask.produced_val)
                put!(ttask.produce_ch, val)
                take!(ttask.consume_ch) # wait for next consumer
            end
        end
        tf(args...; callback=producer)
    catch e
        bt = catch_backtrace()
        put!(produce_ch, TapedTaskException(e, bt))
        # @error "TapedTask Error: " exception=(e, bt)
        rethrow()
    finally
        @static if VERSION >= v"1.4"
            # we don't do this under Julia 1.3, because `isempty` always hangs on
            # an empty channel.
            while !isempty(produce_ch)
                yield()
            end
        end
        close(produce_ch)
        close(consume_ch)
    end
    t = TapedTask(task, tf, produce_ch, consume_ch)
    task.storage === nothing && (task.storage = IdDict())
    task.storage[:tapedtask] = t
    tf.owner = t
    return t
end

this PR

function wrap_task(tf, produce_ch, consume_ch, args...)
    try
        producer = () -> begin
            ttask = current_task().storage[:tapedtask]
            if length(ttask.produced_val) > 0
                val = pop!(ttask.produced_val)
                put!(ttask.produce_ch, val)
                take!(ttask.consume_ch) # wait for next consumer
            end
        end
        tf(args...; callback=producer)
    catch e
        bt = catch_backtrace()
        put!(produce_ch, TapedTaskException(e, bt))
        # @error "TapedTask Error: " exception=(e, bt)
        rethrow()
    finally
        @static if VERSION >= v"1.4"
            # we don't do this under Julia 1.3, because `isempty` always hangs on
            # an empty channel.
            while !isempty(produce_ch)
                yield()
            end
        end
        close(produce_ch)
        close(consume_ch)
    end
end

function TapedTask(tf::TapedFunction, args...)
    produce_ch = Channel()
    consume_ch = Channel{Int}()
    task = @task wrap_task(tf, produce_ch, consume_ch, args...)
    t = TapedTask(task, tf, produce_ch, consume_ch)
    task.storage === nothing && (task.storage = IdDict())
    task.storage[:tapedtask] = t
    tf.owner = t
    return t
end

@yebai
Copy link
Member

yebai commented Feb 18, 2022

It makes sense to me to separate the task function for better clarity.

@rikhuijzer rikhuijzer merged commit c81dab8 into master Feb 18, 2022
@delete-merged-branch delete-merged-branch bot deleted the rh/refactor-tapedtask-try-catch branch February 18, 2022 22:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants