Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
gang_ye committed May 23, 2023
1 parent 7a068b9 commit 5690c28
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.io.Serializable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;

Expand All @@ -30,7 +29,7 @@
* (sketching) can be used.
*/
@Internal
interface DataStatistics<D extends DataStatistics, S> extends Serializable {
interface DataStatistics<D extends DataStatistics, S> {

/**
* Check if data statistics contains any statistics information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ public void runInCoordinatorThread(Runnable runnable) {
runnable));
}

private void runInCoordinatorThread(
ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String actionString) {
ensureStarted();
runInCoordinatorThread(
() -> {
try {
action.run();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
String actionString = String.format(actionName, actionNameFormatParameters);
LOG.error(
"Uncaught exception in the data statistics {} while {}. Triggering job failover.",
operatorName,
Expand Down Expand Up @@ -194,7 +192,6 @@ private void sendDataStatisticsToSubtasks(
new DataStatisticsEvent<>(checkpointId, globalDataStatistics, statisticsSerializer);
int parallelism = parallelism();
for (int i = 0; i < parallelism; ++i) {
System.out.println("dataStatisticsEvent " + dataStatisticsEvent);
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
}
return null;
Expand All @@ -216,10 +213,9 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven
Preconditions.checkArgument(event instanceof DataStatisticsEvent);
handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) event));
},
"handling operator event %s from subtask %d (#%d)",
event.getClass(),
subtask,
attemptNumber);
String.format(
"handling operator event %s from subtask %d (#%d)",
event.getClass(), subtask, attemptNumber));
}

@Override
Expand All @@ -233,8 +229,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
resultFuture.complete(
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
},
"taking checkpoint %d",
checkpointId);
String.format("taking checkpoint %d", checkpointId));
}

@Override
Expand Down Expand Up @@ -271,9 +266,7 @@ public void subtaskReset(int subtask, long checkpointId) {
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
subtaskGateways.reset(subtask);
},
"handling subtask %d recovery to checkpoint %d",
subtask,
checkpointId);
String.format("handling subtask %d recovery to checkpoint %d", subtask, checkpointId));
}

@Override
Expand All @@ -289,9 +282,7 @@ public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Thr
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
},
"handling subtask %d (#%d) failure",
subtask,
attemptNumber);
String.format("handling subtask %d (#%d) failure", subtask, attemptNumber));
}

@Override
Expand All @@ -304,9 +295,8 @@ public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
subtaskGateways.registerSubtaskGateway(gateway);
},
"making event gateway to subtask %d (#%d) available",
subtask,
attemptNumber);
String.format(
"making event gateway to subtask %d (#%d) available", subtask, attemptNumber));
}

@VisibleForTesting
Expand Down

0 comments on commit 5690c28

Please sign in to comment.