Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Channel#try_send, a non-blocking Channel#send #12694

Closed

Conversation

carlhoerberg
Copy link
Contributor

@carlhoerberg carlhoerberg commented Oct 30, 2022

Instead of having to write

select
when @channel.send 1 then true
else false
end

and paying the to_a price in select_impl when wanting to send to a Channel non blocking.

In one of our apps we use this pattern a lot:

@flow = true
@flow_change = Channel(Bool).new

def wait_for_flow
  return if @flow
  until @flow_change.receive
  end
end

def flow=(val)
  @flow = val
  while @flow_change.try_send(val) # notify all flow change subscribers
  end
end

So the flow_change channel may or may not have receivers.

@straight-shoota
Copy link
Member

I think it might be good to add this behavioral variant.

But the name #send? is confusing. It would suggest similar semantics as #receive?, which returns nil instead of raising ClosedError. But that's something different. This method still raises ClosedError. The only difference to #send is that it returns instead of waiting when immediate delivery is impossible.
Not sure what would be a better name, tho. #try_send?? 🤔

@asterite
Copy link
Member

I think instead we should change the code to avoid the to_a call. For example there's no need to call to_a if the Indexable has a single element.

@asterite
Copy link
Member

I tried to optimize this a bit, but the main issue is that even with the optimization the code is translated to create a SendAction, which is a class so it allocates memory.

I think the right thing to do is, yes, to introduce some form of send?, but also change the expansion of select ... with just one channel.send to use send? instead.

In my mind select should be the only primitive to use (like in Go) and it's the compiler's responsibility to optimize it for the single case, not the user's responsibility.

@asterite
Copy link
Member

I'm also surprised to learn that a select expression creates a class for each of the actions (like SendAction) so memory is allocated on every select. Really bad! I think it should be done in a way that no memory is ever allocated. Like, probably SendAction should be a struct, and we create a static array to hold those values in the stack. Then use pointers to refer to those. I think Go might be doing the same thing.

Then we can optimize for the case of 1 select action. In the end it will be as efficient as send?, I think.

It's more work for the compiler/us, but it's work that only needs to be done once by us, and not put the burden on developers.

@asterite
Copy link
Member

Also it seems an Indexable is passed to select_impl. In the actual implementation that's a tuple. But that's not good if we need to sort the values. So I would change the implementation to use a Static Array and pass a slice to the method. Then the slice is mutable so the data can be sorted in-place. And maybe then the data can also be structs to avoid memory allocations. But I see some pointers being passed around, so it's a tricky refactor. But even though it'll take some time to get it right, it's probably the best thing to do.

@yxhuvud
Copy link
Contributor

yxhuvud commented Oct 30, 2022

In my mind select should be the only primitive to use

Considering that a very common use case is to wrap the select in a loop, I could see a second variant of select that comes with the loop built in and some predefined exit behavior semantics. That would allow for optimizations that is not really reasonable with select as it looks today. For example if you are selecting off several different things it would allow you to not tear down and then set up the waiting again each iteration of the loop - it would suffice to redo the one that triggered.

@caspiano caspiano changed the title Non blocking Channel#send? Add Channel#try_send, a non-blocking Channel#send Oct 31, 2022
src/channel.cr Outdated Show resolved Hide resolved
src/channel.cr Outdated Show resolved Hide resolved
spec/std/channel_spec.cr Outdated Show resolved Hide resolved
spec/std/channel_spec.cr Outdated Show resolved Hide resolved
@beta-ziliani
Copy link
Member

Hi @carlhoerberg , we'd like to move forward with this. What do you think of straight-shoota's observations? Do you want us to take it to completion? Thanks!

@asterite
Copy link
Member

we'd like to move forward with this

Really? Could you show an equivalent API for Go? And if Go doesn't have one, why do we need one? I think it might be better to fix select. I know it's harder, but it's the right thing to do.

carlhoerberg and others added 3 commits November 28, 2022 15:50
Co-authored-by: Johannes Müller <[email protected]>
Co-authored-by: Johannes Müller <[email protected]>
Co-authored-by: Johannes Müller <[email protected]>
@carlhoerberg
Copy link
Contributor Author

Isnt this less verbose than any select, fixed or not?

@asterite
Copy link
Member

