Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions java/testfiles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public static IsNull parse(byte[] metadata, List<Expression> children) {
"IsNull expression must have exactly one child, found: " + children.size());
}
if (metadata.length > 0) {
throw new IllegalArgumentException(
"IsNull expression must not have metadata, found: " + metadata.length);
throw new IllegalArgumentException("IsNull expression must not have metadata, found: " + metadata.length);
}
return new IsNull(children.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ private static ArrowType convertType(DataType sparkType) {
return new ArrowType.Date(DateUnit.DAY);
} else if (sparkType instanceof TimestampType) {
return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
} else if (sparkType instanceof TimestampNTZType) {
return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
} else if (sparkType instanceof DecimalType) {
DecimalType decimal = (DecimalType) sparkType;
return new ArrowType.Decimal(decimal.precision(), decimal.scale(), 128);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import dev.vortex.relocated.org.apache.arrow.memory.BufferAllocator;
import dev.vortex.relocated.org.apache.arrow.memory.RootAllocator;
import dev.vortex.relocated.org.apache.arrow.vector.*;
import dev.vortex.relocated.org.apache.arrow.vector.FieldVector;
import dev.vortex.relocated.org.apache.arrow.vector.VectorSchemaRoot;
import dev.vortex.relocated.org.apache.arrow.vector.complex.ListVector;
import dev.vortex.relocated.org.apache.arrow.vector.complex.StructVector;
import dev.vortex.spark.SparkTypes;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -205,16 +207,23 @@ private void populateVector(
if (bytes != null) {
((VarBinaryVector) vector).setSafe(rowIndex, bytes);
}
} else if (dataType instanceof DecimalType) {
DecimalType decType = (DecimalType) dataType;
} else if (dataType instanceof DateType) {
((DateDayVector) vector).setSafe(rowIndex, row.getInt(fieldIndex));
} else if (dataType instanceof TimestampType) {
((TimeStampMicroTZVector) vector).setSafe(rowIndex, row.getLong(fieldIndex));
} else if (dataType instanceof TimestampNTZType) {
((TimeStampMicroVector) vector).setSafe(rowIndex, row.getLong(fieldIndex));
} else if (dataType instanceof DecimalType decType) {
if (decType.precision() <= 38) {
// Use Decimal type from InternalRow
java.math.BigDecimal decimal = row.getDecimal(fieldIndex, decType.precision(), decType.scale())
.toJavaBigDecimal();
((DecimalVector) vector).setSafe(rowIndex, decimal);
}
} else if (dataType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) dataType;
} else if (dataType instanceof StructType structType) {
populateStructVector(
(StructVector) vector, structType, row.getStruct(fieldIndex, structType.fields().length), rowIndex);
} else if (dataType instanceof ArrayType arrayType) {
ArrayData data = row.getArray(fieldIndex);
ListVector listVector = ((ListVector) vector);
int writtenElements = listVector.getElementEndIndex(listVector.getLastSet());
Expand All @@ -229,6 +238,20 @@ private void populateVector(
}
}

private void populateStructVector(StructVector vector, StructType dataType, InternalRow row, int rowIndex) {
vector.setIndexDefined(rowIndex);

StructField[] fields = dataType.fields();
for (int fieldIndex = 0; fieldIndex < fields.length; fieldIndex++) {
FieldVector childVector = (FieldVector) vector.getVectorById(fieldIndex);
if (row.isNullAt(fieldIndex)) {
childVector.setNull(rowIndex);
continue;
}
populateVector(childVector, fields[fieldIndex].dataType(), row, fieldIndex, rowIndex);
}
}

/**
* Commits the write operation and returns a commit message.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package dev.vortex.spark;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -324,6 +325,73 @@ public void testSpecialCharactersAndNulls() throws IOException {
assertEquals("special!@#$%^&*()", specialRows.first().getString(1));
}

@Test
@DisplayName("Write and read date, timestamp, and nested struct columns")
public void testWriteAndReadTemporalAndStructColumns() throws IOException {
Dataset<Row> originalDf = spark.range(0, 2)
.selectExpr(
"cast(id as int) as id",
"CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END AS event_date",
"CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts",
"named_struct("
+ "'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END, "
+ "'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END, "
+ "'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END"
+ ") AS payload");

Path outputPath = tempDir.resolve("temporal_struct_output");
originalDf
.write()
.format("vortex")
.option("path", outputPath.toUri().toString())
.mode(SaveMode.Overwrite)
.save();

Dataset<Row> readDf = spark.read()
.format("vortex")
.option("path", outputPath.toUri().toString())
.load();

List<String> expectedRows = List.of(
"{\"id\":0,\"event_date\":\"2024-01-02\",\"event_ts\":\"2024-01-02 03:04:05.123456\","
+ "\"payload_event_date\":\"2024-01-02\",\"payload_event_ts\":\"2024-01-02 03:04:05.123456\","
+ "\"payload_label\":\"alpha\"}",
"{\"id\":1,\"event_date\":\"2024-02-03\",\"event_ts\":\"2024-02-03 04:05:06.654321\","
+ "\"payload_event_date\":\"2024-02-03\",\"payload_event_ts\":\"2024-02-03 04:05:06.654321\","
+ "\"payload_label\":\"beta\"}");

assertEquals(DataTypes.DateType, readDf.schema().fields()[1].dataType());
assertEquals(DataTypes.TimestampType, readDf.schema().fields()[2].dataType());
assertTrue(readDf.schema().fields()[3].dataType() instanceof StructType);
assertEquals(expectedRows, projectTemporalAndStructRows(readDf));
}

@Test
@DisplayName("Write TimestampNTZ columns and nested structs")
public void testWriteTimestampNtzColumns() throws IOException {
Dataset<Row> timestampNtzDf = spark.range(0, 2)
.selectExpr(
"cast(id as int) as id",
"CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
+ "ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz",
"named_struct("
+ "'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
+ "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END"
+ ") AS payload");

Path outputPath = tempDir.resolve("timestamp_ntz_output");
assertDoesNotThrow(() -> timestampNtzDf
.write()
.format("vortex")
.option("path", outputPath.toUri().toString())
.mode(SaveMode.Overwrite)
.save());

assertTrue(!findVortexFiles(outputPath).isEmpty(), "TimestampNTZ write should create Vortex files");
}

/**
* Creates a test DataFrame with monotonically increasing integers
* and their string representations.
Expand All @@ -337,6 +405,23 @@ private Dataset<Row> createTestDataFrame(int numRows) {
"array('Alpha', 'Bravo', 'Charlie') AS elements");
}

private List<String> projectTemporalAndStructRows(Dataset<Row> df) {
return df
.orderBy("id")
.selectExpr("to_json(named_struct("
+ "'id', id, "
+ "'event_date', cast(event_date as string), "
+ "'event_ts', date_format(event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
+ "'payload_event_date', cast(payload.event_date as string), "
+ "'payload_event_ts', date_format(payload.event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
+ "'payload_label', payload.label"
+ ")) as json")
.collectAsList()
.stream()
.map(row -> row.getString(0))
.collect(Collectors.toList());
}

/**
* Finds all Vortex files in the given directory.
*/
Expand Down
Loading