diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index eec2948b..333f3655 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,7 +36,7 @@ jobs: local sleep_seconds=30 local n=1 while [ "$n" -le "$attempts" ]; do - if sudo apt-get update; then + if timeout 120s sudo apt-get update; then return 0 fi if [ "$n" -eq "$attempts" ]; then @@ -53,7 +53,7 @@ jobs: ! command -v qemu-system-x86_64 &> /dev/null || \ ! qemu-system-x86_64 --version >/dev/null 2>&1; then apt_update_with_retry - sudo apt-get install -y erofs-utils e2fsprogs iptables qemu-system-x86 qemu-utils + timeout 300s sudo apt-get install -y erofs-utils e2fsprogs iptables qemu-system-x86 qemu-utils fi go mod download @@ -69,6 +69,13 @@ jobs: sudo env "PATH=$TEST_PATH" bash -lc "command -v '$bin'" done + - name: Create run-scoped temp directory + run: | + TEST_NETWORK_TMPDIR="/tmp/hm-net-${{ github.run_id }}-${{ github.run_attempt }}" + sudo rm -rf "$TEST_NETWORK_TMPDIR" + mkdir -p "$TEST_NETWORK_TMPDIR" + sudo chown -R "$(id -u):$(id -g)" "$TEST_NETWORK_TMPDIR" + # Avoids rate limits when running the tests # Tests includes pulling, then converting to disk images - name: Login to Docker Hub @@ -106,6 +113,7 @@ jobs: - name: Run tests env: + HYPEMAN_TEST_NETWORK_TMPDIR: /tmp/hm-net-${{ github.run_id }}-${{ github.run_attempt }} GO_TEST_TIMEOUT: 600s # Docker auth for tests running as root (sudo) DOCKER_CONFIG: /home/debianuser/.docker @@ -122,6 +130,10 @@ jobs: export HYPEMAN_TEST_PREWARM_DIR="$HOME/.cache/hypeman-ci/linux-amd64" make test TEST_TIMEOUT=20m + - name: Cleanup + if: always() + run: sudo rm -rf "/tmp/hm-net-${{ github.run_id }}-${{ github.run_attempt }}" + test-darwin: runs-on: [self-hosted, macos, arm64] concurrency: diff --git a/Makefile b/Makefile index 08c14fa8..770d44e8 100644 --- a/Makefile +++ b/Makefile @@ -295,12 +295,14 @@ test-linux: ensure-ch-binaries ensure-firecracker-binaries ensure-caddy-binaries if [ -n "$(TEST)" ]; then \ echo "Running specific test: $(TEST)"; \ sudo env "PATH=$$TEST_PATH" "DOCKER_CONFIG=$${DOCKER_CONFIG:-$$HOME/.docker}" "CI=$${CI:-}" \ + "HYPEMAN_TEST_NETWORK_TMPDIR=$${HYPEMAN_TEST_NETWORK_TMPDIR:-}" \ "HYPEMAN_TEST_PREWARM_DIR=$${HYPEMAN_TEST_PREWARM_DIR:-}" \ "HYPEMAN_TEST_PREWARM_STRICT=$${HYPEMAN_TEST_PREWARM_STRICT:-}" \ "HYPEMAN_TEST_REGISTRY=$${HYPEMAN_TEST_REGISTRY:-}" \ go test -tags containers_image_openpgp -run=$(TEST) $$VERBOSE_FLAG -timeout=$(TEST_TIMEOUT) ./...; \ else \ sudo env "PATH=$$TEST_PATH" "DOCKER_CONFIG=$${DOCKER_CONFIG:-$$HOME/.docker}" "CI=$${CI:-}" \ + "HYPEMAN_TEST_NETWORK_TMPDIR=$${HYPEMAN_TEST_NETWORK_TMPDIR:-}" \ "HYPEMAN_TEST_PREWARM_DIR=$${HYPEMAN_TEST_PREWARM_DIR:-}" \ "HYPEMAN_TEST_PREWARM_STRICT=$${HYPEMAN_TEST_PREWARM_STRICT:-}" \ "HYPEMAN_TEST_REGISTRY=$${HYPEMAN_TEST_REGISTRY:-}" \ diff --git a/cmd/api/api/snapshots.go b/cmd/api/api/snapshots.go index 620d9574..00b6d06b 100644 --- a/cmd/api/api/snapshots.go +++ b/cmd/api/api/snapshots.go @@ -175,7 +175,9 @@ func (s *ApiService) ForkSnapshot(ctx context.Context, request oapi.ForkSnapshot return oapi.ForkSnapshot400JSONResponse{Code: "invalid_request", Message: "request body is required"}, nil } - domainReq := instances.ForkSnapshotRequest{Name: request.Body.Name} + domainReq := instances.ForkSnapshotRequest{ + Name: request.Body.Name, + } if request.Body.TargetState != nil { domainReq.TargetState = instances.State(*request.Body.TargetState) } diff --git a/cmd/api/api/snapshots_test.go b/cmd/api/api/snapshots_test.go index d0730133..1cfaf1d8 100644 --- a/cmd/api/api/snapshots_test.go +++ b/cmd/api/api/snapshots_test.go @@ -1,14 +1,35 @@ package api import ( + "context" "testing" "time" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/instances" + "github.com/kernel/hypeman/lib/oapi" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type captureForkSnapshotManager struct { + instances.Manager + lastID string + lastReq *instances.ForkSnapshotRequest + result *instances.Instance + err error +} + +func (m *captureForkSnapshotManager) ForkSnapshot(ctx context.Context, snapshotID string, req instances.ForkSnapshotRequest) (*instances.Instance, error) { + reqCopy := req + m.lastID = snapshotID + m.lastReq = &reqCopy + if m.err != nil { + return nil, m.err + } + return m.result, nil +} + func TestSnapshotScheduleToOAPIPreservesZeroMaxCount(t *testing.T) { t.Parallel() @@ -30,3 +51,39 @@ func TestSnapshotScheduleToOAPIPreservesZeroMaxCount(t *testing.T) { require.NotNil(t, out.Retention.MaxAge) assert.Equal(t, "24h0m0s", *out.Retention.MaxAge) } + +func TestForkSnapshotSuccess(t *testing.T) { + t.Parallel() + svc := newTestService(t) + + forked := &instances.Instance{ + StoredMetadata: instances.StoredMetadata{ + Id: "forked-instance", + Name: "forked-instance", + Image: "docker.io/library/alpine:latest", + CreatedAt: time.Now(), + HypervisorType: hypervisor.TypeFirecracker, + }, + State: instances.StateRunning, + } + mockMgr := &captureForkSnapshotManager{ + Manager: svc.InstanceManager, + result: forked, + } + svc.InstanceManager = mockMgr + + resp, err := svc.ForkSnapshot(ctx(), oapi.ForkSnapshotRequestObject{ + SnapshotId: "snap-123", + Body: &oapi.ForkSnapshotRequest{ + Name: "forked-instance", + }, + }) + require.NoError(t, err) + + created, ok := resp.(oapi.ForkSnapshot201JSONResponse) + require.True(t, ok, "expected 201 response") + assert.Equal(t, "forked-instance", created.Name) + assert.Equal(t, "snap-123", mockMgr.lastID) + require.NotNil(t, mockMgr.lastReq) + assert.Equal(t, "forked-instance", mockMgr.lastReq.Name) +} diff --git a/lib/forkvm/README.md b/lib/forkvm/README.md index bb6567c7..d224b220 100644 --- a/lib/forkvm/README.md +++ b/lib/forkvm/README.md @@ -14,6 +14,23 @@ to work across implementations. For networked forks, the fork gets a fresh host/guest identity (IP, MAC, TAP) instead of reusing the source identity. +## Resume network handoff + +Networked standby/running forks need a new host-side allocation, but the guest +memory snapshot still contains the source VM's old interface state. On restore, +Hypeman prepares the fork's TAP/IP/MAC before the VM resumes, then hands the new +guest network config to the guest through a small mailbox embedded in snapshot +memory. After resume, VMGenID tells the guest-agent that this is a restored VM; +the guest-agent reads the mailbox and applies the new MAC, address, route, and +neighbor state with netlink. + +For API calls that return a running fork, Hypeman waits for a guest UDP +"applied" ack before returning, so the fast path still avoids making +host-initiated guest RPC/vsock contact as the first post-resume dependency. If +the mailbox path is unavailable, cannot start its ack listener, or does not ack +in time, restore falls back to the older host-initiated guest network +reconfigure path. + ## Fork data copy behavior - Guest directory copy is **sparse-only** for regular files. diff --git a/lib/guest/client.go b/lib/guest/client.go index bfdf6956..b6dcd2e8 100644 --- a/lib/guest/client.go +++ b/lib/guest/client.go @@ -82,8 +82,12 @@ func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*grpc. } // Create new connection using the VsockDialer + traceCtx := ctx conn, err := grpc.Dial("passthrough:///vsock", grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + if span := trace.SpanFromContext(traceCtx); span.SpanContext().IsValid() { + ctx = trace.ContextWithSpan(ctx, span) + } netConn, err := dialer.DialVsock(ctx, vsockGuestPort) if err != nil { return nil, &AgentVSockDialError{Err: err} @@ -146,6 +150,113 @@ type ExecOptions struct { ResizeChan <-chan *WindowSize // Optional: channel to receive resize events (pointer to avoid copying mutex) } +type ReconfigureNetworkOptions struct { + InterfaceName string + MAC string + IPv4 string + Prefix uint32 + Gateway string + WaitForAgent time.Duration +} + +func ReconfigureNetworkInInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ReconfigureNetworkOptions) error { + if opts.WaitForAgent == 0 { + err := reconfigureNetworkOnce(ctx, dialer, opts) + if err != nil && isRetryableConnectionError(err) { + CloseConn(dialer.Key()) + } + return err + } + + ctx, span := otel.Tracer("hypeman/guest").Start(ctx, "guest.reconfigure_network", trace.WithAttributes( + attribute.Bool("wait_for_agent", true), + attribute.Int64("wait_for_agent_ms", opts.WaitForAgent.Milliseconds()), + )) + defer span.End() + + deadline := time.Now().Add(opts.WaitForAgent) + start := time.Now() + attempts := 0 + retryableAttempts := 0 + firstRetryableErrorType := "" + lastRetryableErrorType := "" + lastRetryInterval := time.Duration(0) + + for { + attempts++ + err := reconfigureNetworkOnce(ctx, dialer, opts) + if err == nil { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) + span.SetStatus(otelcodes.Ok, "") + return nil + } + if !isRetryableConnectionError(err) { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) + span.RecordError(err) + span.SetStatus(otelcodes.Error, err.Error()) + return err + } + + retryableAttempts++ + errType := retryableConnectionErrorType(err) + if firstRetryableErrorType == "" { + firstRetryableErrorType = errType + } + lastRetryableErrorType = errType + CloseConn(dialer.Key()) + + if time.Now().After(deadline) { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) + span.RecordError(err) + span.SetStatus(otelcodes.Error, err.Error()) + return err + } + + retryInterval := guestExecRetryInterval(time.Since(start)) + lastRetryInterval = retryInterval + select { + case <-ctx.Done(): + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) + span.RecordError(ctx.Err()) + span.SetStatus(otelcodes.Error, ctx.Err().Error()) + return ctx.Err() + case <-time.After(retryInterval): + } + } +} + +func reconfigureNetworkOnce(ctx context.Context, dialer hypervisor.VsockDialer, opts ReconfigureNetworkOptions) error { + grpcConn, err := GetOrCreateConn(ctx, dialer) + if err != nil { + return fmt.Errorf("get grpc connection: %w", err) + } + client := NewGuestServiceClient(grpcConn) + + _, span := otel.Tracer("hypeman/guest").Start(ctx, "guest.reconfigure_network.rpc") + _, err = client.ReconfigureNetwork(ctx, &ReconfigureNetworkRequest{ + InterfaceName: opts.InterfaceName, + Mac: opts.MAC, + Ipv4: opts.IPv4, + Prefix: opts.Prefix, + Gateway: opts.Gateway, + }) + finishGuestNetworkStepSpan(span, err) + if err != nil { + return fmt.Errorf("reconfigure network rpc: %w", err) + } + return nil +} + +func finishGuestNetworkStepSpan(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(otelcodes.Error, err.Error()) + } else { + span.SetStatus(otelcodes.Ok, "") + } + span.End() +} + // ExecIntoInstance executes command in instance via vsock using gRPC. // The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest. // If WaitForAgent is set, it will retry on connection errors until the timeout. diff --git a/lib/guest/guest.pb.go b/lib/guest/guest.pb.go index 0de1628e..a239fc97 100644 --- a/lib/guest/guest.pb.go +++ b/lib/guest/guest.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.10 -// protoc v3.21.12 +// protoc v6.33.4 // source: lib/guest/guest.proto package guest @@ -1266,6 +1266,120 @@ func (*ShutdownResponse) Descriptor() ([]byte, []int) { return file_lib_guest_guest_proto_rawDescGZIP(), []int{16} } +// ReconfigureNetworkRequest updates a guest network interface after snapshot restore +type ReconfigureNetworkRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + InterfaceName string `protobuf:"bytes,1,opt,name=interface_name,json=interfaceName,proto3" json:"interface_name,omitempty"` // Interface to reconfigure, defaults to eth0 + Mac string `protobuf:"bytes,2,opt,name=mac,proto3" json:"mac,omitempty"` // New MAC address + Ipv4 string `protobuf:"bytes,3,opt,name=ipv4,proto3" json:"ipv4,omitempty"` // New IPv4 address without prefix + Prefix uint32 `protobuf:"varint,4,opt,name=prefix,proto3" json:"prefix,omitempty"` // IPv4 prefix length + Gateway string `protobuf:"bytes,5,opt,name=gateway,proto3" json:"gateway,omitempty"` // Default gateway IPv4 address + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReconfigureNetworkRequest) Reset() { + *x = ReconfigureNetworkRequest{} + mi := &file_lib_guest_guest_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReconfigureNetworkRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReconfigureNetworkRequest) ProtoMessage() {} + +func (x *ReconfigureNetworkRequest) ProtoReflect() protoreflect.Message { + mi := &file_lib_guest_guest_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReconfigureNetworkRequest.ProtoReflect.Descriptor instead. +func (*ReconfigureNetworkRequest) Descriptor() ([]byte, []int) { + return file_lib_guest_guest_proto_rawDescGZIP(), []int{17} +} + +func (x *ReconfigureNetworkRequest) GetInterfaceName() string { + if x != nil { + return x.InterfaceName + } + return "" +} + +func (x *ReconfigureNetworkRequest) GetMac() string { + if x != nil { + return x.Mac + } + return "" +} + +func (x *ReconfigureNetworkRequest) GetIpv4() string { + if x != nil { + return x.Ipv4 + } + return "" +} + +func (x *ReconfigureNetworkRequest) GetPrefix() uint32 { + if x != nil { + return x.Prefix + } + return 0 +} + +func (x *ReconfigureNetworkRequest) GetGateway() string { + if x != nil { + return x.Gateway + } + return "" +} + +// ReconfigureNetworkResponse acknowledges the network reconfiguration request +type ReconfigureNetworkResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReconfigureNetworkResponse) Reset() { + *x = ReconfigureNetworkResponse{} + mi := &file_lib_guest_guest_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReconfigureNetworkResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReconfigureNetworkResponse) ProtoMessage() {} + +func (x *ReconfigureNetworkResponse) ProtoReflect() protoreflect.Message { + mi := &file_lib_guest_guest_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReconfigureNetworkResponse.ProtoReflect.Descriptor instead. +func (*ReconfigureNetworkResponse) Descriptor() ([]byte, []int) { + return file_lib_guest_guest_proto_rawDescGZIP(), []int{18} +} + var File_lib_guest_guest_proto protoreflect.FileDescriptor const file_lib_guest_guest_proto_rawDesc = "" + @@ -1358,13 +1472,21 @@ const file_lib_guest_guest_proto_rawDesc = "" + "\x05error\x18\b \x01(\tR\x05error\")\n" + "\x0fShutdownRequest\x12\x16\n" + "\x06signal\x18\x01 \x01(\x05R\x06signal\"\x12\n" + - "\x10ShutdownResponse2\xd3\x02\n" + + "\x10ShutdownResponse\"\x9a\x01\n" + + "\x19ReconfigureNetworkRequest\x12%\n" + + "\x0einterface_name\x18\x01 \x01(\tR\rinterfaceName\x12\x10\n" + + "\x03mac\x18\x02 \x01(\tR\x03mac\x12\x12\n" + + "\x04ipv4\x18\x03 \x01(\tR\x04ipv4\x12\x16\n" + + "\x06prefix\x18\x04 \x01(\rR\x06prefix\x12\x18\n" + + "\agateway\x18\x05 \x01(\tR\agateway\"\x1c\n" + + "\x1aReconfigureNetworkResponse2\xae\x03\n" + "\fGuestService\x123\n" + "\x04Exec\x12\x12.guest.ExecRequest\x1a\x13.guest.ExecResponse(\x010\x01\x12F\n" + "\vCopyToGuest\x12\x19.guest.CopyToGuestRequest\x1a\x1a.guest.CopyToGuestResponse(\x01\x12L\n" + "\rCopyFromGuest\x12\x1b.guest.CopyFromGuestRequest\x1a\x1c.guest.CopyFromGuestResponse0\x01\x12;\n" + "\bStatPath\x12\x16.guest.StatPathRequest\x1a\x17.guest.StatPathResponse\x12;\n" + - "\bShutdown\x12\x16.guest.ShutdownRequest\x1a\x17.guest.ShutdownResponseB'Z%github.com/onkernel/hypeman/lib/guestb\x06proto3" + "\bShutdown\x12\x16.guest.ShutdownRequest\x1a\x17.guest.ShutdownResponse\x12Y\n" + + "\x12ReconfigureNetwork\x12 .guest.ReconfigureNetworkRequest\x1a!.guest.ReconfigureNetworkResponseB'Z%github.com/onkernel/hypeman/lib/guestb\x06proto3" var ( file_lib_guest_guest_proto_rawDescOnce sync.Once @@ -1378,31 +1500,33 @@ func file_lib_guest_guest_proto_rawDescGZIP() []byte { return file_lib_guest_guest_proto_rawDescData } -var file_lib_guest_guest_proto_msgTypes = make([]protoimpl.MessageInfo, 18) +var file_lib_guest_guest_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_lib_guest_guest_proto_goTypes = []any{ - (*ExecRequest)(nil), // 0: guest.ExecRequest - (*ExecStart)(nil), // 1: guest.ExecStart - (*WindowSize)(nil), // 2: guest.WindowSize - (*ExecResponse)(nil), // 3: guest.ExecResponse - (*CopyToGuestRequest)(nil), // 4: guest.CopyToGuestRequest - (*CopyToGuestStart)(nil), // 5: guest.CopyToGuestStart - (*CopyToGuestEnd)(nil), // 6: guest.CopyToGuestEnd - (*CopyToGuestResponse)(nil), // 7: guest.CopyToGuestResponse - (*CopyFromGuestRequest)(nil), // 8: guest.CopyFromGuestRequest - (*CopyFromGuestResponse)(nil), // 9: guest.CopyFromGuestResponse - (*CopyFromGuestHeader)(nil), // 10: guest.CopyFromGuestHeader - (*CopyFromGuestEnd)(nil), // 11: guest.CopyFromGuestEnd - (*CopyFromGuestError)(nil), // 12: guest.CopyFromGuestError - (*StatPathRequest)(nil), // 13: guest.StatPathRequest - (*StatPathResponse)(nil), // 14: guest.StatPathResponse - (*ShutdownRequest)(nil), // 15: guest.ShutdownRequest - (*ShutdownResponse)(nil), // 16: guest.ShutdownResponse - nil, // 17: guest.ExecStart.EnvEntry + (*ExecRequest)(nil), // 0: guest.ExecRequest + (*ExecStart)(nil), // 1: guest.ExecStart + (*WindowSize)(nil), // 2: guest.WindowSize + (*ExecResponse)(nil), // 3: guest.ExecResponse + (*CopyToGuestRequest)(nil), // 4: guest.CopyToGuestRequest + (*CopyToGuestStart)(nil), // 5: guest.CopyToGuestStart + (*CopyToGuestEnd)(nil), // 6: guest.CopyToGuestEnd + (*CopyToGuestResponse)(nil), // 7: guest.CopyToGuestResponse + (*CopyFromGuestRequest)(nil), // 8: guest.CopyFromGuestRequest + (*CopyFromGuestResponse)(nil), // 9: guest.CopyFromGuestResponse + (*CopyFromGuestHeader)(nil), // 10: guest.CopyFromGuestHeader + (*CopyFromGuestEnd)(nil), // 11: guest.CopyFromGuestEnd + (*CopyFromGuestError)(nil), // 12: guest.CopyFromGuestError + (*StatPathRequest)(nil), // 13: guest.StatPathRequest + (*StatPathResponse)(nil), // 14: guest.StatPathResponse + (*ShutdownRequest)(nil), // 15: guest.ShutdownRequest + (*ShutdownResponse)(nil), // 16: guest.ShutdownResponse + (*ReconfigureNetworkRequest)(nil), // 17: guest.ReconfigureNetworkRequest + (*ReconfigureNetworkResponse)(nil), // 18: guest.ReconfigureNetworkResponse + nil, // 19: guest.ExecStart.EnvEntry } var file_lib_guest_guest_proto_depIdxs = []int32{ 1, // 0: guest.ExecRequest.start:type_name -> guest.ExecStart 2, // 1: guest.ExecRequest.resize:type_name -> guest.WindowSize - 17, // 2: guest.ExecStart.env:type_name -> guest.ExecStart.EnvEntry + 19, // 2: guest.ExecStart.env:type_name -> guest.ExecStart.EnvEntry 5, // 3: guest.CopyToGuestRequest.start:type_name -> guest.CopyToGuestStart 6, // 4: guest.CopyToGuestRequest.end:type_name -> guest.CopyToGuestEnd 10, // 5: guest.CopyFromGuestResponse.header:type_name -> guest.CopyFromGuestHeader @@ -1413,13 +1537,15 @@ var file_lib_guest_guest_proto_depIdxs = []int32{ 8, // 10: guest.GuestService.CopyFromGuest:input_type -> guest.CopyFromGuestRequest 13, // 11: guest.GuestService.StatPath:input_type -> guest.StatPathRequest 15, // 12: guest.GuestService.Shutdown:input_type -> guest.ShutdownRequest - 3, // 13: guest.GuestService.Exec:output_type -> guest.ExecResponse - 7, // 14: guest.GuestService.CopyToGuest:output_type -> guest.CopyToGuestResponse - 9, // 15: guest.GuestService.CopyFromGuest:output_type -> guest.CopyFromGuestResponse - 14, // 16: guest.GuestService.StatPath:output_type -> guest.StatPathResponse - 16, // 17: guest.GuestService.Shutdown:output_type -> guest.ShutdownResponse - 13, // [13:18] is the sub-list for method output_type - 8, // [8:13] is the sub-list for method input_type + 17, // 13: guest.GuestService.ReconfigureNetwork:input_type -> guest.ReconfigureNetworkRequest + 3, // 14: guest.GuestService.Exec:output_type -> guest.ExecResponse + 7, // 15: guest.GuestService.CopyToGuest:output_type -> guest.CopyToGuestResponse + 9, // 16: guest.GuestService.CopyFromGuest:output_type -> guest.CopyFromGuestResponse + 14, // 17: guest.GuestService.StatPath:output_type -> guest.StatPathResponse + 16, // 18: guest.GuestService.Shutdown:output_type -> guest.ShutdownResponse + 18, // 19: guest.GuestService.ReconfigureNetwork:output_type -> guest.ReconfigureNetworkResponse + 14, // [14:20] is the sub-list for method output_type + 8, // [8:14] is the sub-list for method input_type 8, // [8:8] is the sub-list for extension type_name 8, // [8:8] is the sub-list for extension extendee 0, // [0:8] is the sub-list for field type_name @@ -1457,7 +1583,7 @@ func file_lib_guest_guest_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_lib_guest_guest_proto_rawDesc), len(file_lib_guest_guest_proto_rawDesc)), NumEnums: 0, - NumMessages: 18, + NumMessages: 20, NumExtensions: 0, NumServices: 1, }, diff --git a/lib/guest/guest.proto b/lib/guest/guest.proto index c42198a9..317c21b3 100644 --- a/lib/guest/guest.proto +++ b/lib/guest/guest.proto @@ -20,6 +20,9 @@ service GuestService { // Shutdown requests graceful VM shutdown by signaling init (PID 1) rpc Shutdown(ShutdownRequest) returns (ShutdownResponse); + + // ReconfigureNetwork updates the guest network identity without spawning shell commands + rpc ReconfigureNetwork(ReconfigureNetworkRequest) returns (ReconfigureNetworkResponse); } // ExecRequest represents messages from client to server @@ -154,3 +157,15 @@ message ShutdownRequest { // ShutdownResponse acknowledges the shutdown request message ShutdownResponse {} + +// ReconfigureNetworkRequest updates a guest network interface after snapshot restore +message ReconfigureNetworkRequest { + string interface_name = 1; // Interface to reconfigure, defaults to eth0 + string mac = 2; // New MAC address + string ipv4 = 3; // New IPv4 address without prefix + uint32 prefix = 4; // IPv4 prefix length + string gateway = 5; // Default gateway IPv4 address +} + +// ReconfigureNetworkResponse acknowledges the network reconfiguration request +message ReconfigureNetworkResponse {} diff --git a/lib/guest/guest_grpc.pb.go b/lib/guest/guest_grpc.pb.go index 71224327..f93631d9 100644 --- a/lib/guest/guest_grpc.pb.go +++ b/lib/guest/guest_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.0 -// - protoc v3.21.12 +// - protoc v6.33.4 // source: lib/guest/guest.proto package guest @@ -19,11 +19,12 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - GuestService_Exec_FullMethodName = "/guest.GuestService/Exec" - GuestService_CopyToGuest_FullMethodName = "/guest.GuestService/CopyToGuest" - GuestService_CopyFromGuest_FullMethodName = "/guest.GuestService/CopyFromGuest" - GuestService_StatPath_FullMethodName = "/guest.GuestService/StatPath" - GuestService_Shutdown_FullMethodName = "/guest.GuestService/Shutdown" + GuestService_Exec_FullMethodName = "/guest.GuestService/Exec" + GuestService_CopyToGuest_FullMethodName = "/guest.GuestService/CopyToGuest" + GuestService_CopyFromGuest_FullMethodName = "/guest.GuestService/CopyFromGuest" + GuestService_StatPath_FullMethodName = "/guest.GuestService/StatPath" + GuestService_Shutdown_FullMethodName = "/guest.GuestService/Shutdown" + GuestService_ReconfigureNetwork_FullMethodName = "/guest.GuestService/ReconfigureNetwork" ) // GuestServiceClient is the client API for GuestService service. @@ -42,6 +43,8 @@ type GuestServiceClient interface { StatPath(ctx context.Context, in *StatPathRequest, opts ...grpc.CallOption) (*StatPathResponse, error) // Shutdown requests graceful VM shutdown by signaling init (PID 1) Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) + // ReconfigureNetwork updates the guest network identity without spawning shell commands + ReconfigureNetwork(ctx context.Context, in *ReconfigureNetworkRequest, opts ...grpc.CallOption) (*ReconfigureNetworkResponse, error) } type guestServiceClient struct { @@ -117,6 +120,16 @@ func (c *guestServiceClient) Shutdown(ctx context.Context, in *ShutdownRequest, return out, nil } +func (c *guestServiceClient) ReconfigureNetwork(ctx context.Context, in *ReconfigureNetworkRequest, opts ...grpc.CallOption) (*ReconfigureNetworkResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ReconfigureNetworkResponse) + err := c.cc.Invoke(ctx, GuestService_ReconfigureNetwork_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // GuestServiceServer is the server API for GuestService service. // All implementations must embed UnimplementedGuestServiceServer // for forward compatibility. @@ -133,6 +146,8 @@ type GuestServiceServer interface { StatPath(context.Context, *StatPathRequest) (*StatPathResponse, error) // Shutdown requests graceful VM shutdown by signaling init (PID 1) Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) + // ReconfigureNetwork updates the guest network identity without spawning shell commands + ReconfigureNetwork(context.Context, *ReconfigureNetworkRequest) (*ReconfigureNetworkResponse, error) mustEmbedUnimplementedGuestServiceServer() } @@ -158,6 +173,9 @@ func (UnimplementedGuestServiceServer) StatPath(context.Context, *StatPathReques func (UnimplementedGuestServiceServer) Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) { return nil, status.Error(codes.Unimplemented, "method Shutdown not implemented") } +func (UnimplementedGuestServiceServer) ReconfigureNetwork(context.Context, *ReconfigureNetworkRequest) (*ReconfigureNetworkResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ReconfigureNetwork not implemented") +} func (UnimplementedGuestServiceServer) mustEmbedUnimplementedGuestServiceServer() {} func (UnimplementedGuestServiceServer) testEmbeddedByValue() {} @@ -240,6 +258,24 @@ func _GuestService_Shutdown_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _GuestService_ReconfigureNetwork_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReconfigureNetworkRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GuestServiceServer).ReconfigureNetwork(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: GuestService_ReconfigureNetwork_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GuestServiceServer).ReconfigureNetwork(ctx, req.(*ReconfigureNetworkRequest)) + } + return interceptor(ctx, in, info, handler) +} + // GuestService_ServiceDesc is the grpc.ServiceDesc for GuestService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -255,6 +291,10 @@ var GuestService_ServiceDesc = grpc.ServiceDesc{ MethodName: "Shutdown", Handler: _GuestService_Shutdown_Handler, }, + { + MethodName: "ReconfigureNetwork", + Handler: _GuestService_ReconfigureNetwork_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/lib/instances/create.go b/lib/instances/create.go index df991674..e2823c5e 100644 --- a/lib/instances/create.go +++ b/lib/instances/create.go @@ -474,6 +474,8 @@ func (m *manager) createInstance( stored.Volumes = req.Volumes } + ensureGuestInitiatedResumeNetworkMailbox(stored) + // 16. Create config disk (needs Instance for buildVMConfig) inst := &Instance{StoredMetadata: *stored} var proxyGuestConfig *egressproxy.GuestConfig diff --git a/lib/instances/fork.go b/lib/instances/fork.go index b4af9434..1d4f72a8 100644 --- a/lib/instances/fork.go +++ b/lib/instances/fork.go @@ -406,11 +406,11 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe lock.Lock() defer lock.Unlock() - returnWithReadiness := func(inst *Instance, err error) (*Instance, error) { + returnWithReadiness := func(inst *Instance, err error, guestReady bool) (*Instance, error) { if err != nil { return nil, err } - if inst != nil && (inst.State == StateRunning || inst.State == StateInitializing) { + if forkReturnNeedsGuestAgentReady(inst, guestReady) { if err := ensureGuestAgentReadyForForkPhase(ctx, &inst.StoredMetadata, "before returning running fork instance"); err != nil { return nil, fmt.Errorf("wait for forked guest agent readiness: %w", err) } @@ -423,24 +423,27 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe return nil, err } if current.State == target || (target == StateRunning && current.State == StateInitializing) { - return returnWithReadiness(current, nil) + return returnWithReadiness(current, nil, false) } switch current.State { case StateStopped: switch target { case StateRunning: - return returnWithReadiness(m.startInstance(ctx, forkID, StartInstanceRequest{})) + inst, err := m.startInstance(ctx, forkID, StartInstanceRequest{}) + return returnWithReadiness(inst, err, false) case StateStandby: if _, err := m.startInstance(ctx, forkID, StartInstanceRequest{}); err != nil { return nil, fmt.Errorf("start forked instance for standby transition: %w", err) } - return returnWithReadiness(m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false)) + inst, err := m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false) + return returnWithReadiness(inst, err, false) } case StateStandby: switch target { case StateRunning: - return returnWithReadiness(m.restoreInstance(ctx, forkID)) + inst, err := m.restoreInstance(ctx, forkID) + return returnWithReadiness(inst, err, current.NetworkEnabled && !current.SkipGuestAgent) case StateStopped: if err := os.RemoveAll(m.paths.InstanceSnapshotLatest(forkID)); err != nil { return nil, fmt.Errorf("remove fork snapshot: %w", err) @@ -453,20 +456,34 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe if err := m.saveMetadata(meta); err != nil { return nil, fmt.Errorf("save stopped fork metadata: %w", err) } - return returnWithReadiness(m.getInstance(ctx, forkID)) + inst, err := m.getInstance(ctx, forkID) + return returnWithReadiness(inst, err, false) } case StateRunning: switch target { case StateStandby: - return returnWithReadiness(m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false)) + inst, err := m.standbyInstance(ctx, forkID, StandbyInstanceRequest{}, false) + return returnWithReadiness(inst, err, false) case StateStopped: - return returnWithReadiness(m.stopInstance(ctx, forkID)) + inst, err := m.stopInstance(ctx, forkID) + return returnWithReadiness(inst, err, false) } } return nil, fmt.Errorf("%w: cannot transition forked instance from %s to %s", ErrInvalidState, current.State, target) } +func forkTargetStateAlreadyApplied(inst *Instance, target State) bool { + if inst == nil { + return false + } + return inst.State == target || (target == StateRunning && inst.State == StateInitializing) +} + +func forkReturnNeedsGuestAgentReady(inst *Instance, guestReady bool) bool { + return inst != nil && (inst.State == StateRunning || inst.State == StateInitializing) && !guestReady +} + func (m *manager) cleanupForkInstanceOnError(ctx context.Context, forkID string) error { lock := m.getInstanceLock(forkID) lock.Lock() diff --git a/lib/instances/fork_test.go b/lib/instances/fork_test.go index 606b7671..1268d90c 100644 --- a/lib/instances/fork_test.go +++ b/lib/instances/fork_test.go @@ -319,6 +319,24 @@ func TestApplyForkTargetStateStoppedRefreshesSnapshotForkCID(t *testing.T) { assert.Equal(t, generateVsockCID(forkID), updated.StoredMetadata.VsockCID) } +func TestForkReturnReadinessDoesNotUseMailboxEligibility(t *testing.T) { + t.Parallel() + + stored := StoredMetadata{ + HypervisorType: hypervisor.TypeFirecracker, + NetworkEnabled: true, + } + ensureGuestInitiatedResumeNetworkMailbox(&stored) + require.True(t, guestInitiatedResumeNetworkMailbox(&stored)) + + inst := &Instance{ + StoredMetadata: stored, + State: StateInitializing, + } + assert.True(t, forkReturnNeedsGuestAgentReady(inst, false)) + assert.False(t, forkReturnNeedsGuestAgentReady(inst, true)) +} + func TestCloneStoredMetadataForFork_DeepCopiesReferenceFields(t *testing.T) { t.Parallel() startedAt := time.Now().Add(-2 * time.Minute) diff --git a/lib/instances/guest_resume_network.go b/lib/instances/guest_resume_network.go new file mode 100644 index 00000000..0d880eb6 --- /dev/null +++ b/lib/instances/guest_resume_network.go @@ -0,0 +1,235 @@ +package instances + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + stdnet "net" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/mailbox" + "github.com/nrednav/cuid2" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/sys/unix" +) + +const firecrackerSnapshotMemoryFile = "memory" + +const guestResumeNetworkUDPAckTimeout = 5 * time.Second + +var guestResumeNetworkMailboxOffsets sync.Map + +type guestResumeNetworkUDPAck struct { + received time.Time + text string +} + +type guestResumeNetworkUDPWaiter struct { + conn *stdnet.UDPConn + ch chan guestResumeNetworkUDPAck +} + +func guestInitiatedResumeNetworkMailbox(stored *StoredMetadata) bool { + token := guestInitiatedResumeNetworkMailboxToken(stored) + return stored != nil && + stored.HypervisorType == hypervisor.TypeFirecracker && + strings.TrimSpace(stored.Env[mailbox.MailboxEnv]) == "1" && + mailbox.ValidToken(token) +} + +func guestInitiatedResumeNetworkMailboxToken(stored *StoredMetadata) string { + if stored == nil { + return "" + } + return strings.TrimSpace(stored.Env[mailbox.MailboxTokenEnv]) +} + +func ensureGuestInitiatedResumeNetworkMailbox(stored *StoredMetadata) { + if stored == nil || + stored.HypervisorType != hypervisor.TypeFirecracker || + !stored.NetworkEnabled || + stored.SkipGuestAgent { + return + } + if stored.Env == nil { + stored.Env = make(map[string]string) + } + stored.Env[mailbox.MailboxEnv] = "1" + if token := guestInitiatedResumeNetworkMailboxToken(stored); !mailbox.ValidToken(token) { + stored.Env[mailbox.MailboxTokenEnv] = cuid2.Generate() + } +} + +func newGuestResumeNetworkPayload(cfg *guestNetworkConfig) mailbox.Payload { + return mailbox.Payload{ + InterfaceName: "eth0", + MAC: cfg.mac, + IPv4: cfg.ip, + Prefix: uint32(cfg.prefix), + Gateway: cfg.gateway, + } +} + +func startGuestResumeNetworkUDPWaiter() (*guestResumeNetworkUDPWaiter, error) { + conn, err := stdnet.ListenUDP("udp4", &stdnet.UDPAddr{IP: stdnet.IPv4zero, Port: 0}) + if err != nil { + return nil, fmt.Errorf("listen for guest resume network UDP ack: %w", err) + } + + w := &guestResumeNetworkUDPWaiter{ + conn: conn, + ch: make(chan guestResumeNetworkUDPAck, 128), + } + go w.readLoop() + return w, nil +} + +func (w *guestResumeNetworkUDPWaiter) Port() uint32 { + if w == nil || w.conn == nil { + return 0 + } + return uint32(w.conn.LocalAddr().(*stdnet.UDPAddr).Port) +} + +func (w *guestResumeNetworkUDPWaiter) Close() { + if w == nil || w.conn == nil { + return + } + _ = w.conn.Close() +} + +func (w *guestResumeNetworkUDPWaiter) readLoop() { + buf := make([]byte, 1024) + for { + n, _, err := w.conn.ReadFromUDP(buf) + if err != nil { + return + } + w.ch <- guestResumeNetworkUDPAck{ + received: time.Now(), + text: strings.TrimSpace(string(buf[:n])), + } + } +} + +func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (time.Duration, string, error) { + if w == nil { + return 0, "", fmt.Errorf("guest resume network UDP waiter is nil") + } + + start := time.Now() + wantMAC := "mac=" + strings.ToLower(mac) + wantIP := "ip=" + ip + for { + select { + case ack := <-w.ch: + text := strings.ToLower(ack.text) + if strings.Contains(text, "stage=applied") && strings.Contains(text, wantMAC) && strings.Contains(text, wantIP) { + return ack.received.Sub(start), ack.text, nil + } + case <-ctx.Done(): + return 0, "", ctx.Err() + } + } +} + +func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *guestResumeNetworkUDPWaiter, stored *StoredMetadata, cfg *guestNetworkConfig) error { + if waiter == nil || cfg == nil { + return nil + } + + log := logger.FromContext(ctx) + waitCtx, waitSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.udp_ack_wait", + attribute.String("instance_id", stored.Id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "guest_resume_network_udp_ack_wait"), + ) + waitCtx, cancel := context.WithTimeout(waitCtx, guestResumeNetworkUDPAckTimeout) + defer cancel() + + elapsed, ack, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip) + waitSpanEnd(err) + if err != nil { + return err + } + log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", elapsed, "ack", ack) + return nil +} + +func patchGuestResumeNetworkMailbox(snapshotDir, token string, payload *mailbox.Payload) error { + payloadBytes, err := mailbox.MarshalPayload(payload) + if err != nil { + return err + } + + file, err := os.OpenFile(filepath.Join(snapshotDir, firecrackerSnapshotMemoryFile), os.O_RDWR, 0) + if err != nil { + return fmt.Errorf("open snapshot memory for resume network mailbox: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return fmt.Errorf("stat snapshot memory for resume network mailbox: %w", err) + } + if info.Size() <= 0 { + return fmt.Errorf("resume network mailbox memory file is empty") + } + + marker, err := mailbox.Marker(token) + if err != nil { + return err + } + idx, err := findGuestResumeNetworkMailbox(file, info.Size(), marker, token) + if err != nil { + return err + } + if idx+int64(mailbox.MailboxPayloadOffset)+int64(len(payloadBytes)) > info.Size() { + return fmt.Errorf("resume network mailbox marker is too close to end of memory file") + } + + if _, err := file.WriteAt(payloadBytes, idx+int64(mailbox.MailboxPayloadOffset)); err != nil { + return fmt.Errorf("write resume network mailbox payload: %w", err) + } + var u32 [4]byte + binary.LittleEndian.PutUint32(u32[:], uint32(len(payloadBytes))) + if _, err := file.WriteAt(u32[:], idx+int64(mailbox.MailboxLengthOffset)); err != nil { + return fmt.Errorf("write resume network mailbox payload length: %w", err) + } + binary.LittleEndian.PutUint32(u32[:], 1) + if _, err := file.WriteAt(u32[:], idx+int64(mailbox.MailboxSeqOffset)); err != nil { + return fmt.Errorf("write resume network mailbox sequence: %w", err) + } + return nil +} + +func findGuestResumeNetworkMailbox(file *os.File, size int64, marker []byte, token string) (int64, error) { + if cached, ok := guestResumeNetworkMailboxOffsets.Load(token); ok { + if offset, ok := cached.(int64); ok && offset >= 0 && offset+int64(len(marker)) <= size { + buf := make([]byte, len(marker)) + if _, err := file.ReadAt(buf, offset); err == nil && bytes.Equal(buf, marker) { + return offset, nil + } + } + } + + data, err := unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED) + if err != nil { + return 0, fmt.Errorf("mmap snapshot memory for resume network mailbox: %w", err) + } + defer unix.Munmap(data) + + idx := bytes.Index(data, marker) + if idx < 0 { + return 0, fmt.Errorf("resume network mailbox marker not found") + } + guestResumeNetworkMailboxOffsets.Store(token, int64(idx)) + return int64(idx), nil +} diff --git a/lib/instances/manager.go b/lib/instances/manager.go index abdd2165..0ded26ea 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -389,19 +389,14 @@ func (m *manager) ForkInstance(ctx context.Context, id string, req ForkInstanceR return nil, err } - inst, err := m.applyForkTargetState(ctx, forked.Id, targetState) - if err != nil { - if cleanupErr := m.cleanupForkInstanceOnError(ctx, forked.Id); cleanupErr != nil { - return nil, fmt.Errorf("apply fork target state: %w; additionally failed to cleanup forked instance %s: %v", err, forked.Id, cleanupErr) - } - return nil, fmt.Errorf("apply fork target state: %w", err) - } - if inst.State == StateRunning { - if err := ensureGuestAgentReadyForForkPhase(ctx, &inst.StoredMetadata, "before returning running fork instance"); err != nil { + inst := forked + if !forkTargetStateAlreadyApplied(inst, targetState) { + inst, err = m.applyForkTargetState(ctx, forked.Id, targetState) + if err != nil { if cleanupErr := m.cleanupForkInstanceOnError(ctx, forked.Id); cleanupErr != nil { - return nil, fmt.Errorf("wait for fork guest agent readiness: %w; additionally failed to cleanup forked instance %s: %v", err, forked.Id, cleanupErr) + return nil, fmt.Errorf("apply fork target state: %w; additionally failed to cleanup forked instance %s: %v", err, forked.Id, cleanupErr) } - return nil, fmt.Errorf("wait for fork guest agent readiness: %w", err) + return nil, fmt.Errorf("apply fork target state: %w", err) } } m.notifyLifecycleEvent(ctx, LifecycleEventFork, inst) diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 4e9e116e..c8645e0c 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -16,13 +16,14 @@ import ( "github.com/kernel/hypeman/lib/network" snapshotstore "github.com/kernel/hypeman/lib/snapshot" "go.opentelemetry.io/otel/attribute" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -// RestoreInstance restores an instance from standby -// Multi-hop orchestration: Standby → Paused → Running +// RestoreInstance restores an instance from standby. +// Multi-hop orchestration: Standby → Paused → Running. func (m *manager) restoreInstance( ctx context.Context, - id string, ) (_ *Instance, retErr error) { start := time.Now() @@ -247,6 +248,14 @@ func (m *manager) restoreInstance( proxyRegistered = true } + resumeNetworkHandoff, err := m.prepareResumeNetworkHandoff(ctx, stored, allocatedNet, snapshotDir) + if err != nil { + log.ErrorContext(ctx, "failed to prepare guest resume network handoff", "instance_id", id, "error", err) + releaseNetwork() + return nil, fmt.Errorf("prepare guest resume network handoff: %w", err) + } + defer resumeNetworkHandoff.Close() + // 5. Transition: Standby → Paused (start hypervisor + restore) restoreCtx, restoreSpanEnd := m.startLifecycleStep(ctx, "restore_from_snapshot", attribute.String("instance_id", id), @@ -301,15 +310,15 @@ func (m *manager) restoreInstance( attribute.String("hypervisor", string(stored.HypervisorType)), attribute.String("operation", "reconfigure_guest_network"), ) - if err := reconfigureGuestNetwork(reconfigureCtx, stored, allocatedNet); err != nil { - reconfigureSpanEnd(err) - log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", err) + reconfigureErr := resumeNetworkHandoff.AfterResume(reconfigureCtx) + reconfigureSpanEnd(reconfigureErr) + if reconfigureErr != nil { + log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", reconfigureErr) _ = hv.Shutdown(ctx) m.rollbackAdmissionAllocationActive(stored) releaseNetwork() - return nil, fmt.Errorf("configure guest network after restore: %w", err) + return nil, fmt.Errorf("configure guest network after restore: %w", reconfigureErr) } - reconfigureSpanEnd(nil) } // 8. Delete snapshot after successful restore unless the hypervisor is keeping it @@ -399,7 +408,7 @@ func (m *manager) restoreFromSnapshot( } func reconfigureGuestNetwork(ctx context.Context, stored *StoredMetadata, alloc *network.Allocation) error { - cmd, err := guestNetworkReconfigureCommand(alloc) + cfg, err := guestNetworkReconfigureConfig(alloc) if err != nil { return err } @@ -409,6 +418,30 @@ func reconfigureGuestNetwork(ctx context.Context, stored *StoredMetadata, alloc return fmt.Errorf("create vsock dialer: %w", err) } + err = guest.ReconfigureNetworkInInstance(ctx, dialer, guest.ReconfigureNetworkOptions{ + InterfaceName: "eth0", + MAC: cfg.mac, + IPv4: cfg.ip, + Prefix: uint32(cfg.prefix), + Gateway: cfg.gateway, + WaitForAgent: 120 * time.Second, + }) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return reconfigureGuestNetworkWithExec(ctx, dialer, alloc) + } + return fmt.Errorf("reconfigure guest network: %w", err) + } + + return nil +} + +func reconfigureGuestNetworkWithExec(ctx context.Context, dialer hypervisor.VsockDialer, alloc *network.Allocation) error { + cmd, err := guestNetworkReconfigureCommand(alloc) + if err != nil { + return err + } + var stdout, stderr bytes.Buffer exit, err := guest.ExecIntoInstance(ctx, dialer, guest.ExecOptions{ Command: []string{"sh", "-c", cmd}, @@ -422,30 +455,44 @@ func reconfigureGuestNetwork(ctx context.Context, stored *StoredMetadata, alloc if exit.Code != 0 { return fmt.Errorf("network reconfiguration command failed (exit=%d, stdout=%q, stderr=%q)", exit.Code, strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String())) } - return nil } -func guestNetworkReconfigureCommand(alloc *network.Allocation) (string, error) { +type guestNetworkConfig struct { + ip string + mac string + gateway string + prefix int +} + +func guestNetworkReconfigureConfig(alloc *network.Allocation) (*guestNetworkConfig, error) { if alloc == nil { - return "", fmt.Errorf("missing network allocation") + return nil, fmt.Errorf("missing network allocation") } ip := strings.TrimSpace(alloc.IP) if ip == "" { - return "", fmt.Errorf("missing network allocation IP") + return nil, fmt.Errorf("missing network allocation IP") } mac := strings.ToLower(strings.TrimSpace(alloc.MAC)) if mac == "" { - return "", fmt.Errorf("missing network allocation MAC") + return nil, fmt.Errorf("missing network allocation MAC") } if _, err := net.ParseMAC(mac); err != nil { - return "", fmt.Errorf("invalid network allocation MAC %q: %w", alloc.MAC, err) + return nil, fmt.Errorf("invalid network allocation MAC %q: %w", alloc.MAC, err) } gateway := strings.TrimSpace(alloc.Gateway) if gateway == "" { - return "", fmt.Errorf("missing network allocation gateway") + return nil, fmt.Errorf("missing network allocation gateway") } prefix, err := netmaskToPrefix(alloc.Netmask) + if err != nil { + return nil, err + } + return &guestNetworkConfig{ip: ip, mac: mac, gateway: gateway, prefix: prefix}, nil +} + +func guestNetworkReconfigureCommand(alloc *network.Allocation) (string, error) { + cfg, err := guestNetworkReconfigureConfig(alloc) if err != nil { return "", err } @@ -465,7 +512,7 @@ func guestNetworkReconfigureCommand(alloc *network.Allocation) (string, error) { "ip route replace default via %s dev eth0 && "+ // Drop snapshotted ARP/neighbor entries so peers are rediscovered. "(ip neigh flush dev eth0 || true)", - mac, ip, prefix, gateway, + cfg.mac, cfg.ip, cfg.prefix, cfg.gateway, ), nil } diff --git a/lib/instances/restore_egress_test.go b/lib/instances/restore_egress_test.go index f65b0eb1..c3983780 100644 --- a/lib/instances/restore_egress_test.go +++ b/lib/instances/restore_egress_test.go @@ -1,8 +1,13 @@ package instances import ( + "encoding/binary" + "encoding/json" + "os" "testing" + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/mailbox" "github.com/kernel/hypeman/lib/network" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,7 +35,25 @@ func TestNetworkConfigFromAllocation_PreservesDNS(t *testing.T) { assert.Equal(t, alloc.TAPDevice, cfg.TAPDevice) } -func TestGuestNetworkReconfigureCommand_AppliesAllocatedMAC(t *testing.T) { +func TestGuestNetworkReconfigureConfig_AppliesAllocatedMAC(t *testing.T) { + t.Parallel() + + alloc := &network.Allocation{ + IP: "10.102.146.62", + MAC: "02:00:00:85:17:c8", + Gateway: "10.102.0.1", + Netmask: "255.255.0.0", + } + + cfg, err := guestNetworkReconfigureConfig(alloc) + require.NoError(t, err) + assert.Equal(t, "10.102.146.62", cfg.ip) + assert.Equal(t, "02:00:00:85:17:c8", cfg.mac) + assert.Equal(t, "10.102.0.1", cfg.gateway) + assert.Equal(t, 16, cfg.prefix) +} + +func TestGuestNetworkReconfigureCommand_FallbackPreservesShellBehavior(t *testing.T) { t.Parallel() alloc := &network.Allocation{ @@ -42,18 +65,15 @@ func TestGuestNetworkReconfigureCommand_AppliesAllocatedMAC(t *testing.T) { cmd, err := guestNetworkReconfigureCommand(alloc) require.NoError(t, err) - assert.Contains(t, cmd, "ip link set dev eth0 down") assert.Contains(t, cmd, "ip link set dev eth0 address 02:00:00:85:17:c8") assert.Contains(t, cmd, "ip addr add 10.102.146.62/16 dev eth0") - assert.Contains(t, cmd, "ip route replace default via 10.102.0.1 dev eth0") assert.Contains(t, cmd, "(ip neigh flush dev eth0 || true)") - assert.NotContains(t, cmd, "cat /sys/class/net/eth0/address") } -func TestGuestNetworkReconfigureCommand_RequiresAllocatedMAC(t *testing.T) { +func TestGuestNetworkReconfigureConfig_RequiresAllocatedMAC(t *testing.T) { t.Parallel() - _, err := guestNetworkReconfigureCommand(&network.Allocation{ + _, err := guestNetworkReconfigureConfig(&network.Allocation{ IP: "10.102.146.62", Gateway: "10.102.0.1", Netmask: "255.255.0.0", @@ -62,6 +82,87 @@ func TestGuestNetworkReconfigureCommand_RequiresAllocatedMAC(t *testing.T) { assert.Contains(t, err.Error(), "missing network allocation MAC") } +func TestPatchGuestResumeNetworkMailbox(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + token := "test-token" + mem := make([]byte, 4096) + copy(mem[512:], mailbox.MailboxMagic) + copy(mem[512+len(mailbox.MailboxMagic):], token) + require.NoError(t, os.WriteFile(dir+"/"+firecrackerSnapshotMemoryFile, mem, 0644)) + + payload := &mailbox.Payload{ + InterfaceName: "eth0", + MAC: "02:00:00:85:17:c8", + IPv4: "10.102.146.62", + Prefix: 16, + Gateway: "10.102.0.1", + AckPort: 43210, + } + require.NoError(t, patchGuestResumeNetworkMailbox(dir, token, payload)) + + patched, err := os.ReadFile(dir + "/" + firecrackerSnapshotMemoryFile) + require.NoError(t, err) + + offset := 512 + require.Equal(t, uint32(1), binary.LittleEndian.Uint32(patched[offset+mailbox.MailboxSeqOffset:])) + payloadLen := binary.LittleEndian.Uint32(patched[offset+mailbox.MailboxLengthOffset:]) + require.NotZero(t, payloadLen) + + var decoded mailbox.Payload + err = json.Unmarshal(patched[offset+mailbox.MailboxPayloadOffset:offset+mailbox.MailboxPayloadOffset+int(payloadLen)], &decoded) + require.NoError(t, err) + assert.Equal(t, *payload, decoded) +} + +func TestEnsureGuestInitiatedResumeNetworkMailbox(t *testing.T) { + t.Parallel() + + stored := &StoredMetadata{ + HypervisorType: hypervisor.TypeFirecracker, + NetworkEnabled: true, + } + ensureGuestInitiatedResumeNetworkMailbox(stored) + + require.Equal(t, "1", stored.Env[mailbox.MailboxEnv]) + token := stored.Env[mailbox.MailboxTokenEnv] + require.NotEmpty(t, token) + require.LessOrEqual(t, len(token), mailbox.MailboxTokenMaxLen) + assert.True(t, guestInitiatedResumeNetworkMailbox(stored)) +} + +func TestEnsureGuestInitiatedResumeNetworkMailboxPreservesToken(t *testing.T) { + t.Parallel() + + stored := &StoredMetadata{ + HypervisorType: hypervisor.TypeFirecracker, + NetworkEnabled: true, + Env: map[string]string{ + mailbox.MailboxTokenEnv: "existing-token", + }, + } + ensureGuestInitiatedResumeNetworkMailbox(stored) + + assert.Equal(t, "1", stored.Env[mailbox.MailboxEnv]) + assert.Equal(t, "existing-token", stored.Env[mailbox.MailboxTokenEnv]) +} + +func TestEnsureGuestInitiatedResumeNetworkMailboxRequiresEligibleGuest(t *testing.T) { + t.Parallel() + + cases := []StoredMetadata{ + {HypervisorType: hypervisor.TypeCloudHypervisor, NetworkEnabled: true}, + {HypervisorType: hypervisor.TypeFirecracker, NetworkEnabled: false}, + {HypervisorType: hypervisor.TypeFirecracker, NetworkEnabled: true, SkipGuestAgent: true}, + } + for _, tc := range cases { + stored := tc + ensureGuestInitiatedResumeNetworkMailbox(&stored) + assert.False(t, guestInitiatedResumeNetworkMailbox(&stored)) + } +} + func TestRequiresRestoreConfigDiskRefresh(t *testing.T) { t.Parallel() diff --git a/lib/instances/resume_network_handoff.go b/lib/instances/resume_network_handoff.go new file mode 100644 index 00000000..073fc92e --- /dev/null +++ b/lib/instances/resume_network_handoff.go @@ -0,0 +1,82 @@ +package instances + +import ( + "context" + + "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/network" + "go.opentelemetry.io/otel/attribute" +) + +type resumeNetworkHandoff struct { + manager *manager + stored *StoredMetadata + allocatedNet *network.Allocation + ackWaiter *guestResumeNetworkUDPWaiter + ackCfg *guestNetworkConfig + patched bool +} + +func (m *manager) prepareResumeNetworkHandoff(ctx context.Context, stored *StoredMetadata, allocatedNet *network.Allocation, snapshotDir string) (*resumeNetworkHandoff, error) { + h := &resumeNetworkHandoff{ + manager: m, + stored: stored, + allocatedNet: allocatedNet, + } + if allocatedNet == nil || stored.SkipGuestAgent || !guestInitiatedResumeNetworkMailbox(stored) { + return h, nil + } + + log := logger.FromContext(ctx) + resumeNetworkCfg, err := guestNetworkReconfigureConfig(allocatedNet) + if err != nil { + log.WarnContext(ctx, "failed to build guest resume network mailbox payload; falling back to host-initiated reconfigure", "instance_id", stored.Id, "error", err) + return h, nil + } + + waiter, err := startGuestResumeNetworkUDPWaiter() + if err != nil { + log.WarnContext(ctx, "failed to start guest resume network UDP ack waiter; falling back to host-initiated reconfigure", "instance_id", stored.Id, "error", err) + return h, nil + } + + payload := newGuestResumeNetworkPayload(resumeNetworkCfg) + payload.AckPort = waiter.Port() + _, patchSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.mailbox_patch", + attribute.String("instance_id", stored.Id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "guest_resume_network_mailbox_patch"), + ) + err = patchGuestResumeNetworkMailbox(snapshotDir, guestInitiatedResumeNetworkMailboxToken(stored), &payload) + patchSpanEnd(err) + if err != nil { + waiter.Close() + log.WarnContext(ctx, "failed to patch guest resume network mailbox; falling back to host-initiated reconfigure", "instance_id", stored.Id, "error", err) + return h, nil + } + + h.ackWaiter = waiter + h.ackCfg = resumeNetworkCfg + h.patched = true + return h, nil +} + +func (h *resumeNetworkHandoff) Close() { + if h != nil && h.ackWaiter != nil { + h.ackWaiter.Close() + } +} + +func (h *resumeNetworkHandoff) AfterResume(ctx context.Context) error { + if h == nil || h.allocatedNet == nil || h.stored.SkipGuestAgent { + return nil + } + if h.patched { + err := h.manager.waitForGuestResumeNetworkUDPAck(ctx, h.ackWaiter, h.stored, h.ackCfg) + if err == nil { + return nil + } + logger.FromContext(ctx).ErrorContext(ctx, "guest resume network UDP ack wait failed; falling back to host-initiated reconfigure", "instance_id", h.stored.Id, "error", err) + } + return reconfigureGuestNetwork(ctx, h.stored, h.allocatedNet) +} diff --git a/lib/instances/start.go b/lib/instances/start.go index 181aaa8d..dd550a96 100644 --- a/lib/instances/start.go +++ b/lib/instances/start.go @@ -141,6 +141,7 @@ func (m *manager) startInstance( }) } } + ensureGuestInitiatedResumeNetworkMailbox(stored) // 4b. Recreate vGPU mdev if this instance had a GPU profile // Note: GPU availability was already validated in step 2b diff --git a/lib/instances/test_network_config_test.go b/lib/instances/test_network_config_test.go index c26efcdb..419f9fe8 100644 --- a/lib/instances/test_network_config_test.go +++ b/lib/instances/test_network_config_test.go @@ -183,7 +183,7 @@ func allocateTestNetworkLease(testName string, seq uint32) (*testNetworkLease, e } func withTestSubnetLock(fn func() error) error { - lockPath := filepath.Join(os.TempDir(), "hypeman-test-network.lock") + lockPath := filepath.Join(testNetworkTempDir(), "hypeman-test-network.lock") lockFile, err := openTestSubnetLockFile(lockPath) if err != nil { return fmt.Errorf("open subnet lock file: %w", err) @@ -220,7 +220,14 @@ func openTestSubnetLockFile(lockPath string) (*os.File, error) { } func testSubnetLeaseFilePath() string { - return filepath.Join(os.TempDir(), "hypeman-test-network-leases.json") + return filepath.Join(testNetworkTempDir(), "hypeman-test-network-leases.json") +} + +func testNetworkTempDir() string { + if dir := os.Getenv("HYPEMAN_TEST_NETWORK_TMPDIR"); dir != "" { + return dir + } + return os.TempDir() } func loadSubnetLeases() (map[string]subnetLease, error) { diff --git a/lib/mailbox/README.md b/lib/mailbox/README.md new file mode 100644 index 00000000..3e834b7c --- /dev/null +++ b/lib/mailbox/README.md @@ -0,0 +1,5 @@ +# mailbox + +The resume network handoff uses a small pre-armed mailbox in guest memory to avoid making the first post-resume operation a host-initiated guest RPC. When a Firecracker guest boots with mailbox env enabled, the guest agent allocates and locks a fixed-size buffer containing a magic marker and token. That buffer is captured in standby snapshots. + +Before restore, the host finds the marker in the snapshot memory file, writes a JSON network payload into the buffer, and flips the sequence field. After resume, VMGenID wakes the guest watcher, which reads the payload, applies the new network identity locally, and sends a UDP applied ack to the host. If the mailbox is missing, cannot be patched, or does not ack in time, restore falls back to the host-initiated network reconfigure path. diff --git a/lib/mailbox/mailbox.go b/lib/mailbox/mailbox.go new file mode 100644 index 00000000..832f852f --- /dev/null +++ b/lib/mailbox/mailbox.go @@ -0,0 +1,64 @@ +package mailbox + +import ( + "encoding/json" + "fmt" +) + +const MailboxEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX" +const MailboxTokenEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX_TOKEN" + +const MailboxSize = 4096 +const MailboxMagic = "HYPEMAN_RESUME_NETWORK_MAILBOX_V1\x00" +const MailboxSeqOffset = 64 +const MailboxLengthOffset = 68 +const MailboxPayloadOffset = 72 +const MailboxTokenMaxLen = MailboxSeqOffset - len(MailboxMagic) + +type Payload struct { + InterfaceName string `json:"interface_name"` + MAC string `json:"mac"` + IPv4 string `json:"ipv4"` + Prefix uint32 `json:"prefix"` + Gateway string `json:"gateway"` + AckPort uint32 `json:"ack_port,omitempty"` +} + +func ValidToken(token string) bool { + return token != "" && len(token) <= MailboxTokenMaxLen +} + +func Marker(token string) ([]byte, error) { + if !ValidToken(token) { + return nil, fmt.Errorf("resume network mailbox token is invalid") + } + marker := make([]byte, 0, len(MailboxMagic)+len(token)) + marker = append(marker, MailboxMagic...) + marker = append(marker, token...) + return marker, nil +} + +func MarshalPayload(payload *Payload) ([]byte, error) { + if payload == nil { + return nil, fmt.Errorf("resume network mailbox payload is nil") + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("marshal resume network mailbox payload: %w", err) + } + if len(payloadBytes) > MailboxSize-MailboxPayloadOffset { + return nil, fmt.Errorf("resume network mailbox payload too large: %d bytes", len(payloadBytes)) + } + return payloadBytes, nil +} + +func DecodePayloadFrame(buf []byte, payloadLen uint32) (Payload, error) { + if payloadLen == 0 || int(payloadLen) > len(buf)-MailboxPayloadOffset { + return Payload{}, fmt.Errorf("invalid mailbox payload length %d", payloadLen) + } + var payload Payload + if err := json.Unmarshal(buf[MailboxPayloadOffset:MailboxPayloadOffset+int(payloadLen)], &payload); err != nil { + return Payload{}, fmt.Errorf("decode mailbox payload: %w", err) + } + return payload, nil +} diff --git a/lib/mailbox/mailbox_test.go b/lib/mailbox/mailbox_test.go new file mode 100644 index 00000000..db0b5c2b --- /dev/null +++ b/lib/mailbox/mailbox_test.go @@ -0,0 +1,47 @@ +package mailbox + +import ( + "encoding/binary" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMarker(t *testing.T) { + t.Parallel() + + marker, err := Marker("token") + require.NoError(t, err) + assert.Equal(t, MailboxMagic+"token", string(marker)) + + _, err = Marker("") + require.Error(t, err) + + _, err = Marker(strings.Repeat("x", MailboxTokenMaxLen+1)) + require.Error(t, err) +} + +func TestPayloadRoundTrip(t *testing.T) { + t.Parallel() + + want := &Payload{ + InterfaceName: "eth0", + MAC: "02:00:00:85:17:c8", + IPv4: "10.102.146.62", + Prefix: 16, + Gateway: "10.102.0.1", + AckPort: 43210, + } + payloadBytes, err := MarshalPayload(want) + require.NoError(t, err) + + buf := make([]byte, MailboxSize) + copy(buf[MailboxPayloadOffset:], payloadBytes) + binary.LittleEndian.PutUint32(buf[MailboxLengthOffset:], uint32(len(payloadBytes))) + + got, err := DecodePayloadFrame(buf, binary.LittleEndian.Uint32(buf[MailboxLengthOffset:])) + require.NoError(t, err) + assert.Equal(t, *want, got) +} diff --git a/lib/system/guest_agent/main.go b/lib/system/guest_agent/main.go index 46ee3d7a..a968e89a 100644 --- a/lib/system/guest_agent/main.go +++ b/lib/system/guest_agent/main.go @@ -53,8 +53,10 @@ func main() { } // Create gRPC server + guestSvc := &guestServer{} + startResumeNetworkWatcher(guestSvc) grpcServer := grpc.NewServer() - pb.RegisterGuestServiceServer(grpcServer, &guestServer{}) + pb.RegisterGuestServiceServer(grpcServer, guestSvc) // Serve gRPC over vsock if err := grpcServer.Serve(l); err != nil { diff --git a/lib/system/guest_agent/network.go b/lib/system/guest_agent/network.go new file mode 100644 index 00000000..66b71bad --- /dev/null +++ b/lib/system/guest_agent/network.go @@ -0,0 +1,91 @@ +package main + +import ( + "context" + "fmt" + "net" + + pb "github.com/kernel/hypeman/lib/guest" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" +) + +func (s *guestServer) ReconfigureNetwork(_ context.Context, req *pb.ReconfigureNetworkRequest) (*pb.ReconfigureNetworkResponse, error) { + iface := req.InterfaceName + if iface == "" { + iface = "eth0" + } + + link, err := netlink.LinkByName(iface) + if err != nil { + return nil, fmt.Errorf("find interface %s: %w", iface, err) + } + + mac, err := net.ParseMAC(req.Mac) + if err != nil { + return nil, fmt.Errorf("parse mac %q: %w", req.Mac, err) + } + ip := net.ParseIP(req.Ipv4).To4() + if ip == nil { + return nil, fmt.Errorf("parse ipv4 %q", req.Ipv4) + } + if req.Prefix > 32 { + return nil, fmt.Errorf("invalid ipv4 prefix %d", req.Prefix) + } + gateway := net.ParseIP(req.Gateway).To4() + if gateway == nil { + return nil, fmt.Errorf("parse gateway %q", req.Gateway) + } + + if err := netlink.LinkSetDown(link); err != nil { + return nil, fmt.Errorf("set %s down: %w", iface, err) + } + if err := netlink.LinkSetHardwareAddr(link, mac); err != nil { + return nil, fmt.Errorf("set %s mac: %w", iface, err) + } + if err := flushIPv4Addrs(link); err != nil { + return nil, err + } + addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip, Mask: net.CIDRMask(int(req.Prefix), 32)}} + if err := netlink.AddrAdd(link, addr); err != nil { + return nil, fmt.Errorf("add ipv4 address: %w", err) + } + if err := netlink.LinkSetUp(link); err != nil { + return nil, fmt.Errorf("set %s up: %w", iface, err) + } + if err := netlink.RouteReplace(&netlink.Route{ + LinkIndex: link.Attrs().Index, + Gw: gateway, + }); err != nil { + return nil, fmt.Errorf("replace default route: %w", err) + } + _ = flushNeighbors(link) + + return &pb.ReconfigureNetworkResponse{}, nil +} + +func flushIPv4Addrs(link netlink.Link) error { + addrs, err := netlink.AddrList(link, unix.AF_INET) + if err != nil { + return fmt.Errorf("list ipv4 addresses: %w", err) + } + for _, addr := range addrs { + if err := netlink.AddrDel(link, &addr); err != nil { + return fmt.Errorf("delete ipv4 address %s: %w", addr.String(), err) + } + } + return nil +} + +func flushNeighbors(link netlink.Link) error { + neighbors, err := netlink.NeighList(link.Attrs().Index, unix.AF_INET) + if err != nil { + return fmt.Errorf("list neighbors: %w", err) + } + for _, neighbor := range neighbors { + if err := netlink.NeighDel(&neighbor); err != nil { + return fmt.Errorf("delete neighbor %s: %w", neighbor.String(), err) + } + } + return nil +} diff --git a/lib/system/guest_agent/resume_network.go b/lib/system/guest_agent/resume_network.go new file mode 100644 index 00000000..f8708976 --- /dev/null +++ b/lib/system/guest_agent/resume_network.go @@ -0,0 +1,185 @@ +//go:build linux + +package main + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "os" + "strconv" + "strings" + "sync/atomic" + "time" + "unsafe" + + pb "github.com/kernel/hypeman/lib/guest" + "github.com/kernel/hypeman/lib/mailbox" + "golang.org/x/sys/unix" +) + +const vmgenIDKmsgSignal = "crng reseeded due to virtual machine fork" +const resumeNetworkMailboxPayloadTimeout = 5 * time.Second + +type vmGenIDResumeWaiter struct { + file *os.File + reader *bufio.Reader +} + +func startResumeNetworkWatcher(s *guestServer) { + if strings.TrimSpace(os.Getenv(mailbox.MailboxEnv)) != "1" { + return + } + + mailbox := newResumeNetworkMailbox() + if mailbox == nil { + return + } + + go resumeNetworkMailboxLoop(s, mailbox) +} + +func newResumeNetworkMailbox() []byte { + token := strings.TrimSpace(os.Getenv(mailbox.MailboxTokenEnv)) + if !mailbox.ValidToken(token) { + log.Printf("[guest-agent] resume network mailbox disabled: invalid %s", mailbox.MailboxTokenEnv) + return nil + } + + buf := make([]byte, mailbox.MailboxSize) + copy(buf, mailbox.MailboxMagic) + copy(buf[len(mailbox.MailboxMagic):mailbox.MailboxSeqOffset], token) + if err := unix.Mlock(buf); err != nil { + log.Printf("[guest-agent] resume network mailbox mlock failed: %v", err) + } + log.Printf("[guest-agent] resume network mailbox armed token=%s", token) + return buf +} + +func resumeNetworkMailboxLoop(s *guestServer, mailbox []byte) { + for { + waiter, err := newVMGenIDResumeWaiter() + if err != nil { + log.Printf("[guest-agent] resume network VMGenID prepare failed: %v", err) + time.Sleep(100 * time.Millisecond) + continue + } + + start := time.Now() + if err := waiter.Wait(); err != nil { + waiter.Close() + log.Printf("[guest-agent] resume network VMGenID wait failed: %v", err) + time.Sleep(100 * time.Millisecond) + continue + } + waiter.Close() + + if err := waitAndApplyResumeNetworkMailbox(s, mailbox); err != nil { + log.Printf("[guest-agent] resume network mailbox apply failed: %v", err) + time.Sleep(25 * time.Millisecond) + continue + } + log.Printf("[guest-agent] resume network mailbox applied in %s", time.Since(start)) + } +} + +func waitAndApplyResumeNetworkMailbox(s *guestServer, buf []byte) error { + return waitAndApplyResumeNetworkMailboxWithTimeout(s, buf, resumeNetworkMailboxPayloadTimeout) +} + +func waitAndApplyResumeNetworkMailboxWithTimeout(s *guestServer, buf []byte, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for { + seq := atomicLoadUint32(buf[mailbox.MailboxSeqOffset:]) + if seq == 0 { + if time.Now().After(deadline) { + return fmt.Errorf("resume network mailbox payload was not patched within %s", timeout) + } + time.Sleep(100 * time.Microsecond) + continue + } + + payloadLen := binary.LittleEndian.Uint32(buf[mailbox.MailboxLengthOffset:]) + payload, err := mailbox.DecodePayloadFrame(buf, payloadLen) + if err != nil { + return err + } + + _, err = s.ReconfigureNetwork(context.Background(), &pb.ReconfigureNetworkRequest{ + InterfaceName: payload.InterfaceName, + Mac: payload.MAC, + Ipv4: payload.IPv4, + Prefix: payload.Prefix, + Gateway: payload.Gateway, + }) + if err != nil { + return err + } + sendResumeNetworkAck(payload, "applied") + atomicStoreUint32(buf[mailbox.MailboxSeqOffset:], 0) + return nil + } +} + +func sendResumeNetworkAck(payload mailbox.Payload, stage string) { + if payload.AckPort == 0 || payload.Gateway == "" { + return + } + + addr := net.JoinHostPort(payload.Gateway, strconv.FormatUint(uint64(payload.AckPort), 10)) + conn, err := net.DialTimeout("udp4", addr, 100*time.Millisecond) + if err != nil { + log.Printf("[guest-agent] resume network ack dial failed: %v", err) + return + } + defer conn.Close() + + _, _ = fmt.Fprintf(conn, "stage=%s mac=%s ip=%s\n", stage, payload.MAC, payload.IPv4) +} + +func atomicLoadUint32(buf []byte) uint32 { + return atomic.LoadUint32((*uint32)(unsafe.Pointer(&buf[0]))) +} + +func atomicStoreUint32(buf []byte, value uint32) { + atomic.StoreUint32((*uint32)(unsafe.Pointer(&buf[0])), value) +} + +func newVMGenIDResumeWaiter() (*vmGenIDResumeWaiter, error) { + f, err := os.Open("/dev/kmsg") + if err != nil { + return nil, fmt.Errorf("open /dev/kmsg: %w", err) + } + + if _, err := f.Seek(0, io.SeekEnd); err != nil { + log.Printf("[guest-agent] warning: failed to seek /dev/kmsg to end: %v", err) + } + + return &vmGenIDResumeWaiter{ + file: f, + reader: bufio.NewReader(f), + }, nil +} + +func (w *vmGenIDResumeWaiter) Close() { + if w == nil || w.file == nil { + return + } + _ = w.file.Close() +} + +func (w *vmGenIDResumeWaiter) Wait() error { + for { + line, err := w.reader.ReadString('\n') + if err != nil { + return fmt.Errorf("read /dev/kmsg: %w", err) + } + if strings.Contains(line, vmgenIDKmsgSignal) { + return nil + } + } +} diff --git a/lib/system/guest_agent/resume_network_other.go b/lib/system/guest_agent/resume_network_other.go new file mode 100644 index 00000000..dfa74e07 --- /dev/null +++ b/lib/system/guest_agent/resume_network_other.go @@ -0,0 +1,7 @@ +//go:build !linux + +package main + +type resumeNetworkController struct{} + +func startResumeNetworkWatcher(_ *guestServer) {} diff --git a/lib/system/guest_agent/resume_network_test.go b/lib/system/guest_agent/resume_network_test.go new file mode 100644 index 00000000..9e30a533 --- /dev/null +++ b/lib/system/guest_agent/resume_network_test.go @@ -0,0 +1,20 @@ +//go:build linux + +package main + +import ( + "testing" + "time" + + "github.com/kernel/hypeman/lib/mailbox" + "github.com/stretchr/testify/require" +) + +func TestWaitAndApplyResumeNetworkMailboxTimesOutWhenPayloadMissing(t *testing.T) { + buf := make([]byte, mailbox.MailboxSize) + + err := waitAndApplyResumeNetworkMailboxWithTimeout(&guestServer{}, buf, 5*time.Millisecond) + + require.Error(t, err) + require.Contains(t, err.Error(), "payload was not patched") +} diff --git a/lib/system/versions.go b/lib/system/versions.go index aa07108d..7743add0 100644 --- a/lib/system/versions.go +++ b/lib/system/versions.go @@ -20,14 +20,18 @@ const ( // Kernel_202603301 is the current kernel version with expanded nftables/raw support for Docker bridge networking Kernel_202603301 KernelVersion = "ch-6.12.8-kernel-1.6-202603301" + + // Kernel_202605291 is the current kernel version with VMGenID support for snapshot resume detection + Kernel_202605291 KernelVersion = "ch-6.12.8-kernel-3.0-202605291" ) var ( // DefaultKernelVersion is the kernel version used for new instances - DefaultKernelVersion = Kernel_202603301 + DefaultKernelVersion = Kernel_202605291 // SupportedKernelVersions lists all supported kernel versions SupportedKernelVersions = []KernelVersion{ + Kernel_202605291, Kernel_202603301, Kernel_202603091, Kernel_202602101, @@ -37,6 +41,10 @@ var ( // KernelDownloadURLs maps kernel versions and architectures to download URLs var KernelDownloadURLs = map[KernelVersion]map[string]string{ + Kernel_202605291: { + "x86_64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-3.0-202605291/vmlinux-x86_64", + "aarch64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-3.0-202605291/Image-arm64", + }, Kernel_202603301: { "x86_64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-1.6-202603301/vmlinux-x86_64", "aarch64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-1.6-202603301/Image-arm64", @@ -58,6 +66,10 @@ var KernelDownloadURLs = map[KernelVersion]map[string]string{ // KernelHeaderURLs maps kernel versions and architectures to kernel header tarball URLs // These tarballs contain kernel headers needed for DKMS to build out-of-tree modules (e.g., NVIDIA vGPU drivers) var KernelHeaderURLs = map[KernelVersion]map[string]string{ + Kernel_202605291: { + "x86_64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-3.0-202605291/kernel-headers-x86_64.tar.gz", + "aarch64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-3.0-202605291/kernel-headers-aarch64.tar.gz", + }, Kernel_202603301: { "x86_64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-1.6-202603301/kernel-headers-x86_64.tar.gz", "aarch64": "https://github.com/kernel/linux/releases/download/ch-6.12.8-kernel-1.6-202603301/kernel-headers-aarch64.tar.gz",