Skip to content

Commit

Permalink
Reuse transformer in stream indexing (#9625)
Browse files Browse the repository at this point in the history
* Reuse transformer in stream indexing

* remove unused method

* memoize complied pattern
  • Loading branch information
jihoonson authored Apr 6, 2020
1 parent 7bf1ebb commit 82ce60b
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
Expand All @@ -29,12 +32,15 @@
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.regex.Pattern;

public class RegexInputFormat implements InputFormat
{
private final String pattern;
private final String listDelimiter;
private final List<String> columns;
@JsonIgnore
private final Supplier<Pattern> compiledPatternSupplier;

@JsonCreator
public RegexInputFormat(
Expand All @@ -46,6 +52,27 @@ public RegexInputFormat(
this.pattern = pattern;
this.listDelimiter = listDelimiter;
this.columns = columns;
this.compiledPatternSupplier = Suppliers.memoize(() -> Pattern.compile(pattern));
}

@JsonProperty
public String getPattern()
{
return pattern;
}

@Nullable
@JsonProperty
public String getListDelimiter()
{
return listDelimiter;
}

@Nullable
@JsonProperty
public List<String> getColumns()
{
return columns;
}

@Override
Expand All @@ -57,6 +84,6 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns);
return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class RegexReader extends TextReader
{
private final String pattern;
private final Pattern compiled;
private final Pattern compiledPattern;
private final Function<String, Object> multiValueFunction;

private List<String> columns;
Expand All @@ -51,13 +51,14 @@ public class RegexReader extends TextReader
InputRowSchema inputRowSchema,
InputEntity source,
String pattern,
Pattern compiledPattern,
@Nullable String listDelimiter,
@Nullable List<String> columns
)
{
super(inputRowSchema, source);
this.pattern = pattern;
this.compiled = Pattern.compile(pattern);
this.compiledPattern = compiledPattern;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = columns;
Expand All @@ -78,7 +79,7 @@ protected Map<String, Object> toMap(String intermediateRow)
private Map<String, Object> parseLine(String line)
{
try {
final Matcher matcher = compiled.matcher(line);
final Matcher matcher = compiledPattern.matcher(line);

if (!matcher.matches()) {
throw new ParseException("Incorrect Regex: %s . No match found.", pattern);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputFormat;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

public class RegexInputFormatTest
{
private final ObjectMapper mapper;

public RegexInputFormatTest()
{
mapper = new ObjectMapper();
mapper.registerSubtypes(new NamedType(RegexInputFormat.class, "regex"));
}

@Test
public void testSerde() throws IOException
{
final RegexInputFormat expected = new RegexInputFormat(
"//[^\\r\\n]*[\\r\\n]",
"|",
ImmutableList.of("col1", "col2", "col3")
);

final byte[] json = mapper.writeValueAsBytes(expected);
final RegexInputFormat fromJson = (RegexInputFormat) mapper.readValue(json, InputFormat.class);

Assert.assertEquals(expected.getPattern(), fromJson.getPattern());
Assert.assertEquals(expected.getListDelimiter(), fromJson.getListDelimiter());
Assert.assertEquals(expected.getColumns(), fromJson.getColumns());
}

@Test
public void testIgnoreCompiledPatternInJson() throws IOException
{
final RegexInputFormat expected = new RegexInputFormat(
"//[^\\r\\n]*[\\r\\n]",
"|",
ImmutableList.of("col1", "col2", "col3")
);

final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> map = mapper.readValue(json, Map.class);
Assert.assertFalse(map.containsKey("compiledPattern"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
Expand Down Expand Up @@ -74,7 +72,6 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -206,6 +203,7 @@ public enum Status
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
private final InputRowSchema inputRowSchema;
private final InputFormat inputFormat;
@Nullable
private final InputRowParser<ByteBuffer> parser;
private final AuthorizerMapper authorizerMapper;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
Expand Down Expand Up @@ -364,48 +362,24 @@ public void initializeSequences() throws IOException
log.info("Starting with sequences: %s", sequences);
}

private List<InputRow> parseBytes(List<byte[]> valueBytess) throws IOException
{
if (parser != null) {
return parseWithParser(valueBytess);
} else {
return parseWithInputFormat(valueBytess);
}
}

private List<InputRow> parseWithParser(List<byte[]> valueBytess)
{
final List<InputRow> rows = new ArrayList<>();
for (byte[] valueBytes : valueBytess) {
rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
}
return rows;
}

private List<InputRow> parseWithInputFormat(List<byte[]> valueBytess) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
for (byte[] valueBytes : valueBytess) {
final InputEntityReader reader = task.getDataSchema().getTransformSpec().decorate(
Preconditions.checkNotNull(inputFormat, "inputFormat").createReader(
inputRowSchema,
new ByteEntity(valueBytes),
toolbox.getIndexingTmpDir()
)
);
try (CloseableIterator<InputRow> rowIterator = reader.read()) {
rowIterator.forEachRemaining(rows::add);
}
}
return rows;
}

private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
startTime = DateTimes.nowUtc();
status = Status.STARTING;

setToolbox(toolbox);

// Now we can initialize StreamChunkReader with the given toolbox.
final StreamChunkParser parser = new StreamChunkParser(
this.parser,
new SettableByteEntityReader(
inputFormat,
inputRowSchema,
task.getDataSchema().getTransformSpec(),
toolbox.getIndexingTmpDir()
)
);

initializeSequences();

if (chatHandlerProvider.isPresent()) {
Expand Down Expand Up @@ -657,7 +631,7 @@ public void run()
if (valueBytess == null || valueBytess.isEmpty()) {
rows = Utils.nullableListOf((InputRow) null);
} else {
rows = parseBytes(valueBytess);
rows = parser.parse(valueBytess);
}
boolean isPersistRequired = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.seekablestream;

import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.segment.transform.TransformingInputEntityReader;

import java.io.File;
import java.io.IOException;

/**
* A settable {@link InputEntityReader}. This class is intended to be used for only stream parsing in Kafka or Kinesis
* indexing.
*/
class SettableByteEntityReader implements InputEntityReader
{
private final InputFormat inputFormat;
private final InputRowSchema inputRowSchema;
private final Transformer transformer;
private final File indexingTmpDir;

private InputEntityReader delegate;

SettableByteEntityReader(
InputFormat inputFormat,
InputRowSchema inputRowSchema,
TransformSpec transformSpec,
File indexingTmpDir
)
{
this.inputFormat = Preconditions.checkNotNull(inputFormat, "inputFormat");
this.inputRowSchema = inputRowSchema;
this.transformer = transformSpec.toTransformer();
this.indexingTmpDir = indexingTmpDir;
}

void setEntity(ByteEntity entity) throws IOException
{
this.delegate = new TransformingInputEntityReader(
// Yes, we are creating a new reader for every stream chunk.
// This should be fine as long as initializing a reader is cheap which it is for now.
inputFormat.createReader(inputRowSchema, entity, indexingTmpDir),
transformer
);
}

@Override
public CloseableIterator<InputRow> read() throws IOException
{
return delegate.read();
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return delegate.sample();
}
}
Loading

0 comments on commit 82ce60b

Please sign in to comment.