Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Make max client connections configurable and change shard_for hash #11345

Closed
wants to merge 4 commits into from

Conversation

ballard26
Copy link
Contributor

@ballard26 ballard26 commented Jun 12, 2023

During testing with OMB it was found that traffic from internal RPC clients can cause reactor utilization on some shards to be 40% higher than others. This largely caused by poor distribution of client connections to other brokers amongst shards.

Looking at RPC client in + out bytes in the graph below we can see that some shards are processing 4x the amount of throughput as others which can explain a lot of the extra reactor utilization on some shards.
image

The first change this PR makes is to allow for the number of client connections to each of the brokers user configurable. In the chart below the allowed connections was set to equal the number of shards on a given broker. This allows each shard to have its own connections to all of the brokers in the cluster. And we can see in the chart that the standard deviation for reactor utilization is reduced by 50%.
image

In cases where increasing the number of client connections to other brokers to equal the shard count is impractical this PR also attempts to improve the existing shard_for hashing function to better match the following requirements;

  • Each shard should have an equal number of client connections
  • Each client connection should have an equal number of shards using it.
  • On each node there should be max_connections to every other node in the cluster.
  • If a shard wants a connection for a (node, shard) and the current shard has a connection for that (node, shard) it is selected. (from @travisdowns)
  • For any broker A. Connections to broker A when aggregated by shard ID across every broker in the cluster should have an equal count per shard ID. (from Shard aware connections v2 #8)

Also this PR is the first of probably two that will focus on fixing the mentioned issue. There will be a coming PR that tries to move as much work as possible(i.e, encoding the request) to the shard that requests a message to be sent. Rather than doing it all on the shard that happens to hold the connection to the broker the message is to be sent to.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.1.x
  • v22.3.x
  • v22.2.x

Release Notes

@ballard26
Copy link
Contributor Author

ballard26 commented Jun 12, 2023

To evaluate the new hash function relative to the stated requirements this testing code was used. The <lambda 0> label in the results below is the 90th percentile. The results are;

Current

The number of connections node 0 has to the rest of the brokers in a cluster

(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 9) & (df_c_p_n['shards'] == 16) & (df_c_p_n['source_node'] == 0)]
      cluster_size  shards  source_node  destination_node  number_of_connections
1864             9      16            0                 1                      5
1865             9      16            0                 2                      6
1866             9      16            0                 3                      7
1867             9      16            0                 4                      6
1868             9      16            0                 5                      7
1869             9      16            0                 6                      4
1870             9      16            0                 7                      5
1871             9      16            0                 8                      6
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 12) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
30             3      12            0                 1                      4
31             3      12            0                 2                      5
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 5) & (df_c_p_n['shards'] == 11) & (df_c_p_n['source_node'] == 0)]
     cluster_size  shards  source_node  destination_node  number_of_connections
200             5      11            0                 1                      4
201             5      11            0                 2                      5
202             5      11            0                 3                      7
203             5      11            0                 4                      6
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 31) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
60             3      31            0                 1                      8
61             3      31            0                 2                      7

A summary for the number of shards that use a given connection to a destination node.

(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 9) & (df_s_p_c['shards'] == 16) & (df_s_p_c['source_node'] == 0)]
     cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                       sum      mean       std amin amax <lambda_0>
1864            9     16           0                1                   16  3.200000  1.643168    2    6        4.8
1865            9     16           0                2                   16  2.666667  0.516398    2    3        3.0
1866            9     16           0                3                   16  2.285714  0.755929    1    3        3.0
1867            9     16           0                4                   16  2.666667  1.751190    1    6        4.5
1868            9     16           0                5                   16  2.285714  0.755929    1    3        3.0
1869            9     16           0                6                   16  4.000000  2.943920    1    7        6.7
1870            9     16           0                7                   16  3.200000  0.836660    2    4        4.0
1871            9     16           0                8                   16  2.666667  1.032796    1    4        3.5
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 12) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                    
                                                                     sum mean       std amin amax <lambda_0>
