diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java index 4c3ec8d2c9de5..0aaa72899a46b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/memory/ReleaseFlushMonitor.java @@ -230,6 +230,7 @@ public List getRegionsToFlush(long windowsEndTime) { public void forceFlushAndRelease() { boolean needFlush; while (true) { + waitUntilWorkerTasksDone(); needFlush = false; for (CachedMTreeStore store : regionToStoreMap.values()) { if (store.getMemoryManager().getBufferNodeNum() > 0) { @@ -239,13 +240,29 @@ public void forceFlushAndRelease() { } if (needFlush) { scheduler.scheduleFlushAll().join(); + waitUntilWorkerTasksDone(); scheduler.scheduleRelease(true); } else { + // No volatile nodes left, but clean unpinned cache may still remain after previous flushes. + scheduler.scheduleRelease(true); + waitUntilWorkerTasksDone(); break; } } } + @TestOnly + private void waitUntilWorkerTasksDone() { + while (scheduler.getActiveWorkerNum() > 0 || !flushingRegionSet.isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + public void clear() { if (releaseMonitor != null) { releaseMonitor.shutdownNow(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java index 6c9e23e9597ed..00fdb1b81b4bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java @@ -185,6 +185,7 @@ private N getChild(N parent, String childName, boolean skipTemplateChildren) && skipPreDeletedSchema && child.isMeasurement() && child.getAsMeasurementMNode().isPreDeleted()) { + releaseNode(child); child = null; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java index 04643a88c41e5..40aade6a63b95 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java @@ -75,24 +75,12 @@ public void testPBTreeMemoryStatistics() throws Exception { || testParams.getTestModeName().equals("PBTree-NonMemory")) { final IMNodeFactory nodeFactory = MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory(); - // wait release and flush task - Thread.sleep(6000); + ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); // schemaRegion1 final IMNode sg1 = nodeFactory.createDatabaseMNode(null, "sg1"); sg1.setFullPath("root.sg1"); final long size1 = sg1.estimateSize(); - if (size1 != schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()) { - // There are two possibilities here in PartialMemory mode: - // 1. only the "sg1" node remains - // 2. the "sg1" node and the "n" node remain - Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName()); - Assert.assertEquals( - size1 + nodeFactory.createDeviceMNode(sg1.getAsMNode(), "n").estimateSize(), - schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); - ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); - Assert.assertEquals( - size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); - } + Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); } Assert.assertEquals(0, schemaRegion1.getSchemaRegionStatistics().getSchemaRegionId()); checkPBTreeStatistics(engineStatistics); @@ -122,8 +110,7 @@ public void testMemoryStatistics2() throws Exception { final IMNodeFactory nodeFactory = MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory(); - // wait release and flush task - Thread.sleep(1000); + ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); // schemaRegion1 final IMNode sg1 = nodeFactory.createDatabaseMNode(null, "sg1"); sg1.setFullPath("root.sg1"); @@ -231,36 +218,12 @@ public void testMemoryStatistics() throws Exception { || testParams.getTestModeName().equals("PBTree-NonMemory")) { final IMNodeFactory nodeFactory = MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory(); - // wait release and flush task - Thread.sleep(1000); + ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); // schemaRegion1 final IMNode sg1 = nodeFactory.createDatabaseDeviceMNode(null, "sg1"); sg1.setFullPath("root.sg1"); final long size1 = sg1.estimateSize(); - if (sg1.estimateSize() != schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()) { - // "d0" or "d1" node may remain in PartialMemory mode - Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName()); - final long d0ExistSize = - size1 - + nodeFactory - .createMeasurementMNode( - sg1.getAsDeviceMNode(), - "d0", - new MeasurementSchema( - "d0", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY), - null) - .estimateSize(); - final long d1ExistSize = - size1 + nodeFactory.createInternalMNode(sg1.getAsMNode(), "d1").estimateSize(); - Assert.assertTrue( - d0ExistSize == schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage() - || d1ExistSize == schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); - ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); - // wait release and flush task - Thread.sleep(1000); - Assert.assertEquals( - size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); - } + Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage()); // schemaRegion2 final IMNode sg2 = nodeFactory.createDatabaseMNode(null, "sg2"); sg2.setFullPath("root.sg2"); @@ -420,7 +383,11 @@ public void testPBTreeNodeStatistics() throws Exception { schemaRegion1.deleteTimeseriesInBlackList(patternTree); schemaRegion2.deleteTimeseriesInBlackList(patternTree); - Thread.sleep(1000); + if (testParams.getCachedMNodeSize() <= 3) { + ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); + } else { + Thread.sleep(1000); + } final CachedSchemaRegionStatistics cachedRegionStatistics1 = schemaRegion1.getSchemaRegionStatistics().getAsCachedSchemaRegionStatistics(); final CachedSchemaRegionStatistics cachedRegionStatistics2 = @@ -433,13 +400,7 @@ public void testPBTreeNodeStatistics() throws Exception { Assert.assertEquals(4, cachedRegionStatistics2.getUnpinnedMNodeNum()); } else { Assert.assertEquals(1, cachedRegionStatistics1.getPinnedMNodeNum()); - if (0 != cachedRegionStatistics1.getUnpinnedMNodeNum()) { - // "d0" may remain in PartialMemory mode - Assert.assertEquals("PBTree-PartialMemory", testParams.getTestModeName()); - ReleaseFlushMonitor.getInstance().forceFlushAndRelease(); - Thread.sleep(1000); - Assert.assertEquals(0, cachedRegionStatistics1.getUnpinnedMNodeNum()); - } + Assert.assertEquals(0, cachedRegionStatistics1.getUnpinnedMNodeNum()); Assert.assertEquals(1, cachedRegionStatistics2.getPinnedMNodeNum()); Assert.assertEquals(0, cachedRegionStatistics2.getUnpinnedMNodeNum()); }