Skip to content

Commit

Permalink
[improve][function] Support Record<?> as Function output type (#16041)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Jun 17, 2022
1 parent ef56656 commit b3c5191
Show file tree
Hide file tree
Showing 14 changed files with 643 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.utils.FunctionRecord;

/**
* Context provides contextual information to the executing function.
Expand Down Expand Up @@ -161,4 +162,13 @@ public interface Context extends BaseContext {
* @throws PulsarClientException
*/
<X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;

/**
* Creates a FunctionRecordBuilder initialized with values from this Context.
* It can be used in Functions to prepare a Record to return with default values taken from the Context and the
* input Record.
*
* @return the record builder instance
*/
<X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.utils;

import java.util.Map;
import java.util.Optional;
import lombok.Builder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;

@Builder(builderMethodName = "")
public class FunctionRecord<T> implements Record<T> {

private final T value;
private final String topicName;
private final String destinationTopic;
private final Map<String, String> properties;
private final String key;
private final Schema<T> schema;
private final Long eventTime;
private final String partitionId;
private final Integer partitionIndex;
private final Long recordSequence;

/**
* Creates a builder for a Record from a Function Context.
* The builder is initialized with the output topic from the Context and with the topicName, key, eventTime,
* properties, partitionId, partitionIndex and recordSequence from the Context input Record.
* It doesn't initialize a Message at the moment.
*
* @param context a Function Context
* @param <T> type of Record to build
* @return a Record builder initialised with values from the Function Context
*/
public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
Record<?> currentRecord = context.getCurrentRecord();
FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
.destinationTopic(context.getOutputTopic())
.properties(currentRecord.getProperties());
currentRecord.getTopicName().ifPresent(builder::topicName);
currentRecord.getKey().ifPresent(builder::key);
currentRecord.getEventTime().ifPresent(builder::eventTime);
currentRecord.getPartitionId().ifPresent(builder::partitionId);
currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
currentRecord.getRecordSequence().ifPresent(builder::recordSequence);

return builder;
}

@Override
public T getValue() {
return value;
}

@Override
public Optional<String> getTopicName() {
return Optional.ofNullable(topicName);
}

@Override
public Optional<String> getDestinationTopic() {
return Optional.ofNullable(destinationTopic);
}

@Override
public Map<String, String> getProperties() {
return properties;
}

@Override
public Optional<String> getKey() {
return Optional.ofNullable(key);
}

@Override
public Schema<T> getSchema() {
return schema;
}

@Override
public Optional<Long> getEventTime() {
return Optional.ofNullable(eventTime);
}

@Override
public Optional<String> getPartitionId() {
return Optional.ofNullable(partitionId);
}

@Override
public Optional<Integer> getPartitionIndex() {
return Optional.ofNullable(partitionIndex);
}

@Override
public Optional<Long> getRecordSequence() {
return Optional.ofNullable(recordSequence);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;

@EqualsAndHashCode
@ToString
public abstract class AbstractSinkRecord<T> implements Record<T> {

private final Record<?> sourceRecord;

protected AbstractSinkRecord(Record<?> sourceRecord) {
this.sourceRecord = sourceRecord;
}

public abstract boolean shouldAlwaysSetMessageProperties();

public Record<?> getSourceRecord() {
return sourceRecord;
}

@Override
public Optional<String> getTopicName() {
return sourceRecord.getTopicName();
}

@Override
public void ack() {
sourceRecord.ack();
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void cumulativeAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.cumulativeAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void individualAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.individualAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

@Override
public void fail() {
sourceRecord.fail();
}

protected static <T> Schema<T> getRecordSchema(Record<T> record) {
if (record == null) {
return null;
}

if (record.getSchema() != null) {
// unwrap actual schema
Schema<T> schema = record.getSchema();
// AutoConsumeSchema is a special schema, that comes into play
// when the Sink is going to handle any Schema
// usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
if (schema instanceof AutoConsumeSchema) {
// extract the Schema from the message, this is the most accurate schema we have
// see PIP-85
if (record.getMessage().isPresent()
&& record.getMessage().get().getReaderSchema().isPresent()) {
schema = (Schema<T>) record.getMessage().get().getReaderSchema().get();
} else {
schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
}
}
return schema;
}

if (record instanceof KVRecord) {
KVRecord kvRecord = (KVRecord) record;
return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
kvRecord.getKeyValueEncodingType());
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
Expand Down Expand Up @@ -481,6 +482,11 @@ public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws Pulsar
return this.client.newConsumer(schema);
}

@Override
public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
return FunctionRecord.from(this);
}

@Override
public SubscriptionType getSubscriptionType() {
return subscriptionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,14 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
AbstractSinkRecord<?> sinkRecord;
if (output instanceof Record) {
sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
} else {
sinkRecord = new SinkRecord<>(srcRecord, output);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
this.sink.write(sinkRecord);
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import java.util.Map;
import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Record;

@EqualsAndHashCode(callSuper = true)
@ToString
class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {

private final Record<T> sinkRecord;

OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord) {
super(sourceRecord);
this.sinkRecord = sinkRecord;
}

@Override
public Optional<String> getKey() {
return sinkRecord.getKey();
}

@Override
public T getValue() {
return sinkRecord.getValue();
}

@Override
public Optional<String> getPartitionId() {
return sinkRecord.getPartitionId();
}

@Override
public Optional<Integer> getPartitionIndex() {
return sinkRecord.getPartitionIndex();
}

@Override
public Optional<Long> getRecordSequence() {
return sinkRecord.getRecordSequence();
}

@Override
public Map<String, String> getProperties() {
return sinkRecord.getProperties();
}

@Override
public Optional<String> getDestinationTopic() {
return sinkRecord.getDestinationTopic();
}

@Override
public Schema<T> getSchema() {
return getRecordSchema(sinkRecord);
}

@Override
public Optional<Long> getEventTime() {
return sinkRecord.getEventTime();
}

@Override
public Optional<Message<T>> getMessage() {
return sinkRecord.getMessage();
}

@Override
public boolean shouldAlwaysSetMessageProperties() {
return true;
}
}
Loading

0 comments on commit b3c5191

Please sign in to comment.