diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java index 96c820ff05a4..a19a9a716765 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java @@ -36,7 +36,7 @@ public class UdfFactory { this.udfIndex = new UdfIndex<>(metadata.getName()); } - void addFunction(final KsqlFunction ksqlFunction) { + synchronized void addFunction(final KsqlFunction ksqlFunction) { checkCompatible(ksqlFunction); udfIndex.addFunction(ksqlFunction); } @@ -69,7 +69,7 @@ public String getDescription() { return metadata.getDescription(); } - public void eachFunction(final Consumer consumer) { + public synchronized void eachFunction(final Consumer consumer) { udfIndex.values().forEach(consumer); } @@ -95,7 +95,7 @@ public String toString() { + '}'; } - public KsqlFunction getFunction(final List paramTypes) { + public synchronized KsqlFunction getFunction(final List paramTypes) { return udfIndex.getFunction(paramTypes); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java index 9af44e083cb6..b0c93f7ebf8f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java @@ -34,7 +34,9 @@ public class UdafAggregateFunctionFactory extends AggregateFunctionFactory { } @Override - public KsqlAggregateFunction getProperAggregateFunction(final List argTypeList) { + public synchronized KsqlAggregateFunction getProperAggregateFunction( + final List argTypeList + ) { final KsqlAggregateFunction ksqlAggregateFunction = udfIndex.getFunction(argTypeList); if (ksqlAggregateFunction == null) { throw new KsqlException("There is no aggregate function with name='" + getName() @@ -46,7 +48,7 @@ public class UdafAggregateFunctionFactory extends AggregateFunctionFactory { } @Override - public List> supportedArgs() { + public synchronized List> supportedArgs() { return udfIndex.values() .stream() .map(KsqlAggregateFunction::getArguments)