Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException when inserting Cap'N'Proto encoded data with ClickHouseClient #1475

Closed
ladislavmacoun opened this issue Oct 20, 2023 · 3 comments · Fixed by #1666
Closed
Labels

Comments

@ladislavmacoun
Copy link

Describe the bug

When attempting to insert Cap'N'Proto binary encoded data using the edited example, a NullPointerException is encountered.

The specific error message is:

java.lang.NullPointerException`: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null

Steps to reproduce

  1. Execute a write request with Cap'N'Proto data or any other binary codec other than RowBinary.
  2. Attempt to get the response.

Expected behaviour

Using the same code snippet with RowBinary encoded data works as expected, returning the server response.

Code example

  public long insert(List<Record> records, ClickHouseNode server) throws ClickHouseException {
        if (records.isEmpty()) {
            return;
        }

        try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
            ClickHouseRequest.Mutation request = client
                    .read(server)
                    .write()
                    .table(TABLE_NAME)
                    .format(ClickHouseFormat.CapnProto)
                    .set("format_schema", "schema.capnp:Record")
                    .decompressClientRequest(ClickHouseCompression.LZ4);

            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;
            // back-pressuring is not supported, you can adjust the first two arguments
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                // in async mode, which is default, execution happens in a worker thread
                future = request.data(stream.getInputStream()).execute();

                records.stream()
                        .filter(Objects::nonNull)
                        .forEach(record -> {
                            try {
                                stream.write(record.getPayload());
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
            }

            // response should be always closed
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                return summary.getWrittenRows();
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw ClickHouseException.of(e, server);
        }
    }

Configuration

Environment

  • Client version: 0.4.6
  • Language version: java 21 2023-09-19 LTS
  • OS: Linux

It looks like the problem is at the ClickHouseDataStreamFactory, where any other binary format from RowBinary will return null processor.

    public ClickHouseDataProcessor getProcessor(ClickHouseDataConfig config, ClickHouseInputStream input,
            ClickHouseOutputStream output, Map<String, Serializable> settings, List<ClickHouseColumn> columns)
            throws IOException {
        ClickHouseFormat format = ClickHouseChecker.nonNull(config, ClickHouseDataConfig.TYPE_NAME).getFormat();
        ClickHouseDataProcessor processor = null;
        if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format) {
            processor = new ClickHouseRowBinaryProcessor(config, input, output, columns, settings);
        } else if (format.isText()) {
            processor = new ClickHouseTabSeparatedProcessor(config, input, output, columns, settings);
        }
        return processor;
    }

I would appriciate any assistance addressing this problem or offer any potential solutions or workarounds, thank you.

@mzitnik
Copy link
Contributor

mzitnik commented Oct 22, 2023

@ladislavmacoun Thanks for opening the issue. Can you provide a sample of data so we can reproduce.

@ladislavmacoun
Copy link
Author

@mzitnik Sure, here is simple example of inserting Cap'N'Proto to Clickhouse

Let's create a simple capnproto schema schema.capnp and example data example.json for it.

@0xe7a7d8bbc2f70135;

struct Message {
  id @0 :UInt64;
  data @1 :Text;
}
{
    "id": 1475,
    "data": "Hello, clickhouse-java!"
}

We can now encode this data to capnp using the capnp tool1

capnp convert json:binary schema.capnp Message < example.json > encoded.bin

Make sure it's properly encoded

capnp convert binary:json schema.capnp Message < encoded.bin
{"id": "1475", "data": "Hello, clickhouse-java!"}

Now, prepare Clickhouse DB and table (/var/lib/clickhouse/format_schemas/ is the default for the format_schema_path2)

docker run --rm -it \
  --name clickhouse-container \
  -p 8123:8123 \
  -p 9000:9000 \
  -v $(pwd)/schema.capnp:/var/lib/clickhouse/format_schemas/schema.capnp \
  yandex/clickhouse-server

Let's create Table for the data

curl -X POST 'http://localhost:8123/' \
   --data-binary "CREATE TABLE Message (id UInt64, data String) ENGINE = MergeTree() ORDER BY id;"

And insert the encoded capnp data

#!/bin/bash
url="http://localhost:8123/"
q="INSERT INTO example FORMAT CapnProto SETTINGS format_schema = \'schema.capnp:Message\'"

curl -X POST "$url?query=$q" --data-binary @encoded.bin

Now, we should now be able to query the data

curl 'http://localhost:8123/' --data-binary "SELECT * FROM Message format Vertical;"

Row 1:
──────
id:   1475
data: Hello, clickhouse-java!

Here is an example Java class to which loads this data

import com.clickhouse.client.*;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHousePipedOutputStream;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ClickhouseCapnpInsertExample {
    private static final String TABLE_NAME = "Message";
    private static final ClickHouseNode server = ClickHouseNode.of("http://localhost:8123");

    public static long insert(List<Record> records) throws ClickHouseException {
        if (records.isEmpty()) {
            return 0;
        }

        try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
            ClickHouseRequest.Mutation request = client
                    .read(server)
                    .write()
                    .table(TABLE_NAME)
                    .format(ClickHouseFormat.CapnProto)
                    .set("format_schema", "schema.capnp:Message");

            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;
            // back-pressuring is not supported, you can adjust the first two arguments
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                // in async mode, which is default, execution happens in a worker thread
                future = request.data(stream.getInputStream()).execute();

                records.stream()
                        .filter(Objects::nonNull)
                        .forEach(record -> {
                            try {
                                stream.write(record.payload());
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
            }

            // response should be always closed
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                return summary.getWrittenRows();
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw ClickHouseException.of(e, server);
        }
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.println("Missing encoded binary data path");
            return;
        }

        String filePath = args[0];
        try {
            byte[] data = Files.readAllBytes(Paths.get(filePath));
            long insertedRows = 0;
            try {
                insertedRows = insert(new ArrayList<>(Collections.singleton(new Record(data))));
            } catch (ClickHouseException e) {
                throw new RuntimeException("Error while inserting CapNProto data to clickhouse:", e);
            }
            System.out.println("Successfully inserted " + insertedRows + " rows.");


        } catch (IOException e) {
            System.out.println("An error occurred while reading the file: " + e.getMessage());
        }
    }

    public record Record(byte[] payload) {
    }
}

with these maven deps

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>test-ch-java-capnp</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>Archetype - test-ch-java-capnp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>org.lz4</groupId>
      <artifactId>lz4-java</artifactId>
      <version>1.8.0</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-data</artifactId>
      <version>0.4.6</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-http-client</artifactId>
      <version>0.4.6</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-client</artifactId>
      <version>0.4.6</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>16</source>
          <target>16</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

When run with the encoded.bin data as an argument, you should get this exception

Exception in thread "main" java.lang.NullPointerException: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null
	at com.clickhouse.client.ClickHouseStreamResponse.close(ClickHouseStreamResponse.java:94)
	at ClickhouseCapnpInsertExample.insert(ClickhouseCapnpInsertExample.java:58)
	at ClickhouseCapnpInsertExample.main(ClickhouseCapnpInsertExample.java:75)

@ladislavmacoun
Copy link
Author

Hi @mzitnik, it's been a few months since the last update, and I was wondering if there are any new developments. Alternatively, could you suggest any other approaches to address this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants