Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33859] Support OpenSearch v2 #38

Merged
merged 3 commits into from
Apr 24, 2024
Merged

Conversation

snuyanzin
Copy link
Contributor

@snuyanzin snuyanzin commented Dec 15, 2023

The PR adds support for OpenSearch v2

Since there are breaking changes introduced in OpenSearch[1], [2] there is no way for current code in main to be working for both v1 and v2.
For that reason it is now splitted in same way like it is for elastic: one jar for v1, another for v2.

Connector name for v1 is same as before - opensearch, for v2 it is opensearch-2.

Since v2 is java 11 based there is a java11 maven profile for v2 which makes opensearch connector for v2 building only in case of java 11+. There are some attempts on OpenSearch side to improve this situation, in case of success building with java8 for OpenSearch v2 could be easily added by removal of that profile.

Also PR bumps dependency for Flink to 1.18.0. The reason is incompatible changes for ArchUnit which makes the code passing archunit tests either only for 1.17 or only for 1.18., 1.19.

Also it adds support for java 17

[1] opensearch-project/OpenSearch#9082
[2] opensearch-project/OpenSearch#5902

@reta
Copy link
Member

reta commented Dec 15, 2023

@snuyanzin I think separating v1 and v2 would bring a lot of addition maintenance work, how about trying to use multi-release jar instead? We know 100% that 2.x needs JDK-11, so we could isolate a few key classes that use breaking APIs under java11 codebase ?

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Dec 15, 2023

I'm curious whether multirelease jar supports cases when for the same Flink cluster there is a necessity to use both Opensearch v1 and OpenSearch v2 connectors and both are built with jdk11 for instance?
like e.g. different instances of OpenSearch where data are filled by the same Flink

@reta
Copy link
Member

reta commented Dec 16, 2023

I'm curious whether multirelease jar supports cases when for the same Flink cluster there is a necessity to use both Opensearch v1 and OpenSearch v2 connectors and both are built with jdk11 for instance?

It is very possible in theory, but would be very difficult to pull off in practice I believe - both clients have overlapping set of classes - and if used with same job, would need quite extensive classloader manipulations

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Dec 16, 2023

yes, I see...

Then it is not clear to me what multi release jar could give us here?

so we could isolate a few key classes that use breaking APIs under java11 codebase ?

Since currently (main branch) connector is building with jdk8, 11 then IIUC it doesn't solve the issue when connector both for v1 and v2 is building with jdk11. Or did I miss anything?

Here in the PR I also extracted common classes into base module and in v1, v2 related modules are only opensearch api dependent classes

@reta
Copy link
Member

reta commented Dec 16, 2023

Since currently (main branch) connector is building with jdk8, 11 then IIUC it doesn't solve the issue when connector both for v1 and v2 is building with jdk11. Or did I miss anything?

Correct, hence the profiles that we touched upon the other day, jdk-8 - 1.x, jdk-11 - 2.x

Here in the PR I also extracted common classes into base module and in v1, v2 related modules are only opensearch api dependent classes

I have objections to that (it is a right way to me), I am just suggesting more simpler alternative that we might consider

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Dec 16, 2023

the profiles that we touched upon the other day, jdk-8 - 1.x, jdk-11 - 2.x

I guess the issue is that so far the build/test by default is happening for jdk8 and 11, that means that ideally we need to keep it building/testing with jdk11 for v1 as well

@reta
Copy link
Member

reta commented Dec 16, 2023

I guess the issue is that so far the build/test by default is happening for jdk8 and 11, that means that ideally we need to keep it building/testing with jdk11 for v1 as well

Fair point, but we cannot build with jdk8 for 2.x, so it is no win/win in any case sadly

@snuyanzin
Copy link
Contributor Author

at least we could build/test with what it is allowed
this is what I tried to do and this is how it looks like with this PR

Opensearch jdk8 jdk11 jdk17 (Flink1.18+) jdk21 (Flink 1.19+)
v1
v2 no since OS v2 baseline is jdk11

@reta
Copy link
Member

reta commented Dec 17, 2023

this is what I tried to do and this is how it looks like with this PR

Yep. it looks reasonable

