From 0dcb5f4bf19962a6d83dcb383412456b1da3b169 Mon Sep 17 00:00:00 2001 From: Nariman Abdullin Date: Fri, 9 Jun 2023 17:58:24 +0300 Subject: [PATCH 1/3] Rewrite StreamGobbler ### What's done: - rewrite StreamGobbler to kotlinx coroutines --- diktat-test-framework/build.gradle.kts | 1 + .../test/framework/common/StreamGobbler.java | 67 ------------------- .../framework/common/LocalCommandExecutor.kt | 36 +++++++--- 3 files changed, 26 insertions(+), 78 deletions(-) delete mode 100644 diktat-test-framework/src/main/java/org/cqfn/diktat/test/framework/common/StreamGobbler.java diff --git a/diktat-test-framework/build.gradle.kts b/diktat-test-framework/build.gradle.kts index 5878a32fcf..0762ec92d2 100644 --- a/diktat-test-framework/build.gradle.kts +++ b/diktat-test-framework/build.gradle.kts @@ -11,6 +11,7 @@ dependencies { implementation(libs.apache.commons.cli) implementation(libs.apache.commons.io) implementation(libs.kotlin.logging) + implementation(libs.kotlinx.coroutines.core) implementation(libs.kotlin.multiplatform.diff) testImplementation(libs.junit.jupiter) testImplementation(libs.assertj.core) diff --git a/diktat-test-framework/src/main/java/org/cqfn/diktat/test/framework/common/StreamGobbler.java b/diktat-test-framework/src/main/java/org/cqfn/diktat/test/framework/common/StreamGobbler.java deleted file mode 100644 index 309c0c66f5..0000000000 --- a/diktat-test-framework/src/main/java/org/cqfn/diktat/test/framework/common/StreamGobbler.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.cqfn.diktat.test.framework.common; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.function.BiConsumer; - -public class StreamGobbler extends Thread { - private final InputStream inputStream; - private final String streamType; - private final BiConsumer exceptionHandler; - private final ArrayList result; - private volatile boolean isStopped = false; - - /** - * @param inputStream the InputStream to be consumed - * @param streamType the stream type (should be OUTPUT or ERROR) - * @param exceptionHandler the exception handler - */ - public StreamGobbler( - final InputStream inputStream, - final String streamType, - final BiConsumer exceptionHandler - ) { - this.inputStream = inputStream; - this.streamType = streamType; - this.exceptionHandler = exceptionHandler; - this.result = new ArrayList<>(); - } - - /** - * Consumes the output from the input stream and displays the lines consumed - * if configured to do so. - */ - @Override - synchronized public void run() { - try { - BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(inputStream, Charset.defaultCharset()) - ); - String line; - while ((line = bufferedReader.readLine()) != null) { - this.result.add(line); - } - } catch (IOException ex) { - exceptionHandler.accept(ex, "Failed to consume and display the input stream of type " + streamType + "."); - } finally { - this.isStopped = true; - notify(); - } - } - - synchronized public List getContent() { - if (!this.isStopped) { - try { - wait(); - } catch (InterruptedException e) { - exceptionHandler.accept(e, "Cannot get content of output stream"); - } - } - return this.result; - } -} diff --git a/diktat-test-framework/src/main/kotlin/org/cqfn/diktat/test/framework/common/LocalCommandExecutor.kt b/diktat-test-framework/src/main/kotlin/org/cqfn/diktat/test/framework/common/LocalCommandExecutor.kt index 7a8e3fd402..da27f40bff 100644 --- a/diktat-test-framework/src/main/kotlin/org/cqfn/diktat/test/framework/common/LocalCommandExecutor.kt +++ b/diktat-test-framework/src/main/kotlin/org/cqfn/diktat/test/framework/common/LocalCommandExecutor.kt @@ -1,8 +1,15 @@ package org.cqfn.diktat.test.framework.common import mu.KotlinLogging - import java.io.IOException +import java.io.InputStream +import java.nio.charset.Charset +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking /** * Class that wraps shell [command] and can execute it @@ -18,17 +25,12 @@ class LocalCommandExecutor internal constructor(private val command: String) { log.info { "Executing command: $command" } val process = Runtime.getRuntime().exec(command) process.outputStream.close() - val inputStream = process.inputStream - val outputGobbler = StreamGobbler(inputStream, "OUTPUT") { msg, ex -> - log.error(ex, msg) - } - outputGobbler.start() - val errorStream = process.errorStream - val errorGobbler = StreamGobbler(errorStream, "ERROR") { msg, ex -> - log.error(ex, msg) + return runBlocking(Dispatchers.IO) { + ExecutionResult( + process.inputStream.readLinesAsync("OUTPUT").toList(), + process.errorStream.readLinesAsync("ERROR").toList(), + ) } - errorGobbler.start() - return ExecutionResult(outputGobbler.content, errorGobbler.content) } catch (ex: IOException) { log.error("Execution of $command failed", ex) } @@ -37,5 +39,17 @@ class LocalCommandExecutor internal constructor(private val command: String) { companion object { private val log = KotlinLogging.logger {} + + fun InputStream.readLinesAsync(streamType: String): Flow = flow { + try { + val bufferedReader = this@readLinesAsync.bufferedReader(Charset.defaultCharset()) + while (true) { + val line = bufferedReader.readLine() ?: break + emit(line) + } + } catch (ex: IOException) { + log.error(ex) { "Failed to consume and display the input stream of type $streamType." } + } + }.flowOn(Dispatchers.IO) } } From e9933182c4af92e72c5140b80a7c0cb696a99b8b Mon Sep 17 00:00:00 2001 From: Nariman Abdullin Date: Tue, 13 Jun 2023 11:08:57 +0300 Subject: [PATCH 2/3] missed changes in libs.versions.toml --- gradle/libs.versions.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b12ba38f9d..804fb7df92 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -39,7 +39,7 @@ save-cli = "0.3.9" ktor = "2.2.4" okio = "3.3.0" kotlinx-datetime = "0.4.0" -kotlinx-coroutines = "1.6.4" +kotlinx-coroutines = "1.7.1" assertj = "3.24.2" diktat = "1.2.5" jgit = "6.5.0.202303070854-r" @@ -102,6 +102,9 @@ kotlinx-serialization-json-jvm = { module = "org.jetbrains.kotlinx:kotlinx-seria # another serialization kaml = { module = "com.charleskorn.kaml:kaml", version.ref = "kaml" } +# kotlinx coroutines +kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" } + #kotlin libs kotlin-multiplatform-diff = { module = "io.github.petertrr:kotlin-multiplatform-diff", version.ref = "kotlin-multiplatform-diff" } From f92f29d971c9e3bf5c2c2b6adc19eb97a31cf8cc Mon Sep 17 00:00:00 2001 From: Nariman Abdullin Date: Tue, 13 Jun 2023 12:28:34 +0300 Subject: [PATCH 3/3] diktatFix & detektAll --- .../framework/common/LocalCommandExecutor.kt | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/diktat-test-framework/src/main/kotlin/com/saveourtool/diktat/test/framework/common/LocalCommandExecutor.kt b/diktat-test-framework/src/main/kotlin/com/saveourtool/diktat/test/framework/common/LocalCommandExecutor.kt index 9069d6d4fb..10b59e3b3e 100644 --- a/diktat-test-framework/src/main/kotlin/com/saveourtool/diktat/test/framework/common/LocalCommandExecutor.kt +++ b/diktat-test-framework/src/main/kotlin/com/saveourtool/diktat/test/framework/common/LocalCommandExecutor.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import java.io.IOException import java.io.InputStream import java.nio.charset.Charset +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow @@ -14,7 +15,10 @@ import kotlinx.coroutines.runBlocking /** * Class that wraps shell [command] and can execute it */ -class LocalCommandExecutor internal constructor(private val command: String) { +class LocalCommandExecutor internal constructor( + private val command: String, + private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO, +) { /** * Execute [command] * @@ -25,10 +29,10 @@ class LocalCommandExecutor internal constructor(private val command: String) { log.info { "Executing command: $command" } val process = Runtime.getRuntime().exec(command) process.outputStream.close() - return runBlocking(Dispatchers.IO) { + return runBlocking { ExecutionResult( - process.inputStream.readLinesAsync("OUTPUT").toList(), - process.errorStream.readLinesAsync("ERROR").toList(), + process.inputStream.readLinesAsync("OUTPUT", ioDispatcher).toList(), + process.errorStream.readLinesAsync("ERROR", ioDispatcher).toList(), ) } } catch (ex: IOException) { @@ -40,16 +44,21 @@ class LocalCommandExecutor internal constructor(private val command: String) { companion object { private val log = KotlinLogging.logger {} - fun InputStream.readLinesAsync(streamType: String): Flow = flow { - try { - val bufferedReader = this@readLinesAsync.bufferedReader(Charset.defaultCharset()) - while (true) { - val line = bufferedReader.readLine() ?: break - emit(line) - } - } catch (ex: IOException) { - log.error(ex) { "Failed to consume and display the input stream of type $streamType." } + /** + * @param streamType + * @param ioDispatcher + * @return [Flow] of strings from input stream + */ + fun InputStream.readLinesAsync(streamType: String, ioDispatcher: CoroutineDispatcher): Flow = flow { + try { + val bufferedReader = this@readLinesAsync.bufferedReader(Charset.defaultCharset()) + while (true) { + val line = bufferedReader.readLine() ?: break + emit(line) } - }.flowOn(Dispatchers.IO) + } catch (ex: IOException) { + log.error(ex) { "Failed to consume and display the input stream of type $streamType." } + } + }.flowOn(ioDispatcher) } }