Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
Expand Down Expand Up @@ -195,7 +198,12 @@ public boolean isNullAt(int index) {

@Override
public Bytes getBytes(int index) {
byte[] bytes = ((VarBinaryVector) vector).getObject(index);
byte[] bytes;
if (vector instanceof FixedSizeBinaryVector) {
bytes = ((FixedSizeBinaryVector) vector).get(index);
} else {
bytes = ((VarBinaryVector) vector).getObject(index);
}
return new Bytes(bytes, 0, bytes.length) {
@Override
public byte[] getBytes() {
Expand Down Expand Up @@ -378,7 +386,7 @@ public boolean isNullAt(int index) {

@Override
public int getInt(int index) {
return ((TimeMilliVector) vector).get(index);
return getTimeInMillis(vector, index);
}
};
}
Expand All @@ -396,16 +404,7 @@ public boolean isNullAt(int index) {
@Override
public Timestamp getTimestamp(int i, int precision) {
long value = ((TimeStampVector) vector).get(i);
if (precision == 0) {
return Timestamp.fromEpochMillis(value * 1000);
} else if (precision >= 1 && precision <= 3) {
return Timestamp.fromEpochMillis(value);
} else if (precision >= 4 && precision <= 6) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) (value % 1_000_000));
}
return convertEpochToTimestamp(value, precision);
}
};
}
Expand All @@ -422,21 +421,39 @@ public boolean isNullAt(int index) {

@Override
public Timestamp getTimestamp(int i, int precision) {
long value = (long) vector.getObject(i);
if (precision == 0) {
return Timestamp.fromEpochMillis(value * 1000);
} else if (precision >= 1 && precision <= 3) {
return Timestamp.fromEpochMillis(value);
} else if (precision >= 4 && precision <= 6) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) (value % 1_000_000));
}
long value = ((TimeStampVector) vector).get(i);
return convertEpochToTimestamp(value, precision);
}
};
}

private int getTimeInMillis(FieldVector vector, int index) {
if (vector instanceof TimeMilliVector) {
return ((TimeMilliVector) vector).get(index);
} else if (vector instanceof TimeMicroVector) {
return (int) (((TimeMicroVector) vector).get(index) / 1_000);
} else if (vector instanceof TimeNanoVector) {
return (int) (((TimeNanoVector) vector).get(index) / 1_000_000);
} else if (vector instanceof TimeSecVector) {
return ((TimeSecVector) vector).get(index) * 1_000;
} else {
throw new UnsupportedOperationException(
"Unsupported Arrow time vector: " + vector.getClass().getName());
}
}

private Timestamp convertEpochToTimestamp(long value, int precision) {
if (precision == 0) {
return Timestamp.fromEpochMillis(value * 1000);
} else if (precision >= 1 && precision <= 3) {
return Timestamp.fromEpochMillis(value);
} else if (precision >= 4 && precision <= 6) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(value / 1_000_000, (int) (value % 1_000_000));
}
}

@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -401,6 +405,57 @@ public void testArrowBundleRecords() {
}
}

@Test
public void testArrowBundleRecordsWithTimeAndFixedBinaryVectors() {
// Arrow batches from external producers may use these vector types even though Paimon's
// Arrow writer currently defaults to TimeMilliVector and VarBinaryVector.
RowType rowType =
RowType.of(
new DataField(0, "time_sec", DataTypes.TIME(0)),
new DataField(1, "time_micro", DataTypes.TIME(6)),
new DataField(2, "time_nano", DataTypes.TIME(9)),
new DataField(3, "fixed_binary", DataTypes.BINARY(3)));

try (RootAllocator allocator = new RootAllocator()) {
TimeSecVector timeSecVector = new TimeSecVector("time_sec", allocator);
timeSecVector.allocateNew(1);
timeSecVector.setSafe(0, 12);
timeSecVector.setValueCount(1);

TimeMicroVector timeMicroVector = new TimeMicroVector("time_micro", allocator);
timeMicroVector.allocateNew(1);
timeMicroVector.setSafe(0, 12345678L);
timeMicroVector.setValueCount(1);

TimeNanoVector timeNanoVector = new TimeNanoVector("time_nano", allocator);
timeNanoVector.allocateNew(1);
timeNanoVector.setSafe(0, 12345678901L);
timeNanoVector.setValueCount(1);

byte[] binary = new byte[] {1, 2, 3};
FixedSizeBinaryVector fixedBinaryVector =
new FixedSizeBinaryVector("fixed_binary", allocator, binary.length);
fixedBinaryVector.allocateNew(1);
fixedBinaryVector.setSafe(0, binary);
fixedBinaryVector.setValueCount(1);

List<FieldVector> vectors =
Arrays.asList(
timeSecVector, timeMicroVector, timeNanoVector, fixedBinaryVector);
try (VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors)) {
vectorSchemaRoot.setRowCount(1);

Iterator<InternalRow> iterator =
new ArrowBundleRecords(vectorSchemaRoot, rowType, true).iterator();
InternalRow row = iterator.next();
assertThat(row.getInt(0)).isEqualTo(12000);
assertThat(row.getInt(1)).isEqualTo(12345);
assertThat(row.getInt(2)).isEqualTo(12345);
assertThat(row.getBinary(3)).containsExactly(binary);
}
}
}

@Test
public void testCWriter() {
try (ArrowFormatCWriter writer = new ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true)) {
Expand Down