diff --git a/dataframe-arrow/build.gradle.kts b/dataframe-arrow/build.gradle.kts index ee6a3878a..1ac28c43e 100644 --- a/dataframe-arrow/build.gradle.kts +++ b/dataframe-arrow/build.gradle.kts @@ -19,6 +19,7 @@ dependencies { implementation(libs.arrow.vector) implementation(libs.arrow.format) implementation(libs.arrow.memory) + implementation(libs.arrow.dataset) implementation(libs.commonsCompress) implementation(libs.kotlin.reflect) implementation(libs.kotlin.datetimeJvm) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt index 83a660c37..1e2a0e4b4 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt @@ -1,5 +1,6 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.ipc.ArrowReader import org.apache.commons.compress.utils.SeekableInMemoryByteChannel @@ -184,3 +185,11 @@ public fun DataFrame.Companion.readArrow( */ public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + url: URL, + nullability: NullabilityOptions = NullabilityOptions.Infer, +): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index f7c7eb940..32736bc09 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime import kotlinx.datetime.toKotlinLocalDate import kotlinx.datetime.toKotlinLocalDateTime import kotlinx.datetime.toKotlinLocalTime +import org.apache.arrow.dataset.file.FileFormat +import org.apache.arrow.dataset.file.FileSystemDatasetFactory +import org.apache.arrow.dataset.jni.DirectReservationListener +import org.apache.arrow.dataset.jni.NativeMemoryPool +import org.apache.arrow.dataset.scanner.ScanOptions import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -414,3 +419,27 @@ internal fun DataFrame.Companion.readArrowImpl( return flattened.concatKeepingSchema() } } + +internal fun DataFrame.Companion.readArrowDataset( + fileUri: String, + fileFormat: FileFormat, + nullability: NullabilityOptions = NullabilityOptions.Infer, +): AnyFrame { + val scanOptions = ScanOptions(32768) + RootAllocator().use { allocator -> + FileSystemDatasetFactory( + allocator, + NativeMemoryPool.createListenable(DirectReservationListener.instance()), + fileFormat, + fileUri, + ).use { datasetFactory -> + datasetFactory.finish().use { dataset -> + dataset.newScan(scanOptions).use { scanner -> + scanner.scanBatches().use { reader -> + return readArrow(reader, nullability) + } + } + } + } + } +} diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 78356f4fe..17b32eab2 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -653,4 +653,17 @@ internal class ArrowKtTest { DataFrame.readArrow(dbArrowReader) shouldBe expected } } + + @Test + fun testReadParquet() { + val path = testResource("test.arrow.parquet").path + val dataFrame = DataFrame.readParquet(URL("file:$path")) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true, + ) + } } diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt index 2ae83fb36..12ef641b6 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt @@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime * Assert that we have got the same data that was originally saved on example creation. * Example generation project is currently located at https://github.com/Kopilov/arrow_example */ -internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) { +internal fun assertEstimations( + exampleFrame: AnyFrame, + expectedNullable: Boolean, + hasNulls: Boolean, + fromParquet: Boolean = false, +) { /** * In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number */ @@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate()) } - val datetimeCol = exampleFrame["date64"] as DataColumn - datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) - datetimeCol.forEachIndexed { i, element -> - assertValueOrNull( - rowNumber = iBatch(i), - actual = element, - expected = JavaLocalDateTime - .ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC) - .toKotlinLocalDateTime(), - ) + if (fromParquet) { + // parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate()) + } + } else { + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull( + rowNumber = iBatch(i), + actual = element, + expected = JavaLocalDateTime.ofEpochSecond( + iBatch(i).toLong() * 60 * 60 * 24 * 30, + 0, + ZoneOffset.UTC, + ).toKotlinLocalDateTime(), + ) + } } val timeSecCol = exampleFrame["time32_seconds"] as DataColumn diff --git a/dataframe-arrow/src/test/resources/test.arrow.parquet b/dataframe-arrow/src/test/resources/test.arrow.parquet new file mode 100644 index 000000000..cf78b1c25 Binary files /dev/null and b/dataframe-arrow/src/test/resources/test.arrow.parquet differ diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4be1812d3..00fa38276 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -101,6 +101,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" } arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" } arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" } arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" } +arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" } arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }