Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink write to iceberg raises warn: Unclosed S3FileIO instance in GlueTableOperations #11364

Open
3 tasks
a8356555 opened this issue Oct 21, 2024 · 0 comments
Open
3 tasks
Labels
bug Something isn't working

Comments

@a8356555
Copy link

Apache Iceberg version

1.5.2

Query engine

Flink

Please describe the bug 🐞

use this flink sql

CREATE CATALOG glue_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'warehouse'='s3://my-bucket'
);

CREATE TABLE mysql_cdc_source
(
    id  INT,
    name  STRING,
    description STRING,
    weight  FLOAT,
    refund_samount DECIMAL(38, 0),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'username',
    'password' = 'password',
    'database-name' = 'inventory',
    'table-name' = 'products',
    'debezium.snapshot.mode' = 'when_needed',
    'scan.startup.mode' = 'earliest-offset',
    'server-id' = '1'
);

CREATE DATABASE IF NOT EXISTS glue_catalog.my_db;
CREATE TABLE IF NOT EXISTS glue_catalog.my_db.my_table(
    `id` INT NOT NULL,
    `name` STRING,
    `description` STRING,
    `weight` FLOAT,
    `refund_samount` DECIMAL(38, 0),
    `datetime_utc8` TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'format-version'='2',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.upsert.enabled'='true'
);

INSERT INTO glue_catalog.my_db.my_table
SELECT
    *,
    CURRENT_TIMESTAMP as datetime_utc8
FROM mysql_cdc_source;

the warning occurs

2024-10-21 03:42:14,197 WARN  org.apache.iceberg.aws.s3.S3FileIO                           [] - Unclosed S3FileIO instance created by:
	org.apache.iceberg.aws.s3.S3FileIO.initialize(S3FileIO.java:359)
	org.apache.iceberg.aws.glue.GlueTableOperations.initializeFileIO(GlueTableOperations.java:220)
	org.apache.iceberg.aws.glue.GlueTableOperations.io(GlueTableOperations.java:115)
	org.apache.iceberg.aws.glue.GlueCatalog.newTableOps(GlueCatalog.java:246)
	org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:190)
	org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:261)
	org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	java.base/java.util.concurrent.ConcurrentHashMap.compute(Unknown Source)
	org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:257)
	org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:75)
	org.apache.iceberg.flink.FlinkCatalog.createIcebergTable(FlinkCatalog.java:416)
	org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:395)
	org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:663)
	org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:885)
	org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:652)
	org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:929)
	org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
	org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52)
	java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	java.base/java.lang.reflect.Method.invoke(Unknown Source)
	org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
	org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
	org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
	org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
	org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
	org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
	akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@a8356555 a8356555 added the bug Something isn't working label Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant