Skip to content

Commit

Permalink
Add support for reading parquet file thanks to arrow-dataset Kotlin#576
Browse files Browse the repository at this point in the history
  • Loading branch information
fb64 committed Oct 15, 2024
1 parent 117e958 commit e66906a
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 11 deletions.
1 change: 1 addition & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation(libs.arrow.vector)
implementation(libs.arrow.format)
implementation(libs.arrow.memory)
implementation(libs.arrow.dataset)
implementation(libs.commonsCompress)
implementation(libs.kotlin.reflect)
implementation(libs.kotlin.datetimeJvm)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
Expand Down Expand Up @@ -184,3 +185,11 @@ public fun DataFrame.Companion.readArrow(
*/
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
DataFrame.Companion.readArrowImpl(this, nullability)

/**
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
*/
public fun DataFrame.Companion.readParquet(
url: URL,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime
import kotlinx.datetime.toKotlinLocalDate
import kotlinx.datetime.toKotlinLocalDateTime
import kotlinx.datetime.toKotlinLocalTime
import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.jni.DirectReservationListener
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
Expand Down Expand Up @@ -414,3 +419,27 @@ internal fun DataFrame.Companion.readArrowImpl(
return flattened.concatKeepingSchema()
}
}

internal fun DataFrame.Companion.readArrowDataset(
fileUri: String,
fileFormat: FileFormat,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame {
val scanOptions = ScanOptions(32768)
RootAllocator().use { allocator ->
FileSystemDatasetFactory(
allocator,
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
fileFormat,
fileUri,
).use { datasetFactory ->
datasetFactory.finish().use { dataset ->
dataset.newScan(scanOptions).use { scanner ->
scanner.scanBatches().use { reader ->
return readArrow(reader, nullability)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,17 @@ internal class ArrowKtTest {
DataFrame.readArrow(dbArrowReader) shouldBe expected
}
}

@Test
fun testReadParquet() {
val path = testResource("test.arrow.parquet").path
val dataFrame = DataFrame.readParquet(URL("file:$path"))
dataFrame.rowsCount() shouldBe 300
assertEstimations(
exampleFrame = dataFrame,
expectedNullable = false,
hasNulls = false,
fromParquet = true,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime
* Assert that we have got the same data that was originally saved on example creation.
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
*/
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
internal fun assertEstimations(
exampleFrame: AnyFrame,
expectedNullable: Boolean,
hasNulls: Boolean,
fromParquet: Boolean = false,
) {
/**
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
*/
Expand Down Expand Up @@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
}

val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(
rowNumber = iBatch(i),
actual = element,
expected = JavaLocalDateTime
.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)
.toKotlinLocalDateTime(),
)
if (fromParquet) {
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
}
} else {
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(
rowNumber = iBatch(i),
actual = element,
expected = JavaLocalDateTime.ofEpochSecond(
iBatch(i).toLong() * 60 * 60 * 24 * 30,
0,
ZoneOffset.UTC,
).toKotlinLocalDateTime(),
)
}
}

val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Expand Down
Binary file not shown.
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }


Expand Down

0 comments on commit e66906a

Please sign in to comment.