Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions lib/instances/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,19 @@ func (m *manager) restoreInstance(
log.InfoContext(ctx, "allocating fresh network identity for standby restore",
"instance_id", id, "network", "default",
"download_bps", stored.NetworkBandwidthDownload, "upload_bps", stored.NetworkBandwidthUpload)
netConfig, err := m.networkManager.CreateAllocation(networkCtx, network.AllocateRequest{
allocateCtx, allocateSpanEnd := m.startLifecycleStep(networkCtx, "restore_network.create_allocation",
attribute.String("instance_id", id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "restore_network_create_allocation"),
)
netConfig, err := m.networkManager.CreateAllocation(allocateCtx, network.AllocateRequest{
InstanceID: id,
InstanceName: stored.Name,
DownloadBps: stored.NetworkBandwidthDownload,
UploadBps: stored.NetworkBandwidthUpload,
UploadCeilBps: stored.NetworkBandwidthUpload * int64(m.networkManager.GetUploadBurstMultiplier()),
})
allocateSpanEnd(err)
if err != nil {
networkSpanEnd(err)
log.ErrorContext(ctx, "failed to allocate network", "instance_id", id, "error", err)
Expand All @@ -157,7 +163,12 @@ func (m *manager) restoreInstance(
stored.IP = netConfig.IP
stored.MAC = netConfig.MAC

if _, err := starter.PrepareFork(networkCtx, hypervisor.ForkPrepareRequest{
prepareForkCtx, prepareForkSpanEnd := m.startLifecycleStep(networkCtx, "restore_network.prepare_fork_network",
attribute.String("instance_id", id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "restore_network_prepare_fork_network"),
)
_, err = starter.PrepareFork(prepareForkCtx, hypervisor.ForkPrepareRequest{
SnapshotConfigPath: m.paths.InstanceSnapshotConfig(id),
VsockCID: stored.VsockCID,
VsockSocket: stored.VsockSocket,
Expand All @@ -167,7 +178,9 @@ func (m *manager) restoreInstance(
MAC: netConfig.MAC,
Netmask: netConfig.Netmask,
},
}); err != nil {
})
prepareForkSpanEnd(err)
if err != nil {
networkSpanEnd(err)
if errors.Is(err, hypervisor.ErrNotSupported) {
log.ErrorContext(ctx, "forked standby network rewrite not supported for hypervisor", "instance_id", id, "hypervisor", stored.HypervisorType)
Expand All @@ -181,7 +194,14 @@ func (m *manager) restoreInstance(
} else {
log.InfoContext(ctx, "recreating network for restore", "instance_id", id, "network", "default",
"download_bps", stored.NetworkBandwidthDownload, "upload_bps", stored.NetworkBandwidthUpload)
if err := m.networkManager.RecreateAllocation(networkCtx, id, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload); err != nil {
recreateCtx, recreateSpanEnd := m.startLifecycleStep(networkCtx, "restore_network.recreate_allocation",
attribute.String("instance_id", id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "restore_network_recreate_allocation"),
)
err := m.networkManager.RecreateAllocation(recreateCtx, id, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload)
recreateSpanEnd(err)
if err != nil {
networkSpanEnd(err)
log.ErrorContext(ctx, "failed to recreate network", "instance_id", id, "error", err)
return nil, fmt.Errorf("recreate network: %w", err)
Expand Down
80 changes: 74 additions & 6 deletions lib/network/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/kernel/hypeman/lib/logger"
"go.opentelemetry.io/otel/attribute"
)

// CreateAllocation allocates IP/MAC/TAP for instance on the default network
Expand All @@ -19,7 +20,12 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N

// Resolve bridge/default network before taking allocation lock so
// self-heal retries don't block other allocation/release operations.
network, err := m.getOrInitDefaultNetwork(ctx)
networkCtx, networkSpanEnd := startNetworkStep(ctx, "network.get_default_network",
attribute.String("operation", "get_default_network"),
attribute.String("instance_id", req.InstanceID),
)
network, err := m.getOrInitDefaultNetwork(networkCtx)
networkSpanEnd(err)
if err != nil {
return nil, err
}
Expand All @@ -30,13 +36,23 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N
m.mu.Lock()
defer m.mu.Unlock()

allocations, err := m.ListAllocations(ctx)
listCtx, listSpanEnd := startNetworkStep(ctx, "network.list_allocations",
attribute.String("operation", "list_allocations"),
attribute.String("instance_id", req.InstanceID),
)
allocations, err := m.ListAllocations(listCtx)
listSpanEnd(err)
if err != nil {
return nil, fmt.Errorf("list allocations: %w", err)
}

// 1. Check name uniqueness (exclude current instance to allow restarts)
_, nameSpanEnd := startNetworkStep(ctx, "network.name_exists",
attribute.String("operation", "name_exists"),
attribute.String("instance_id", req.InstanceID),
)
exists := nameExistsInAllocations(allocations, req.InstanceName, req.InstanceID)
nameSpanEnd(nil)
if exists {
return nil, fmt.Errorf("%w: instance name '%s' already exists, can't assign into same network: %s",
ErrNameExists, req.InstanceName, network.Name)
Expand All @@ -46,13 +62,23 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N
// Random selection reduces predictability and helps distribute IPs across the subnet.
// This is especially useful for large /16 networks and reduces conflicts when
// moving standby VMs across hosts.
_, ipSpanEnd := startNetworkStep(ctx, "network.allocate_ip",
attribute.String("operation", "allocate_ip"),
attribute.String("instance_id", req.InstanceID),
)
ip, err := allocateNextIPFromAllocations(network.Subnet, allocations)
ipSpanEnd(err)
if err != nil {
return nil, fmt.Errorf("allocate IP: %w", err)
}

// 3. Generate unused MAC (02:00:00:... format - locally administered)
_, macSpanEnd := startNetworkStep(ctx, "network.allocate_mac",
attribute.String("operation", "allocate_mac"),
attribute.String("instance_id", req.InstanceID),
)
mac, err := allocateUniqueMACFromAllocations(allocations, generateMAC)
macSpanEnd(err)
if err != nil {
return nil, fmt.Errorf("allocate MAC: %w", err)
}
Expand All @@ -61,21 +87,37 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N
tap := GenerateTAPName(req.InstanceID)

// 5. Create TAP device with bidirectional rate limiting
classID, err := m.createTAPDevice(ctx, tap, network.Bridge, network.Isolated, req.DownloadBps, req.UploadBps, req.UploadCeilBps)
tapCtx, tapSpanEnd := startNetworkStep(ctx, "network.create_tap",
attribute.String("operation", "create_tap"),
attribute.String("instance_id", req.InstanceID),
attribute.String("tap", tap),
attribute.Bool("isolated", network.Isolated),
attribute.Bool("download_rate_limit", req.DownloadBps > 0),
attribute.Bool("upload_rate_limit", req.UploadBps > 0),
)
classID, err := m.createTAPDevice(tapCtx, tap, network.Bridge, network.Isolated, req.DownloadBps, req.UploadBps, req.UploadCeilBps)
tapSpanEnd(err)
if err != nil {
return nil, fmt.Errorf("create TAP device: %w", err)
}
m.recordTAPOperation(ctx, "create")

// Persist assigned tc class ID so removal uses the correct ID after collisions.
// Clear any stale file when no rate limiting was applied.
_, saveClassSpanEnd := startNetworkStep(ctx, "network.save_class_id",
attribute.String("operation", "save_class_id"),
attribute.String("instance_id", req.InstanceID),
attribute.Bool("has_class_id", classID != ""),
)
if classID != "" {
if err := m.saveClassID(req.InstanceID, classID); err != nil {
saveClassSpanEnd(err)
return nil, fmt.Errorf("save class ID: %w", err)
}
} else {
m.clearClassID(req.InstanceID)
}
saveClassSpanEnd(nil)

log.InfoContext(ctx, "allocated network",
"instance_id", req.InstanceID,
Expand Down Expand Up @@ -111,7 +153,12 @@ func (m *manager) RecreateAllocation(ctx context.Context, instanceID string, dow
log := logger.FromContext(ctx)

// 1. Derive allocation from snapshot
alloc, err := m.deriveAllocation(ctx, instanceID)
deriveCtx, deriveSpanEnd := startNetworkStep(ctx, "network.derive_allocation",
attribute.String("operation", "derive_allocation"),
attribute.String("instance_id", instanceID),
)
alloc, err := m.deriveAllocation(deriveCtx, instanceID)
deriveSpanEnd(err)
if err != nil {
return fmt.Errorf("derive allocation: %w", err)
}
Expand All @@ -121,28 +168,49 @@ func (m *manager) RecreateAllocation(ctx context.Context, instanceID string, dow
}

// 2. Get default network details (same self-healing behavior as CreateAllocation).
network, err := m.getOrInitDefaultNetwork(ctx)
networkCtx, networkSpanEnd := startNetworkStep(ctx, "network.get_default_network",
attribute.String("operation", "get_default_network"),
attribute.String("instance_id", instanceID),
)
network, err := m.getOrInitDefaultNetwork(networkCtx)
networkSpanEnd(err)
if err != nil {
return err
}

// 3. Recreate TAP device with same name and rate limits from instance metadata
uploadCeilBps := uploadBps * int64(m.GetUploadBurstMultiplier())
classID, err := m.createTAPDevice(ctx, alloc.TAPDevice, network.Bridge, network.Isolated, downloadBps, uploadBps, uploadCeilBps)
tapCtx, tapSpanEnd := startNetworkStep(ctx, "network.create_tap",
attribute.String("operation", "create_tap"),
attribute.String("instance_id", instanceID),
attribute.String("tap", alloc.TAPDevice),
attribute.Bool("isolated", network.Isolated),
attribute.Bool("download_rate_limit", downloadBps > 0),
attribute.Bool("upload_rate_limit", uploadBps > 0),
)
classID, err := m.createTAPDevice(tapCtx, alloc.TAPDevice, network.Bridge, network.Isolated, downloadBps, uploadBps, uploadCeilBps)
tapSpanEnd(err)
if err != nil {
return fmt.Errorf("create TAP device: %w", err)
}
m.recordTAPOperation(ctx, "create")

// Persist assigned tc class ID so removal uses the correct ID after collisions.
// Clear any stale file when no rate limiting was applied.
_, saveClassSpanEnd := startNetworkStep(ctx, "network.save_class_id",
attribute.String("operation", "save_class_id"),
attribute.String("instance_id", instanceID),
attribute.Bool("has_class_id", classID != ""),
)
if classID != "" {
if err := m.saveClassID(instanceID, classID); err != nil {
saveClassSpanEnd(err)
return fmt.Errorf("save class ID: %w", err)
}
} else {
m.clearClassID(instanceID)
}
saveClassSpanEnd(nil)

log.InfoContext(ctx, "recreated network for restore",
"instance_id", instanceID,
Expand Down
72 changes: 63 additions & 9 deletions lib/network/bridge_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/kernel/hypeman/lib/logger"
"github.com/vishvananda/netlink"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -500,9 +501,21 @@ func (m *manager) lastHypemanForwardRulePosition() int {
// Returns the tc class ID actually assigned (empty if no upload rate limiting).
func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool, downloadBps, uploadBps, uploadCeilBps int64) (string, error) {
// 1. Check if TAP already exists
if _, err := netlink.LinkByName(tapName); err == nil {
_, linkCheckSpanEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_existing",
attribute.String("operation", "link_lookup_existing"),
attribute.String("tap", tapName),
)
_, err := netlink.LinkByName(tapName)
linkCheckSpanEnd(nil)
if err == nil {
// TAP already exists, delete it first
if err := m.deleteTAPDevice(tapName, ""); err != nil {
_, deleteSpanEnd := startNetworkStep(ctx, "network.create_tap.delete_existing",
attribute.String("operation", "delete_existing_tap"),
attribute.String("tap", tapName),
)
err := m.deleteTAPDevice(tapName, "")
deleteSpanEnd(err)
if err != nil {
return "", fmt.Errorf("delete existing TAP: %w", err)
}
}
Expand All @@ -521,46 +534,87 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin
Group: uint32(gid),
}

if err := netlink.LinkAdd(tap); err != nil {
_, linkAddSpanEnd := startNetworkStep(ctx, "network.create_tap.link_add",
attribute.String("operation", "link_add"),
attribute.String("tap", tapName),
)
err = netlink.LinkAdd(tap)
linkAddSpanEnd(err)
if err != nil {
return "", fmt.Errorf("create TAP device: %w", err)
}

// 3. Set TAP up
tapLink := tap

if err := netlink.LinkSetUp(tapLink); err != nil {
_, setUpSpanEnd := startNetworkStep(ctx, "network.create_tap.link_set_up",
attribute.String("operation", "link_set_up"),
attribute.String("tap", tapName),
)
err = netlink.LinkSetUp(tapLink)
setUpSpanEnd(err)
if err != nil {
return "", fmt.Errorf("set TAP up: %w", err)
}

// 4. Attach TAP to bridge
_, bridgeLookupSpanEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_bridge",
attribute.String("operation", "link_lookup_bridge"),
attribute.String("bridge", bridgeName),
)
bridge, err := netlink.LinkByName(bridgeName)
bridgeLookupSpanEnd(err)
if err != nil {
return "", fmt.Errorf("get bridge: %w", err)
}

if err := netlink.LinkSetMaster(tapLink, bridge); err != nil {
_, setMasterSpanEnd := startNetworkStep(ctx, "network.create_tap.link_set_master",
attribute.String("operation", "link_set_master"),
attribute.String("tap", tapName),
attribute.String("bridge", bridgeName),
)
err = netlink.LinkSetMaster(tapLink, bridge)
setMasterSpanEnd(err)
if err != nil {
return "", fmt.Errorf("attach TAP to bridge: %w", err)
}

// 5. Enable port isolation so isolated TAPs can't directly talk to each other (requires kernel support and capabilities)
if isolated {
if err := netlink.LinkSetIsolated(tapLink, true); err != nil {
_, isolationSpanEnd := startNetworkStep(ctx, "network.create_tap.set_isolation",
attribute.String("operation", "set_isolation"),
attribute.String("tap", tapName),
)
err = netlink.LinkSetIsolated(tapLink, true)
isolationSpanEnd(err)
if err != nil {
return "", fmt.Errorf("set isolation mode: %w", err)
}
}

// 6. Apply download rate limiting (TBF on TAP egress)
if downloadBps > 0 {
if err := m.applyDownloadRateLimit(tapName, downloadBps); err != nil {
_, downloadSpanEnd := startNetworkStep(ctx, "network.create_tap.download_rate_limit",
attribute.String("operation", "download_rate_limit"),
attribute.String("tap", tapName),
)
err := m.applyDownloadRateLimit(tapName, downloadBps)
downloadSpanEnd(err)
if err != nil {
return "", fmt.Errorf("apply download rate limit: %w", err)
}
}

// 7. Apply upload rate limiting (HTB class on bridge)
var classID string
if uploadBps > 0 {
var err error
classID, err = m.addVMClass(ctx, bridgeName, tapName, uploadBps, uploadCeilBps)
uploadCtx, uploadSpanEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit",
attribute.String("operation", "upload_rate_limit"),
attribute.String("tap", tapName),
attribute.String("bridge", bridgeName),
)
classID, err = m.addVMClass(uploadCtx, bridgeName, tapName, uploadBps, uploadCeilBps)
uploadSpanEnd(err)
if err != nil {
return "", fmt.Errorf("apply upload rate limit: %w", err)
}
Expand Down
Loading
Loading