From a3ce9f18a1ba50fb46dfd58cebf9e32b71d9688b Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 5 Jun 2026 21:02:34 +0800 Subject: [PATCH] [arrow] Fix Arrow to Paimon vector conversion casts --- .../Arrow2PaimonVectorConverter.java | 63 ++++++++++++------- .../arrow/vector/ArrowFormatWriterTest.java | 55 ++++++++++++++++ 2 files changed, 95 insertions(+), 23 deletions(-) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java index e1fe66883a84..ce0d067e6392 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java @@ -82,7 +82,10 @@ import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; @@ -195,7 +198,12 @@ public boolean isNullAt(int index) { @Override public Bytes getBytes(int index) { - byte[] bytes = ((VarBinaryVector) vector).getObject(index); + byte[] bytes; + if (vector instanceof FixedSizeBinaryVector) { + bytes = ((FixedSizeBinaryVector) vector).get(index); + } else { + bytes = ((VarBinaryVector) vector).getObject(index); + } return new Bytes(bytes, 0, bytes.length) { @Override public byte[] getBytes() { @@ -378,7 +386,7 @@ public boolean isNullAt(int index) { @Override public int getInt(int index) { - return ((TimeMilliVector) vector).get(index); + return getTimeInMillis(vector, index); } }; } @@ -396,16 +404,7 @@ public boolean isNullAt(int index) { @Override public Timestamp getTimestamp(int i, int precision) { long value = ((TimeStampVector) vector).get(i); - if (precision == 0) { - return Timestamp.fromEpochMillis(value * 1000); - } else if (precision >= 1 && precision <= 3) { - return Timestamp.fromEpochMillis(value); - } else if (precision >= 4 && precision <= 6) { - return Timestamp.fromMicros(value); - } else { - return Timestamp.fromEpochMillis( - value / 1_000_000, (int) (value % 1_000_000)); - } + return convertEpochToTimestamp(value, precision); } }; } @@ -422,21 +421,39 @@ public boolean isNullAt(int index) { @Override public Timestamp getTimestamp(int i, int precision) { - long value = (long) vector.getObject(i); - if (precision == 0) { - return Timestamp.fromEpochMillis(value * 1000); - } else if (precision >= 1 && precision <= 3) { - return Timestamp.fromEpochMillis(value); - } else if (precision >= 4 && precision <= 6) { - return Timestamp.fromMicros(value); - } else { - return Timestamp.fromEpochMillis( - value / 1_000_000, (int) (value % 1_000_000)); - } + long value = ((TimeStampVector) vector).get(i); + return convertEpochToTimestamp(value, precision); } }; } + private int getTimeInMillis(FieldVector vector, int index) { + if (vector instanceof TimeMilliVector) { + return ((TimeMilliVector) vector).get(index); + } else if (vector instanceof TimeMicroVector) { + return (int) (((TimeMicroVector) vector).get(index) / 1_000); + } else if (vector instanceof TimeNanoVector) { + return (int) (((TimeNanoVector) vector).get(index) / 1_000_000); + } else if (vector instanceof TimeSecVector) { + return ((TimeSecVector) vector).get(index) * 1_000; + } else { + throw new UnsupportedOperationException( + "Unsupported Arrow time vector: " + vector.getClass().getName()); + } + } + + private Timestamp convertEpochToTimestamp(long value, int precision) { + if (precision == 0) { + return Timestamp.fromEpochMillis(value * 1000); + } else if (precision >= 1 && precision <= 3) { + return Timestamp.fromEpochMillis(value); + } else if (precision >= 4 && precision <= 6) { + return Timestamp.fromMicros(value); + } else { + return Timestamp.fromEpochMillis(value / 1_000_000, (int) (value % 1_000_000)); + } + } + @Override public Arrow2PaimonVectorConverter visit(VariantType variantType) { throw new UnsupportedOperationException(); diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java index 5e7dc9539548..9d102c9ce2df 100644 --- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java +++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java @@ -44,7 +44,11 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -401,6 +405,57 @@ public void testArrowBundleRecords() { } } + @Test + public void testArrowBundleRecordsWithTimeAndFixedBinaryVectors() { + // Arrow batches from external producers may use these vector types even though Paimon's + // Arrow writer currently defaults to TimeMilliVector and VarBinaryVector. + RowType rowType = + RowType.of( + new DataField(0, "time_sec", DataTypes.TIME(0)), + new DataField(1, "time_micro", DataTypes.TIME(6)), + new DataField(2, "time_nano", DataTypes.TIME(9)), + new DataField(3, "fixed_binary", DataTypes.BINARY(3))); + + try (RootAllocator allocator = new RootAllocator()) { + TimeSecVector timeSecVector = new TimeSecVector("time_sec", allocator); + timeSecVector.allocateNew(1); + timeSecVector.setSafe(0, 12); + timeSecVector.setValueCount(1); + + TimeMicroVector timeMicroVector = new TimeMicroVector("time_micro", allocator); + timeMicroVector.allocateNew(1); + timeMicroVector.setSafe(0, 12345678L); + timeMicroVector.setValueCount(1); + + TimeNanoVector timeNanoVector = new TimeNanoVector("time_nano", allocator); + timeNanoVector.allocateNew(1); + timeNanoVector.setSafe(0, 12345678901L); + timeNanoVector.setValueCount(1); + + byte[] binary = new byte[] {1, 2, 3}; + FixedSizeBinaryVector fixedBinaryVector = + new FixedSizeBinaryVector("fixed_binary", allocator, binary.length); + fixedBinaryVector.allocateNew(1); + fixedBinaryVector.setSafe(0, binary); + fixedBinaryVector.setValueCount(1); + + List vectors = + Arrays.asList( + timeSecVector, timeMicroVector, timeNanoVector, fixedBinaryVector); + try (VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors)) { + vectorSchemaRoot.setRowCount(1); + + Iterator iterator = + new ArrowBundleRecords(vectorSchemaRoot, rowType, true).iterator(); + InternalRow row = iterator.next(); + assertThat(row.getInt(0)).isEqualTo(12000); + assertThat(row.getInt(1)).isEqualTo(12345); + assertThat(row.getInt(2)).isEqualTo(12345); + assertThat(row.getBinary(3)).containsExactly(binary); + } + } + } + @Test public void testCWriter() { try (ArrowFormatCWriter writer = new ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true)) {