@MartijnVisser MartijnVisser requested a review from reswqa December 28, 2023 11:58
@snuyanzin
Copy link
Contributor Author

@reswqa do you have time to have a look here please?

Copy link
Member

@reswqa reswqa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snuyanzin, I believe supporting OpenSearch2 is a worthwhile endeavor.

This changes overall looks good to me(just based on my knowledge of es-connector repo). 👍

I just have one question: Since the legacy sink implementation (based on the sink function) is deprecated, why did we need to include it in OpenSearch V2 still?

@snuyanzin
Copy link
Contributor Author

thanks for the feedback @reswqa

I just have one question: Since the legacy sink implementation (based on the sink function) is deprecated, why did we need to include it in OpenSearch V2 still?

Here the intention was just to support both versions. The approach was same as for version 1 and for ElasticSearch connector

I guess in fact we need to address this issue with deprecated Sink for both Elastic (6, 7) and OpenSearch(1, 2) connectors.

I would suggest to create a dedicated follow up task for that, WDYT?

@reta
Copy link
Member

reta commented Feb 22, 2024

I would suggest to create a dedicated follow up task for that, WDYT?

@snuyanzin fyi, we have #5 open

@reswqa
Copy link
Member

reswqa commented Feb 23, 2024

I would suggest to create a dedicated follow up task for that, WDYT?

Sounds good.

Copy link
Member

@reta reta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snuyanzin !

@joshbsemperis
Copy link

joshbsemperis commented Apr 14, 2024

Can this be resolved and pushed? Is there a reason why it hasnt? @reta @reswqa @snuyanzin

@snuyanzin
Copy link
Contributor Author

first this should be reviewed/approved/merged
#42
then we could continue with this PR

@joshbsemperis
Copy link

can we now continue with this @reta @reswqa @snuyanzin

@snuyanzin
Copy link
Contributor Author

this is in my todo list for today/tomorrow

@snuyanzin snuyanzin force-pushed the flink33859 branch 4 times, most recently from 02b0345 to 58dfe14 Compare April 17, 2024 08:08
@snuyanzin
Copy link
Contributor Author

Thanks for the review
I solved the conflicts and added profiles to simplify releasing separately for v1 and v2 as was mentioned in ml

@snuyanzin snuyanzin merged commit 22a2934 into apache:main Apr 24, 2024
12 checks passed
@stanb
Copy link

stanb commented May 22, 2024

where can i download opensearch 2 compatible flink sql connector jar?

@snuyanzin
Copy link
Contributor Author

It is in voting stage
you can find links to artifacts (including jars for os v2) in corresponding email thread
https://lists.apache.org/thread/by44cdpfv6p9394vwxhh1vzh3rfskzms

@stanb
Copy link

stanb commented May 22, 2024

It is in voting stage you can find links to artifacts (including jars for os v2) in corresponding email thread https://lists.apache.org/thread/by44cdpfv6p9394vwxhh1vzh3rfskzms

I mean the built jar file.
I tried to build it myself and use with my jobs

FROM maven AS build-opensearch-connector

RUN git clone https://github.com/apache/flink-connector-opensearch.git
WORKDIR /flink-connector-opensearch
RUN git checkout v2.0.0-rc1
RUN mvn clean package -U -B --no-transfer-progress -Dflink.version=1.18.1 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120

FROM flink:1.18.1

#
# Add additional connectors not included in the default Flink image
#

## Opensearch connector
##ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-opensearch/1.1.0-1.18/flink-sql-connector-opensearch-1.1.0-1.18.jar /opt/flink/lib/
COPY --from=build-opensearch-connector /flink-connector-opensearch/flink-sql-connector-opensearch2/target/flink-sql-connector-opensearch2-2.0.0.jar /opt/flink/lib/

## JDBC connector
ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.2-1.18/flink-connector-jdbc-3.1.2-1.18.jar /opt/flink/lib/

## Postgresql JDBC Driver
ADD https://jdbc.postgresql.org/download/postgresql-42.7.1.jar /opt/flink/lib/

