Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ private void getAnySatisfiedTimestamp(
alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
return;
}
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
BitMap allValueColDeletedMap =
ignoreAllNullRows ? alignedTVList.getAllValueColDeletedMap() : null;
int rowCount = alignedTVList.rowCount();
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
if (valueColumnsDeletionList != null) {
Expand All @@ -371,16 +372,13 @@ private void getAnySatisfiedTimestamp(
// timestamp: 2 bitmap: 101
// timestamp: 4 bitmap: 110
List<long[]> timestampsList = alignedTVList.getTimestamps();
List<int[]> indicesList = alignedTVList.getIndices();
int row = -1;
for (int i = 0; i < timestampsList.size(); i++) {
long[] timestamps = timestampsList.get(i);
int[] indices = indicesList == null ? null : indicesList.get(i);
int limit = (i == timestampsList.size() - 1) ? rowCount - i * ARRAY_SIZE : ARRAY_SIZE;
for (int j = 0; j < limit; j++) {
row++;
// the row is deleted
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) {
// the row is deleted or has no value columns (unwritten columns count as null)
int valueIndex = alignedTVList.getValueIndex(i, j);
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(valueIndex)) {
continue;
}
long timestamp = timestamps[j];
Expand All @@ -392,7 +390,7 @@ private void getAnySatisfiedTimestamp(
// non-null value in multiple timestamps for the same column.
BitMap currentRowNullValueBitmap = null;
for (int column = 0; column < schemaList.size(); column++) {
if (alignedTVList.isNullValue(indices == null ? row : indices[j], column)) {
if (alignedTVList.isNullValue(valueIndex, column)) {
continue;
}

Expand Down Expand Up @@ -551,10 +549,8 @@ public void encodeWorkingAlignedTVList(
BlockingQueue<Object> ioTaskQueue,
long maxNumberOfPointsInChunk,
int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap;
AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush;

allValueColDeletedMap =
AlignedTVList alignedWorkingListForFlush = ((AlignedTVList) workingListForFlush);
BitMap allValueColDeletedMap =
ignoreAllNullRows ? alignedWorkingListForFlush.getAllValueColDeletedMap() : null;

boolean[] timeDuplicateInfo = null;
Expand All @@ -570,6 +566,10 @@ public void encodeWorkingAlignedTVList(
for (int sortedRowIndex = 0;
sortedRowIndex < alignedWorkingListForFlush.rowCount();
sortedRowIndex++) {
if (isEmptyRowForFlush(alignedWorkingListForFlush, allValueColDeletedMap, sortedRowIndex)
|| alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)) {
continue;
}
long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
if (pointNumInPage == 0) {
pageRange.add(sortedRowIndex);
Expand All @@ -592,9 +592,7 @@ public void encodeWorkingAlignedTVList(

int nextRowIndex = sortedRowIndex + 1;
while (nextRowIndex < alignedWorkingListForFlush.rowCount()
&& ((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(nextRowIndex)))
&& (isEmptyRowForFlush(alignedWorkingListForFlush, allValueColDeletedMap, nextRowIndex)
|| alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) {
nextRowIndex++;
}
Expand Down Expand Up @@ -643,9 +641,8 @@ private void handleEncoding(
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) {
if (isEmptyRowForFlush(
alignedWorkingListForFlush, allValueColDeletedMap, sortedRowIndex)) {
continue;
}
// skip time duplicated rows
Expand Down Expand Up @@ -753,10 +750,8 @@ private void handleEncoding(
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
|| (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) {
if (isEmptyRowForFlush(alignedWorkingListForFlush, allValueColDeletedMap, sortedRowIndex)
|| alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)) {
continue;
}
if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) {
Expand All @@ -775,6 +770,12 @@ private void handleEncoding(
}
}

private boolean isEmptyRowForFlush(
AlignedTVList alignedTVList, BitMap allValueColDeletedMap, int sortedRowIndex) {
return allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(alignedTVList.getValueIndex(sortedRowIndex));
}

@Override
public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) {
encodeInfo.maxNumberOfPointsInChunk =
Expand Down Expand Up @@ -893,6 +894,70 @@ public boolean isEmpty() {
return false;
}

/**
* Extra memory for allocating value arrays in the current (last) chunk when columns are written
* for the first time in that chunk.
*/
public long getTvListArrayMemCostIncrement(
String[] insertingMeasurements, TSDataType[] insertingTypes, Object[] insertingValues) {
long memCostIncrement = 0;
for (int i = 0; i < insertingMeasurements.length; i++) {
if (insertingTypes[i] == null || insertingMeasurements[i] == null) {
continue;
}
Integer columnIndex = measurementIndexMap.get(insertingMeasurements[i]);
if (columnIndex == null) {
continue;
}
if (!list.isLastValueArrayUnallocated(columnIndex)) {
continue;
}
if (insertingValues != null && insertingValues[i] != null) {
memCostIncrement += AlignedTVList.valueListArrayMemCost(insertingTypes[i]);
}
}
return memCostIncrement;
}

/**
* Extra memory for tablet insertion: allocate value arrays only when the column has non-null
* values in the inserting range of the last chunk.
*/
public long getTvListArrayMemCostIncrementForTablet(
String[] insertingMeasurements,
TSDataType[] insertingTypes,
Object[] columns,
BitMap[] bitMaps,
int start,
int end) {
long memCostIncrement = 0;
for (int i = 0; i < insertingMeasurements.length; i++) {
if (insertingTypes[i] == null || insertingMeasurements[i] == null || columns[i] == null) {
continue;
}
Integer columnIndex = measurementIndexMap.get(insertingMeasurements[i]);
if (columnIndex == null || !list.isLastValueArrayUnallocated(columnIndex)) {
continue;
}
if (columnHasNonNullInRange(columns[i], bitMaps == null ? null : bitMaps[i], start, end)) {
memCostIncrement += AlignedTVList.valueListArrayMemCost(insertingTypes[i]);
}
}
return memCostIncrement;
}

private static boolean columnHasNonNullInRange(Object column, BitMap bitMap, int start, int end) {
if (bitMap == null) {
return true;
}
for (int i = start; i < end; i++) {
if (!bitMap.isMarked(i)) {
return true;
}
}
return false;
}

@Override
public int serializedSize() {
int size = 0;
Expand Down
Loading
Loading