Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Encoding issue when reading BQ query result set in DataflowRunner #24870

Closed
2 of 15 tasks
mouyang opened this issue Jan 4, 2023 · 6 comments · Fixed by #24910
Closed
2 of 15 tasks

[Bug]: Encoding issue when reading BQ query result set in DataflowRunner #24870

mouyang opened this issue Jan 4, 2023 · 6 comments · Fixed by #24910
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. java P1

Comments

@mouyang
Copy link
Contributor

mouyang commented Jan 4, 2023

What happened?

I tried to run a pipeline that ran a SQL query with a BQ DATE field in the result set with DataflowRunner and it resulted in an encoding error. The exception message suggests that the correct coder was chosen (The Date logical type resolves to a Long) but the value resolution doesn't appear to happen with DataflowRunner. It did with DirectRunner.

FWIW I see another issue with a similar exception message. Any relation? #20906

Affected Runners: DataflowRunner (DirectRunner with 2.43.0 and previous versions was fine)
Affected Versions: 2.43.0 (2.41.0, 2.42.0 do not appear to exhibit this behaviour.)

Sample Code

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.io.File;
import java.util.Arrays;
import java.util.stream.Collectors;

public class TestPipeline {
    public static void main(String[] args) {
        PipelineOptionsFactory.register(DataflowPipelineOptions.class);
        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
        options.setFilesToStage(
                Arrays.stream(System.getProperty("java.class.path").
                        split(File.pathSeparator)).
                        map(entry -> (new File(entry)).toString()).collect(Collectors.toList()));


        String testQuery = "select cast(\"2022-12-21\" as date) dt";
        Pipeline pipeline = Pipeline.create(options);
        PCollection<TableRow> tableRows = pipeline.apply(BigQueryIO.readTableRowsWithSchema()
                .fromQuery(testQuery)
                .usingStandardSql()
        );
        PCollection<Row> rows = tableRows.apply(MapElements.into(TypeDescriptors.rows())
                .via(tableRows.getToRowFunction()))
                .setRowSchema(tableRows.getSchema());
        rows.apply("println", ParDo.of(new DoFn<Row, Row>() {
            @ProcessElement
            public void processElement(@Element Row row) {
                System.out.println("println row" + row);
            }
        })).setRowSchema(tableRows.getSchema());
        pipeline.run();
    }
}

Exception

Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'GenericData{classInfo=[f], {dt=2022-12-21}}' with coder 'SchemaCoder<Schema: Fields:
Field{name=dt, description=, type=LOGICAL_TYPE, options={{}}}
Encoding positions:
{dt=0}
Options:{{}}UUID: 9622aec6-90b4-4021-b112-a7e4d2abecef  UUID: 9622aec6-90b4-4021-b112-a7e4d2abecef delegateCoder: org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH@3ef9d47a'.
	org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
	org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
	org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
	org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
	org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
	org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
	org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Long
	org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:35)
	org.apache.beam.sdk.schemas.SchemaCoderHelpers$LogicalTypeCoder.encode(SchemaCoderHelpers.java:89)
	org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:333)
	org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown Source)
	org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown Source)
	org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
	org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
	org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
	org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
	org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
	org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
	org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
	org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
	org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
	java.util.concurrent.FutureTask.run(FutureTask.java:266)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Jan 4, 2023

CC: @ahmedabu98 who is investigating this issue

@Abacn
Copy link
Contributor

Abacn commented Jan 5, 2023

The logical type causing problem is indeed beam:logical_type:date:v1. #24888 in the error message is more clear:

java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'GenericData{classInfo=[f], {dt=2022-12-21}}' with coder 'SchemaCoder<Schema: Fields:
Field{name=dt, description=, type=LOGICAL_TYPE<beam:logical_type:date:v1>, options={{}}}
Encoding positions:
{dt=0}

What confuses me is here:

BaseT baseType = logicalType.toBaseType(value);
if (isDateTime) {
baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
}
baseTypeCoder.encode(baseType, outStream);

value is a java.time.LocalDate, then Date.toBaseType(value) should return a Long to baseType. However baseType feed to baseTypeCoder(VarLongCoder).encode gets a java.time.LocalDate. Not sure how this happened. The SchemaCoderHelpers class does not change in recent months.

@Abacn
Copy link
Contributor

Abacn commented Jan 5, 2023

Logical type send to SchemaCoderHelpers is an "UnknownLogicalType" this is the problem. It has the beam:logical_type:date:v1 though.

@mouyang
Copy link
Contributor Author

mouyang commented Jan 5, 2023 via email

@Abacn
Copy link
Contributor

Abacn commented Jan 5, 2023

I think the issue is introduced in #23014. Will put a fix in.

@Abacn Abacn added this to the 2.44.0 Release milestone Jan 5, 2023
@Abacn
Copy link
Contributor

Abacn commented Jan 5, 2023

This issue likely affect all pipelines that involves portable but not standard logical types in schematranslation, set a milestone. Would provide a quick fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. java P1
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants