From 7e7a95b67b8c07751f3aa24646fdedb803e2bed5 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Sat, 18 Jun 2022 16:50:16 +0200 Subject: [PATCH] [improve][function] Support Record as Function output type (#16041) --- .../apache/pulsar/functions/api/Context.java | 10 ++ .../functions/api/utils/FunctionRecord.java | 117 ++++++++++++++++++ .../instance/AbstractSinkRecord.java | 94 ++++++++++++++ .../functions/instance/ContextImpl.java | 6 + .../instance/JavaInstanceRunnable.java | 8 +- .../instance/OutputRecordSinkRecord.java | 94 ++++++++++++++ .../pulsar/functions/instance/SinkRecord.java | 75 +++-------- .../pulsar/functions/sink/PulsarSink.java | 27 ++-- .../functions/instance/ContextImplTest.java | 70 ++++++++++- .../api/examples/RecordFunction.java | 43 +++++++ .../functions/utils/FunctionCommon.java | 13 ++ .../functions/utils/FunctionCommonTest.java | 69 +++++++++++ .../functions/PulsarFunctionsTest.java | 61 +++++++++ .../java/PulsarFunctionsJavaTest.java | 9 +- 14 files changed, 620 insertions(+), 76 deletions(-) create mode 100644 pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java create mode 100644 pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java create mode 100644 pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java create mode 100644 pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 1a2175eefd7b1..9532ffbd5f59a 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.functions.api.utils.FunctionRecord; /** * Context provides contextual information to the executing function. @@ -162,4 +163,13 @@ public interface Context extends BaseContext { * @throws PulsarClientException */ ConsumerBuilder newConsumerBuilder(Schema schema) throws PulsarClientException; + + /** + * Creates a FunctionRecordBuilder initialized with values from this Context. + * It can be used in Functions to prepare a Record to return with default values taken from the Context and the + * input Record. + * + * @return the record builder instance + */ + FunctionRecord.FunctionRecordBuilder newOutputRecordBuilder(); } diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java new file mode 100644 index 0000000000000..be204a7bc213c --- /dev/null +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.utils; + +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Record; + +@Builder(builderMethodName = "") +public class FunctionRecord implements Record { + + private final T value; + private final String topicName; + private final String destinationTopic; + private final Map properties; + private final String key; + private final Schema schema; + private final Long eventTime; + private final String partitionId; + private final Integer partitionIndex; + private final Long recordSequence; + + /** + * Creates a builder for a Record from a Function Context. + * The builder is initialized with the output topic from the Context and with the topicName, key, eventTime, + * properties, partitionId, partitionIndex and recordSequence from the Context input Record. + * It doesn't initialize a Message at the moment. + * + * @param context a Function Context + * @param type of Record to build + * @return a Record builder initialised with values from the Function Context + */ + public static FunctionRecord.FunctionRecordBuilder from(Context context) { + Record currentRecord = context.getCurrentRecord(); + FunctionRecordBuilder builder = new FunctionRecordBuilder() + .destinationTopic(context.getOutputTopic()) + .properties(currentRecord.getProperties()); + currentRecord.getTopicName().ifPresent(builder::topicName); + currentRecord.getKey().ifPresent(builder::key); + currentRecord.getEventTime().ifPresent(builder::eventTime); + currentRecord.getPartitionId().ifPresent(builder::partitionId); + currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex); + currentRecord.getRecordSequence().ifPresent(builder::recordSequence); + + return builder; + } + + @Override + public T getValue() { + return value; + } + + @Override + public Optional getTopicName() { + return Optional.ofNullable(topicName); + } + + @Override + public Optional getDestinationTopic() { + return Optional.ofNullable(destinationTopic); + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public Optional getKey() { + return Optional.ofNullable(key); + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public Optional getEventTime() { + return Optional.ofNullable(eventTime); + } + + @Override + public Optional getPartitionId() { + return Optional.ofNullable(partitionId); + } + + @Override + public Optional getPartitionIndex() { + return Optional.ofNullable(partitionIndex); + } + + @Override + public Optional getRecordSequence() { + return Optional.ofNullable(recordSequence); + } + +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java new file mode 100644 index 0000000000000..df0db9bce684b --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import java.util.Optional; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.functions.api.KVRecord; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.source.PulsarRecord; + +@EqualsAndHashCode +@ToString +public abstract class AbstractSinkRecord implements Record { + + private final Record sourceRecord; + + protected AbstractSinkRecord(Record sourceRecord) { + this.sourceRecord = sourceRecord; + } + + public abstract boolean shouldAlwaysSetMessageProperties(); + + public Record getSourceRecord() { + return sourceRecord; + } + + @Override + public Optional getTopicName() { + return sourceRecord.getTopicName(); + } + + @Override + public void ack() { + sourceRecord.ack(); + } + + @Override + public void fail() { + sourceRecord.fail(); + } + + protected static Schema getRecordSchema(Record record) { + if (record == null) { + return null; + } + + if (record.getSchema() != null) { + // unwrap actual schema + Schema schema = record.getSchema(); + // AutoConsumeSchema is a special schema, that comes into play + // when the Sink is going to handle any Schema + // usually you see Sink or Sink in this case + if (schema instanceof AutoConsumeSchema) { + // extract the Schema from the message, this is the most accurate schema we have + // see PIP-85 + if (record.getMessage().isPresent() + && record.getMessage().get().getReaderSchema().isPresent()) { + schema = (Schema) record.getMessage().get().getReaderSchema().get(); + } else { + schema = (Schema) ((AutoConsumeSchema) schema).getInternalSchema(); + } + } + return schema; + } + + if (record instanceof KVRecord) { + KVRecord kvRecord = (KVRecord) record; + return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(), + kvRecord.getKeyValueEncodingType()); + } + + return null; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 35fffc64028e7..d0ac52bd27c19 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -60,6 +60,7 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.StateStore; +import org.apache.pulsar.functions.api.utils.FunctionRecord; import org.apache.pulsar.functions.instance.state.DefaultStateStore; import org.apache.pulsar.functions.instance.state.StateManager; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; @@ -470,6 +471,11 @@ public ConsumerBuilder newConsumerBuilder(Schema schema) throws Pulsar return this.client.newConsumer(schema); } + @Override + public FunctionRecord.FunctionRecordBuilder newOutputRecordBuilder() { + return FunctionRecord.from(this); + } + @Override public SubscriptionType getSubscriptionType() { return subscriptionType; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 462fdd144db29..c7eba086f10fd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -376,8 +376,14 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) { Thread.currentThread().setContextClassLoader(functionClassLoader); } + AbstractSinkRecord sinkRecord; + if (output instanceof Record) { + sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output); + } else { + sinkRecord = new SinkRecord<>(srcRecord, output); + } try { - this.sink.write(new SinkRecord<>(srcRecord, output)); + this.sink.write(sinkRecord); } catch (Exception e) { log.info("Encountered exception in sink write: ", e); stats.incrSinkExceptions(e); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java new file mode 100644 index 0000000000000..6220517414dd6 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import java.util.Map; +import java.util.Optional; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.functions.api.Record; + +@EqualsAndHashCode(callSuper = true) +@ToString +class OutputRecordSinkRecord extends AbstractSinkRecord { + + private final Record sinkRecord; + + OutputRecordSinkRecord(Record sourceRecord, Record sinkRecord) { + super(sourceRecord); + this.sinkRecord = sinkRecord; + } + + @Override + public Optional getKey() { + return sinkRecord.getKey(); + } + + @Override + public T getValue() { + return sinkRecord.getValue(); + } + + @Override + public Optional getPartitionId() { + return sinkRecord.getPartitionId(); + } + + @Override + public Optional getPartitionIndex() { + return sinkRecord.getPartitionIndex(); + } + + @Override + public Optional getRecordSequence() { + return sinkRecord.getRecordSequence(); + } + + @Override + public Map getProperties() { + return sinkRecord.getProperties(); + } + + @Override + public Optional getDestinationTopic() { + return sinkRecord.getDestinationTopic(); + } + + @Override + public Schema getSchema() { + return getRecordSchema(sinkRecord); + } + + @Override + public Optional getEventTime() { + return sinkRecord.getEventTime(); + } + + @Override + public Optional> getMessage() { + return sinkRecord.getMessage(); + } + + @Override + public boolean shouldAlwaysSetMessageProperties() { + return true; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java index d0ec4d686afbc..8f64ed2ce0964 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java @@ -20,31 +20,22 @@ import java.util.Map; import java.util.Optional; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; +import lombok.EqualsAndHashCode; +import lombok.ToString; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; -import org.apache.pulsar.functions.api.KVRecord; import org.apache.pulsar.functions.api.Record; -@Slf4j -@Data -@AllArgsConstructor -public class SinkRecord implements Record { - +@EqualsAndHashCode(callSuper = true) +@ToString +public class SinkRecord extends AbstractSinkRecord { private final Record sourceRecord; private final T value; - public Record getSourceRecord() { - return sourceRecord; - } - - @Override - public Optional getTopicName() { - return sourceRecord.getTopicName(); + public SinkRecord(Record sourceRecord, T value) { + super(sourceRecord); + this.sourceRecord = sourceRecord; + this.value = value; } @Override @@ -72,21 +63,11 @@ public Optional getRecordSequence() { return sourceRecord.getRecordSequence(); } - @Override + @Override public Map getProperties() { return sourceRecord.getProperties(); } - @Override - public void ack() { - sourceRecord.ack(); - } - - @Override - public void fail() { - sourceRecord.fail(); - } - @Override public Optional getDestinationTopic() { return sourceRecord.getDestinationTopic(); @@ -94,36 +75,7 @@ public Optional getDestinationTopic() { @Override public Schema getSchema() { - if (sourceRecord == null) { - return null; - } - - if (sourceRecord.getSchema() != null) { - // unwrap actual schema - Schema schema = sourceRecord.getSchema(); - // AutoConsumeSchema is a special schema, that comes into play - // when the Sink is going to handle any Schema - // usually you see Sink or Sink in this case - if (schema instanceof AutoConsumeSchema) { - // extract the Schema from the message, this is the most accurate schema we have - // see PIP-85 - if (sourceRecord.getMessage().isPresent() - && sourceRecord.getMessage().get().getReaderSchema().isPresent()) { - schema = (Schema) sourceRecord.getMessage().get().getReaderSchema().get(); - } else { - schema = (Schema) ((AutoConsumeSchema) schema).getInternalSchema(); - } - } - return schema; - } - - if (sourceRecord instanceof KVRecord) { - KVRecord kvRecord = (KVRecord) sourceRecord; - return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(), - kvRecord.getKeyValueEncodingType()); - } - - return null; + return getRecordSchema(sourceRecord); } @Override @@ -135,4 +87,9 @@ public Optional getEventTime() { public Optional> getMessage() { return sourceRecord.getMessage(); } + + @Override + public boolean shouldAlwaysSetMessageProperties() { + return false; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 5bc43a23dedcc..0337fc69cefda 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -59,8 +59,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.instance.AbstractSinkRecord; import org.apache.pulsar.functions.instance.FunctionResultRouter; -import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; @@ -85,9 +85,9 @@ public class PulsarSink implements Sink { private interface PulsarSinkProcessor { - TypedMessageBuilder newMessage(SinkRecord record); + TypedMessageBuilder newMessage(AbstractSinkRecord record); - void sendOutputMessage(TypedMessageBuilder msg, SinkRecord record); + void sendOutputMessage(TypedMessageBuilder msg, AbstractSinkRecord record); void close() throws Exception; } @@ -183,17 +183,17 @@ public void close() throws Exception { } } - public Function getPublishErrorHandler(SinkRecord record, boolean failSource) { + public Function getPublishErrorHandler(AbstractSinkRecord record, boolean failSource) { return throwable -> { - Record srcRecord = record.getSourceRecord(); + Record srcRecord = record.getSourceRecord(); if (failSource) { srcRecord.fail(); } String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()); - String errorMsg = null; + String errorMsg; if (srcRecord instanceof PulsarRecord) { errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId()); } else { @@ -231,7 +231,7 @@ public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) { } @Override - public TypedMessageBuilder newMessage(SinkRecord record) { + public TypedMessageBuilder newMessage(AbstractSinkRecord record) { Schema schemaToWrite = record.getSchema(); if (record.getSourceRecord() instanceof PulsarRecord) { // we are receiving data directly from another Pulsar topic @@ -253,7 +253,7 @@ public TypedMessageBuilder newMessage(SinkRecord record) { } @Override - public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord record) { + public void sendOutputMessage(TypedMessageBuilder msg, AbstractSinkRecord record) { msg.sendAsync().thenAccept(messageId -> { //no op }).exceptionally(getPublishErrorHandler(record, false)); @@ -267,7 +267,7 @@ public PulsarSinkAtLeastOnceProcessor(Schema schema, Crypto crypto) { } @Override - public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord record) { + public void sendOutputMessage(TypedMessageBuilder msg, AbstractSinkRecord record) { msg.sendAsync() .thenAccept(messageId -> record.ack()) .exceptionally(getPublishErrorHandler(record, true)); @@ -282,7 +282,7 @@ public PulsarSinkEffectivelyOnceProcessor(Schema schema, Crypto crypto) { } @Override - public TypedMessageBuilder newMessage(SinkRecord record) { + public TypedMessageBuilder newMessage(AbstractSinkRecord record) { if (!record.getPartitionId().isPresent()) { throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode"); } @@ -306,7 +306,7 @@ public TypedMessageBuilder newMessage(SinkRecord record) { } @Override - public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord record) { + public void sendOutputMessage(TypedMessageBuilder msg, AbstractSinkRecord record) { if (!record.getRecordSequence().isPresent()) { throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode"); @@ -361,7 +361,7 @@ public void open(Map config, SinkContext sinkContext) throws Exc @Override public void write(Record record) { - SinkRecord sinkRecord = (SinkRecord) record; + AbstractSinkRecord sinkRecord = (AbstractSinkRecord) record; TypedMessageBuilder msg = pulsarSinkProcessor.newMessage(sinkRecord); if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema && @@ -371,7 +371,8 @@ public void write(Record record) { msg.value(record.getValue()); - if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) { + if (!record.getProperties().isEmpty() + && (sinkRecord.shouldAlwaysSetMessageProperties() || pulsarSinkConfig.isForwardSourceMessageProperty())) { msg.properties(record.getProperties()); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 49bc4161c308b..b2520aa4df4ed 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -50,7 +50,9 @@ import org.testng.annotations.Test; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -66,6 +68,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; /** * Unit test {@link ContextImpl}. @@ -340,4 +344,68 @@ FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), } } - } + + @Test + public void testNewOutputRecordBuilder() { + Map properties = new HashMap<>(); + properties.put("prop-key", "prop-value"); + long now = System.currentTimeMillis(); + context.setCurrentMessageContext(new Record() { + @Override + public Optional getTopicName() { + return Optional.of("input-topic"); + } + + @Override + public Optional getKey() { + return Optional.of("input-key"); + } + + @Override + public Schema getSchema() { + return Schema.STRING; + } + + @Override + public String getValue() { + return "input-value"; + } + + @Override + public Optional getEventTime() { + return Optional.of(now); + } + + @Override + public Optional getPartitionId() { + return Optional.of("input-partition-id"); + } + + @Override + public Optional getPartitionIndex() { + return Optional.of(42); + } + + @Override + public Optional getRecordSequence() { + return Optional.of(43L); + } + + @Override + public Map getProperties() { + return properties; + } + }); + Record record = context.newOutputRecordBuilder().build(); + assertEquals(record.getTopicName().get(), "input-topic"); + assertEquals(record.getKey().get(), "input-key"); + assertEquals(record.getEventTime(), Optional.of(now)); + assertEquals(record.getPartitionId().get(), "input-partition-id"); + assertEquals(record.getPartitionIndex(), Optional.of(42)); + assertEquals(record.getRecordSequence(), Optional.of(43L)); + assertTrue(record.getProperties().containsKey("prop-key")); + assertEquals(record.getProperties().get("prop-key"), "prop-value"); + assertNull(record.getValue()); + assertNull(record.getSchema()); + } +} diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java new file mode 100644 index 0000000000000..028bccae5fcdd --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; + +public class RecordFunction implements Function> { + + @Override + public Record process(String input, Context context) throws Exception { + String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic"); + String output = String.format("%s!", input); + + Map properties = new HashMap<>(context.getCurrentRecord().getProperties()); + context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic)); + + return context.newOutputRecordBuilder() + .destinationTopic(publishTopic) + .value(output) + .properties(properties) + .build(); + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 333877490102a..cdb77fcfef38c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -55,6 +55,7 @@ import org.apache.pulsar.common.nar.NarClassLoaderBuilder; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.WindowFunction; import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime; import org.apache.pulsar.functions.utils.io.ConnectorUtils; @@ -121,8 +122,20 @@ public static Class[] getFunctionTypes(Class userClass, boolean isWindowConfi } else { if (Function.class.isAssignableFrom(userClass)) { typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass); + if (typeArgs[1].equals(Record.class)) { + Type type = TypeResolver.resolveGenericType(Function.class, userClass); + Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1]; + Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0]; + typeArgs[1] = (Class) actualInputType; + } } else { typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass); + if (typeArgs[1].equals(Record.class)) { + Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass); + Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1]; + Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0]; + typeArgs[1] = (Class) actualInputType; + } } } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java index 0e34e2997c571..3a1f69d4bfd6e 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java @@ -19,9 +19,16 @@ package org.apache.pulsar.functions.utils; +import java.util.Collection; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.WindowContext; +import org.apache.pulsar.functions.api.WindowFunction; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.File; @@ -101,4 +108,66 @@ public void testGetMessageId() { assertEquals(lid, id.getLedgerId()); assertEquals(eid, id.getEntryId()); } + + @DataProvider(name = "function") + public Object[][] functionProvider() { + return new Object[][] { + { + new Function() { + @Override + public Integer process(String input, Context context) throws Exception { + return null; + } + }, false + }, + { + new Function>() { + @Override + public Record process(String input, Context context) throws Exception { + return null; + } + }, false + }, + { + new java.util.function.Function() { + @Override + public Integer apply(String s) { + return null; + } + }, false + }, + { + new java.util.function.Function>() { + @Override + public Record apply(String s) { + return null; + } + }, false + }, + { + new WindowFunction() { + @Override + public Integer process(Collection> input, WindowContext context) throws Exception { + return null; + } + }, true + }, + { + new java.util.function.Function, Integer>() { + @Override + public Integer apply(Collection strings) { + return null; + } + }, true + } + }; + } + + @Test(dataProvider = "function") + public void testGetFunctionTypes(Object function, boolean isWindowConfigPresent) { + Class[] types = FunctionCommon.getFunctionTypes(function.getClass(), isWindowConfigPresent); + assertEquals(types.length, 2); + assertEquals(types[0], String.class); + assertEquals(types[1], Integer.class); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index f9b90b4d75b9d..1286ed362cf98 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -72,6 +72,7 @@ import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction; import org.apache.pulsar.functions.api.examples.MergeTopicFunction; import org.apache.pulsar.functions.api.examples.InitializableFunction; +import org.apache.pulsar.functions.api.examples.RecordFunction; import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject; import org.apache.pulsar.functions.api.examples.pojo.Users; import org.apache.pulsar.functions.api.examples.serde.CustomObject; @@ -1710,6 +1711,66 @@ protected void testGenericObjectFunction(String function, boolean removeAgeField getFunctionInfoNotFound(functionName); } + protected void testRecordFunction() throws Exception { + log.info("start RecordFunction function test ..."); + + String ns = "public/ns-recordfunction-" + randomName(8); + @Cleanup + PulsarAdmin pulsarAdmin = getPulsarAdmin(); + pulsarAdmin.namespaces().createNamespace(ns); + + @Cleanup + PulsarClient pulsarClient = getPulsarClient(); + + final int numMessages = 10; + final String inputTopic = ns + "/test-string-input-" + randomName(8); + final String outputTopic = ns + "/test-string-output-" + randomName(8); + @Cleanup + Consumer consumer = pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName("test") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic("publishtopic") + .subscribe(); + + final String functionName = "test-record-fn-" + randomName(8); + submitFunction( + Runtime.JAVA, + inputTopic, + outputTopic, + functionName, + null, + RecordFunction.class.getName(), + Schema.AUTO_CONSUME()); + try { + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.STRING) + .topic(inputTopic) + .create(); + for (int i = 0; i < numMessages; i++) { + producer.send("message" + i); + } + + getFunctionInfoSuccess(functionName); + + getFunctionStatus(functionName, numMessages, true); + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(30, TimeUnit.SECONDS); + log.info("Received: {}", msg.getValue()); + assertEquals(msg.getValue(), "message" + i + "!"); + assertEquals(msg.getProperty("input_topic"), "persistent://" + inputTopic); + } + } finally { + pulsarCluster.dumpFunctionLogs(functionName); + } + + deleteFunction(functionName); + + getFunctionInfoNotFound(functionName); + } + protected void testMergeFunction() throws Exception { log.info("start merge function test ..."); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java index 46cb15892a1eb..6e49621421988 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java @@ -165,8 +165,8 @@ public void testSlidingCountWindowTest() throws Exception { @Test(groups = {"java_function", "function"}) public void testMergeFunctionTest() throws Exception { - testMergeFunction(); - } + testMergeFunction(); + } @Test(groups = {"java_function", "function"}) public void testGenericObjectFunction() throws Exception { @@ -188,4 +188,9 @@ public void testGenericObjectRemoveFiledFunctionKeyValue() throws Exception { testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true); } + @Test(groups = {"java_function", "function"}) + public void testRecordFunctionTest() throws Exception { + testRecordFunction(); + } + }