Skip to content

Commit

Permalink
Kafka 9626: Improve ACLAuthorizer.acls() performance
Browse files Browse the repository at this point in the history
This PR avoids creation of unnecessary sets in AclAuthorizer.acls() method implementation.

Perf results:
**Old**
```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt    Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15    5.821 ? 0.309  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   15.303 ? 0.107  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15   74.976 ? 0.543  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   15.366 ? 0.184  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   29.899 ? 0.129  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  167.301 ? 1.723  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   21.980 ? 0.114  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   44.385 ? 0.255  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  241.919 ? 3.955  ms/op
```
**New**

```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt   Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15   0.666 ? 0.004  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   1.427 ? 0.015  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15  21.410 ? 0.225  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   1.230 ? 0.018  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   4.303 ? 0.744  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  36.724 ? 0.409  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   2.433 ? 0.379  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   9.818 ? 0.214  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  52.886 ? 0.525  ms/op
```

Author: Manikumar Reddy <[email protected]>
Author: Lucas Bradstreet <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Rajini Sivaram <[email protected]>, Lucas Bradstreet <[email protected]>

Closes apache#8199 from omkreddy/KAFKA-9626
  • Loading branch information
omkreddy committed Mar 2, 2020
1 parent ea0c027 commit 8dff0b1
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 10 deletions.
2 changes: 1 addition & 1 deletion checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
<allow pkg="org.mockito"/>

<allow pkg="kafka.security.authorizer"/>

<subpackage name="cache">
</subpackage>
Expand Down
23 changes: 14 additions & 9 deletions core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock

import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.VersionedAcls
import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
Expand All @@ -36,7 +36,7 @@ import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, Un
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, SecurityUtils}
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
Expand Down Expand Up @@ -67,7 +67,7 @@ object AclAuthorizer {
val WildcardHost = "*"

// Orders by resource type, then resource pattern type and finally reverse ordering by name.
private object ResourceOrdering extends Ordering[ResourcePattern] {
class ResourceOrdering extends Ordering[ResourcePattern] {

def compare(a: ResourcePattern, b: ResourcePattern): Int = {
val rt = a.resourceType.compareTo(b.resourceType)
Expand Down Expand Up @@ -116,7 +116,7 @@ class AclAuthorizer extends Authorizer with Logging {
private var extendedAclSupport: Boolean = _

@volatile
private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(AclAuthorizer.ResourceOrdering)
private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
private val lock = new ReentrantReadWriteLock()

// The maximum number of times we should try to update the resource acls in zookeeper before failing;
Expand Down Expand Up @@ -252,18 +252,23 @@ class AclAuthorizer extends Authorizer with Logging {
}
}
val deletedResult = deletedBindings.groupBy(_._2)
.mapValues(_.map{ case (binding, _) => new AclBindingDeleteResult(binding, deleteExceptions.getOrElse(binding, null)) })
.mapValues(_.map { case (binding, _) => new AclBindingDeleteResult(binding, deleteExceptions.getOrElse(binding, null)) })
(0 until aclBindingFilters.size).map { i =>
new AclDeleteResult(deletedResult.getOrElse(i, Set.empty[AclBindingDeleteResult]).toSet.asJava)
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}

override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
inReadLock(lock) {
unorderedAcls.flatMap { case (resource, versionedAcls) =>
versionedAcls.acls.map(acl => new AclBinding(resource, acl.ace))
.filter(filter.matches)
}.asJava
val aclBindings = new util.ArrayList[AclBinding]()
unorderedAcls.foreach { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding))
aclBindings.add(binding)
}
}
aclBindings
}
}

Expand Down
4 changes: 4 additions & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>

<Match>
<!-- Suppress warnings related to jmh generated code -->
<Package name="org.apache.kafka.jmh.acl.generated"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.jmh.acl;

import kafka.security.authorizer.AclAuthorizer;
import kafka.security.authorizer.AclAuthorizer.VersionedAcls;
import kafka.security.authorizer.AclEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.collection.JavaConverters;
import scala.collection.immutable.TreeMap;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)

public class AclAuthorizerBenchmark {
@Param({"5000", "10000", "50000"})
private int resourceCount;
//no. of. rules per resource
@Param({"5", "10", "15"})
private int aclCount;

private AclAuthorizer aclAuthorizer = new AclAuthorizer();
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");

@Setup(Level.Trial)
public void setup() throws Exception {
setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(),
prepareAclCache());
}

private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(obj, value);
}

private TreeMap<ResourcePattern, VersionedAcls> prepareAclCache() {
Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
for (int resourceId = 0; resourceId < resourceCount; resourceId++) {

ResourcePattern resource = new ResourcePattern(
(resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC,
"resource-" + resourceId,
(resourceId % 5 == 0) ? PatternType.PREFIXED : PatternType.LITERAL);

Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());

for (int aclId = 0; aclId < aclCount; aclId++) {
AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId,
"*", AclOperation.READ, AclPermissionType.ALLOW);
entries.add(new AclEntry(ace));
}
}

TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering());
for (Map.Entry<ResourcePattern, Set<AclEntry>> entry : aclEntries.entrySet()) {
aclCache = aclCache.updated(entry.getKey(),
new VersionedAcls(JavaConverters.asScalaSetConverter(entry.getValue()).asScala().toSet(), 1));
}

return aclCache;
}

@TearDown(Level.Trial)
public void tearDown() {
aclAuthorizer.close();
}

@Benchmark
public void testAclsIterator() {
aclAuthorizer.acls(AclBindingFilter.ANY);
}
}

0 comments on commit 8dff0b1

Please sign in to comment.