Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Make producer cache bounded and expiring in Functions/Connectors #22945

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
Expand Down Expand Up @@ -85,6 +84,7 @@
/**
* This class implements the Context interface exposed to the user.
*/
@Slf4j
@ToString(exclude = {"pulsarAdmin"})
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
private final ProducerBuilderFactory producerBuilderFactory;
Expand All @@ -98,8 +98,6 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final ClientBuilder clientBuilder;
private final PulsarClient client;
private final PulsarAdmin pulsarAdmin;
private Map<String, Producer<?>> publishProducers;
private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;

private final TopicSchema topicSchema;

Expand Down Expand Up @@ -139,12 +137,15 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private final java.util.function.Consumer<Throwable> fatalHandler;

private final ProducerCache producerCache;
private final boolean useThreadLocalProducers;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder,
java.util.function.Consumer<Throwable> fatalHandler) {
java.util.function.Consumer<Throwable> fatalHandler, ProducerCache producerCache) {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
Expand All @@ -154,14 +155,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.statsManager = statsManager;
this.fatalHandler = fatalHandler;

boolean useThreadLocalProducers = false;
this.producerCache = producerCache;

Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
ProducerConfig producerConfig = null;
if (producerSpec != null) {
producerConfig = FunctionConfigUtils.convertProducerSpecToProducerConfig(producerSpec);
useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
} else {
useThreadLocalProducers = false;
}

producerBuilderFactory = new ProducerBuilderFactory(client, producerConfig,
Thread.currentThread().getContextClassLoader(),
// This is for backwards compatibility. The PR https://github.com/apache/pulsar/pull/19470 removed
Expand All @@ -175,12 +179,6 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.config.getFunctionDetails().getName()),
this.config.getInstanceId()));

if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
} else {
publishProducers = new ConcurrentHashMap<>();
}

if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
} else {
Expand Down Expand Up @@ -543,39 +541,15 @@ public void fatal(Throwable t) {
}

private <T> Producer<T> getProducer(String topicName, Schema<T> schema) throws PulsarClientException {
Producer<T> producer;
if (tlPublishProducers != null) {
Map<String, Producer<?>> producerMap = tlPublishProducers.get();
if (producerMap == null) {
producerMap = new HashMap<>();
tlPublishProducers.set(producerMap);
}
producer = (Producer<T>) producerMap.get(topicName);
} else {
producer = (Producer<T>) publishProducers.get(topicName);
}

if (producer == null) {
Producer<T> newProducer = producerBuilderFactory
.createProducerBuilder(topicName, schema, null)
.properties(producerProperties)
.create();

if (tlPublishProducers != null) {
tlPublishProducers.get().put(topicName, newProducer);
} else {
Producer<T> existingProducer = (Producer<T>) publishProducers.putIfAbsent(topicName, newProducer);

if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
newProducer.close();
producer = existingProducer;
} else {
producer = newProducer;
}
}
}
return producer;
Long additionalCacheKey = useThreadLocalProducers ? Thread.currentThread().getId() : null;
return producerCache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE,
topicName, additionalCacheKey, () -> {
log.info("Initializing producer on topic {} with schema {}", topicName, schema);
return producerBuilderFactory
.createProducerBuilder(topicName, schema, null)
.properties(producerProperties)
.create();
shibd marked this conversation as resolved.
Show resolved Hide resolved
});
}

