Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot INSERT INTO Stream. #2376

Closed
sangli00 opened this issue Jan 22, 2019 · 5 comments
Closed

cannot INSERT INTO Stream. #2376

sangli00 opened this issue Jan 22, 2019 · 5 comments
Labels
needs-triage streaming-engine Tickets owned by the ksqlDB Streaming Team

Comments

@sangli00
Copy link

detail error log:

ksql> insert into stream_merge_data_pipelinedb select * from stream_tran_to_pipeline;

io.confluent.ksql.util.KsqlException: Incompatible schema between results and sink. Result schema is [IPROBE_ID : BIGINT, ALPROTO_ID : BIGINT, USER_PORT : BIGINT, METHOD : BIGINT, USER_AGENT : VARCHAR, REQ_CONTENT_TYPE : VARCHAR, HOST : VARCHAR, REFERER : VARCHAR, VER : BIGINT, USER_IP : BIGINT, PATH : VARCHAR, URL_QUERY : VARCHAR, STATUS : BIGINT, RESP_CONTENT_LENGTH : BIGINT, REQ_CONTENT_LENGTH : BIGINT, COOKIE : VARCHAR, SERVER : VARCHAR, RESP_CTYPE : VARCHAR, REQ_TIME : BIGINT, RESP_TIME : BIGINT, DATA_STATUS : BIGINT, POLICYID : VARCHAR, REQ_HOUR : BIGINT, REGION : VARCHAR, APP_ID : BIGINT, APP_IP : BIGINT, DATA_RESOURCE_URL : VARCHAR, DATA_RESOURCE_ID : BIGINT, COUNTRY : VARCHAR, LEVEL : INT, MSH : VARCHAR], but the sink schema is [IPROBE_ID : BIGINT, ALPROTO_ID : BIGINT, USER_PORT : BIGINT, METHOD : BIGINT, USER_AGENT : VARCHAR, REQ_CONTENT_TYPE : VARCHAR, HOST : VARCHAR, REFERER : VARCHAR, VER : BIGINT, USER_IP : BIGINT, PATH : VARCHAR, URL_QUERY : VARCHAR, STATUS : BIGINT, RESP_CONTENT_LENGTH : BIGINT, REQ_CONTENT_LENGTH : BIGINT, COOKIE : VARCHAR, SERVER : VARCHAR, RESP_CTYPE : VARCHAR, REQ_TIME : BIGINT, RESP_TIME : BIGINT, DATA_STATUS : BIGINT, POLICYID : VARCHAR, REQ_HOUR : BIGINT, REGION : VARCHAR, APP_ID : BIGINT, APP_IP : BIGINT, DATA_RESOURCE_URL : VARCHAR, DATA_RESOURCE_ID : BIGINT, COUNTRY : VARCHAR, LEVEL : INT, MSH : VARCHAR].
ksql>

stream_merge_data_pipelinedb:

ksql> describe stream_merge_data_pipelinedb;

Name                 : STREAM_MERGE_DATA_PIPELINEDB
 Field               | Type
-------------------------------------------------
 ROWTIME             | BIGINT           (system)
 ROWKEY              | VARCHAR(STRING)  (system)
 IPROBE_ID           | BIGINT
 ALPROTO_ID          | BIGINT
 USER_PORT           | BIGINT
 METHOD              | BIGINT
 USER_AGENT          | VARCHAR(STRING)
 REQ_CONTENT_TYPE    | VARCHAR(STRING)
 HOST                | VARCHAR(STRING)
 REFERER             | VARCHAR(STRING)
 VER                 | BIGINT
 USER_IP             | BIGINT
 PATH                | VARCHAR(STRING)
 URL_QUERY           | VARCHAR(STRING)
 STATUS              | BIGINT
 RESP_CONTENT_LENGTH | BIGINT
 REQ_CONTENT_LENGTH  | BIGINT
 COOKIE              | VARCHAR(STRING)
 SERVER              | VARCHAR(STRING)
 RESP_CTYPE          | VARCHAR(STRING)
 REQ_TIME            | BIGINT
 RESP_TIME           | BIGINT
 DATA_STATUS         | BIGINT
 POLICYID            | VARCHAR(STRING)
 REQ_HOUR            | BIGINT
 REGION              | VARCHAR(STRING)
 APP_ID              | BIGINT
 APP_IP              | BIGINT
 DATA_RESOURCE_URL   | VARCHAR(STRING)
 DATA_RESOURCE_ID    | BIGINT
 COUNTRY             | VARCHAR(STRING)
 LEVEL               | INTEGER
 MSH                 | VARCHAR(STRING)
-------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

stream_tran_to_pipeline:

ksql> describe stream_tran_to_pipeline;

Name                 : STREAM_TRAN_TO_PIPELINE
 Field               | Type
-------------------------------------------------
 ROWTIME             | BIGINT           (system)
 ROWKEY              | VARCHAR(STRING)  (system)
 IPROBE_ID           | BIGINT
 ALPROTO_ID          | BIGINT
 USER_PORT           | BIGINT
 METHOD              | BIGINT
 USER_AGENT          | VARCHAR(STRING)
 REQ_CONTENT_TYPE    | VARCHAR(STRING)
 HOST                | VARCHAR(STRING)
 REFERER             | VARCHAR(STRING)
 VER                 | BIGINT
 USER_IP             | BIGINT
 PATH                | VARCHAR(STRING)
 URL_QUERY           | VARCHAR(STRING)
 STATUS              | BIGINT
 RESP_CONTENT_LENGTH | BIGINT
 REQ_CONTENT_LENGTH  | BIGINT
 COOKIE              | VARCHAR(STRING)
 SERVER              | VARCHAR(STRING)
 RESP_CTYPE          | VARCHAR(STRING)
 REQ_TIME            | BIGINT
 RESP_TIME           | BIGINT
 DATA_STATUS         | BIGINT
 POLICYID            | VARCHAR(STRING)
 REQ_HOUR            | BIGINT
 REGION              | VARCHAR(STRING)
 APP_ID              | BIGINT
 APP_IP              | BIGINT
 DATA_RESOURCE_URL   | VARCHAR(STRING)
 DATA_RESOURCE_ID    | BIGINT
 COUNTRY             | VARCHAR(STRING)
 LEVEL               | INTEGER
 MSH                 | VARCHAR(STRING)
-------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

stream stream_merge_data_pipelinedb and stream_tran_to_pipeline column&type is equal.

Why cannot insert?

@singyiu
Copy link

singyiu commented May 8, 2019

I run into the exact same problem, Where the schema is exactly the same but fail to insert.

Incompatible schema between results and sink. Result schema is [CREDENTIAL_ID : VARCHAR, first_name : VARCHAR, last_name : VARCHAR, access_card_id_binary : VARCHAR, is_active : BOOLEAN, effective_date : VARCHAR, expiry_date : VARCHAR, access_levels : ARRAY], but the sink schema is [CREDENTIAL_ID : VARCHAR, first_name : VARCHAR, last_name : VARCHAR, access_card_id_binary : VARCHAR, is_active : BOOLEAN, effective_date : VARCHAR, expiry_date : VARCHAR, access_levels : ARRAY].

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Sep 17, 2019

Humm... this is worrying. Are you able to provide steps to recreate please, i.e. the create statements and the INSERT statement in question? Then we can look into it.

@big-andy-coates
Copy link
Contributor

