Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,38 @@ generate-grpc: install-proto-tools
# Generate all code
generate-all: oapi-generate generate-vmm-client generate-wire generate-grpc

# Check if CH binaries exist, download if missing
# Check if CH binaries exist for this host and match the expected architecture.
.PHONY: ensure-ch-binaries
ensure-ch-binaries:
@ARCH=$$(uname -m); \
if [ "$$ARCH" = "x86_64" ]; then \
CH_ARCH=x86_64; \
CH_FILE_PATTERN='x86-64'; \
elif [ "$$ARCH" = "aarch64" ] || [ "$$ARCH" = "arm64" ]; then \
CH_ARCH=aarch64; \
CH_FILE_PATTERN='aarch64|ARM aarch64'; \
else \
echo "Unsupported architecture: $$ARCH"; exit 1; \
fi; \
if [ ! -f lib/vmm/binaries/cloud-hypervisor/v49.0/$$CH_ARCH/cloud-hypervisor ] || \
[ ! -f lib/vmm/binaries/cloud-hypervisor/v51.1/$$CH_ARCH/cloud-hypervisor ]; then \
echo "Cloud Hypervisor binaries not found, downloading..."; \
NEEDS_DOWNLOAD=0; \
for CH_VERSION in v49.0 v51.1; do \
CH_BIN=lib/vmm/binaries/cloud-hypervisor/$$CH_VERSION/$$CH_ARCH/cloud-hypervisor; \
if [ ! -f "$$CH_BIN" ]; then \
echo "Cloud Hypervisor binary not found: $$CH_BIN"; \
NEEDS_DOWNLOAD=1; \
break; \
fi; \
CH_FILE_DESC=$$(file -b "$$CH_BIN" 2>/dev/null || true); \
if ! printf '%s\n' "$$CH_FILE_DESC" | grep -Eiq "$$CH_FILE_PATTERN"; then \
echo "Cloud Hypervisor binary has unexpected architecture: $$CH_BIN"; \
echo "file output: $$CH_FILE_DESC"; \
NEEDS_DOWNLOAD=1; \
break; \
fi; \
done; \
if [ "$$NEEDS_DOWNLOAD" = "1" ]; then \
echo "Refreshing Cloud Hypervisor binaries..."; \
rm -rf lib/vmm/binaries/cloud-hypervisor; \
$(MAKE) download-ch-binaries; \
fi

