Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public List<Integer> getRegionsToFlush(long windowsEndTime) {
public void forceFlushAndRelease() {
boolean needFlush;
while (true) {
waitUntilWorkerTasksDone();
needFlush = false;
for (CachedMTreeStore store : regionToStoreMap.values()) {
if (store.getMemoryManager().getBufferNodeNum() > 0) {
Expand All @@ -239,13 +240,29 @@ public void forceFlushAndRelease() {
}
if (needFlush) {
scheduler.scheduleFlushAll().join();
waitUntilWorkerTasksDone();
scheduler.scheduleRelease(true);
} else {
// No volatile nodes left, but clean unpinned cache may still remain after previous flushes.
scheduler.scheduleRelease(true);
waitUntilWorkerTasksDone();
break;
}
}
}

@TestOnly
private void waitUntilWorkerTasksDone() {
while (scheduler.getActiveWorkerNum() > 0 || !flushingRegionSet.isEmpty()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}

public void clear() {
if (releaseMonitor != null) {
releaseMonitor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ private N getChild(N parent, String childName, boolean skipTemplateChildren)
&& skipPreDeletedSchema
&& child.isMeasurement()
&& child.getAsMeasurementMNode().isPreDeleted()) {
releaseNode(child);
child = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,12 @@ public void testPBTreeMemoryStatistics() throws Exception {
|| testParams.getTestModeName().equals("PBTree-NonMemory")) {
final IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
// wait release and flush task
Thread.sleep(6000);
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
// schemaRegion1
final IMNode<ICachedMNode> sg1 = nodeFactory.createDatabaseMNode(null, "sg1");
sg1.setFullPath("root.sg1");
final long size1 = sg1.estimateSize();
if (size1 != schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()) {
// There are two possibilities here in PartialMemory mode:
// 1. only the "sg1" node remains
// 2. the "sg1" node and the "n" node remain
Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName());
Assert.assertEquals(
size1 + nodeFactory.createDeviceMNode(sg1.getAsMNode(), "n").estimateSize(),
schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
Assert.assertEquals(
size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
}
Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
}
Assert.assertEquals(0, schemaRegion1.getSchemaRegionStatistics().getSchemaRegionId());
checkPBTreeStatistics(engineStatistics);
Expand Down Expand Up @@ -122,8 +110,7 @@ public void testMemoryStatistics2() throws Exception {

final IMNodeFactory<?> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
// wait release and flush task
Thread.sleep(1000);
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
// schemaRegion1
final IMNode<?> sg1 = nodeFactory.createDatabaseMNode(null, "sg1");
sg1.setFullPath("root.sg1");
Expand Down Expand Up @@ -231,36 +218,12 @@ public void testMemoryStatistics() throws Exception {
|| testParams.getTestModeName().equals("PBTree-NonMemory")) {
final IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
// wait release and flush task
Thread.sleep(1000);
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
// schemaRegion1
final IMNode<ICachedMNode> sg1 = nodeFactory.createDatabaseDeviceMNode(null, "sg1");
sg1.setFullPath("root.sg1");
final long size1 = sg1.estimateSize();
if (sg1.estimateSize() != schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()) {
// "d0" or "d1" node may remain in PartialMemory mode
Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName());
final long d0ExistSize =
size1
+ nodeFactory
.createMeasurementMNode(
sg1.getAsDeviceMNode(),
"d0",
new MeasurementSchema(
"d0", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY),
null)
.estimateSize();
final long d1ExistSize =
size1 + nodeFactory.createInternalMNode(sg1.getAsMNode(), "d1").estimateSize();
Assert.assertTrue(
d0ExistSize == schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()
|| d1ExistSize == schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
// wait release and flush task
Thread.sleep(1000);
Assert.assertEquals(
size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
}
Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
// schemaRegion2
final IMNode<?> sg2 = nodeFactory.createDatabaseMNode(null, "sg2");
sg2.setFullPath("root.sg2");
Expand Down Expand Up @@ -420,7 +383,11 @@ public void testPBTreeNodeStatistics() throws Exception {
schemaRegion1.deleteTimeseriesInBlackList(patternTree);
schemaRegion2.deleteTimeseriesInBlackList(patternTree);

Thread.sleep(1000);
if (testParams.getCachedMNodeSize() <= 3) {
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
} else {
Thread.sleep(1000);
}
final CachedSchemaRegionStatistics cachedRegionStatistics1 =
schemaRegion1.getSchemaRegionStatistics().getAsCachedSchemaRegionStatistics();
final CachedSchemaRegionStatistics cachedRegionStatistics2 =
Expand All @@ -433,13 +400,7 @@ public void testPBTreeNodeStatistics() throws Exception {
Assert.assertEquals(4, cachedRegionStatistics2.getUnpinnedMNodeNum());
} else {
Assert.assertEquals(1, cachedRegionStatistics1.getPinnedMNodeNum());
if (0 != cachedRegionStatistics1.getUnpinnedMNodeNum()) {
// "d0" may remain in PartialMemory mode
Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName());
ReleaseFlushMonitor.getInstance().forceFlushAndRelease();
Thread.sleep(1000);
Assert.assertEquals(0, cachedRegionStatistics1.getUnpinnedMNodeNum());
}
Assert.assertEquals(0, cachedRegionStatistics1.getUnpinnedMNodeNum());
Assert.assertEquals(1, cachedRegionStatistics2.getPinnedMNodeNum());
Assert.assertEquals(0, cachedRegionStatistics2.getUnpinnedMNodeNum());
}
Expand Down
Loading