I tried adding a test case to insert-into.json:

    {
      "name": "complex schema",
      "statements": [
        "CREATE STREAM SOURCE (IPROBE_ID BIGINT, ALPROTO_ID BIGINT, USER_PORT BIGINT, METHOD BIGINT, USER_AGENT STRING, REQ_CONTENT_TYPE STRING, HOST STRING, REFERER STRING, VER BIGINT, USER_IP BIGINT, PATH STRING, URL_QUERY STRING, STATUS BIGINT, RESP_CONTENT_LENGTH BIGINT, REQ_CONTENT_LENGTH BIGINT, COOKIE STRING, SERVER STRING, RESP_CTYPE STRING, REQ_TIME BIGINT, RESP_TIME BIGINT, DATA_STATUS BIGINT, POLICYID STRING, REQ_HOUR BIGINT, REGION STRING, APP_ID BIGINT, APP_IP BIGINT, DATA_RESOURCE_URL STRING, DATA_RESOURCE_ID BIGINT, COUNTRY STRING, LEVEL INTEGER, MSH STRING) WITH (kafka_topic='source', value_format='JSON');",
        "CREATE STREAM SINK   (IPROBE_ID BIGINT, ALPROTO_ID BIGINT, USER_PORT BIGINT, METHOD BIGINT, USER_AGENT STRING, REQ_CONTENT_TYPE STRING, HOST STRING, REFERER STRING, VER BIGINT, USER_IP BIGINT, PATH STRING, URL_QUERY STRING, STATUS BIGINT, RESP_CONTENT_LENGTH BIGINT, REQ_CONTENT_LENGTH BIGINT, COOKIE STRING, SERVER STRING, RESP_CTYPE STRING, REQ_TIME BIGINT, RESP_TIME BIGINT, DATA_STATUS BIGINT, POLICYID STRING, REQ_HOUR BIGINT, REGION STRING, APP_ID BIGINT, APP_IP BIGINT, DATA_RESOURCE_URL STRING, DATA_RESOURCE_ID BIGINT, COUNTRY STRING, LEVEL INTEGER, MSH STRING) WITH (kafka_topic='sink', value_format='JSON');",
        "INSERT INTO SINK SELECT * FROM SOURCE;"
      ],
      "inputs": [
        {"topic": "source", "key": 0, "value": null, "timestamp": 0}
      ],
      "outputs": [
        {"topic": "sink", "key": 0, "value": null, "timestamp": 0}
      ]
    }

This test did not fail with the error you encountered, so I'm unable to recreate. This may be some quirk of how your source and sink streams are being created. Hence requiring steps to recreate to look into this.

Thanks,

Andy

@aurabhi
Copy link
Contributor

aurabhi commented Mar 17, 2021

Got into a similar problem. selecting a subset of values and insert into another stream which has the same schema.

insert into sub_stream select NAME, LASTNAME, CREATED, LOCATION from main_stream emit changes;

Caused by: io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Incompatible schema between results and sink.
Result schema is NAME STRING, LASTNAME STRING, CREATED BIGINT, LOCATION STRING
Sink schema is NAME STRING, LASTNAME STRING, CREATED BIGINT, LOCATION STRING, EMAIL STRING, ADDRESS MAP<STRING, STRING>, PHONE STRING.

Error code: 40001

@vcrfxia vcrfxia added the streaming-engine Tickets owned by the ksqlDB Streaming Team label Mar 19, 2021
@vcrfxia
Copy link
Contributor

vcrfxia commented Mar 23, 2021

Hi @aurabhi , the behavior you reported is expected: ksqlDB requires schemas for INSERT INTO statements to match exactly. Because the schema you are trying to insert is a subset of the required schema, you can modify your statement to meet the requirements by adding in null-valued columns for the missing columns:

insert into sub_stream select NAME, LASTNAME, CREATED, LOCATION, cast(null as varchar) as email, cast(null as map<string, string>) as address, cast(null as varchar) as phone from main_stream emit changes;

I'm going to close this ticket for now as there is a workaround in your case and the original question has not seen activity in more than a year. Feel free to reopen if this has not addressed your question. Thanks.

@vcrfxia vcrfxia closed this as completed Mar 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-triage streaming-engine Tickets owned by the ksqlDB Streaming Team
Projects
None yet
Development

No branches or pull requests

6 participants