Skip to content

Commit

Permalink
fix: make sure use of non threadsafe UdfIndex is synchronized (#3486)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Oct 7, 2019
1 parent a1096bf commit 618aae8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class UdfFactory {
this.udfIndex = new UdfIndex<>(metadata.getName());
}

void addFunction(final KsqlFunction ksqlFunction) {
synchronized void addFunction(final KsqlFunction ksqlFunction) {
checkCompatible(ksqlFunction);
udfIndex.addFunction(ksqlFunction);
}
Expand Down Expand Up @@ -69,7 +69,7 @@ public String getDescription() {
return metadata.getDescription();
}

public void eachFunction(final Consumer<KsqlFunction> consumer) {
public synchronized void eachFunction(final Consumer<KsqlFunction> consumer) {
udfIndex.values().forEach(consumer);
}

Expand All @@ -95,7 +95,7 @@ public String toString() {
+ '}';
}

public KsqlFunction getFunction(final List<Schema> paramTypes) {
public synchronized KsqlFunction getFunction(final List<Schema> paramTypes) {
return udfIndex.getFunction(paramTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class UdafAggregateFunctionFactory extends AggregateFunctionFactory {
}

@Override
public KsqlAggregateFunction<?, ?, ?> getProperAggregateFunction(final List<Schema> argTypeList) {
public synchronized KsqlAggregateFunction<?, ?, ?> getProperAggregateFunction(
final List<Schema> argTypeList
) {
final KsqlAggregateFunction ksqlAggregateFunction = udfIndex.getFunction(argTypeList);
if (ksqlAggregateFunction == null) {
throw new KsqlException("There is no aggregate function with name='" + getName()
Expand All @@ -46,7 +48,7 @@ public class UdafAggregateFunctionFactory extends AggregateFunctionFactory {
}

@Override
public List<List<Schema>> supportedArgs() {
public synchronized List<List<Schema>> supportedArgs() {
return udfIndex.values()
.stream()
.map(KsqlAggregateFunction::getArguments)
Expand Down

0 comments on commit 618aae8

Please sign in to comment.