I guess that's right. My only concern is that eventually if we fix select to be as fast as try_send then there will be two ways to do the same thing. Maybe it's not a big deal. We could also eventually rewrite select to try_send if we detect they are the same.

@bcardiff
Copy link
Member

The implementation seems correct, but I worry about increasing Channel's API. It's easy for things go wrong and not having a single code for sending things through the channel seems like a bad idea. In this case the whole method is blocking so it should be safe, but in general things like

crystal/src/channel.cr

Lines 453 to 457 in 1b93218

# Because `channel#close` may clean up a long list, `select_context.try_trigger` may
# be called after the select return. In order to prevent invalid address access,
# the state is allocated in the heap.
shared_state = SelectContextSharedState.new(SelectState::Active)
contexts = ops.map &.create_context_and_wait(shared_state)
makes me shiver.

We have receive/receive? which are both blocking, having try_send to be non-blocking seems confusing and opens the question whether we are going to add try_receive/send? for consistency. IMO it will not be trivial to remember that ? is blocking but try_ is not.

I also wonder if people would expect to write select; when ch1.try_send ...; when ch2.try_send ...; which will not compile because there is no matching _action method.

If select;when ch.send x then true; else false; end is a common pattern, I would prefer to put that in a macro or method and optimize the select_impl implementation a bit.

From what I see, it seems that the culprit of the to_a is only:

crystal/src/channel.cr

Lines 426 to 431 in 1b93218

ops_locks = ops
.to_a
.uniq!(&.lock_object_id)
.sort_by!(&.lock_object_id)
ops_locks.each &.lock

Did someone check if doing a special case to use a Slice to point to a single element when ops.size == 1 is enough to avoid the extra allocations?

If that is not enough, in order to keep the API as today we could have a private def self.select_impl_single(op : SelectAction, non_blocking), with more optimizations that can be applied for the single channel case, even at the cost of some internal code duplication that can be solved/refactored later.

A bonus of either last approaches is that will also apply to select; when ch.receive; else ... directly.

@straight-shoota
Copy link
Member

@bcardiff Sounds like a good assessment. 👍

Optimizing #select_impl should happen anyways. For a single action (or even a small number) we can avoid an array allocation. Additionally, we can try to refactor SendAction without heap allocations as suggested in #12694 (comment)

Thus the lengthy form with a single select branch should be as performant as the custom implementation of #try_send.
I suppose at that point we could technically even implement #try_send based on that so that all Channel interactions go through a single point.
But then we could also consider other alternatives for this pattern in order to keep the Channel API succinct and avoid potential confusion about #try_send.

@bcardiff
Copy link
Member

Moving the proposed implementation to ChannelAction helper in case there is a single channel on the select sounds a good design actually. It still has the downsides of very different implementations but at least the API is kept stable.

Would this be a preferred approach?

@bcardiff
Copy link
Member

bcardiff commented Dec 3, 2022

I was thinking that something like master...bcardiff:crystal:select-optimizations would avoid the array heap allocation, and can will cover single and dual action selects. WDYT?

Essentially it applies the following logic to keep a tuple of ops when possible for the locks.

def f(*ops)
  f_impl ops
end

def f(ops : Indexable)
  f_impl ops
end

def f_impl(ops : Indexable)
    if ops.is_a?(Tuple) && ops.size == 1
      f_impl_with_locks(ops, ops)
    elsif ops.is_a?(Tuple) && ops.size == 2
      ops0 = ops.fetch(0, nil).not_nil!
      ops1 = ops.fetch(1, nil).not_nil!

      case (ops0 <=> ops1)
      when 0
        f_impl_with_locks(ops, {ops0})
      when 1
        f_impl_with_locks(ops, {ops1, ops0})
      when -1
        f_impl_with_locks(ops, {ops0, ops1})
      else
        raise "unreachable"
      end
    else
      ops_locks = ops
        .to_a
        .uniq!
        .sort!

      f_impl_with_locks(ops, ops_locks)
    end
end

def f_impl_with_locks(ops : Indexable, ops_locks)
  puts({typeof(ops), typeof(ops_locks)})
end