30            3     12           0                1                   12  3.0  1.825742    1    5        4.7
31            3     12           0                2                   12  2.4  0.894427    2    4        3.2
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 5) & (df_s_p_c['shards'] == 11) & (df_s_p_c['source_node'] == 0)]
    cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                      sum      mean       std amin amax <lambda_0>
200            5     11           0                1                   11  2.750000  1.707825    1    5        4.4
201            5     11           0                2                   11  2.200000  0.447214    2    3        2.6
202            5     11           0                3                   11  1.571429  0.534522    1    2        2.0
203            5     11           0                4                   11  1.833333  0.752773    1    3        2.5
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 31) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                     sum      mean       std amin amax <lambda_0>
60            3     31           0                1                   31  3.875000  1.125992    2    5        5.0
61            3     31           0                2                   31  4.428571  1.133893    3    6        5.4

A summary for the number of connections to other brokers a given shard has.

(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 9) & (df_c_p_s['shards'] == 16)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
67            9     16                   424  2.965035  1.280474    1    6        5.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 12)]
  cluster_size shards number_of_connections                                     
                                        sum  mean       std amin amax <lambda_0>
5            3     12                    32  1.28  0.458258    1    2        2.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 5) & (df_c_p_s['shards'] == 11)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
24            5     11                   114  2.111111  0.768892    1    4        3.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 31)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
10            3     31                    44  1.128205  0.338688    1    2        2.0

(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 1)].shape[0]
90
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 2)].shape[0]
2
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 3)].shape[0]
0

Current modified to allow for variable connection counts

The number of connections node 0 has to the rest of the brokers in a cluster

(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 9) & (df_c_p_n['shards'] == 16) & (df_c_p_n['source_node'] == 0)]
      cluster_size  shards  source_node  destination_node  number_of_connections
1864             9      16            0                 1                      6
1865             9      16            0                 2                      6
1866             9      16            0                 3                      5
1867             9      16            0                 4                      7
1868             9      16            0                 5                      7
1869             9      16            0                 6                      6
1870             9      16            0                 7                      6
1871             9      16            0                 8                      6
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 12) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
30             3      12            0                 1                      6
31             3      12            0                 2                      5
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 5) & (df_c_p_n['shards'] == 11) & (df_c_p_n['source_node'] == 0)]
     cluster_size  shards  source_node  destination_node  number_of_connections
200             5      11            0                 1                      6
201             5      11            0                 2                      5
202             5      11            0                 3                      5
203             5      11            0                 4                      5
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 31) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
60             3      31            0                 1                      8
61             3      31            0                 2                      6

A summary for the number of shards that use a given connection to a destination node.

(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 9) & (df_s_p_c['shards'] == 16) & (df_s_p_c['source_node'] == 0)]
     cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                       sum      mean       std amin amax <lambda_0>
1864            9     16           0                1                   16  2.666667  1.032796    1    4        3.5
1865            9     16           0                2                   16  2.666667  1.366260    1    5        4.0
1866            9     16           0                3                   16  3.200000  3.271085    1    9        6.2
1867            9     16           0                4                   16  2.285714  0.755929    1    3        3.0
1868            9     16           0                5                   16  2.285714  0.755929    1    3        3.0
1869            9     16           0                6                   16  2.666667  1.032796    1    4        3.5
1870            9     16           0                7                   16  2.666667  0.516398    2    3        3.0
1871            9     16           0                8                   16  2.666667  1.366260    1    5        4.0
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 12) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                    
                                                                     sum mean       std amin amax <lambda_0>
30            3     12           0                1                   12  2.0  0.632456    1    3        2.5
31            3     12           0                2                   12  2.4  1.516575    1    5        3.8
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 5) & (df_s_p_c['shards'] == 11) & (df_s_p_c['source_node'] == 0)]
    cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                      sum      mean       std amin amax <lambda_0>