Expand Down
10 changes: 7 additions & 3 deletions lib/builds/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,13 @@ func (m *manager) waitForResult(ctx context.Context, buildID string, inst *insta

dialer, dialerErr := m.instanceManager.GetVsockDialer(ctx, inst.Id)
if dialerErr == nil {
conn, err = dialer.DialVsock(ctx, BuildAgentVsockPort)
if err == nil {
break
if dialer == nil {
err = fmt.Errorf("vsock dialer unavailable for instance %s", inst.Id)
} else {
conn, err = dialer.DialVsock(ctx, BuildAgentVsockPort)
if err == nil {
break
}
}
} else {
err = dialerErr
Expand Down
6 changes: 5 additions & 1 deletion lib/builds/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type mockInstanceManager struct {
getFunc func(ctx context.Context, id string) (*instances.Instance, error)
deleteFunc func(ctx context.Context, id string) error
stopFunc func(ctx context.Context, id string) (*instances.Instance, error)
vsockDialerFunc func(ctx context.Context, instanceID string) (hypervisor.VsockDialer, error)
createCallCount int
deleteCallCount int
}
Expand Down Expand Up @@ -168,7 +169,10 @@ func (m *mockInstanceManager) SetResourceValidator(v instances.ResourceValidator
}

func (m *mockInstanceManager) GetVsockDialer(ctx context.Context, instanceID string) (hypervisor.VsockDialer, error) {
return nil, nil
if m.vsockDialerFunc != nil {
return m.vsockDialerFunc(ctx, instanceID)
}
return nil, fmt.Errorf("vsock dialer unavailable in mock instance manager")
}

func (m *mockInstanceManager) SubscribeLifecycleEvents(consumer instances.LifecycleEventConsumer) (<-chan instances.LifecycleEvent, func()) {
Expand Down
6 changes: 5 additions & 1 deletion lib/guest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts E
// their own exec.session span; the detailed retry spans are only useful when
// we are waiting for the guest-agent to become reachable.
if opts.WaitForAgent == 0 {
return execIntoInstanceOnce(ctx, dialer, opts)
exit, err := execIntoInstanceOnce(ctx, dialer, opts)
if err != nil && isRetryableConnectionError(err) {
CloseConn(dialer.Key())
}
return exit, err
}

ctx, span := startGuestExecSpan(ctx, opts)
Expand Down
32 changes: 32 additions & 0 deletions lib/guest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ func TestExecIntoInstanceRetriesWithFreshConnections(t *testing.T) {
}
}

func TestExecIntoInstanceNoWaitClosesRetryableConnection(t *testing.T) {
dialer := &alwaysFailDialer{key: "no-wait-close-retryable-test"}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, err := ExecIntoInstance(ctx, dialer, ExecOptions{
Command: []string{"true"},
})
if err == nil {
t.Fatal("ExecIntoInstance succeeded unexpectedly")
}

connPool.RLock()
_, ok := connPool.conns[dialer.Key()]
connPool.RUnlock()
if ok {
t.Fatal("retryable no-wait exec error left connection in pool")
}
}

func TestCloseConnClosesPooledConnection(t *testing.T) {
dialer := &trackingDialer{
key: "close-conn-test",
Expand Down Expand Up @@ -164,6 +184,18 @@ func (d *delayedDialer) DialVsock(ctx context.Context, port int) (net.Conn, erro

var _ hypervisor.VsockDialer = (*delayedDialer)(nil)

type alwaysFailDialer struct {
key string
}

func (d *alwaysFailDialer) Key() string { return d.key }

func (d *alwaysFailDialer) DialVsock(ctx context.Context, port int) (net.Conn, error) {
return nil, errors.New("not ready")
}

var _ hypervisor.VsockDialer = (*alwaysFailDialer)(nil)

type trackingDialer struct {
key string
conns chan *closeTrackingConn
Expand Down
26 changes: 19 additions & 7 deletions lib/instances/firecracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/kernel/hypeman/cmd/api/config"
"github.com/kernel/hypeman/lib/devices"
"github.com/kernel/hypeman/lib/diskutilization"
"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/images"
"github.com/kernel/hypeman/lib/instances/phasetracking"
Expand Down Expand Up @@ -717,9 +718,9 @@ func TestFirecrackerForkIsolation(t *testing.T) {
require.NoError(t, err, "fingerprint source mem-file after standby")

reflinkOK := probeReflinkSupport(t, tmpDir)
var statBefore syscall.Statfs_t
require.NoError(t, syscall.Statfs(tmpDir, &statBefore))
freeBefore := int64(statBefore.Bavail) * statBefore.Bsize
usageBefore, err := diskutilization.Collect(p)
require.NoError(t, err)
usedBefore := diskUtilizationTotal(usageBefore)

fork, err := mgr.ForkInstance(ctx, sourceID, ForkInstanceRequest{
Name: "fc-fork-isolation-fork",
Expand Down Expand Up @@ -791,10 +792,9 @@ func TestFirecrackerForkIsolation(t *testing.T) {
// because pages are shared CoW. Gated on FICLONE probe — ext4 etc. fall
// back to sparse copy which produces full physical copies, so the bound
// would not hold there.
var statAfter syscall.Statfs_t
require.NoError(t, syscall.Statfs(tmpDir, &statAfter))
freeAfter := int64(statAfter.Bavail) * statAfter.Bsize
consumed := freeBefore - freeAfter
usageAfter, err := diskutilization.Collect(p)
require.NoError(t, err)
consumed := diskUtilizationTotal(usageAfter) - usedBefore
t.Logf("fork lifecycle disk-usage delta: consumed=%d guestMem=%d reflink=%v",
consumed, guestMemBytes, reflinkOK)
if reflinkOK {
Expand Down Expand Up @@ -837,6 +837,18 @@ func TestFirecrackerForkIsolation(t *testing.T) {
sourceDeleted = true
}

func diskUtilizationTotal(b diskutilization.Breakdown) int64 {
return b.Images +
b.OCICache +
b.Volumes +
b.RootfsOverlays +
b.VolumeOverlays +
b.SnapshotUncompressed +
b.SnapshotCompressed +
b.SnapshotShared +
b.SnapshotOther
}

type fileFingerprint struct {
inode uint64
sha string
Expand Down
12 changes: 10 additions & 2 deletions lib/instances/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,22 @@ func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceR
}

log.InfoContext(ctx, "restoring source instance after running fork", "source_instance_id", id)
_, restoreErr := m.restoreInstance(ctx, id)
restoredSource, restoreErr := m.restoreInstance(ctx, id)

if restoreErr != nil {
if forkErr != nil {
return nil, "", fmt.Errorf("fork failed: %v; additionally failed to restore source instance: %w", forkErr, restoreErr)
}
return nil, "", fmt.Errorf("restore source instance after fork: %w", restoreErr)
}
if restoredSource != nil && !restoredSource.NetworkEnabled && (restoredSource.State == StateRunning || restoredSource.State == StateInitializing) {
if err := ensureGuestAgentReadyForForkPhase(ctx, &restoredSource.StoredMetadata, "after restoring running fork source"); err != nil {
if forkErr != nil {
return nil, "", fmt.Errorf("fork failed: %v; additionally restored source guest agent was not ready: %w", forkErr, err)
}
return nil, "", fmt.Errorf("wait for restored source guest agent readiness: %w", err)
}
}
if forkErr != nil {
return nil, "", forkErr
}
Expand All @@ -121,7 +129,7 @@ func ensureGuestAgentReadyForRunningFork(ctx context.Context, source *StoredMeta
}

func ensureGuestAgentReadyForForkPhase(ctx context.Context, inst *StoredMetadata, phase string) error {
if inst == nil || !inst.NetworkEnabled || inst.SkipGuestAgent {
if inst == nil || inst.SkipGuestAgent {
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions lib/instances/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func runWarmForkChain(t *testing.T, mgr *manager, tmpDir string, cfg warmForkCha
t.Helper()

ctx := context.Background()
readyTimeout := 90 * time.Second
p := paths.New(tmpDir)

imageManager, err := images.NewManager(p, 1, nil)
Expand Down Expand Up @@ -658,9 +659,9 @@ func runWarmForkChain(t *testing.T, mgr *manager, tmpDir string, cfg warmForkCha
}
})

source, err = waitForInstanceState(ctx, mgr, sourceID, StateRunning, integrationTestTimeout(45*time.Second))
source, err = waitForInstanceState(ctx, mgr, sourceID, StateRunning, readyTimeout)
require.NoError(t, err)
require.NoError(t, waitForExecAgent(ctx, mgr, sourceID, 45*time.Second))
require.NoError(t, waitForExecAgent(ctx, mgr, sourceID, readyTimeout))

snapshot, err := mgr.CreateSnapshot(ctx, sourceID, CreateSnapshotRequest{
Kind: SnapshotKindStandby,
Expand Down Expand Up @@ -690,9 +691,9 @@ func runWarmForkChain(t *testing.T, mgr *manager, tmpDir string, cfg warmForkCha
_ = mgr.DeleteInstance(context.Background(), warmID)
}
})
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, integrationTestTimeout(45*time.Second))
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, readyTimeout)
require.NoError(t, err)
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 45*time.Second))
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, readyTimeout))

child, err := mgr.ForkInstance(ctx, warmID, ForkInstanceRequest{
Name: cfg.namePrefix + "-warm-chain-child",
Expand All @@ -707,11 +708,11 @@ func runWarmForkChain(t *testing.T, mgr *manager, tmpDir string, cfg warmForkCha
warm, err = mgr.GetInstance(ctx, warmID)
require.NoError(t, err)
if warm.State != StateRunning {
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, integrationTestTimeout(45*time.Second))
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, readyTimeout)
require.NoError(t, err)
}
require.Equal(t, StateRunning, warm.State)
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 45*time.Second))
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, readyTimeout))

require.NoError(t, mgr.DeleteInstance(ctx, warmID))
warmDeleted = true
Expand Down
22 changes: 5 additions & 17 deletions lib/instances/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,13 @@ func TestBasicEndToEnd(t *testing.T) {
assert.Len(t, instances, 1)
assert.Equal(t, inst.Id, instances[0].Id)

// Poll for logs to contain nginx startup message
var logs string
foundNginxStartup := false
for i := 0; i < 200; i++ { // Poll for up to 20 seconds (200 * 100ms)
logs, err = collectLogs(ctx, manager, inst.Id, 100)
require.NoError(t, err)

if strings.Contains(logs, "start worker processes") {
foundNginxStartup = true
break
}
time.Sleep(100 * time.Millisecond)
if err := waitForLogMessage(ctx, manager, inst.Id, "start worker processes", integrationTestTimeout(20*time.Second)); err != nil {
logs, logErr := collectLogs(ctx, manager, inst.Id, 100)
require.NoError(t, logErr)
t.Logf("Nginx startup log not observed before ingress probe: %v", err)
t.Logf("Instance logs (last 100 lines):\n%s", logs)
}

t.Logf("Instance logs (last 100 lines):\n%s", logs)

// Verify nginx started successfully
assert.True(t, foundNginxStartup, "Nginx should have started worker processes within 20 seconds")

// Test ingress - route external traffic to nginx through Caddy
t.Log("Testing ingress routing to nginx...")

Expand Down
20 changes: 5 additions & 15 deletions lib/instances/qemu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,23 +341,13 @@ func TestQEMUBasicEndToEnd(t *testing.T) {
assert.Len(t, instances, 1)
assert.Equal(t, inst.Id, instances[0].Id)

// Poll for logs to contain nginx startup message
var logs string
foundNginxStartup := false
for i := 0; i < 200; i++ {
logs, err = collectQEMULogs(ctx, manager, inst.Id, 100)
require.NoError(t, err)

if strings.Contains(logs, "start worker processes") {
foundNginxStartup = true
break
}
time.Sleep(100 * time.Millisecond)
if err := waitForLogMessage(ctx, manager, inst.Id, "start worker processes", integrationTestTimeout(20*time.Second)); err != nil {
logs, logErr := collectQEMULogs(ctx, manager, inst.Id, 100)
require.NoError(t, logErr)
t.Logf("Nginx startup log not observed before ingress probe: %v", err)
t.Logf("Instance logs (last 100 lines):\n%s", logs)
}

t.Logf("Instance logs (last 100 lines):\n%s", logs)
assert.True(t, foundNginxStartup, "Nginx should have started worker processes within 20 seconds")

// Test ingress - route external traffic to nginx
t.Log("Testing ingress routing to nginx...")

Expand Down
6 changes: 5 additions & 1 deletion lib/instances/restart_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,9 @@ func shouldResetRestartAttempts(policy *restartpolicy.Policy, status restartpoli
if inst.State != StateRunning && inst.State != StateInitializing {
return false
}
return !now.Before(inst.StartedAt.UTC().Add(restartpolicy.StableAfter(policy)))
stableSince := inst.StartedAt.UTC()
if status.LastAttemptAt != nil && status.LastAttemptAt.UTC().After(stableSince) {
stableSince = status.LastAttemptAt.UTC()
}
return !now.Before(stableSince.Add(restartpolicy.StableAfter(policy)))
}
42 changes: 42 additions & 0 deletions lib/instances/restart_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,48 @@ func TestShouldResetRestartAttempts(t *testing.T) {
assert.True(t, reset)
}

func TestShouldResetRestartAttemptsUsesLatestAttemptWhenStartedAtIsOlder(t *testing.T) {
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
startedAt := now.Add(-1 * time.Hour)
lastAttemptAt := now.Add(-5 * time.Second)

reset := shouldResetRestartAttempts(
&restartpolicy.Policy{Policy: restartpolicy.PolicyOnFailure, StableAfter: "30s"},
restartpolicy.Status{
Attempts: 1,
LastAttemptAt: &lastAttemptAt,
},
&Instance{
State: StateInitializing,
StoredMetadata: StoredMetadata{StartedAt: &startedAt},
},
now,
)

assert.False(t, reset)
}

func TestShouldResetRestartAttemptsAfterLatestAttemptIsStable(t *testing.T) {
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
startedAt := now.Add(-1 * time.Hour)
lastAttemptAt := now.Add(-31 * time.Second)

reset := shouldResetRestartAttempts(
&restartpolicy.Policy{Policy: restartpolicy.PolicyOnFailure, StableAfter: "30s"},
restartpolicy.Status{
Attempts: 1,
LastAttemptAt: &lastAttemptAt,
},
&Instance{
State: StateRunning,
StoredMetadata: StoredMetadata{StartedAt: &startedAt},
},
now,
)

assert.True(t, reset)
}

func TestPrepareRestartAttemptPreservesLastReason(t *testing.T) {
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
nextStatus, reason, shouldAttempt := prepareRestartAttempt(
Expand Down
Loading
Loading