f 1         # => {Tuple(Int32)              , Tuple(Int32)}
f 1, 2      # => {Tuple(Int32, Int32)       , Tuple(Int32, Int32)}
f 1, 2, 3   # => {Tuple(Int32, Int32, Int32), Array(Int32)}
f [1, 2]    # => {Array(Int32)              , Array(Int32)}

The runtime operations for the non blocking send is basically the proposal of this PR, so I expect it to work as efficient.

@straight-shoota
Copy link
Member

I think it's easier than that. We can just turn the tuple into a StaticArray for sorting. There's no need for special treatment for individual cases. This improves performance for all sizes of ops, as long as the size is known at compile time.

I have a patch for that ready that goes on top of #12814. It would work without it, but I'm batching it avoid conflicts.

On top of master the simplified diff would be this:

@@ -423,8 +423,12 @@ class Channel(T)
   private def self.select_impl(ops : Indexable(SelectAction), non_blocking)
     # Sort the operations by the channel they contain
     # This is to avoid deadlocks between concurrent `select` calls
-    ops_locks = ops
-      .to_a
+    if ops.is_a?(Tuple)
+      ops_locks = ops.to_static_array
+    else
+      ops_locks = ops.to_a
+    end
+    ops_locks
       .uniq!(&.lock_object_id)
       .sort_by!(&.lock_object_id)

The implementation of Tuple#to_static_array would be analog to StaticArray#[], returning StaticArray(Union(*T), {{ T.size}}). This can also be an internal helper method, but IMO it would be useful to have this publicly available. Doesn't matter though for select_impl.

@bcardiff
Copy link
Member

bcardiff commented Dec 3, 2022

Something I wanted to avoid is to have a union in ops_locks, so there is no multi-dispatch on each iteration.
Another thing that worries me is that Tuple#to_static_array will expand multiple defs. One for each combination of select actions used in the entire program. They should not be many but we are not forcing an upper bound unless we do something like I proposed.


The runtime operations for the non blocking send is basically the proposal of this PR, so I expect it to work as efficient.

I was wrong, we still have the allocation of the SelectContext ary pointed out. But I would start with optimizing how locks are computed. Seems we agree there.

@straight-shoota
Copy link
Member

straight-shoota commented Dec 3, 2022

Okay, yeah multi dispatch is indeed an issue. But we can't really prevent that on scale, unless we multiply all possible combinations.
I suppose we can merge both approaches, using the tuple sorting you suggested for small sizes and the conversion to StaticArray for larger tuples (it should be superior to to_a, right?).

For T.size == 1 it probably doesn't make any difference whatsoever. So we're effectively optimizing for the case T.size == 2. Perhaps we could do the same for T.size == 3, but the tuple combinations just explode from there.

@straight-shoota
Copy link
Member

Actually, your implementation doesn't avoid multi dispatch either, does it? The type of ops.fetch(0, nil).not_nil! is Union(*T).

Consequently:

f 1      # => {Tuple(Int32)              , Tuple(Int32)}
f 1, 2u32      # => {Tuple(Int32, UInt32), Tuple(Int32 | UInt32, Int32 | UInt32)}
f 1, 2u32, 3   # => {Tuple(Int32, UInt32, Int32), Array(Int32 | UInt32)}
f [1, 2u32]    # => {Array(Int32 | UInt32), Array(Int32 | UInt32)}

Tuple(Int32 | UInt32, Int32 | UInt32) is not an improvement over StaticArray(Int32 | UInt32, 2) (one could argue that it's pretty much the same thing).

To avoid those union types, we need to switch over the tuple size at compile time. This should work with macros:

def f_impl(ops : Tuple(*T)) forall T
  {% if T.size == 1 %}
    f_impl_with_locks(ops, ops)
  {% elsif T.size == 2 %}
      case (ops[0] <=> ops[1])
      when 0
        f_impl_with_locks(ops, {ops[0]})
      when 1
        f_impl_with_locks(ops, {ops[1], ops[0]})
      when -1
        f_impl_with_locks(ops, ops)
      else
        raise "unreachable"
      end
  {% else %}
    f_impl(ops.to_a)
  {% end %}
end

@straight-shoota
Copy link
Member

I suppose we could also look into alternative solutions to maybe avoid sorting altogether.

Considering that the size of ops is typically not that big, maybe iterating multiple times wouldn't be such a bad option.

@bcardiff
Copy link
Member

bcardiff commented Dec 5, 2022

I was thinking of preventing the union in ops_locks mostly, but we can definitely improve the union in the ops. I think that we can

add the following overloads and a select_impl_with_locks

def self.non_blocking_select(op1 : SelectAction)
  select_impl_with_locks({op1}, {op1}, true)
end

def self.non_blocking_select(op1 : SelectAction, op2 : SelectAction)
  case (op1 <=> op2)
    when 0
      select_impl_with_locks({op1, op2}, {op1}, true)
    when 1
      select_impl_with_locks({op1, op2}, {op2, op1}, true)
    when -1
      select_impl_with_locks({op1, op2}, {op1, op2}, true)
    else
      raise "unreachable"
  end
end

With that I think we prevent union in many parts and cover single and dual select actions

@straight-shoota
Copy link
Member

I don't follow that latest example. Where would those methods be called?

Looks like they are overloads of the splat variant of non_blocking_select which are actually unused and proposed for removal in #12813.

@bcardiff
Copy link
Member

bcardiff commented Dec 8, 2022

I thought the compiler would generate those calls. Either way doing a dispatch on runtime depending on the indexable size to a method that will avoid unions in the ops_locks can still be done. That's the core part of the idea.

@straight-shoota
Copy link
Member

straight-shoota commented Dec 8, 2022

Yeah, sure that can be done. It's only feasible for very small sizes, though. Not even sure we would want to go higher than 2 or maybe 3.
And I wouldn't expect the actual performance improvement to be very noticeable. At least there are more effective measures, such as reducing heap allocations.

@asterite
Copy link
Member

asterite commented Dec 8, 2022

I honestly wouldn't worry about multidispatch. We have LLVM here.

Here's an example:

struct Int32
  def foo
    self
  end
end

class String
  def foo
    bytesize
  end
end

def sum(values : Union(Int32, String)[2])
  values.sum(&.foo)
end

fun my_awesome_function : Int32
  values = uninitialized Union(Int32, String)[2]
  values[0] = 1
  values[1] = "hello"

  sum(values)
end

puts my_awesome_function

We are initializing a static array whose type is a union. We put an int in the first position and a string in the second one. Then we "sum" the values, which would in theory result in multidispatch.

Compile the above program with --release --emit llvm-ir and search for my_awesome_function in the output .ll file. Here it is:

define i32 @my_awesome_function() local_unnamed_addr #1 !dbg !25766 {
exit.1.i:
  ret i32 6, !dbg !25767
}

LLVM figured out what was in each position and completely avoided the multidispatch. Not only that, it computed everything at compile-time.

@straight-shoota
Copy link
Member

@asterite I agree on not worrying too much about multi dispatch. At least not at this stage

But this kind of optimization you're showing here won't be possible in Channel.select_impl where we call values.unstable_sort_by!(&.lock_object_id) because that completely reorderes the array based on data that is only available on at runtime. There is no way LLVM can tell which type to find at which position. It must go through multi dispatch.

@asterite
Copy link
Member

asterite commented Dec 8, 2022

Unrelated to the comment above (I'll reply to it soon), look at this:

https://github.com/golang/go/blob/eaf0e3d4650fd223dec84ee52025c7a82bcb24bd/src/runtime/chan.go#L677-L715

So Go decides to implement different select patterns in different ways, probably optimal ones.

I think we could do the same. That is, instead of introducing try_send, detect this pattern and call a specific method (which would be nodoc.)

That way, at least for this issue, there's no need to optimize the general case.

@beta-ziliani beta-ziliani self-requested a review April 24, 2023 13:45
@beta-ziliani
Copy link
Member

beta-ziliani commented May 30, 2023

This is a recap and summary of the current situation with this PR.

From the performance PoV, with #12814 we optimized part of it: the creation of an array from actions. What’s missing is to avoid the creation of actions in the heap. The goal should be to have select be as performant as the internal methods of Channel.

From a stdlib and language design PoV, this PR is missing what would be an action related to the method, or a better place for it so it’s not being used as part of a select. As an action, as Brian pointed out, it might get confusing quite quickly. So I would rather think of a top-level method (non_blocking_send?), but that’s better discussed in its own issue.

Therefore, I'm closing it for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants