From 5690c28d1d5ed20b2735b561059787e3a38121e6 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Mon, 22 May 2023 22:01:32 -0700 Subject: [PATCH] fix style --- .../flink/sink/shuffle/DataStatistics.java | 3 +- .../shuffle/DataStatisticsCoordinator.java | 28 ++++++------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java index 53b2346db25e..28a05201c02f 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.sink.shuffle; -import java.io.Serializable; import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; @@ -30,7 +29,7 @@ * (sketching) can be used. */ @Internal -interface DataStatistics extends Serializable { +interface DataStatistics { /** * Check if data statistics contains any statistics information. diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java index aa723986abac..e02b9a095715 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -148,8 +148,7 @@ public void runInCoordinatorThread(Runnable runnable) { runnable)); } - private void runInCoordinatorThread( - ThrowingRunnable action, String actionName, Object... actionNameFormatParameters) { + private void runInCoordinatorThread(ThrowingRunnable action, String actionString) { ensureStarted(); runInCoordinatorThread( () -> { @@ -157,7 +156,6 @@ private void runInCoordinatorThread( action.run(); } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - String actionString = String.format(actionName, actionNameFormatParameters); LOG.error( "Uncaught exception in the data statistics {} while {}. Triggering job failover.", operatorName, @@ -194,7 +192,6 @@ private void sendDataStatisticsToSubtasks( new DataStatisticsEvent<>(checkpointId, globalDataStatistics, statisticsSerializer); int parallelism = parallelism(); for (int i = 0; i < parallelism; ++i) { - System.out.println("dataStatisticsEvent " + dataStatisticsEvent); subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent); } return null; @@ -216,10 +213,9 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven Preconditions.checkArgument(event instanceof DataStatisticsEvent); handleDataStatisticRequest(subtask, ((DataStatisticsEvent) event)); }, - "handling operator event %s from subtask %d (#%d)", - event.getClass(), - subtask, - attemptNumber); + String.format( + "handling operator event %s from subtask %d (#%d)", + event.getClass(), subtask, attemptNumber)); } @Override @@ -233,8 +229,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r resultFuture.complete( globalStatisticsAggregatorTracker.serializeLastCompletedAggregator()); }, - "taking checkpoint %d", - checkpointId); + String.format("taking checkpoint %d", checkpointId)); } @Override @@ -271,9 +266,7 @@ public void subtaskReset(int subtask, long checkpointId) { this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); subtaskGateways.reset(subtask); }, - "handling subtask %d recovery to checkpoint %d", - subtask, - checkpointId); + String.format("handling subtask %d recovery to checkpoint %d", subtask, checkpointId)); } @Override @@ -289,9 +282,7 @@ public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Thr this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber); }, - "handling subtask %d (#%d) failure", - subtask, - attemptNumber); + String.format("handling subtask %d (#%d) failure", subtask, attemptNumber)); } @Override @@ -304,9 +295,8 @@ public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); subtaskGateways.registerSubtaskGateway(gateway); }, - "making event gateway to subtask %d (#%d) available", - subtask, - attemptNumber); + String.format( + "making event gateway to subtask %d (#%d) available", subtask, attemptNumber)); } @VisibleForTesting