Skip to content

Commit

Permalink
[improve][function] Support Record<?> as Function output type
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Jun 13, 2022
1 parent 36690f5 commit eeaa729
Show file tree
Hide file tree
Showing 12 changed files with 481 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.utils.FunctionRecord;

/**
* Context provides contextual information to the executing function.
Expand Down Expand Up @@ -161,4 +162,11 @@ public interface Context extends BaseContext {
* @throws PulsarClientException
*/
<X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;

/**
* Create a FunctionRecordBuilder.
*
* @return the record builder instance
*/
<X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.utils;

import java.util.Map;
import java.util.Optional;
import lombok.Builder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;

@Builder(builderMethodName = "")
public class FunctionRecord<T> implements Record<T> {

private final T value;
private final String topicName;
private final String destinationTopic;
private final Map<String, String> properties;
private final String key;
private final Schema<T> schema;
private final Long eventTime;
private final String partitionId;
private final Integer partitionIndex;
private final Long recordSequence;

public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
Record<?> currentRecord = context.getCurrentRecord();
FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
.destinationTopic(context.getOutputTopic())
.properties(currentRecord.getProperties());
currentRecord.getTopicName().ifPresent(builder::topicName);
currentRecord.getKey().ifPresent(builder::key);
currentRecord.getEventTime().ifPresent(builder::eventTime);
currentRecord.getPartitionId().ifPresent(builder::partitionId);
currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
currentRecord.getRecordSequence().ifPresent(builder::recordSequence);

// TODO: add message

return builder;
}

@Override
public T getValue() {
return value;
}

@Override
public Optional<String> getTopicName() {
return Optional.ofNullable(topicName);
}

@Override
public Optional<String> getDestinationTopic() {
return Optional.ofNullable(destinationTopic);
}

@Override
public Map<String, String> getProperties() {
return properties;
}

@Override
public Optional<String> getKey() {
return Optional.ofNullable(key);
}

@Override
public Schema<T> getSchema() {
return schema;
}

@Override
public Optional<Long> getEventTime() {
return Optional.ofNullable(eventTime);
}

@Override
public Optional<String> getPartitionId() {
return Optional.ofNullable(partitionId);
}

@Override
public Optional<Integer> getPartitionIndex() {
return Optional.ofNullable(partitionIndex);
}

@Override
public Optional<Long> getRecordSequence() {
return Optional.ofNullable(recordSequence);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;

@EqualsAndHashCode
@ToString
public abstract class AbstractSinkRecord<T> implements Record<T> {

private final Record<?> sourceRecord;

protected AbstractSinkRecord(Record<?> sourceRecord) {
this.sourceRecord = sourceRecord;
}

public Record<?> getSourceRecord() {
return sourceRecord;
}

@Override
public Optional<String> getTopicName() {
return sourceRecord.getTopicName();
}

@Override
public void ack() {
sourceRecord.ack();
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void cumulativeAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.cumulativeAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void individualAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.individualAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

@Override
public void fail() {
sourceRecord.fail();
}

protected static <T> Schema<T> getRecordSchema(Record<T> record) {
if (record == null) {
return null;
}

if (record.getSchema() != null) {
// unwrap actual schema
Schema<T> schema = record.getSchema();
// AutoConsumeSchema is a special schema, that comes into play
// when the Sink is going to handle any Schema
// usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
if (schema instanceof AutoConsumeSchema) {
// extract the Schema from the message, this is the most accurate schema we have
// see PIP-85
if (record.getMessage().isPresent()
&& record.getMessage().get().getReaderSchema().isPresent()) {
schema = (Schema<T>) record.getMessage().get().getReaderSchema().get();
} else {
schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
}
}
return schema;
}

if (record instanceof KVRecord) {
KVRecord kvRecord = (KVRecord) record;
return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
kvRecord.getKeyValueEncodingType());
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
Expand Down Expand Up @@ -481,6 +482,11 @@ public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws Pulsar
return this.client.newConsumer(schema);
}

@Override
public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
return FunctionRecord.from(this);
}

@Override
public SubscriptionType getSubscriptionType() {
return subscriptionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,14 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
AbstractSinkRecord<?> sinkRecord;
if (output instanceof Record) {
sinkRecord = new TargetSinkRecord<>(srcRecord, (Record) output);
} else {
sinkRecord = new SinkRecord<>(srcRecord, output);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
this.sink.write(sinkRecord);
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
Expand Down
Loading

0 comments on commit eeaa729

Please sign in to comment.