Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cmd/ateapi/internal/controlapi/workflow_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
"github.com/agent-substrate/substrate/internal/proto/ateletpb"
"github.com/agent-substrate/substrate/internal/resources"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
Expand Down Expand Up @@ -241,6 +242,12 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
return err
}

// Validate that the ActorTemplate's volume references are satisfied by the
// assigned WorkerPool's storageVolumes.
if err := s.validateVolumeRefs(ctx, state); err != nil {
return err
}

if state.Actor.GetLatestSnapshotInfo().GetType() != ateapipb.SnapshotType_SNAPSHOT_TYPE_UNSPECIFIED {
slog.InfoContext(ctx, "Actor has snapshot; Restoring from snapshot")

Expand Down Expand Up @@ -329,6 +336,33 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,

func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff { return nil }

// validateVolumeRefs checks that every volume referenced by the ActorTemplate
// exists in the assigned WorkerPool's storageVolumes.
func (s *CallAteletRestoreStep) validateVolumeRefs(ctx context.Context, state *ResumeState) error {
if len(state.ActorTemplate.Spec.Volumes) == 0 {
return nil
}
pool, err := s.workerPoolLister.WorkerPools(state.Actor.GetAteomPodNamespace()).Get(state.Actor.GetWorkerPoolName())
if err != nil {
return fmt.Errorf("while getting worker pool for volume validation: %w", err)
}
poolVolumes := make(map[string]bool, len(pool.Spec.StorageVolumes))
for _, sv := range pool.Spec.StorageVolumes {
poolVolumes[sv.Name] = true
}
for _, v := range state.ActorTemplate.Spec.Volumes {
if !poolVolumes[v.Name] {
return status.Errorf(codes.FailedPrecondition,
"actor template references volume %q not provided by worker pool %s",
v.Name, pool.Name)
}
if err := resources.ValidateVolumeMountPath(v.MountPath); err != nil {
return status.Errorf(codes.InvalidArgument, "invalid volume mount: %v", err)
}
}
return nil
}

type FinalizeRunningStep struct {
store store.Interface
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/ateapi/internal/controlapi/workload_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.In
workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr)
}

// Convert ActorTemplate volumes to proto VolumeMounts.
for _, v := range actorTemplate.Spec.Volumes {
workloadSpec.VolumeMounts = append(workloadSpec.VolumeMounts, &ateletpb.VolumeMount{
Name: v.Name,
MountPath: v.MountPath,
ReadOnly: v.ReadOnly,
SubPath: v.SubPath,
})
}

return workloadSpec, nil
}

Expand Down
35 changes: 35 additions & 0 deletions cmd/atecontroller/internal/controllers/workerpool_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package controllers

import (
"path/filepath"

corev1 "k8s.io/api/core/v1"
appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
Expand Down Expand Up @@ -59,6 +61,39 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment
WithPath(ateompath.BasePath).
WithType(corev1.HostPathDirectoryOrCreate)))

// Add storage volumes from WorkerPool spec. Each volume is mounted into
// the ateom container at a well-known path so atelet can create OCI bind
// mounts from it into actor sandboxes.
for _, sv := range wp.Spec.StorageVolumes {
vol := corev1ac.Volume().WithName(sv.Name)
switch {
case sv.NFS != nil:
vol.WithNFS(corev1ac.NFSVolumeSource().
WithServer(sv.NFS.Server).
WithPath(sv.NFS.Path).
WithReadOnly(sv.NFS.ReadOnly))
case sv.PersistentVolumeClaim != nil:
vol.WithPersistentVolumeClaim(corev1ac.PersistentVolumeClaimVolumeSource().
WithClaimName(sv.PersistentVolumeClaim.ClaimName).
WithReadOnly(sv.PersistentVolumeClaim.ReadOnly))
case sv.HostPath != nil:
hp := corev1ac.HostPathVolumeSource().WithPath(sv.HostPath.Path)
if sv.HostPath.Type != nil {
hp.WithType(*sv.HostPath.Type)
}
vol.WithHostPath(hp)
}
podSpecAC.WithVolumes(vol)

// Mount the volume into the ateom container at StorageVolumesPath/<name>
// so atelet can reference it when building OCI bind mounts.
mountPath := filepath.Join(ateompath.StorageVolumesPath, sv.Name)
containerAC.WithVolumeMounts(
corev1ac.VolumeMount().
WithName(sv.Name).
WithMountPath(mountPath))
}

applyWorkerPoolPodTemplate(podSpecAC, wp.Spec.Template)
podSpecAC.WithContainers(containerAC)

Expand Down
32 changes: 31 additions & 1 deletion cmd/atelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ func (s *AteomHerder) prepareOCIBundles(
return fmt.Errorf("while writing actor identity file: %w", err)
}

