Skip to content

Commit

Permalink
Add ArrayBasedEventLoopState(#4439) (#4682)
Browse files Browse the repository at this point in the history
Motivation:

To resolve #4439:
> Using a heap is pretty efficient if the number is large. However, it's
inefficient if the number is small. (e.g. 2)
> So we need to:
>
> - find the threshold to use array-based or heap-based implementation
using benchmark
> -  add ArrayBasedEventLoopState

Modifications:

- Extract class: `AcquisitionIndex`
- Add array-based entry structure
  - Set threshold as 128

Result:

We can manage EventLoops using array-based structure now.
But now we need to experiment with benchmarks to find suitable
thresholds.

<!--
Visit this URL to learn more about how to write a pull request
description:

https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->

---------

Co-authored-by: minwoox <[email protected]>
Co-authored-by: Trustin Lee <[email protected]>
  • Loading branch information
3 people authored Oct 20, 2023
1 parent 16afe9a commit bb79158
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client;

import static com.google.common.collect.ImmutableList.toImmutableList;

import java.util.List;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;

import com.linecorp.armeria.common.util.EventLoopGroups;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;

@State(Scope.Benchmark)
public class EventLoopStateBenchmark {
private AbstractEventLoopState state;
private AbstractEventLoopEntry[] acquired;
private EventLoopGroup eventLoopGroup;

@Param({ "32", "64", "128", "256" })
private int maxNumEventLoops;
@Param({ "true", "false" })
private boolean arrayBased;

@Setup
public void setUp() {
acquired = new AbstractEventLoopEntry[maxNumEventLoops];

eventLoopGroup = EventLoopGroups.newEventLoopGroup(maxNumEventLoops);
final DefaultEventLoopScheduler scheduler = new DefaultEventLoopScheduler(
eventLoopGroup, maxNumEventLoops, maxNumEventLoops, ImmutableList.of());
final List<EventLoop> eventLoops = Streams.stream(eventLoopGroup)
.map(EventLoop.class::cast)
.collect(toImmutableList());
if (arrayBased) {
state = new ArrayBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler);
} else {
state = new HeapBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler);
}

// Acquire as many as the number of eventLoops so that the active request of all states are
// greater than 1.
for (int i = 0; i < maxNumEventLoops; i++) {
state.acquire();
}
}

@TearDown
public void tearDown() {
eventLoopGroup.shutdownGracefully();
}

@Benchmark
public void lastInFirstOut() {
for (int i = 0; i < maxNumEventLoops; ++i) {
acquired[i] = state.acquire();
}
for (int i = maxNumEventLoops - 1; i >= 0; --i) {
acquired[i].release();
}
}

@Benchmark
public void firstInFirstOut() {
for (int i = 0; i < maxNumEventLoops; ++i) {
acquired[i] = state.acquire();
}
for (int i = 0; i < maxNumEventLoops; ++i) {
acquired[i].release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ static AbstractEventLoopState of(List<EventLoop> eventLoops, int maxNumEventLoop
DefaultEventLoopScheduler scheduler) {
if (maxNumEventLoops == 1) {
return new OneEventLoopState(eventLoops, scheduler);
} else if (maxNumEventLoops <= 128) {
return new ArrayBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler);
}
// TODO(minwoox) Introduce array based state which is used when the maxNumEventLoops is greater than 1
// and less than N for the performance.
return new HeapBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler);
}

Expand Down Expand Up @@ -80,7 +80,7 @@ protected final void unlock() {
abstract void release(AbstractEventLoopEntry e);

@VisibleForTesting
abstract List<AbstractEventLoopEntry> entries();
abstract AbstractEventLoopEntry[] entries();

abstract int allActiveRequests();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.netty.channel.EventLoop;

final class ArrayBasedEventLoopState extends AbstractEventLoopState {

private final AbstractEventLoopEntry[] entries;
private final int maxNumEventLoops;
private int allActiveRequests;

ArrayBasedEventLoopState(List<EventLoop> eventLoops, int maxNumEventLoops,
DefaultEventLoopScheduler scheduler) {
super(eventLoops, scheduler);
this.maxNumEventLoops = maxNumEventLoops;
entries = new AbstractEventLoopEntry[maxNumEventLoops];
if (eventLoops.size() == maxNumEventLoops) {
init(0);
} else {
init(scheduler().acquisitionStartIndex(maxNumEventLoops));
}
}

private void init(final int acquisitionStartIndex) {
final int initialEventLoopOffset = ThreadLocalRandom.current().nextInt(maxNumEventLoops);
final int eventLoopSize = eventLoops().size();
for (int i = 0; i < maxNumEventLoops; ++i) {
final int nextIndex = (acquisitionStartIndex + (initialEventLoopOffset + i) % maxNumEventLoops) %
eventLoopSize;
entries[i] = new Entry(this, eventLoops().get(nextIndex), i);
}
}

private AbstractEventLoopEntry targetEntry() {
int minActiveRequest = Integer.MAX_VALUE;
int targetIndex = 0;
for (int i = 0; i < maxNumEventLoops; ++i) {
final AbstractEventLoopEntry e = entries[i];
final int activeRequests = e.activeRequests();
if (activeRequests == 0) {
return e;
}
if (minActiveRequest > activeRequests) {
minActiveRequest = activeRequests;
targetIndex = i;
}
}
return entries[targetIndex];
}

@Override
AbstractEventLoopEntry acquire() {
lock();
try {
final AbstractEventLoopEntry e = targetEntry();
e.incrementActiveRequests();
allActiveRequests++;
return e;
} finally {
unlock();
}
}

@Override
void release(AbstractEventLoopEntry e) {
lock();
try {
e.decrementActiveRequests();
if (--allActiveRequests == 0) {
setLastActivityTimeNanos();
}
} finally {
unlock();
}
}

@Override
AbstractEventLoopEntry[] entries() {
return entries;
}

@Override
int allActiveRequests() {
return allActiveRequests;
}

private static final class Entry extends AbstractEventLoopEntry {

private final int id;

private int activeRequests;

Entry(AbstractEventLoopState parent, EventLoop eventLoop, int id) {
super(parent, eventLoop);
this.id = id;
}

@Override
int activeRequests() {
return activeRequests;
}

@Override
void incrementActiveRequests() {
activeRequests++;
}

@Override
void decrementActiveRequests() {
activeRequests--;
}

@Override
int id() {
return id;
}

@Override
int index() {
throw new UnsupportedOperationException();
}

@Override
void setIndex(int index) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return "(" + id + ", " + activeRequests() + ')';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ public ReleasableHolder<EventLoop> acquire(SessionProtocol sessionProtocol,
}

@VisibleForTesting
List<AbstractEventLoopEntry> entries(SessionProtocol sessionProtocol,
EndpointGroup endpointGroup,
@Nullable Endpoint endpoint) {
AbstractEventLoopEntry[] entries(SessionProtocol sessionProtocol,
EndpointGroup endpointGroup,
@Nullable Endpoint endpoint) {
return state(sessionProtocol, endpointGroup, endpoint).entries();
}

Expand Down
Loading

0 comments on commit bb79158

Please sign in to comment.