200            5     11           0                1                   11  1.833333  0.752773    1    3        2.5
201            5     11           0                2                   11  2.200000  1.643168    1    5        3.8
202            5     11           0                3                   11  2.200000  1.643168    1    5        3.8
203            5     11           0                4                   11  2.200000  1.303840    1    4        3.6
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 31) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                         
                                                                     sum      mean       std amin amax <lambda_0>
60            3     31           0                1                   31  3.875000  1.125992    2    5        5.0
61            3     31           0                2                   31  5.166667  1.471960    4    8        6.5

A summary for the number of connections to other brokers a given shard has.

(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 9) & (df_c_p_s['shards'] == 16)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
67            9     16                   421  3.050725  1.251699    1    8        4.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 12)]
  cluster_size shards number_of_connections                                         
                                        sum      mean       std amin amax <lambda_0>
5            3     12                    32  1.391304  0.499011    1    2        2.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 5) & (df_c_p_s['shards'] == 11)]
   cluster_size shards number_of_connections                                      
                                         sum   mean       std amin amax <lambda_0>
24            5     11                   102  2.125  0.841099    1    4        3.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 31)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
10            3     31                    42  1.135135  0.346583    1    2        2.0

(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 1)].shape[0]
66
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 2)].shape[0]
4
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 3)].shape[0]
0

Newest hash in this PR

The number of connections node 0 has to the rest of the brokers in a cluster

(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 9) & (df_c_p_n['shards'] == 16) & (df_c_p_n['source_node'] == 0)]
      cluster_size  shards  source_node  destination_node  number_of_connections
1864             9      16            0                 1                      8
1865             9      16            0                 2                      8
1866             9      16            0                 3                      8
1867             9      16            0                 4                      8
1868             9      16            0                 5                      8
1869             9      16            0                 6                      8
1870             9      16            0                 7                      8
1871             9      16            0                 8                      8
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 12) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
30             3      12            0                 1                      8
31             3      12            0                 2                      8
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 5) & (df_c_p_n['shards'] == 11) & (df_c_p_n['source_node'] == 0)]
     cluster_size  shards  source_node  destination_node  number_of_connections
200             5      11            0                 1                      8
201             5      11            0                 2                      8
202             5      11            0                 3                      8
203             5      11            0                 4                      8
(Pdb) df_c_p_n[(df_c_p_n['cluster_size'] == 3) & (df_c_p_n['shards'] == 31) & (df_c_p_n['source_node'] == 0)]
    cluster_size  shards  source_node  destination_node  number_of_connections
60             3      31            0                 1                      8
61             3      31            0                 2                      8

A summary for the number of shards that use a given connection to a destination node.

(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 9) & (df_s_p_c['shards'] == 16) & (df_s_p_c['source_node'] == 0)]
     cluster_size shards source_node destination_node number_of_src_shards                               
                                                                       sum mean  std amin amax <lambda_0>
1864            9     16           0                1                   16  2.0  0.0    2    2        2.0
1865            9     16           0                2                   16  2.0  0.0    2    2        2.0
1866            9     16           0                3                   16  2.0  0.0    2    2        2.0
1867            9     16           0                4                   16  2.0  0.0    2    2        2.0
1868            9     16           0                5                   16  2.0  0.0    2    2        2.0
1869            9     16           0                6                   16  2.0  0.0    2    2        2.0
1870            9     16           0                7                   16  2.0  0.0    2    2        2.0
1871            9     16           0                8                   16  2.0  0.0    2    2        2.0
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 12) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                    
                                                                     sum mean       std amin amax <lambda_0>
30            3     12           0                1                   12  1.5  0.755929    1    3        2.3
31            3     12           0                2                   12  1.5  0.755929    1    3        2.3
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 5) & (df_s_p_c['shards'] == 11) & (df_s_p_c['source_node'] == 0)]
    cluster_size shards source_node destination_node number_of_src_shards                                      
                                                                      sum   mean       std amin amax <lambda_0>