// Resolve volume mounts from proto to host-path-based configs.
volumeMounts := resolveVolumeMounts(spec.GetVolumeMounts(), actorID)

g, gCtx := errgroup.WithContext(ctx)

// Pause container.
Expand All @@ -625,7 +628,8 @@ func (s *AteomHerder) prepareOCIBundles(
"io.kubernetes.cri.container-name": "pause",
},
netnsPath,
"", // pause is sandbox infra; it gets no actor identity mount.
"", // pause is sandbox infra; it gets no actor identity mount.
nil, // pause gets no storage volume mounts.
); err != nil {
return fmt.Errorf("while creating pause OCI bundle: %w", err)
}
Expand Down Expand Up @@ -655,6 +659,7 @@ func (s *AteomHerder) prepareOCIBundles(
},
netnsPath,
identityDir,
volumeMounts,
); err != nil {
return fmt.Errorf("while creating %q OCI bundle: %w", ctr.GetName(), err)
}
Expand All @@ -665,6 +670,31 @@ func (s *AteomHerder) prepareOCIBundles(
return g.Wait()
}

// resolveVolumeMounts converts proto VolumeMounts to host-path-based mount
// configs. The host path is {ateompath.StorageVolumesPath}/{volumeName},
// which is where the WorkerPool controller mounted the volume in the ateom
// container. The ${ACTOR_ID} placeholder in SubPath is replaced with the
// actor's actual ID.
func resolveVolumeMounts(protoMounts []*ateletpb.VolumeMount, actorID string) []VolumeMountConfig {
if len(protoMounts) == 0 {
return nil
}
mounts := make([]VolumeMountConfig, 0, len(protoMounts))
for _, m := range protoMounts {
hostPath := filepath.Join(ateompath.StorageVolumesPath, m.GetName())
subPath := m.GetSubPath()
subPath = strings.ReplaceAll(subPath, "${ACTOR_ID}", actorID)
mounts = append(mounts, VolumeMountConfig{
Name: m.GetName(),
MountPath: m.GetMountPath(),
HostPath: hostPath,
SubPath: subPath,
ReadOnly: m.GetReadOnly(),
})
}
return mounts
}

// dialAteom opens (or reuses) the gRPC connection to the target ateom
// pod and returns an ateom client.
func (s *AteomHerder) dialAteom(ctx context.Context, targetAteomUid string) (ateompb.AteomClient, error) {
Expand Down
44 changes: 41 additions & 3 deletions cmd/atelet/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,19 @@ const (
ActorIDFileName = "actor-id"
)

func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string) error {
// VolumeMountConfig holds the resolved configuration for a single volume
// mount to be added to an actor container's OCI spec. HostPath is the
// absolute path on the host (inside the ateom container) where the storage
// volume is mounted.
type VolumeMountConfig struct {
Name string
MountPath string
HostPath string
SubPath string
ReadOnly bool
}

func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) error {
tracer := otel.Tracer("prepareOCIDirectory")

ctx, span := tracer.Start(ctx, "prepareOCIDirectory")
Expand Down Expand Up @@ -88,7 +100,14 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP
}
}

ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir)
// Create mount point directories in rootfs for each storage volume mount.
for _, vm := range volumeMounts {
if err := createMountPoint(rootPath, vm.MountPath); err != nil {
return fmt.Errorf("while creating volume mount point %q: %w", vm.MountPath, err)
}
}

ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir, volumeMounts)
ociSpecBytes, err := json.MarshalIndent(ociSpec, "", " ")
if err != nil {
return fmt.Errorf("while marshaling OCI spec: %w", err)
Expand All @@ -105,7 +124,8 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP
// When identityDir is non-empty it adds a read-only bind mount of that host
// directory at IdentityMountPath so the actor can read its own ID (see
// IdentityMountPath for why this is a bind mount rather than env vars).
func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string) *specs.Spec {
// volumeMounts are additional bind mounts from WorkerPool storage volumes.
func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) *specs.Spec {
envVars := []string{
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
}
Expand Down Expand Up @@ -149,6 +169,24 @@ func buildActorOCISpec(args []string, env []string, annotations map[string]strin
})
}

// Add storage volume bind mounts from WorkerPool.
for _, vm := range volumeMounts {
options := []string{"bind"}
if vm.ReadOnly {
options = append(options, "ro")
}
source := vm.HostPath
if vm.SubPath != "" {
source = filepath.Join(source, vm.SubPath)
}
mounts = append(mounts, specs.Mount{
Destination: vm.MountPath,
Type: "bind",
Source: source,
Options: options,
})
}

return &specs.Spec{
Process: &specs.Process{
User: specs.User{
Expand Down
Loading