RUN chown flink:flink /opt/flink/lib/*

Then I submitting my sql job that copies data from postgres table to opensearch index

bin/sql-client.sh -f sql/flink.sql

and I get following error:

2024-05-22 15:25:47
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.opensearch2.shaded.org.opensearch.node.Node
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.util.concurrent.OpenSearchExecutors.threadName(OpenSearchExecutors.java:361)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory(OpenSearchExecutors.java:375)
	at org.apache.flink.opensearch2.shaded.org.opensearch.threadpool.Scheduler.initScheduler(Scheduler.java:73)
	at org.apache.flink.opensearch2.shaded.org.opensearch.action.bulk.BulkProcessor.builder(BulkProcessor.java:263)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Writer.createBulkProcessor(Opensearch2Writer.java:166)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Writer.<init>(Opensearch2Writer.java:113)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Sink.createWriter(Opensearch2Sink.java:95)
	at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:149)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Unknown Source)

As I understand, something was not build well and org.apache.flink.opensearch2.shaded.org.opensearch.node.Node cant be instantiated. I am not java developer and not familiar with maven. Tried to follow github workflow pipeline to understand if i missing any maven parameter.

@snuyanzin
Copy link
Contributor Author

I mean the built jar file.
I tried to build it myself and use with my jobs

if you open the email thread https://lists.apache.org/thread/by44cdpfv6p9394vwxhh1vzh3rfskzms

you will see

and this is a link to the folder containing jars, however again these jars are only RC, not the release yet

@stanb
Copy link

stanb commented May 22, 2024

there is only tgz that contains sources. there is no compiled jar.

@snuyanzin
Copy link
Contributor Author

@stanb
Copy link

stanb commented May 22, 2024

With this jar i get another error:

2024-05-22 15:57:26
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.lang.ExceptionInInitializerError
	at org.apache.flink.opensearch2.shaded.org.opensearch.transport.RemoteClusterService.lambda$static$1(RemoteClusterService.java:123)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.affixKeySetting(Setting.java:2837)
	at org.apache.flink.opensearch2.shaded.org.opensearch.transport.RemoteClusterService.<clinit>(RemoteClusterService.java:120)
	at org.apache.flink.opensearch2.shaded.org.opensearch.node.Node.<clinit>(Node.java:318)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.util.concurrent.OpenSearchExecutors.threadName(OpenSearchExecutors.java:361)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory(OpenSearchExecutors.java:375)
	at org.apache.flink.opensearch2.shaded.org.opensearch.threadpool.Scheduler.initScheduler(Scheduler.java:73)
	at org.apache.flink.opensearch2.shaded.org.opensearch.action.bulk.BulkProcessor.builder(BulkProcessor.java:263)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Writer.createBulkProcessor(Opensearch2Writer.java:166)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Writer.<init>(Opensearch2Writer.java:113)
	at org.apache.flink.connector.opensearch.sink.Opensearch2Sink.createWriter(Opensearch2Sink.java:95)
	at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:149)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.arrayToParsableString(Setting.java:2373)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting$ListSetting.lambda$new$0(Setting.java:2405)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting$ListSetting.innerGetRaw(Setting.java:2416)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.getRaw(Setting.java:549)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.lambda$listSetting$37(Setting.java:2300)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.listSetting(Setting.java:2340)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.listSetting(Setting.java:2329)
	at org.apache.flink.opensearch2.shaded.org.opensearch.common.settings.Setting.listSetting(Setting.java:2300)
	at org.apache.flink.opensearch2.shaded.org.opensearch.transport.TransportSettings.<clinit>(TransportSettings.java:68)
	... 25 more

something changed in table configuration except connector name?

CREATE TABLE sink_table (
 ...
) WITH (
    'connector' = 'opensearch-2',
    'hosts' = 'https://node-0.example.com:9200',
    'username' = 'admin',
    'password' = 'admin',
    'index'='index_name_{timestamp|yyyy.MM.dd}',
    'allow-insecure' = 'true',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'json.timestamp-format.standard' = 'ISO-8601',
    'sink.bulk-flush.backoff.max-retries' = '20',
    'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
    'sink.bulk-flush.backoff.delay' = '1s'
);

@akotek
Copy link

akotek commented May 23, 2024

@snuyanzin I'm also having this issue, what should we do?

@snuyanzin
Copy link
Contributor Author

could you provide a minimum reproducing test for that?
I guess something like IT test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants