Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand Arrow reading support #129

Merged
merged 8 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.jetbrains.kotlinx.dataframe.columns.ColumnSet
import org.jetbrains.kotlinx.dataframe.columns.FrameColumn
import org.jetbrains.kotlinx.dataframe.columns.ValueColumn
import org.jetbrains.kotlinx.dataframe.impl.GroupByImpl
import org.jetbrains.kotlinx.dataframe.impl.anyNull
import org.jetbrains.kotlinx.dataframe.impl.asList
import org.jetbrains.kotlinx.dataframe.impl.columnName
import org.jetbrains.kotlinx.dataframe.impl.columns.ColumnAccessorImpl
Expand Down Expand Up @@ -189,6 +190,50 @@ public enum class Infer {
Type
}

/**
* Indicates how [DataColumn.hasNulls] (or, more accurately, DataColumn.type.isMarkedNullable) should be initialized from
* expected schema and actual data when reading schema-defined data formats.
*/
public enum class NullabilityOptions {
/**
* Use only actual data, set [DataColumn.hasNulls] to true if and only if there are null values in the column.
* On empty dataset use False.
*/
Infer,

/**
* Set [DataColumn.hasNulls] to expected value. Throw exception if column should be not nullable but there are null values.
*/
Checking,

/**
* Set [DataColumn.hasNulls] to expected value by default. Change False to True if column should be not nullable but there are null values.
*/
Widening
}

public class NullabilityException() : Exception()

/**
* @return if column should be marked nullable for current [NullabilityOptions] value with actual [data] and [expectedNulls] per some schema/signature.
* @throws [NullabilityException] for [NullabilityOptions.Checking] if [expectedNulls] is false and [data] contains nulls.
*/
public fun NullabilityOptions.applyNullability(data: List<Any?>, expectedNulls: Boolean): Boolean {
val hasNulls = data.anyNull()
return when (this) {
NullabilityOptions.Infer -> hasNulls
NullabilityOptions.Checking -> {
if (!expectedNulls && hasNulls) {
throw NullabilityException()
}
expectedNulls
}
NullabilityOptions.Widening -> {
expectedNulls || hasNulls
}
}
}

public inline fun <reified T> Iterable<T>.toColumn(
name: String = "",
infer: Infer = Infer.None
Expand Down
1 change: 1 addition & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation(libs.arrow.format)
implementation(libs.arrow.memory)
implementation(libs.commonsCompress)
implementation(libs.kotlin.reflect)

testApi(project(":core"))
testImplementation(libs.junit)
Expand Down

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions dataframe-arrow/src/test/kotlin/ArrowKtTest.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.matchers.shouldBe
import org.apache.arrow.vector.util.Text
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.NullabilityOptions
import org.jetbrains.kotlinx.dataframe.api.columnOf
import org.jetbrains.kotlinx.dataframe.api.dataFrameOf
import org.jetbrains.kotlinx.dataframe.api.toColumn
import org.jetbrains.kotlinx.dataframe.io.readArrowFeather
import org.jetbrains.kotlinx.dataframe.io.readArrowIPC
import org.junit.Test
import java.net.URL

Expand All @@ -13,6 +16,7 @@ internal class ArrowKtTest {
fun testResource(resourcePath: String): URL = ArrowKtTest::class.java.classLoader.getResource(resourcePath)!!

fun testArrowFeather(name: String) = testResource("$name.feather")
fun testArrowIPC(name: String) = testResource("$name.ipc")

@Test
fun testReadingFromFile() {
Expand All @@ -31,4 +35,56 @@ internal class ArrowKtTest {
val expected = dataFrameOf(a, b, c, d)
df shouldBe expected
}

@Test
fun testReadingAllTypesAsEstimated() {
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Infer), false, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Infer), false, false)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Checking), true, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Checking), true, false)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Widening), true, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Widening), true, false)
}

@Test
fun testReadingAllTypesAsEstimatedWithNulls() {
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Infer), true, true)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Infer), true, true)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Checking), true, true)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Checking), true, true)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Widening), true, true)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Widening), true, true)
}

@Test
fun testReadingAllTypesAsEstimatedNotNullable() {
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Infer), false, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Infer), false, false)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Checking), false, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Checking), false, false)

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Widening), false, false)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Widening), false, false)
}

@Test
fun testReadingAllTypesAsEstimatedNotNullableWithNulls() {
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Infer), true, true)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Infer), true, true)

shouldThrow<IllegalArgumentException> {
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Checking), false, true)
}
shouldThrow<IllegalArgumentException> {
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Checking), false, true)
}

assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Widening), true, true)
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Widening), true, true)
}
}
159 changes: 159 additions & 0 deletions dataframe-arrow/src/test/kotlin/exampleEstimatesAssertions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import io.kotest.matchers.shouldBe
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.api.forEachIndexed
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.ZoneOffset
import kotlin.math.absoluteValue
import kotlin.math.pow
import kotlin.reflect.full.withNullability
import kotlin.reflect.typeOf

/**
* Assert that we have got the same data that was originally saved on example creation.
*/
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
/**
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
*/
fun iBatch(iFrame: Int): Int {
val firstBatchSize = 100;
return if (iFrame < firstBatchSize) iFrame else iFrame - firstBatchSize
}

fun expectedNull(rowNumber: Int): Boolean {
return (rowNumber + 1) % 5 == 0;
}

fun assertValueOrNull(rowNumber: Int, actual: Any?, expected: Any) {
if (hasNulls && expectedNull(rowNumber)) {
actual shouldBe null
} else {
actual shouldBe expected
}
}

val asciiStringCol = exampleFrame["asciiString"] as DataColumn<String?>
asciiStringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
asciiStringCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, "Test Example ${iBatch(i)}")
}

val utf8StringCol = exampleFrame["utf8String"] as DataColumn<String?>
utf8StringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
utf8StringCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, "Тестовый пример ${iBatch(i)}")
koperagen marked this conversation as resolved.
Show resolved Hide resolved
}

val largeStringCol = exampleFrame["largeString"] as DataColumn<String?>
largeStringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
largeStringCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, "Test Example Should Be Large ${iBatch(i)}")
}

val booleanCol = exampleFrame["boolean"] as DataColumn<Boolean?>
booleanCol.type() shouldBe typeOf<Boolean>().withNullability(expectedNullable)
booleanCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i) % 2 == 0)
}

val byteCol = exampleFrame["byte"] as DataColumn<Byte?>
byteCol.type() shouldBe typeOf<Byte>().withNullability(expectedNullable)
byteCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, (iBatch(i) * 10).toByte())
}

val shortCol = exampleFrame["short"] as DataColumn<Short?>
shortCol.type() shouldBe typeOf<Short>().withNullability(expectedNullable)
shortCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, (iBatch(i) * 1000).toShort())
}

val intCol = exampleFrame["int"] as DataColumn<Int?>
intCol.type() shouldBe typeOf<Int>().withNullability(expectedNullable)
intCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i) * 100000000)
}

val longCol = exampleFrame["longInt"] as DataColumn<Long?>
longCol.type() shouldBe typeOf<Long>().withNullability(expectedNullable)
longCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i) * 100000000000000000L)
}

val unsignedByteCol = exampleFrame["unsigned_byte"] as DataColumn<Short?>
unsignedByteCol.type() shouldBe typeOf<Short>().withNullability(expectedNullable)
unsignedByteCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, (iBatch(i) * 10 % (Byte.MIN_VALUE.toShort() * 2).absoluteValue).toShort())
}

val unsignedShortCol = exampleFrame["unsigned_short"] as DataColumn<Int?>
unsignedShortCol.type() shouldBe typeOf<Int>().withNullability(expectedNullable)
unsignedShortCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i) * 1000 % (Short.MIN_VALUE.toInt() * 2).absoluteValue)
}

val unsignedIntCol = exampleFrame["unsigned_int"] as DataColumn<Long?>
unsignedIntCol.type() shouldBe typeOf<Long>().withNullability(expectedNullable)
unsignedIntCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i).toLong() * 100000000 % (Int.MIN_VALUE.toLong() * 2).absoluteValue)
}

val unsignedLongIntCol = exampleFrame["unsigned_longInt"] as DataColumn<BigInteger?>
unsignedLongIntCol.type() shouldBe typeOf<BigInteger>().withNullability(expectedNullable)
unsignedLongIntCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, iBatch(i).toBigInteger() * 100000000000000000L.toBigInteger() % (Long.MIN_VALUE.toBigInteger() * 2.toBigInteger()).abs())
}

val floatCol = exampleFrame["float"] as DataColumn<Float?>
floatCol.type() shouldBe typeOf<Float>().withNullability(expectedNullable)
floatCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, 2.0f.pow(iBatch(i).toFloat()))
}

val doubleCol = exampleFrame["double"] as DataColumn<Double?>
doubleCol.type() shouldBe typeOf<Double>().withNullability(expectedNullable)
doubleCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, 2.0.pow(iBatch(i)))
}

val dateCol = exampleFrame["date32"] as DataColumn<LocalDate?>
dateCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
dateCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}

val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
}

val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
timeSecCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
timeSecCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalTime.ofSecondOfDay(iBatch(i).toLong()))
}

val timeMilliCol = exampleFrame["time32_milli"] as DataColumn<LocalTime?>
timeMilliCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
timeMilliCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong() * 1000_000))
}

val timeMicroCol = exampleFrame["time64_micro"] as DataColumn<LocalTime?>
timeMicroCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
timeMicroCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong() * 1000))
}

val timeNanoCol = exampleFrame["time64_nano"] as DataColumn<LocalTime?>
timeNanoCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
timeNanoCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong()))
}

}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.