public Map<String, Double> getAndResetMetrics() {
Expand Down Expand Up @@ -714,29 +688,9 @@ public void setUnderlyingBuilder(TypedMessageBuilder<T> underlyingBuilder) {

@Override
public void close() {
List<CompletableFuture> futures = new LinkedList<>();

if (publishProducers != null) {
for (Producer<?> producer : publishProducers.values()) {
futures.add(producer.closeAsync());
}
}

if (tlPublishProducers != null) {
for (Producer<?> producer : tlPublishProducers.get().values()) {
futures.add(producer.closeAsync());
}
}

if (pulsarAdmin != null) {
pulsarAdmin.close();
}

try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failed to close producers", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final AtomicReference<Schema<?>> sinkSchema = new AtomicReference<>();
private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;

private final ProducerCache producerCache = new ProducerCache();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
Expand Down Expand Up @@ -292,7 +294,7 @@ ContextImpl setupContext() throws PulsarClientException {
Thread.currentThread().setContextClassLoader(functionClassLoader);
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler);
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} finally {
Thread.currentThread().setContextClassLoader(clsLoader);
}
Expand Down Expand Up @@ -607,6 +609,8 @@ public synchronized void close() {

instanceCache = null;

producerCache.close();

if (logAppender != null) {
removeLogTopicAppender(LoggerContext.getContext());
removeLogTopicAppender(LoggerContext.getContext(false));
Expand Down Expand Up @@ -1050,7 +1054,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
}

object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats,
this.functionClassLoader);
this.functionClassLoader, this.producerCache);
}
} else {
object = Reflections.createInstance(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class ProducerCache implements Closeable {
// allow tuning the cache timeout with PRODUCER_CACHE_TIMEOUT_SECONDS env variable
private static final int PRODUCER_CACHE_TIMEOUT_SECONDS =
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_TIMEOUT_SECONDS", "300"));
lhotari marked this conversation as resolved.
Show resolved Hide resolved
// allow tuning the cache size with PRODUCER_CACHE_MAX_SIZE env variable
private static final int PRODUCER_CACHE_MAX_SIZE =
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_MAX_SIZE", "10000"));
private static final int FLUSH_OR_CLOSE_TIMEOUT_SECONDS = 60;

// prevents the different producers created in different code locations from mixing up
public enum CacheArea {
// producers created by calling Context, SinkContext, SourceContext methods
CONTEXT_CACHE,
// producers created in Pulsar Sources, multiple topics are possible by returning destination topics
// by SinkRecord.getDestinationTopic call
SINK_RECORD_CACHE,
}

record ProducerCacheKey(CacheArea cacheArea, String topic, Object additionalKey) {
}

private final Cache<ProducerCacheKey, Producer<?>> cache;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>();

public ProducerCache() {
Caffeine<ProducerCacheKey, Producer> builder = Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
.<ProducerCacheKey, Producer>removalListener((key, producer, cause) -> {
log.info("Closing producer for topic {}, cause {}", key.topic(), cause);
CompletableFuture closeFuture =
producer.flushAsync()
.orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error flushing producer for topic {}", key.topic(), ex);
return null;
}).thenCompose(__ ->
producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS,
TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error closing producer for topic {}", key.topic(),
ex);
return null;
}));
if (closed.get()) {
closeFutures.add(closeFuture);
}
})
.weigher((key, producer) -> Math.max(producer.getNumOfPartitions(), 1))
.maximumWeight(PRODUCER_CACHE_MAX_SIZE);
if (PRODUCER_CACHE_TIMEOUT_SECONDS > 0) {
builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS));
}
cache = builder.build();
}

public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String topicName, Object additionalCacheKey,
Callable<Producer<T>> supplier) {
if (closed.get()) {
throw new IllegalStateException("ProducerCache is already closed");
}
return (Producer<T>) cache.get(new ProducerCacheKey(cacheArea, topicName, additionalCacheKey), key -> {
try {
return supplier.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Unable to create producer for topic '" + topicName + "'", e);
}
});
}

public void close() {
if (closed.compareAndSet(false, true)) {
cache.invalidateAll();
try {
FutureUtil.waitForAll(closeFutures).get();
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to close producers", e);
}
}
}

@VisibleForTesting
public boolean containsKey(CacheArea cacheArea, String topic) {
return containsKey(cacheArea, topic, null);
}

@VisibleForTesting
public boolean containsKey(CacheArea cacheArea, String topic, Object additionalCacheKey) {
return cache.getIfPresent(new ProducerCacheKey(cacheArea, topic, additionalCacheKey)) != null;
}
}
Loading
Loading