From 2e213a008dfebe0d7c7a270f8d6f8550006d213f Mon Sep 17 00:00:00 2001 From: "shan.wu" Date: Tue, 20 May 2025 19:07:56 +0800 Subject: [PATCH] [cloudbus]: call AsyncBackup for method 'CancelablePeriodicTask.run' 1.call AsyncBackup for method 'CancelablePeriodicTask.run' 2.reply error when bus.call timeout. Resolves/Related: ZSTAC-70710 Change-Id: I6c7765646472696f7464786c6870777670647573 (cherry picked from commit 977838b5843b2ed1c2fdebf51fc8e9760b61e1fc) --- .../zstack/core/aspect/AsyncBackupAspect.aj | 17 ++++- .../zstack/core/cloudbus/CloudBusImpl3.java | 14 ++++ .../core/cloudbus/CloudBusCase.groovy | 26 +++++++ .../core/cloudbus/FakeNeedReplyMessage4.java | 6 ++ .../zstack/test/aop/TestAsyncBackup11.java | 73 +++++++++++++++++++ .../test/resources/unitTestSuiteXml/Core.xml | 1 + 6 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 test/src/test/groovy/org/zstack/test/integration/core/cloudbus/FakeNeedReplyMessage4.java create mode 100644 test/src/test/java/org/zstack/test/aop/TestAsyncBackup11.java diff --git a/core/src/main/java/org/zstack/core/aspect/AsyncBackupAspect.aj b/core/src/main/java/org/zstack/core/aspect/AsyncBackupAspect.aj index 144fe8161bd..700bf48a38b 100755 --- a/core/src/main/java/org/zstack/core/aspect/AsyncBackupAspect.aj +++ b/core/src/main/java/org/zstack/core/aspect/AsyncBackupAspect.aj @@ -3,14 +3,14 @@ package org.zstack.core.aspect; import org.springframework.beans.factory.annotation.Autowired; import org.zstack.core.cloudbus.CloudBus; import org.zstack.core.errorcode.ErrorFacade; -import org.zstack.header.core.workflow.FlowRollback; -import org.zstack.header.errorcode.ErrorCodeList; -import org.zstack.header.errorcode.OperationFailureException; import org.zstack.core.thread.ChainTask; import org.zstack.core.thread.SyncTaskChain; -import org.zstack.header.core.workflow.FlowTrigger; import org.zstack.header.core.*; +import org.zstack.header.core.workflow.FlowRollback; +import org.zstack.header.core.workflow.FlowTrigger; import org.zstack.header.errorcode.ErrorCode; +import org.zstack.header.errorcode.ErrorCodeList; +import org.zstack.header.errorcode.OperationFailureException; import org.zstack.header.exception.CloudRuntimeException; import org.zstack.header.message.Message; import org.zstack.utils.DebugUtils; @@ -247,4 +247,13 @@ public aspect AsyncBackupAspect { backup(completion.getBackups(), t); } } + + boolean around(org.zstack.header.core.AbstractCompletion completion) : this(completion) && execution(boolean org.zstack.core.thread.CancelablePeriodicTask+.run()) { + try { + return proceed(completion); + } catch (Throwable t) { + backup(completion.getBackups(), t); + return true; + } + } } diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java index 0d83683708a..12de539758a 100755 --- a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java +++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java @@ -680,6 +680,11 @@ public void run(MessageReply reply) { }); future.await(SYNC_CALL_TIMEOUT); + if (!future.isSuccess()) { + MessageReply reply = new MessageReply(); + reply.setError(future.getErrorCode()); + return reply; + } return future.getResult(); } @@ -696,6 +701,15 @@ public void run(List replies) { }); future.await(SYNC_CALL_TIMEOUT); + if (!future.isSuccess()) { + List replies = new ArrayList<>(msgs.size()); + msgs.forEach(msg -> { + MessageReply reply = new MessageReply(); + reply.setError(future.getErrorCode()); + replies.add(reply); + }); + return replies; + } return future.getResult(); } diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBusCase.groovy b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBusCase.groovy index 44fa4def7e0..e3ab4170478 100644 --- a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBusCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBusCase.groovy @@ -2,11 +2,13 @@ package org.zstack.test.integration.core.cloudbus import org.zstack.core.Platform import org.zstack.core.cloudbus.CloudBusCallBack +import org.zstack.core.cloudbus.CloudBusGlobalProperty import org.zstack.core.cloudbus.CloudBusIN import org.zstack.core.errorcode.ErrorFacade import org.zstack.core.thread.AsyncThread import org.zstack.header.AbstractService import org.zstack.header.errorcode.ErrorCodeList +import org.zstack.header.errorcode.SysErrors import org.zstack.header.message.Message import org.zstack.header.message.MessageReply import org.zstack.testlib.SkipTestSuite @@ -76,6 +78,8 @@ class CloudBusCase extends SubCase{ def r = new MessageReply() r.setError(errf.stringToOperationError("fake first error", [operr("origin error")])) bus.reply(msg, r) + } else if (msg instanceof FakeNeedReplyMessage4) { + // no reply } } @@ -126,6 +130,28 @@ class CloudBusCase extends SubCase{ assert r.error instanceof ErrorCodeList assert r.error.causes[0] instanceof ErrorCodeList assert r.error.causes[0].causes[0] instanceof ErrorCodeList + + CloudBusGlobalProperty.SYNC_CALL_TIMEOUT = TimeUnit.MINUTES.toMillis(1) + def msg4 = new FakeNeedReplyMessage4() + bus.makeTargetServiceIdByResourceUuid(msg4, servId, Platform.getUuid()) + def reply = bus.call(msg4) + assert !reply.isSuccess() + assert reply.getError().isError(SysErrors.TIMEOUT) + + List msgs = new ArrayList<>() + for (final def i in (1..5)) { + def m = new FakeNeedReplyMessage4() + bus.makeTargetServiceIdByResourceUuid(m, servId, Platform.getUuid()) + msgs.add(m) + } + def messageReplies = bus.call(msgs) + assert messageReplies.size() == 5 + for (final def rr in messageReplies) { + assert !rr.isSuccess() + assert rr.getError().isError(SysErrors.TIMEOUT) + } + + CloudBusGlobalProperty.SYNC_CALL_TIMEOUT = TimeUnit.MINUTES.toMillis(15) } @Override diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/FakeNeedReplyMessage4.java b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/FakeNeedReplyMessage4.java new file mode 100644 index 00000000000..9a89029cf05 --- /dev/null +++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/FakeNeedReplyMessage4.java @@ -0,0 +1,6 @@ +package org.zstack.test.integration.core.cloudbus; + +import org.zstack.header.message.NeedReplyMessage; + +public class FakeNeedReplyMessage4 extends NeedReplyMessage { +} diff --git a/test/src/test/java/org/zstack/test/aop/TestAsyncBackup11.java b/test/src/test/java/org/zstack/test/aop/TestAsyncBackup11.java new file mode 100644 index 00000000000..928898ef4f9 --- /dev/null +++ b/test/src/test/java/org/zstack/test/aop/TestAsyncBackup11.java @@ -0,0 +1,73 @@ +package org.zstack.test.aop; + +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.zstack.core.cloudbus.CloudBus; +import org.zstack.core.componentloader.ComponentLoader; +import org.zstack.core.thread.CancelablePeriodicTask; +import org.zstack.core.thread.ThreadFacade; +import org.zstack.header.core.Completion; +import org.zstack.header.errorcode.ErrorCode; +import org.zstack.test.BeanConstructor; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +/** + */ +public class TestAsyncBackup11 { + CLogger logger = Utils.getLogger(TestAsyncBackup11.class); + boolean success; + ComponentLoader loader; + ThreadFacade thdf; + CloudBus bus; + + @Before + public void setUp() throws Exception { + BeanConstructor con = new BeanConstructor(); + loader = con.build(); + thdf = loader.getComponent(ThreadFacade.class); + bus = loader.getComponent(CloudBus.class); + } + + @Test + public void test() throws InterruptedException { + Completion comp = new Completion(null) { + @Override + public void success() { + } + + @Override + public void fail(ErrorCode errorCode) { + success = true; + } + }; + Future task = thdf.submitCancelablePeriodicTask(new CancelablePeriodicTask(comp) { + @Override + public boolean run() { + throw new RuntimeException("on purpose"); + } + + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.SECONDS; + } + + @Override + public long getInterval() { + return 3; + } + + @Override + public String getName() { + return "test"; + } + }); + + TimeUnit.SECONDS.sleep(1); + Assert.assertTrue(success); + Assert.assertTrue(task.isCancelled()); + } +} diff --git a/test/src/test/resources/unitTestSuiteXml/Core.xml b/test/src/test/resources/unitTestSuiteXml/Core.xml index 5f111b95a32..5e17ac243f0 100755 --- a/test/src/test/resources/unitTestSuiteXml/Core.xml +++ b/test/src/test/resources/unitTestSuiteXml/Core.xml @@ -122,6 +122,7 @@ +