diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index fcd884d696d2c..bd5356ccff49b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -32,7 +32,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; @@ -161,12 +160,7 @@ public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDi PlanNode planNode = instance.getFragment().getPlanNodeTree(); if (planNode instanceof LoadTsFilePieceNode) { // split - LoadTsFilePieceNode pieceNode = - (LoadTsFilePieceNode) PlanNodeType.deserialize(planNode.serializeToByteBuffer()); - if (pieceNode == null) { - throw new FragmentInstanceDispatchException( - new TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode())); - } + LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode; TSStatus resultStatus = StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId) groupId, pieceNode, uuid); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java index 2c9cea068529b..d852ec3c9b85d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java @@ -290,6 +290,7 @@ public void writeDecodeValuePage( } protected void writeTsFileData(TsFileIOWriter writer) throws IOException, PageException { + ensureDataReadyForWriting(); final InputStream stream = new LoadTsFilePieceNode.ByteBufferInputStream(chunkData); if (needDecodeChunk) { writeChunkToWriter(stream, writer); @@ -298,6 +299,14 @@ protected void writeTsFileData(TsFileIOWriter writer) throws IOException, PageEx } } + private void ensureDataReadyForWriting() throws IOException { + if (chunkData != null) { + chunkData.rewind(); + return; + } + chunkData = ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size()); + } + protected void deserializeTsFileDataByte(final InputStream stream) throws IOException { final int size = ReadWriteIOUtils.readInt(stream); if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java index 2310b9cb95c3e..7b3c2fd3f9523 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.ChunkHeader; @@ -114,6 +115,7 @@ public boolean isAligned() { @Override public void writeToFileWriter(final TsFileIOWriter writer) throws IOException { + ensureDataReadyForWriting(); if (chunk != null) { writer.writeChunk(chunk); } else { @@ -121,6 +123,20 @@ public void writeToFileWriter(final TsFileIOWriter writer) throws IOException { } } + private void ensureDataReadyForWriting() throws IOException { + if (chunk != null || chunkWriter != null) { + return; + } + + try { + deserializeTsFileData( + new LoadTsFilePieceNode.ByteBufferInputStream( + ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size()))); + } catch (final PageException e) { + throw new IOException(e); + } + } + @Override public void serialize(final DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(getType().ordinal(), stream); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java new file mode 100644 index 0000000000000..2da982fd23da9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.scheduler.load; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.rpc.RpcUtils; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.util.Collections; + +@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) +@RunWith(PowerMockRunner.class) +@PrepareForTest(StorageEngine.class) +public class LoadTsFileDispatcherImplTest { + + @Test + public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws Exception { + final StorageEngine storageEngine = Mockito.mock(StorageEngine.class); + PowerMockito.mockStatic(StorageEngine.class); + PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine); + + final LoadTsFileDispatcherImpl dispatcher = new LoadTsFileDispatcherImpl(null, false); + dispatcher.setUuid("test-uuid"); + + final LoadTsFilePieceNode pieceNode = + new LoadTsFilePieceNode(new PlanNodeId("piece"), new File("test.tsfile")); + final FragmentInstance instance = createFragmentInstance(pieceNode); + + Mockito.when( + storageEngine.writeLoadTsFileNode( + Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid"))) + .thenReturn(RpcUtils.SUCCESS_STATUS); + + dispatcher.dispatchLocally(instance); + + Mockito.verify(storageEngine) + .writeLoadTsFileNode( + Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid")); + } + + private static FragmentInstance createFragmentInstance(final LoadTsFilePieceNode pieceNode) { + final PlanFragmentId fragmentId = new PlanFragmentId("test", 0); + final FragmentInstance instance = + new FragmentInstance( + new PlanFragment(fragmentId, pieceNode), + fragmentId.genFragmentInstanceId(), + null, + null, + 0, + null, + false, + false); + final TConsensusGroupId consensusGroupId = new DataRegionId(1).convertToTConsensusGroupId(); + instance.setExecutorAndHost( + new StorageExecutor( + new TRegionReplicaSet( + consensusGroupId, + Collections.singletonList( + new TDataNodeLocation().setInternalEndPoint(new TEndPoint("127.0.0.1", 1)))))); + return instance; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java new file mode 100644 index 0000000000000..c824a6c0ce94b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.splitter; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.Test; +import org.mockito.Mockito; + +import java.nio.ByteBuffer; + +public class ChunkDataDirectWriteTest { + + @Test + public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception { + final NonAlignedChunkData chunkData = createNonAlignedChunkData(); + chunkData.setNotDecode(); + final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class); + Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics(); + chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata); + + final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class); + chunkData.writeToFileWriter(writer); + + Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class)); + } + + @Test + public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception { + final AlignedChunkData chunkData = createAlignedChunkData(); + chunkData.setNotDecode(); + final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class); + Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics(); + chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata); + + final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class); + chunkData.writeToFileWriter(writer); + + Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class)); + } + + private static Statistics createInt32Statistics() { + final Statistics statistics = Statistics.getStatsByType(TSDataType.INT32); + statistics.update(1L, 1); + return statistics; + } + + private static NonAlignedChunkData createNonAlignedChunkData() { + final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1"); + return (NonAlignedChunkData) + ChunkData.createChunkData(false, device, createChunkHeader(), new TTimePartitionSlot(0L)); + } + + private static AlignedChunkData createAlignedChunkData() { + final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1"); + return (AlignedChunkData) + ChunkData.createChunkData(true, device, createChunkHeader(), new TTimePartitionSlot(0L)); + } + + private static ChunkHeader createChunkHeader() { + return new ChunkHeader( + "temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, 0); + } +}