200            5     11           0                1                   11  1.375  0.744024    1    3        2.3
201            5     11           0                2                   11  1.375  0.517549    1    2        2.0
202            5     11           0                3                   11  1.375  0.517549    1    2        2.0
203            5     11           0                4                   11  1.375  1.060660    1    4        1.9
(Pdb) df_s_p_c[(df_s_p_c['cluster_size'] == 3) & (df_s_p_c['shards'] == 31) & (df_s_p_c['source_node'] == 0)]
   cluster_size shards source_node destination_node number_of_src_shards                                      
                                                                     sum   mean       std amin amax <lambda_0>
60            3     31           0                1                   31  3.875  0.640870    3    5        4.3
61            3     31           0                2                   31  3.875  0.834523    3    5        5.0

A summary for the number of connections to other brokers a given shard has.

(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 9) & (df_c_p_s['shards'] == 16)]
   cluster_size shards number_of_connections                                  
                                         sum mean     std amin amax <lambda_0>
67            9     16                   576  4.0  1.1094    2    6        6.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 12)]
  cluster_size shards number_of_connections                                        
                                        sum      mean      std amin amax <lambda_0>
5            3     12                    48  1.454545  0.50565    1    2        2.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 5) & (df_c_p_s['shards'] == 11)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
24            5     11                   160  3.018868  1.168341    1    4        4.0
(Pdb) df_c_p_s[(df_c_p_s['cluster_size'] == 3) & (df_c_p_s['shards'] == 31)]
   cluster_size shards number_of_connections                                         
                                         sum      mean       std amin amax <lambda_0>
10            3     31                    48  1.142857  0.354169    1    2        2.0

(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 1)].shape[0]
78
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 2)].shape[0]
23
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 3)].shape[0]
14
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 4)].shape[0]
9
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 5)].shape[0]
4
(Pdb) df_c_p_s[(df_c_p_s['number_of_connections']['std'] > 6)].shape[0]
0

Copy link
Member

@StephanDollberg StephanDollberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Results look great.

Can I just confirm this upgrades fine? I.e.: this doesn't affect any persisted state anywhere and different nodes also don't need to have the same implementation?

Adding some unit tests probably also a good idea.

