Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions core/src/main/java/org/zstack/core/aspect/AsyncBackupAspect.aj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package org.zstack.core.aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.errorcode.ErrorFacade;
import org.zstack.header.core.workflow.FlowRollback;
import org.zstack.header.errorcode.ErrorCodeList;
import org.zstack.header.errorcode.OperationFailureException;
import org.zstack.core.thread.ChainTask;
import org.zstack.core.thread.SyncTaskChain;
import org.zstack.header.core.workflow.FlowTrigger;
import org.zstack.header.core.*;
import org.zstack.header.core.workflow.FlowRollback;
import org.zstack.header.core.workflow.FlowTrigger;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.errorcode.ErrorCodeList;
import org.zstack.header.errorcode.OperationFailureException;
import org.zstack.header.exception.CloudRuntimeException;
import org.zstack.header.message.Message;
import org.zstack.utils.DebugUtils;
Expand Down Expand Up @@ -247,4 +247,13 @@ public aspect AsyncBackupAspect {
backup(completion.getBackups(), t);
}
}

boolean around(org.zstack.header.core.AbstractCompletion completion) : this(completion) && execution(boolean org.zstack.core.thread.CancelablePeriodicTask+.run()) {
try {
return proceed(completion);
} catch (Throwable t) {
backup(completion.getBackups(), t);
return true;
}
}
}
14 changes: 14 additions & 0 deletions core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ public void run(MessageReply reply) {
});

future.await(SYNC_CALL_TIMEOUT);
if (!future.isSuccess()) {
MessageReply reply = new MessageReply();
reply.setError(future.getErrorCode());
return reply;
}
return future.getResult();
}

Expand All @@ -696,6 +701,15 @@ public void run(List<MessageReply> replies) {
});

future.await(SYNC_CALL_TIMEOUT);
if (!future.isSuccess()) {
List<MessageReply> replies = new ArrayList<>(msgs.size());
msgs.forEach(msg -> {
MessageReply reply = new MessageReply();
reply.setError(future.getErrorCode());
replies.add(reply);
});
return replies;
}
return future.getResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package org.zstack.test.integration.core.cloudbus

import org.zstack.core.Platform
import org.zstack.core.cloudbus.CloudBusCallBack
import org.zstack.core.cloudbus.CloudBusGlobalProperty
import org.zstack.core.cloudbus.CloudBusIN
import org.zstack.core.errorcode.ErrorFacade
import org.zstack.core.thread.AsyncThread
import org.zstack.header.AbstractService
import org.zstack.header.errorcode.ErrorCodeList
import org.zstack.header.errorcode.SysErrors
import org.zstack.header.message.Message
import org.zstack.header.message.MessageReply
import org.zstack.testlib.SkipTestSuite
Expand Down Expand Up @@ -76,6 +78,8 @@ class CloudBusCase extends SubCase{
def r = new MessageReply()
r.setError(errf.stringToOperationError("fake first error", [operr("origin error")]))
bus.reply(msg, r)
} else if (msg instanceof FakeNeedReplyMessage4) {
// no reply
}
}

Expand Down Expand Up @@ -126,6 +130,28 @@ class CloudBusCase extends SubCase{
assert r.error instanceof ErrorCodeList
assert r.error.causes[0] instanceof ErrorCodeList
assert r.error.causes[0].causes[0] instanceof ErrorCodeList

CloudBusGlobalProperty.SYNC_CALL_TIMEOUT = TimeUnit.MINUTES.toMillis(1)
def msg4 = new FakeNeedReplyMessage4()
bus.makeTargetServiceIdByResourceUuid(msg4, servId, Platform.getUuid())
def reply = bus.call(msg4)
assert !reply.isSuccess()
assert reply.getError().isError(SysErrors.TIMEOUT)

List<Message> msgs = new ArrayList<>()
for (final def i in (1..5)) {
def m = new FakeNeedReplyMessage4()
bus.makeTargetServiceIdByResourceUuid(m, servId, Platform.getUuid())
msgs.add(m)
}
def messageReplies = bus.call(msgs)
assert messageReplies.size() == 5
for (final def rr in messageReplies) {
assert !rr.isSuccess()
assert rr.getError().isError(SysErrors.TIMEOUT)
}

CloudBusGlobalProperty.SYNC_CALL_TIMEOUT = TimeUnit.MINUTES.toMillis(15)
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.zstack.test.integration.core.cloudbus;

import org.zstack.header.message.NeedReplyMessage;

public class FakeNeedReplyMessage4 extends NeedReplyMessage {
}
73 changes: 73 additions & 0 deletions test/src/test/java/org/zstack/test/aop/TestAsyncBackup11.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.zstack.test.aop;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.core.thread.CancelablePeriodicTask;
import org.zstack.core.thread.ThreadFacade;
import org.zstack.header.core.Completion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*/
public class TestAsyncBackup11 {
CLogger logger = Utils.getLogger(TestAsyncBackup11.class);
boolean success;
ComponentLoader loader;
ThreadFacade thdf;
CloudBus bus;

@Before
public void setUp() throws Exception {
BeanConstructor con = new BeanConstructor();
loader = con.build();
thdf = loader.getComponent(ThreadFacade.class);
bus = loader.getComponent(CloudBus.class);
}

@Test
public void test() throws InterruptedException {
Completion comp = new Completion(null) {
@Override
public void success() {
}

@Override
public void fail(ErrorCode errorCode) {
success = true;
}
};
Future<Void> task = thdf.submitCancelablePeriodicTask(new CancelablePeriodicTask(comp) {
@Override
public boolean run() {
throw new RuntimeException("on purpose");
}

@Override
public TimeUnit getTimeUnit() {
return TimeUnit.SECONDS;
}

@Override
public long getInterval() {
return 3;
}

@Override
public String getName() {
return "test";
}
});

TimeUnit.SECONDS.sleep(1);
Assert.assertTrue(success);
Assert.assertTrue(task.isCancelled());
}
}
1 change: 1 addition & 0 deletions test/src/test/resources/unitTestSuiteXml/Core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<TestCase class="org.zstack.test.aop.TestAsyncBackup8"/>
<TestCase class="org.zstack.test.aop.TestAsyncBackup9"/>
<TestCase class="org.zstack.test.aop.TestAsyncBackup10"/>
<TestCase class="org.zstack.test.aop.TestAsyncBackup11"/>
<TestCase class="org.zstack.test.aop.TestWith"/>
<TestCase class="org.zstack.test.aop.TestWith1"/>
<TestCase class="org.zstack.test.aop.TestCompletionCallOnce"/>
Expand Down