, rpc_client_max_connections(
*this,
"rpc_client_max_connections",
"The max client connections that will be open to a given broker.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think need to be clear here that this is internal RPC traffic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "Maximum internal RPC connections a broker will open to each other broker."

*this,
"rpc_client_max_connections",
"The max client connections that will be open to a given broker.",
{.example = "8"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the defaults are and why the other examples above don't list it but probably good make reloadability and visibility explicit.

@@ -20,7 +20,18 @@ namespace rpc {
connection_cache::connection_cache(
ss::sharded<ss::abort_source>& as,
std::optional<connection_cache_label> label)
: _label(std::move(label)) {
: _max_connections(8)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this constructor just be delegated? I know we have something like a dummy binding for testing but not sure there is something like that for non-testing code.

Copy link
Contributor Author

@ballard26 ballard26 Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think I should probably just remove this constructor entirely. I added it in mainly to avoid having to change a bunch of unit tests. But it's kinda a foot-gun now that I look at it again.

uint64_t remainder = total_shards % _max_connections;
uint64_t ring_size = total_shards - remainder;

// Vary the starting shard of the ring depending on the connection hash.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the actual "consistent hash" right? In that case I don't understand why we use remainder below for the bucket size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add comments/links to where/how those magic primes (there are more than one!) came about to be? Is there a paper or is it on OSS implementation from elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piyushredpanda The primes came from https://planetmath.org/goodhashtableprimes I'll re-add the attribution into the code. As for the hash method its self it is vaguely inspired by a older consistent hash algorithm mentioned in the paper we reference for our jump_consistent_hash implementation here https://arxiv.org/pdf/1406.2294.pdf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some results I didn't include that show this, but I was surprised how much just mixing in a prime number helps with ensure connections are uniformly distributed. So I was mainly just mixing in primes with the hashes to avoid clustering.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess boost::hash_combine does not do very good mixing? I wonder if absl::HashOf is better. It seems a bit weird to mix in a "magic prime" which seem to be optimized to be used as the second argument to % in a hash table, and not to be used as the seed for a mixing function like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that these primes aren't new: the original shard_for used a list of them from the same source too: it's just not that clear from the diff because it moved files.

Copy link
Contributor Author

@ballard26 ballard26 Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absl::HashOf will work much better. The magic prime is mainly there since std::hash is just the identity function for integers which doesn't result in a very good distribution of connections.

connection_hash, remainder + 1);
uint64_t ring_end_shard = (ring_size - 1) + ring_start_shard;

// Hash any shards not in [ring_start_shard, ring_end_shard] to a point
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this happens because of the truncation above right? For example, 11 shards and 8 connections then we handle shard 9,10,11 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep exactly, in any case where shard count isn't a multiple of the number of connections this will be used.

ss::shard_id src_shard,
model::node_id n,
model::node_id dest_node,
ss::shard_id total_shards) const {
if (total_shards <= _max_connections) {
return src_shard;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an overall explanation with a small ascii chart (maybe use a line instead of a ring) would be great here I think

@piyushredpanda
Copy link
Contributor

piyushredpanda commented Jun 12, 2023

@ballard26 has probably set a new high bar for the best PR cover letter/detail here. 🥇

@ballard26
Copy link
Contributor Author

Adding some unit tests probably also a good idea.

@StephanDollberg Definitely agree, will be adding in some unit tests for the hash function specifically before moving the PR out of the draft stage.

@travisdowns
Copy link
Member

Basic question: does this need to be a hash at all? Can we not just assign the shard/client mapping explicitly, in which case it can be exactly optimal (e.g., simple linear mapping all the connections to node 0, then 1, etc, gives optimal distribution, right?).

That is, since modifications to the set of nodes are rare and connections are basically ephemeral (we can tear one down and create one one another shard w/o much cost) it's not clear why we use a hash or consistent hash at all.

@travisdowns
Copy link
Member

Why did the existing hash underperform? It was not sufficiently uniformly random?

@ballard26
Copy link
Contributor Author

ballard26 commented Jun 19, 2023

Why did the existing hash underperform? It was not sufficiently uniformly random?

@travisdowns It was basically that it didn't meet the three requirements I outlined in the PR;

  • The number of connections each shard had wasn't sufficiently uniform as you stated.
  • The number of shards that used a given connection varied by quite a bit with some connections being only used by one shard and others being used by 5 shards.
  • The number of connections being created didn't match the requested number of connections. I.e, when 8 connections to each broker was requested somewhere between 5-8 connections was created to each broker instead.
  • Even if a given shard had a connection to broker A the current hash function may not use that connection to send requests from that shard to broker A.

The data I provided in the first comment for this PR goes into details about how each hash function performs relative to the requirements I stated.

@ballard26
Copy link
Contributor Author

ballard26 commented Jun 19, 2023

Basic question: does this need to be a hash at all? Can we not just assign the shard/client mapping explicitly, in which case it can be exactly optimal (e.g., simple linear mapping all the connections to node 0, then 1, etc, gives optimal distribution, right?).

@travisdowns I did something similar in the current "hash" function. Though I spaced on connections to the same broker with an equal number of shards between each connection. I had hoped this could give us better cache locality compared to a linear mapping. I also had to handle some edge cases like when shard count isn't a multiple of the requested connection count. It can definitely be made perfectly optimal we we're willing for store some information about the allocations in the connection_cache class.

@travisdowns
Copy link
Member

It was basically that it didn't meet the three requirements I outlined in the PR;

@ballard26 - I guess maybe I'm misunderstanding what I we mean by a "hash function". As I see, the best hash function can only map inputs to shards uniformly at random, which will never produce a particularly good result in terms of your requirements due to randomness. I.e., even "uniform random" selections aren't going to meet those requirements in general.

So I guess my "hash function" we mean arbitrary logic which maps shards to connections, and it can take the whole list of input shards + nodes in the cluster as input?

@travisdowns I did something similar in the current "hash" function. Though I spaced on connections to the same broker with an equal number of shards between each connection.

Right, that's fine: I was suggesting linear was the best, only that it was the simplest example of something that would satisfy all the requirements.

I had hoped this could give us better cache locality compared to a linear mapping.

How does cache locality play into this? I'm not following.

@ballard26
Copy link
Contributor Author

ballard26 commented Jun 20, 2023

@ballard26 - I guess maybe I'm misunderstanding what I we mean by a "hash function". As I see, the best hash function can only map inputs to shards uniformly at random, which will never produce a particularly good result in terms of your requirements due to randomness. I.e., even "uniform random" selections aren't going to meet those requirements in general.

Right you are. At this point it'd be a misnomer to call what I wrote a hash function. It's just a function that tries to map connections to shards in a way that satisfies my stated requirements.

So I guess my "hash function" we mean arbitrary logic which maps shards to connections, and it can take the whole list of input shards + nodes in the cluster as input?

Yeah

How does cache locality play into this? I'm not following.

It's more to do with locality for the data being sent rather than the connection its self. If a shard wants to use a connection on a different shard it'd be nice if they shared, say, the same L2 cache as it's likely the data that needs to be sent over that connection will already be there. Though I've noticed at least one new architecture(AMD's Zen 3) shares a L3 cache among all cores and has private per-core L1 & L2 caches. So maybe it's not something that's important to optimize for anymore.

Comment on lines +112 to +115
uint64_t connection_hash = 201326611; // magic prime
boost::hash_combine(connection_hash, std::hash<model::node_id>{}(src_node));
boost::hash_combine(
connection_hash, std::hash<model::node_id>{}(dest_node));
Copy link
Member

@travisdowns travisdowns Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absl::HashOf is a more concise way to hash several values of presumably similar quality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(though I see now this comes from the existing mapping function which had moved files)

@@ -34,11 +34,11 @@ class connection_cache final
using underlying = std::unordered_map<model::node_id, transport_ptr>;
using iterator = typename underlying::iterator;

static inline ss::shard_id shard_for(
ss::shard_id shard_for(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good time to add a comment to this function, especially given the ambiguity of some of the parameter names.

As I understand it, is it given that we are currently on (self, shard) with max_shards total shards, we would like to know the shard id (return value) we should use to connect to node node.

}

connection_cache::connection_cache(
config::binding<size_t> max_connections,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of passing it through as a binding? The value is used once and saved in a member so it could just be a size_t which would simplify things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I wasn't sure at first if this is something we'd want to make dynamically configurable without a restart. But I don't believe we do. So I'll definitely just switch this over to size_t

@travisdowns
Copy link
Member

I would add a 4th desirable requirement to @ballard26 's list:

  • If a shard wants a connection for a (node, shard) and the current shard has a connection for that (node, shard) it is selected.

This is trivially true when max_connections >= smp::count since we always return self in that case, but can we do it for the hashing case too?

@travisdowns
Copy link
Member

It's more to do with locality for the data being sent rather than the connection its self. If a shard wants to use a connection on a different shard it'd be nice if they shared, say, the same L2 cache as it's likely the data that needs to be sent over that connection will already be there. Though I've noticed at least one new architecture(AMD's Zen 3) shares a L3 cache among all cores and has private per-core L1 & L2 caches. So maybe it's not something that's important to optimize for anymore.

OK sure, but how does the current approach or any contemplated approach maximize this kind of locality? Wouldn't you have to examine the topology of the system and make explicit decisions based on the topology<->shard mapping to do that? Or are you saying that shard IDs have some correlation to topology? [1]

FWIW basically no modern big CPUs have cores that share an L2 cache unless they are hyper-siblings in which case they also share an L1 cache. The basic rule is that a physical core has private L1 and L2 and shares the L3 with "a set of other cores". The cores that share an L3 are generally all the cores on a socket (most AWS boxes are dual socket), though on AMD it's a bit more complicated because even within the shared domain L3 access is pretty NUMA due to the CCX boundaries.


[1] Well there is some if you assume shard ID maps to equivalent or at least "shifted" cpuid: but it works both ways: the closest cores topology-wide (hyper siblings) usually are far apart numerically (e.g., 0 and N for an N*2 lcore system), but then CPUs on the same socket are usually contiguous. So basically close/far numerically isn't very useful.

@travisdowns
Copy link
Member

So I guess the main problem with just using an even mapping without "hashing" is that the broker IDs:

  • They aren't really known a-priori
  • They may change over time
  • They aren't necessarily contiguous nor start at 0

Shard ID doesn't really have any of those problems but the combination of shard + broker does.

So like you say maybe a stateful connection cache is the way to go? Basically you'd discover the broker IDs over time or something and maintain an explicit mapping table? Because it really seems like while you are in here we should try to remove this source of imbalance completely.

@travisdowns
Copy link
Member

The current approach doesn't try to use the "self" shard either, so I guess there is kind of a double performance cliff when you go over 8 shards currently: not only do you get the imbalance that originally drove this change, you also go from 100% connection locality to say 6.25% connection locality on a 16-shard system (1/16), even though that could be 50% if the self-preference rule was used.

@travisdowns
Copy link
Member

Interesting related change (PR 8 !!):

#8

So actually the source shard for a connection can affect the port selected (on the local side) which in turn can affect the shard assigned to the connection on the remote side (if port is used for load_balancing_algorithm which I guess was enabled in that change). So that's another thing to consider here probably: the implied distribution on the remote side.

@ballard26
Copy link
Contributor Author

So I guess the main problem with just using an even mapping without "hashing" is that the broker IDs:

* They aren't really known a-priori

* They may change over time

* They aren't necessarily contiguous nor start at 0

Yeah, these are the problems I ended up running into when trying to have a perfectly distributed stateless solution. The distribution function I ended up with is pretty close to meeting the requirements stated in the PR. However, the hashing that I ended up using still results in some collisions that skew the distribution of connections per shard. Storing some state in the connection_cache class should resolve this.

@ballard26
Copy link
Contributor Author

ballard26 commented Jun 20, 2023

The current approach doesn't try to use the "self" shard either, so I guess there is kind of a double performance cliff when you go over 8 shards currently: not only do you get the imbalance that originally drove this change, you also go from 100% connection locality to say 6.25% connection locality on a 16-shard system (1/16), even though that could be 50% if the self-preference rule was used.

Shards not using their own connections was definitely a nasty surprise when I went to analyze the results. And it was a behavior I made sure the new distribution function didn't have.

One potential "benefit" could be that it means no shard has priority access to a connection. I.e, if none of the shards their own connections then each of them will have around the same latency to send data over a connection. Vs if some shards use their own connection then they may be able to send data over it without having to schedule a task in the queue. This could lead to partitions having better latency than others in the worst case.

That being said the locality and reduced inter-shard communication we get from using a connection on the same shard is much more important.

@emaxerrno
Copy link
Contributor

emaxerrno commented Aug 8, 2023

@ballard26 - we should make this property dynamic, so the k8s operator can take this into account and get upstream into the cloud.

cc: @joejulian @alejandroEsc and @RafalKorepta

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants