diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index a604e1709..eaf660896 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -25,6 +25,7 @@ import ( "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" "github.com/agent-substrate/substrate/internal/proto/ateletpb" + "github.com/agent-substrate/substrate/internal/resources" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -241,6 +242,12 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, return err } + // Validate that the ActorTemplate's volume references are satisfied by the + // assigned WorkerPool's storageVolumes. + if err := s.validateVolumeRefs(ctx, state); err != nil { + return err + } + if state.Actor.GetLatestSnapshotInfo().GetType() != ateapipb.SnapshotType_SNAPSHOT_TYPE_UNSPECIFIED { slog.InfoContext(ctx, "Actor has snapshot; Restoring from snapshot") @@ -329,6 +336,33 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff { return nil } +// validateVolumeRefs checks that every volume referenced by the ActorTemplate +// exists in the assigned WorkerPool's storageVolumes. +func (s *CallAteletRestoreStep) validateVolumeRefs(ctx context.Context, state *ResumeState) error { + if len(state.ActorTemplate.Spec.Volumes) == 0 { + return nil + } + pool, err := s.workerPoolLister.WorkerPools(state.Actor.GetAteomPodNamespace()).Get(state.Actor.GetWorkerPoolName()) + if err != nil { + return fmt.Errorf("while getting worker pool for volume validation: %w", err) + } + poolVolumes := make(map[string]bool, len(pool.Spec.StorageVolumes)) + for _, sv := range pool.Spec.StorageVolumes { + poolVolumes[sv.Name] = true + } + for _, v := range state.ActorTemplate.Spec.Volumes { + if !poolVolumes[v.Name] { + return status.Errorf(codes.FailedPrecondition, + "actor template references volume %q not provided by worker pool %s", + v.Name, pool.Name) + } + if err := resources.ValidateVolumeMountPath(v.MountPath); err != nil { + return status.Errorf(codes.InvalidArgument, "invalid volume mount: %v", err) + } + } + return nil +} + type FinalizeRunningStep struct { store store.Interface } diff --git a/cmd/ateapi/internal/controlapi/workload_spec.go b/cmd/ateapi/internal/controlapi/workload_spec.go index 018fb444f..3fba07756 100644 --- a/cmd/ateapi/internal/controlapi/workload_spec.go +++ b/cmd/ateapi/internal/controlapi/workload_spec.go @@ -60,6 +60,16 @@ func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.In workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr) } + // Convert ActorTemplate volumes to proto VolumeMounts. + for _, v := range actorTemplate.Spec.Volumes { + workloadSpec.VolumeMounts = append(workloadSpec.VolumeMounts, &ateletpb.VolumeMount{ + Name: v.Name, + MountPath: v.MountPath, + ReadOnly: v.ReadOnly, + SubPath: v.SubPath, + }) + } + return workloadSpec, nil } diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply.go b/cmd/atecontroller/internal/controllers/workerpool_apply.go index e9ce1f7a1..e656f6b8f 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_apply.go +++ b/cmd/atecontroller/internal/controllers/workerpool_apply.go @@ -15,6 +15,8 @@ package controllers import ( + "path/filepath" + corev1 "k8s.io/api/core/v1" appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" @@ -59,6 +61,39 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment WithPath(ateompath.BasePath). WithType(corev1.HostPathDirectoryOrCreate))) + // Add storage volumes from WorkerPool spec. Each volume is mounted into + // the ateom container at a well-known path so atelet can create OCI bind + // mounts from it into actor sandboxes. + for _, sv := range wp.Spec.StorageVolumes { + vol := corev1ac.Volume().WithName(sv.Name) + switch { + case sv.NFS != nil: + vol.WithNFS(corev1ac.NFSVolumeSource(). + WithServer(sv.NFS.Server). + WithPath(sv.NFS.Path). + WithReadOnly(sv.NFS.ReadOnly)) + case sv.PersistentVolumeClaim != nil: + vol.WithPersistentVolumeClaim(corev1ac.PersistentVolumeClaimVolumeSource(). + WithClaimName(sv.PersistentVolumeClaim.ClaimName). + WithReadOnly(sv.PersistentVolumeClaim.ReadOnly)) + case sv.HostPath != nil: + hp := corev1ac.HostPathVolumeSource().WithPath(sv.HostPath.Path) + if sv.HostPath.Type != nil { + hp.WithType(*sv.HostPath.Type) + } + vol.WithHostPath(hp) + } + podSpecAC.WithVolumes(vol) + + // Mount the volume into the ateom container at StorageVolumesPath/ + // so atelet can reference it when building OCI bind mounts. + mountPath := filepath.Join(ateompath.StorageVolumesPath, sv.Name) + containerAC.WithVolumeMounts( + corev1ac.VolumeMount(). + WithName(sv.Name). + WithMountPath(mountPath)) + } + applyWorkerPoolPodTemplate(podSpecAC, wp.Spec.Template) podSpecAC.WithContainers(containerAC) diff --git a/cmd/atelet/main.go b/cmd/atelet/main.go index 7094f7315..bf0587c26 100644 --- a/cmd/atelet/main.go +++ b/cmd/atelet/main.go @@ -608,6 +608,9 @@ func (s *AteomHerder) prepareOCIBundles( return fmt.Errorf("while writing actor identity file: %w", err) } + // Resolve volume mounts from proto to host-path-based configs. + volumeMounts := resolveVolumeMounts(spec.GetVolumeMounts(), actorID) + g, gCtx := errgroup.WithContext(ctx) // Pause container. @@ -625,7 +628,8 @@ func (s *AteomHerder) prepareOCIBundles( "io.kubernetes.cri.container-name": "pause", }, netnsPath, - "", // pause is sandbox infra; it gets no actor identity mount. + "", // pause is sandbox infra; it gets no actor identity mount. + nil, // pause gets no storage volume mounts. ); err != nil { return fmt.Errorf("while creating pause OCI bundle: %w", err) } @@ -655,6 +659,7 @@ func (s *AteomHerder) prepareOCIBundles( }, netnsPath, identityDir, + volumeMounts, ); err != nil { return fmt.Errorf("while creating %q OCI bundle: %w", ctr.GetName(), err) } @@ -665,6 +670,31 @@ func (s *AteomHerder) prepareOCIBundles( return g.Wait() } +// resolveVolumeMounts converts proto VolumeMounts to host-path-based mount +// configs. The host path is {ateompath.StorageVolumesPath}/{volumeName}, +// which is where the WorkerPool controller mounted the volume in the ateom +// container. The ${ACTOR_ID} placeholder in SubPath is replaced with the +// actor's actual ID. +func resolveVolumeMounts(protoMounts []*ateletpb.VolumeMount, actorID string) []VolumeMountConfig { + if len(protoMounts) == 0 { + return nil + } + mounts := make([]VolumeMountConfig, 0, len(protoMounts)) + for _, m := range protoMounts { + hostPath := filepath.Join(ateompath.StorageVolumesPath, m.GetName()) + subPath := m.GetSubPath() + subPath = strings.ReplaceAll(subPath, "${ACTOR_ID}", actorID) + mounts = append(mounts, VolumeMountConfig{ + Name: m.GetName(), + MountPath: m.GetMountPath(), + HostPath: hostPath, + SubPath: subPath, + ReadOnly: m.GetReadOnly(), + }) + } + return mounts +} + // dialAteom opens (or reuses) the gRPC connection to the target ateom // pod and returns an ateom client. func (s *AteomHerder) dialAteom(ctx context.Context, targetAteomUid string) (ateompb.AteomClient, error) { diff --git a/cmd/atelet/oci.go b/cmd/atelet/oci.go index 4451f6362..af6975b51 100644 --- a/cmd/atelet/oci.go +++ b/cmd/atelet/oci.go @@ -51,7 +51,19 @@ const ( ActorIDFileName = "actor-id" ) -func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string) error { +// VolumeMountConfig holds the resolved configuration for a single volume +// mount to be added to an actor container's OCI spec. HostPath is the +// absolute path on the host (inside the ateom container) where the storage +// volume is mounted. +type VolumeMountConfig struct { + Name string + MountPath string + HostPath string + SubPath string + ReadOnly bool +} + +func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) error { tracer := otel.Tracer("prepareOCIDirectory") ctx, span := tracer.Start(ctx, "prepareOCIDirectory") @@ -88,7 +100,14 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP } } - ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir) + // Create mount point directories in rootfs for each storage volume mount. + for _, vm := range volumeMounts { + if err := createMountPoint(rootPath, vm.MountPath); err != nil { + return fmt.Errorf("while creating volume mount point %q: %w", vm.MountPath, err) + } + } + + ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir, volumeMounts) ociSpecBytes, err := json.MarshalIndent(ociSpec, "", " ") if err != nil { return fmt.Errorf("while marshaling OCI spec: %w", err) @@ -105,7 +124,8 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP // When identityDir is non-empty it adds a read-only bind mount of that host // directory at IdentityMountPath so the actor can read its own ID (see // IdentityMountPath for why this is a bind mount rather than env vars). -func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string) *specs.Spec { +// volumeMounts are additional bind mounts from WorkerPool storage volumes. +func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) *specs.Spec { envVars := []string{ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", } @@ -149,6 +169,24 @@ func buildActorOCISpec(args []string, env []string, annotations map[string]strin }) } + // Add storage volume bind mounts from WorkerPool. + for _, vm := range volumeMounts { + options := []string{"bind"} + if vm.ReadOnly { + options = append(options, "ro") + } + source := vm.HostPath + if vm.SubPath != "" { + source = filepath.Join(source, vm.SubPath) + } + mounts = append(mounts, specs.Mount{ + Destination: vm.MountPath, + Type: "bind", + Source: source, + Options: options, + }) + } + return &specs.Spec{ Process: &specs.Process{ User: specs.User{ diff --git a/cmd/atelet/oci_test.go b/cmd/atelet/oci_test.go index 5b53f2520..e2bda6ad3 100644 --- a/cmd/atelet/oci_test.go +++ b/cmd/atelet/oci_test.go @@ -89,6 +89,7 @@ func TestBuildActorOCISpec_IdentityMount(t *testing.T) { map[string]string{"k": "v"}, "/run/netns/x", "/host/actors/ns:tmpl:id/identity", + nil, ) found := false for _, m := range spec.Mounts { @@ -113,7 +114,7 @@ func TestBuildActorOCISpec_IdentityMount(t *testing.T) { // Without an identity dir (the pause container), no identity mount appears. func TestBuildActorOCISpec_NoIdentityMountForPause(t *testing.T) { - bare := buildActorOCISpec([]string{"/pause"}, nil, nil, "/run/netns/x", "") + bare := buildActorOCISpec([]string{"/pause"}, nil, nil, "/run/netns/x", "", nil) for _, m := range bare.Mounts { if m.Destination == IdentityMountPath { t.Errorf("identity mount must be absent when identityDir is empty") @@ -526,3 +527,172 @@ func TestUntar_TruncatedArchive(t *testing.T) { t.Errorf("error = %v, want it to surface the underlying tar/copy error", err) } } + +// --- Volume mount tests --- + +func TestBuildActorOCISpec_VolumeMounts(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + { + Name: "shared-data", + MountPath: "/workspace", + HostPath: "/var/lib/ateom-storage/shared-data", + ReadOnly: false, + }, + { + Name: "model-cache", + MountPath: "/models", + HostPath: "/var/lib/ateom-storage/model-cache", + ReadOnly: true, + }, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "/var/lib/ateom-gvisor/actors/ns:tmpl:id/identity", + volumeMounts, + ) + + // 4 standard + 1 identity + 2 volume mounts = 7 + if got := len(spec.Mounts); got != 7 { + t.Fatalf("mount count = %d, want 7; mounts: %+v", got, spec.Mounts) + } + + // Check shared-data mount (read-write) + sharedDataMount := spec.Mounts[5] + if sharedDataMount.Destination != "/workspace" { + t.Errorf("shared-data mount destination = %q, want %q", sharedDataMount.Destination, "/workspace") + } + if sharedDataMount.Source != "/var/lib/ateom-storage/shared-data" { + t.Errorf("shared-data mount source = %q, want %q", sharedDataMount.Source, "/var/lib/ateom-storage/shared-data") + } + if sharedDataMount.Type != "bind" { + t.Errorf("shared-data mount type = %q, want %q", sharedDataMount.Type, "bind") + } + if slices.Contains(sharedDataMount.Options, "ro") { + t.Errorf("shared-data mount should NOT be read-only, options=%v", sharedDataMount.Options) + } + + // Check model-cache mount (read-only) + modelCacheMount := spec.Mounts[6] + if modelCacheMount.Destination != "/models" { + t.Errorf("model-cache mount destination = %q, want %q", modelCacheMount.Destination, "/models") + } + if !slices.Contains(modelCacheMount.Options, "ro") { + t.Errorf("model-cache mount must be read-only, options=%v", modelCacheMount.Options) + } +} + +func TestBuildActorOCISpec_VolumeMountWithSubPath(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + { + Name: "shared-data", + MountPath: "/workspace", + HostPath: "/var/lib/ateom-storage/shared-data", + SubPath: "actor-123/workspace", + ReadOnly: false, + }, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "", + volumeMounts, + ) + + // 4 standard + 1 volume mount = 5 + mount := spec.Mounts[4] + wantSource := "/var/lib/ateom-storage/shared-data/actor-123/workspace" + if mount.Source != wantSource { + t.Errorf("mount source = %q, want %q", mount.Source, wantSource) + } + if mount.Destination != "/workspace" { + t.Errorf("mount destination = %q, want %q", mount.Destination, "/workspace") + } +} + +func TestBuildActorOCISpec_NoVolumeMounts(t *testing.T) { + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "", + nil, + ) + + // 4 standard mounts only + if got := len(spec.Mounts); got != 4 { + t.Errorf("mount count = %d, want 4", got) + } + for _, m := range spec.Mounts { + if m.Destination == "/workspace" || m.Destination == "/models" { + t.Errorf("unexpected volume mount at %q", m.Destination) + } + } +} + +func TestBuildActorOCISpec_MountOrder(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + {Name: "vol1", MountPath: "/a", HostPath: "/host/a"}, + {Name: "vol2", MountPath: "/b", HostPath: "/host/b"}, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "/identity/dir", + volumeMounts, + ) + + // Expected order: proc, dev, sys, resolv.conf, identity, vol1, vol2 + wantDests := []string{ + "/proc", "/dev", "/sys", "/etc/resolv.conf", + IdentityMountPath, "/a", "/b", + } + + if len(spec.Mounts) != len(wantDests) { + t.Fatalf("mount count = %d, want %d", len(spec.Mounts), len(wantDests)) + } + + for i, want := range wantDests { + if spec.Mounts[i].Destination != want { + t.Errorf("mount[%d].Destination = %q, want %q", i, spec.Mounts[i].Destination, want) + } + } +} + +func TestCreateMountPoint_VolumePath(t *testing.T) { + root := t.TempDir() + if err := createMountPoint(root, "/workspace"); err != nil { + t.Fatalf("createMountPoint: %v", err) + } + info, err := os.Stat(filepath.Join(root, "workspace")) + if err != nil { + t.Fatalf("mount point not created: %v", err) + } + if !info.IsDir() { + t.Errorf("mount point must be a directory") + } +} + +func TestCreateMountPoint_NestedVolumePath(t *testing.T) { + root := t.TempDir() + if err := createMountPoint(root, "/data/nested/path"); err != nil { + t.Fatalf("createMountPoint: %v", err) + } + info, err := os.Stat(filepath.Join(root, "data", "nested", "path")) + if err != nil { + t.Fatalf("nested mount point not created: %v", err) + } + if !info.IsDir() { + t.Errorf("nested mount point must be a directory") + } +} diff --git a/internal/ateompath/ateompath.go b/internal/ateompath/ateompath.go index a779755c4..a611f918f 100644 --- a/internal/ateompath/ateompath.go +++ b/internal/ateompath/ateompath.go @@ -23,6 +23,11 @@ const ( // The base path. This is both the path of the root shared folder on the // host filesystem, and when it is mounted into ateom and atelet containers. BasePath = "/var/lib/ateom-gvisor" + + // StorageVolumesPath is the base directory inside the ateom container where + // WorkerPool storage volumes are mounted. Each named volume from the + // WorkerPool's storageVolumes appears as a subdirectory here. + StorageVolumesPath = "/var/lib/ateom-storage" ) var ( diff --git a/internal/proto/ateletpb/atelet.pb.go b/internal/proto/ateletpb/atelet.pb.go index 03b0fe750..b88007e6a 100644 --- a/internal/proto/ateletpb/atelet.pb.go +++ b/internal/proto/ateletpb/atelet.pb.go @@ -332,18 +332,89 @@ func (x *SandboxAssets) GetAssets() map[string]*ArchAssets { return nil } +// VolumeMount describes a single storage volume mount for an actor container. +// The volume name references a storage volume defined on the WorkerPool. +type VolumeMount struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // References a WorkerPool storageVolume name + MountPath string `protobuf:"bytes,2,opt,name=mount_path,json=mountPath,proto3" json:"mount_path,omitempty"` // Absolute path inside the container + ReadOnly bool `protobuf:"varint,3,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"` // Mount as read-only + SubPath string `protobuf:"bytes,4,opt,name=sub_path,json=subPath,proto3" json:"sub_path,omitempty"` // Optional sub-directory within the volume + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VolumeMount) Reset() { + *x = VolumeMount{} + mi := &file_atelet_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VolumeMount) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VolumeMount) ProtoMessage() {} + +func (x *VolumeMount) ProtoReflect() protoreflect.Message { + mi := &file_atelet_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VolumeMount.ProtoReflect.Descriptor instead. +func (*VolumeMount) Descriptor() ([]byte, []int) { + return file_atelet_proto_rawDescGZIP(), []int{4} +} + +func (x *VolumeMount) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *VolumeMount) GetMountPath() string { + if x != nil { + return x.MountPath + } + return "" +} + +func (x *VolumeMount) GetReadOnly() bool { + if x != nil { + return x.ReadOnly + } + return false +} + +func (x *VolumeMount) GetSubPath() string { + if x != nil { + return x.SubPath + } + return "" +} + // WorkloadSpec parallels Pod, but with far fewer configurable fields. type WorkloadSpec struct { state protoimpl.MessageState `protogen:"open.v1"` Containers []*Container `protobuf:"bytes,1,rep,name=containers,proto3" json:"containers,omitempty"` PauseImage string `protobuf:"bytes,2,opt,name=pause_image,json=pauseImage,proto3" json:"pause_image,omitempty"` + VolumeMounts []*VolumeMount `protobuf:"bytes,3,rep,name=volume_mounts,json=volumeMounts,proto3" json:"volume_mounts,omitempty"` // Storage volume mounts unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *WorkloadSpec) Reset() { *x = WorkloadSpec{} - mi := &file_atelet_proto_msgTypes[4] + mi := &file_atelet_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +426,7 @@ func (x *WorkloadSpec) String() string { func (*WorkloadSpec) ProtoMessage() {} func (x *WorkloadSpec) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[4] + mi := &file_atelet_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +439,7 @@ func (x *WorkloadSpec) ProtoReflect() protoreflect.Message { // Deprecated: Use WorkloadSpec.ProtoReflect.Descriptor instead. func (*WorkloadSpec) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{4} + return file_atelet_proto_rawDescGZIP(), []int{5} } func (x *WorkloadSpec) GetContainers() []*Container { @@ -385,6 +456,13 @@ func (x *WorkloadSpec) GetPauseImage() string { return "" } +func (x *WorkloadSpec) GetVolumeMounts() []*VolumeMount { + if x != nil { + return x.VolumeMounts + } + return nil +} + type Container struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -397,7 +475,7 @@ type Container struct { func (x *Container) Reset() { *x = Container{} - mi := &file_atelet_proto_msgTypes[5] + mi := &file_atelet_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -409,7 +487,7 @@ func (x *Container) String() string { func (*Container) ProtoMessage() {} func (x *Container) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[5] + mi := &file_atelet_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -422,7 +500,7 @@ func (x *Container) ProtoReflect() protoreflect.Message { // Deprecated: Use Container.ProtoReflect.Descriptor instead. func (*Container) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{5} + return file_atelet_proto_rawDescGZIP(), []int{6} } func (x *Container) GetName() string { @@ -463,7 +541,7 @@ type EnvEntry struct { func (x *EnvEntry) Reset() { *x = EnvEntry{} - mi := &file_atelet_proto_msgTypes[6] + mi := &file_atelet_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -475,7 +553,7 @@ func (x *EnvEntry) String() string { func (*EnvEntry) ProtoMessage() {} func (x *EnvEntry) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[6] + mi := &file_atelet_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -488,7 +566,7 @@ func (x *EnvEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use EnvEntry.ProtoReflect.Descriptor instead. func (*EnvEntry) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{6} + return file_atelet_proto_rawDescGZIP(), []int{7} } func (x *EnvEntry) GetName() string { @@ -513,7 +591,7 @@ type RunResponse struct { func (x *RunResponse) Reset() { *x = RunResponse{} - mi := &file_atelet_proto_msgTypes[7] + mi := &file_atelet_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -525,7 +603,7 @@ func (x *RunResponse) String() string { func (*RunResponse) ProtoMessage() {} func (x *RunResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[7] + mi := &file_atelet_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -538,7 +616,7 @@ func (x *RunResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RunResponse.ProtoReflect.Descriptor instead. func (*RunResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{7} + return file_atelet_proto_rawDescGZIP(), []int{8} } type LocalCheckpointConfiguration struct { @@ -552,7 +630,7 @@ type LocalCheckpointConfiguration struct { func (x *LocalCheckpointConfiguration) Reset() { *x = LocalCheckpointConfiguration{} - mi := &file_atelet_proto_msgTypes[8] + mi := &file_atelet_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -564,7 +642,7 @@ func (x *LocalCheckpointConfiguration) String() string { func (*LocalCheckpointConfiguration) ProtoMessage() {} func (x *LocalCheckpointConfiguration) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[8] + mi := &file_atelet_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -577,7 +655,7 @@ func (x *LocalCheckpointConfiguration) ProtoReflect() protoreflect.Message { // Deprecated: Use LocalCheckpointConfiguration.ProtoReflect.Descriptor instead. func (*LocalCheckpointConfiguration) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{8} + return file_atelet_proto_rawDescGZIP(), []int{9} } func (x *LocalCheckpointConfiguration) GetSnapshotPrefix() string { @@ -606,7 +684,7 @@ type ExternalCheckpointConfiguration struct { func (x *ExternalCheckpointConfiguration) Reset() { *x = ExternalCheckpointConfiguration{} - mi := &file_atelet_proto_msgTypes[9] + mi := &file_atelet_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -618,7 +696,7 @@ func (x *ExternalCheckpointConfiguration) String() string { func (*ExternalCheckpointConfiguration) ProtoMessage() {} func (x *ExternalCheckpointConfiguration) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[9] + mi := &file_atelet_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -631,7 +709,7 @@ func (x *ExternalCheckpointConfiguration) ProtoReflect() protoreflect.Message { // Deprecated: Use ExternalCheckpointConfiguration.ProtoReflect.Descriptor instead. func (*ExternalCheckpointConfiguration) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{9} + return file_atelet_proto_rawDescGZIP(), []int{10} } func (x *ExternalCheckpointConfiguration) GetSnapshotUriPrefix() string { @@ -662,7 +740,7 @@ type CheckpointRequest struct { func (x *CheckpointRequest) Reset() { *x = CheckpointRequest{} - mi := &file_atelet_proto_msgTypes[10] + mi := &file_atelet_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -674,7 +752,7 @@ func (x *CheckpointRequest) String() string { func (*CheckpointRequest) ProtoMessage() {} func (x *CheckpointRequest) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[10] + mi := &file_atelet_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -687,7 +765,7 @@ func (x *CheckpointRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckpointRequest.ProtoReflect.Descriptor instead. func (*CheckpointRequest) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{10} + return file_atelet_proto_rawDescGZIP(), []int{11} } func (x *CheckpointRequest) GetTargetAteomUid() string { @@ -781,7 +859,7 @@ type CheckpointResponse struct { func (x *CheckpointResponse) Reset() { *x = CheckpointResponse{} - mi := &file_atelet_proto_msgTypes[11] + mi := &file_atelet_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -793,7 +871,7 @@ func (x *CheckpointResponse) String() string { func (*CheckpointResponse) ProtoMessage() {} func (x *CheckpointResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[11] + mi := &file_atelet_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -806,7 +884,7 @@ func (x *CheckpointResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckpointResponse.ProtoReflect.Descriptor instead. func (*CheckpointResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{11} + return file_atelet_proto_rawDescGZIP(), []int{12} } type RestoreRequest struct { @@ -830,7 +908,7 @@ type RestoreRequest struct { func (x *RestoreRequest) Reset() { *x = RestoreRequest{} - mi := &file_atelet_proto_msgTypes[12] + mi := &file_atelet_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -842,7 +920,7 @@ func (x *RestoreRequest) String() string { func (*RestoreRequest) ProtoMessage() {} func (x *RestoreRequest) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[12] + mi := &file_atelet_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -855,7 +933,7 @@ func (x *RestoreRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RestoreRequest.ProtoReflect.Descriptor instead. func (*RestoreRequest) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{12} + return file_atelet_proto_rawDescGZIP(), []int{13} } func (x *RestoreRequest) GetTargetAteomUid() string { @@ -949,7 +1027,7 @@ type RestoreResponse struct { func (x *RestoreResponse) Reset() { *x = RestoreResponse{} - mi := &file_atelet_proto_msgTypes[13] + mi := &file_atelet_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -961,7 +1039,7 @@ func (x *RestoreResponse) String() string { func (*RestoreResponse) ProtoMessage() {} func (x *RestoreResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[13] + mi := &file_atelet_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -974,7 +1052,7 @@ func (x *RestoreResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RestoreResponse.ProtoReflect.Descriptor instead. func (*RestoreResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{13} + return file_atelet_proto_rawDescGZIP(), []int{14} } var File_atelet_proto protoreflect.FileDescriptor @@ -1005,13 +1083,20 @@ const file_atelet_proto_rawDesc = "" + "\x06assets\x18\x02 \x03(\v2!.atelet.SandboxAssets.AssetsEntryR\x06assets\x1aM\n" + "\vAssetsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12(\n" + - "\x05value\x18\x02 \x01(\v2\x12.atelet.ArchAssetsR\x05value:\x028\x01\"b\n" + + "\x05value\x18\x02 \x01(\v2\x12.atelet.ArchAssetsR\x05value:\x028\x01\"x\n" + + "\vVolumeMount\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1d\n" + + "\n" + + "mount_path\x18\x02 \x01(\tR\tmountPath\x12\x1b\n" + + "\tread_only\x18\x03 \x01(\bR\breadOnly\x12\x19\n" + + "\bsub_path\x18\x04 \x01(\tR\asubPath\"\x9c\x01\n" + "\fWorkloadSpec\x121\n" + "\n" + "containers\x18\x01 \x03(\v2\x11.atelet.ContainerR\n" + "containers\x12\x1f\n" + "\vpause_image\x18\x02 \x01(\tR\n" + - "pauseImage\"s\n" + + "pauseImage\x128\n" + + "\rvolume_mounts\x18\x03 \x03(\v2\x13.atelet.VolumeMountR\fvolumeMounts\"s\n" + "\tContainer\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" + "\x05image\x18\x02 \x01(\tR\x05image\x12\x18\n" + @@ -1072,54 +1157,56 @@ func file_atelet_proto_rawDescGZIP() []byte { } var file_atelet_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_atelet_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_atelet_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_atelet_proto_goTypes = []any{ (CheckpointType)(0), // 0: atelet.CheckpointType (*RunRequest)(nil), // 1: atelet.RunRequest (*AssetFile)(nil), // 2: atelet.AssetFile (*ArchAssets)(nil), // 3: atelet.ArchAssets (*SandboxAssets)(nil), // 4: atelet.SandboxAssets - (*WorkloadSpec)(nil), // 5: atelet.WorkloadSpec - (*Container)(nil), // 6: atelet.Container - (*EnvEntry)(nil), // 7: atelet.EnvEntry - (*RunResponse)(nil), // 8: atelet.RunResponse - (*LocalCheckpointConfiguration)(nil), // 9: atelet.LocalCheckpointConfiguration - (*ExternalCheckpointConfiguration)(nil), // 10: atelet.ExternalCheckpointConfiguration - (*CheckpointRequest)(nil), // 11: atelet.CheckpointRequest - (*CheckpointResponse)(nil), // 12: atelet.CheckpointResponse - (*RestoreRequest)(nil), // 13: atelet.RestoreRequest - (*RestoreResponse)(nil), // 14: atelet.RestoreResponse - nil, // 15: atelet.ArchAssets.FilesEntry - nil, // 16: atelet.SandboxAssets.AssetsEntry + (*VolumeMount)(nil), // 5: atelet.VolumeMount + (*WorkloadSpec)(nil), // 6: atelet.WorkloadSpec + (*Container)(nil), // 7: atelet.Container + (*EnvEntry)(nil), // 8: atelet.EnvEntry + (*RunResponse)(nil), // 9: atelet.RunResponse + (*LocalCheckpointConfiguration)(nil), // 10: atelet.LocalCheckpointConfiguration + (*ExternalCheckpointConfiguration)(nil), // 11: atelet.ExternalCheckpointConfiguration + (*CheckpointRequest)(nil), // 12: atelet.CheckpointRequest + (*CheckpointResponse)(nil), // 13: atelet.CheckpointResponse + (*RestoreRequest)(nil), // 14: atelet.RestoreRequest + (*RestoreResponse)(nil), // 15: atelet.RestoreResponse + nil, // 16: atelet.ArchAssets.FilesEntry + nil, // 17: atelet.SandboxAssets.AssetsEntry } var file_atelet_proto_depIdxs = []int32{ - 5, // 0: atelet.RunRequest.spec:type_name -> atelet.WorkloadSpec + 6, // 0: atelet.RunRequest.spec:type_name -> atelet.WorkloadSpec 4, // 1: atelet.RunRequest.sandbox_assets:type_name -> atelet.SandboxAssets - 15, // 2: atelet.ArchAssets.files:type_name -> atelet.ArchAssets.FilesEntry - 16, // 3: atelet.SandboxAssets.assets:type_name -> atelet.SandboxAssets.AssetsEntry - 6, // 4: atelet.WorkloadSpec.containers:type_name -> atelet.Container - 7, // 5: atelet.Container.env:type_name -> atelet.EnvEntry - 5, // 6: atelet.CheckpointRequest.spec:type_name -> atelet.WorkloadSpec - 0, // 7: atelet.CheckpointRequest.type:type_name -> atelet.CheckpointType - 9, // 8: atelet.CheckpointRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration - 10, // 9: atelet.CheckpointRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration - 5, // 10: atelet.RestoreRequest.spec:type_name -> atelet.WorkloadSpec - 0, // 11: atelet.RestoreRequest.type:type_name -> atelet.CheckpointType - 9, // 12: atelet.RestoreRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration - 10, // 13: atelet.RestoreRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration - 2, // 14: atelet.ArchAssets.FilesEntry.value:type_name -> atelet.AssetFile - 3, // 15: atelet.SandboxAssets.AssetsEntry.value:type_name -> atelet.ArchAssets - 1, // 16: atelet.AteomHerder.Run:input_type -> atelet.RunRequest - 11, // 17: atelet.AteomHerder.Checkpoint:input_type -> atelet.CheckpointRequest - 13, // 18: atelet.AteomHerder.Restore:input_type -> atelet.RestoreRequest - 8, // 19: atelet.AteomHerder.Run:output_type -> atelet.RunResponse - 12, // 20: atelet.AteomHerder.Checkpoint:output_type -> atelet.CheckpointResponse - 14, // 21: atelet.AteomHerder.Restore:output_type -> atelet.RestoreResponse - 19, // [19:22] is the sub-list for method output_type - 16, // [16:19] is the sub-list for method input_type - 16, // [16:16] is the sub-list for extension type_name - 16, // [16:16] is the sub-list for extension extendee - 0, // [0:16] is the sub-list for field type_name + 16, // 2: atelet.ArchAssets.files:type_name -> atelet.ArchAssets.FilesEntry + 17, // 3: atelet.SandboxAssets.assets:type_name -> atelet.SandboxAssets.AssetsEntry + 7, // 4: atelet.WorkloadSpec.containers:type_name -> atelet.Container + 5, // 5: atelet.WorkloadSpec.volume_mounts:type_name -> atelet.VolumeMount + 8, // 6: atelet.Container.env:type_name -> atelet.EnvEntry + 6, // 7: atelet.CheckpointRequest.spec:type_name -> atelet.WorkloadSpec + 0, // 8: atelet.CheckpointRequest.type:type_name -> atelet.CheckpointType + 10, // 9: atelet.CheckpointRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration + 11, // 10: atelet.CheckpointRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration + 6, // 11: atelet.RestoreRequest.spec:type_name -> atelet.WorkloadSpec + 0, // 12: atelet.RestoreRequest.type:type_name -> atelet.CheckpointType + 10, // 13: atelet.RestoreRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration + 11, // 14: atelet.RestoreRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration + 2, // 15: atelet.ArchAssets.FilesEntry.value:type_name -> atelet.AssetFile + 3, // 16: atelet.SandboxAssets.AssetsEntry.value:type_name -> atelet.ArchAssets + 1, // 17: atelet.AteomHerder.Run:input_type -> atelet.RunRequest + 12, // 18: atelet.AteomHerder.Checkpoint:input_type -> atelet.CheckpointRequest + 14, // 19: atelet.AteomHerder.Restore:input_type -> atelet.RestoreRequest + 9, // 20: atelet.AteomHerder.Run:output_type -> atelet.RunResponse + 13, // 21: atelet.AteomHerder.Checkpoint:output_type -> atelet.CheckpointResponse + 15, // 22: atelet.AteomHerder.Restore:output_type -> atelet.RestoreResponse + 20, // [20:23] is the sub-list for method output_type + 17, // [17:20] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_atelet_proto_init() } @@ -1127,11 +1214,11 @@ func file_atelet_proto_init() { if File_atelet_proto != nil { return } - file_atelet_proto_msgTypes[10].OneofWrappers = []any{ + file_atelet_proto_msgTypes[11].OneofWrappers = []any{ (*CheckpointRequest_LocalConfig)(nil), (*CheckpointRequest_ExternalConfig)(nil), } - file_atelet_proto_msgTypes[12].OneofWrappers = []any{ + file_atelet_proto_msgTypes[13].OneofWrappers = []any{ (*RestoreRequest_LocalConfig)(nil), (*RestoreRequest_ExternalConfig)(nil), } @@ -1141,7 +1228,7 @@ func file_atelet_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_atelet_proto_rawDesc), len(file_atelet_proto_rawDesc)), NumEnums: 1, - NumMessages: 16, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/proto/ateletpb/atelet.proto b/internal/proto/ateletpb/atelet.proto index d210656d6..a965888ec 100644 --- a/internal/proto/ateletpb/atelet.proto +++ b/internal/proto/ateletpb/atelet.proto @@ -73,10 +73,20 @@ message SandboxAssets { map assets = 2; // arch -> {name -> file} } +// VolumeMount describes a single storage volume mount for an actor container. +// The volume name references a storage volume defined on the WorkerPool. +message VolumeMount { + string name = 1; // References a WorkerPool storageVolume name + string mount_path = 2; // Absolute path inside the container + bool read_only = 3; // Mount as read-only + string sub_path = 4; // Optional sub-directory within the volume +} + // WorkloadSpec parallels Pod, but with far fewer configurable fields. message WorkloadSpec { - repeated Container containers = 1; - string pause_image = 2; + repeated Container containers = 1; + string pause_image = 2; + repeated VolumeMount volume_mounts = 3; // Storage volume mounts } message Container { diff --git a/internal/resources/validate.go b/internal/resources/validate.go index 3855fe856..147fdcf97 100644 --- a/internal/resources/validate.go +++ b/internal/resources/validate.go @@ -136,3 +136,19 @@ func ValidateSnapshotURIPrefix(prefix string) error { } return nil } + +// ValidateVolumeMountPath ensures a volume mount path is an absolute path that +// does not contain traversal sequences. The path is used as the mount +// destination inside the OCI bundle rootfs, so it must be safe. +func ValidateVolumeMountPath(mountPath string) error { + if mountPath == "" { + return fmt.Errorf("volume mount path must not be empty") + } + if mountPath[0] != '/' { + return fmt.Errorf("volume mount path %q must be absolute", mountPath) + } + if strings.Contains(mountPath, "..") { + return fmt.Errorf("volume mount path %q must not contain '..'", mountPath) + } + return nil +} diff --git a/manifests/ate-install/generated/ate.dev_actortemplates.yaml b/manifests/ate-install/generated/ate.dev_actortemplates.yaml index 4fcb59ac2..f81b1bf80 100644 --- a/manifests/ate-install/generated/ate.dev_actortemplates.yaml +++ b/manifests/ate-install/generated/ate.dev_actortemplates.yaml @@ -177,6 +177,46 @@ spec: required: - location type: object + volumes: + description: |- + Volumes defines storage volumes available to this actor's containers. + Each volume references a name defined in the assigned WorkerPool's + storageVolumes. The actor cannot request storage the pool doesn't provide. + items: + description: |- + ActorVolume maps a WorkerPool storage volume into the actor's containers. + The name must match a volume defined in the assigned WorkerPool's + storageVolumes field. + properties: + mountPath: + description: |- + MountPath is the absolute path inside the actor's containers where + the volume will be mounted. + pattern: ^/ + type: string + name: + description: Name of the volume (must match a WorkerPool storageVolume + name). + minLength: 1 + type: string + readOnly: + description: ReadOnly forces the volume to be read-only inside + the actor. + type: boolean + subPath: + description: |- + SubPath is an optional sub-directory within the volume to mount + instead of the whole volume. The literal "${ACTOR_ID}" is replaced + with the actor's ID at mount time, enabling per-actor subdirectories + on shared storage. + type: string + required: + - mountPath + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-type: atomic workerSelector: description: |- WorkerSelector restricts which worker pools actors from this template may diff --git a/manifests/ate-install/generated/ate.dev_workerpools.yaml b/manifests/ate-install/generated/ate.dev_workerpools.yaml index 5714891ab..5f0fdb2cd 100644 --- a/manifests/ate-install/generated/ate.dev_workerpools.yaml +++ b/manifests/ate-install/generated/ate.dev_workerpools.yaml @@ -94,6 +94,91 @@ spec: this pool's SandboxClass. If empty, the default SandboxConfig for the SandboxClass is used. type: string + storageVolumes: + description: |- + StorageVolumes defines network or persistent storage volumes that will be + mounted into each worker pod. Actors can reference these by name via their + ActorTemplate's volumes field. + items: + description: |- + StorageVolume describes a network or persistent storage volume to be mounted + into each worker pod. Actors reference these by name via their ActorTemplate's + Volumes field. Exactly one source (NFS, PersistentVolumeClaim, or HostPath) + must be specified. + properties: + hostPath: + description: |- + HostPath represents a pre-existing host path (e.g., NFS already + mounted at a well-known path on all nodes). + properties: + path: + description: |- + path of the directory on the host. + If the path is a symlink, it will follow the link to the real path. + More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath + type: string + type: + description: |- + type for HostPath Volume + Defaults to "" + More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath + type: string + required: + - path + type: object + name: + description: Name of the volume. Referenced by ActorTemplate + volume entries. + maxLength: 63 + minLength: 1 + pattern: ^[a-zA-Z][a-zA-Z0-9-]*$ + type: string + nfs: + description: NFS represents an NFS mount on the host. + properties: + path: + description: |- + path that is exported by the NFS server. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: string + readOnly: + description: |- + readOnly here will force the NFS export to be mounted with read-only permissions. + Defaults to false. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: boolean + server: + description: |- + server is the hostname or IP address of the NFS server. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: string + required: + - path + - server + type: object + persistentVolumeClaim: + description: PersistentVolumeClaim references a PVC in the worker + pool's namespace. + properties: + claimName: + description: |- + claimName is the name of a PersistentVolumeClaim in the same namespace as the pod using this volume. + More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#persistentvolumeclaims + type: string + readOnly: + description: |- + readOnly Will force the ReadOnly setting in VolumeMounts. + Default false. + type: boolean + required: + - claimName + type: object + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-type: atomic template: description: Template holds optional pod scheduling and resource settings for worker pods. diff --git a/pkg/api/v1alpha1/actortemplate_types.go b/pkg/api/v1alpha1/actortemplate_types.go index 38b777b71..42d8dd5dd 100644 --- a/pkg/api/v1alpha1/actortemplate_types.go +++ b/pkg/api/v1alpha1/actortemplate_types.go @@ -125,6 +125,37 @@ type SecretKeySelector struct { Optional *bool `json:"optional,omitempty"` } +// ActorVolume maps a WorkerPool storage volume into the actor's containers. +// The name must match a volume defined in the assigned WorkerPool's +// storageVolumes field. +type ActorVolume struct { + // Name of the volume (must match a WorkerPool storageVolume name). + // + // +required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // MountPath is the absolute path inside the actor's containers where + // the volume will be mounted. + // + // +required + // +kubebuilder:validation:Pattern=`^/` + MountPath string `json:"mountPath"` + + // ReadOnly forces the volume to be read-only inside the actor. + // + // +optional + ReadOnly bool `json:"readOnly,omitempty"` + + // SubPath is an optional sub-directory within the volume to mount + // instead of the whole volume. The literal "${ACTOR_ID}" is replaced + // with the actor's ID at mount time, enabling per-actor subdirectories + // on shared storage. + // + // +optional + SubPath string `json:"subPath,omitempty"` +} + type SnapshotsConfig struct { // Location to store snapshots in. // @@ -165,6 +196,15 @@ type ActorTemplateSpec struct { // // +optional WorkerSelector *metav1.LabelSelector `json:"workerSelector,omitempty"` + + // Volumes defines storage volumes available to this actor's containers. + // Each volume references a name defined in the assigned WorkerPool's + // storageVolumes. The actor cannot request storage the pool doesn't provide. + // + // +optional + // +kubebuilder:validation:MaxItems=16 + // +listType=atomic + Volumes []ActorVolume `json:"volumes,omitempty"` } // TODO: add validation diff --git a/pkg/api/v1alpha1/workerpool_types.go b/pkg/api/v1alpha1/workerpool_types.go index cb0818274..629290314 100644 --- a/pkg/api/v1alpha1/workerpool_types.go +++ b/pkg/api/v1alpha1/workerpool_types.go @@ -46,6 +46,36 @@ type WorkerPoolPodTemplate struct { NodeAffinity *corev1.NodeAffinity `json:"nodeAffinity,omitempty"` } +// StorageVolume describes a network or persistent storage volume to be mounted +// into each worker pod. Actors reference these by name via their ActorTemplate's +// Volumes field. Exactly one source (NFS, PersistentVolumeClaim, or HostPath) +// must be specified. +type StorageVolume struct { + // Name of the volume. Referenced by ActorTemplate volume entries. + // + // +required + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=63 + // +kubebuilder:validation:Pattern=`^[a-zA-Z][a-zA-Z0-9-]*$` + Name string `json:"name"` + + // NFS represents an NFS mount on the host. + // + // +optional + NFS *corev1.NFSVolumeSource `json:"nfs,omitempty"` + + // PersistentVolumeClaim references a PVC in the worker pool's namespace. + // + // +optional + PersistentVolumeClaim *corev1.PersistentVolumeClaimVolumeSource `json:"persistentVolumeClaim,omitempty"` + + // HostPath represents a pre-existing host path (e.g., NFS already + // mounted at a well-known path on all nodes). + // + // +optional + HostPath *corev1.HostPathVolumeSource `json:"hostPath,omitempty"` +} + type WorkerPoolSpec struct { // Replicas is the number of worker pods to run. // +required @@ -78,6 +108,15 @@ type WorkerPoolSpec struct { // SandboxClass is used. // +optional SandboxConfigName string `json:"sandboxConfigName,omitempty"` + + // StorageVolumes defines network or persistent storage volumes that will be + // mounted into each worker pod. Actors can reference these by name via their + // ActorTemplate's volumes field. + // + // +optional + // +kubebuilder:validation:MaxItems=16 + // +listType=atomic + StorageVolumes []StorageVolume `json:"storageVolumes,omitempty"` } type WorkerPoolStatus struct { diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 65cd9cdbb..1027f1800 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -99,6 +99,11 @@ func (in *ActorTemplateSpec) DeepCopyInto(out *ActorTemplateSpec) { *out = new(v1.LabelSelector) (*in).DeepCopyInto(*out) } + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + *out = make([]ActorVolume, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ActorTemplateSpec. @@ -134,6 +139,21 @@ func (in *ActorTemplateStatus) DeepCopy() *ActorTemplateStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ActorVolume) DeepCopyInto(out *ActorVolume) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ActorVolume. +func (in *ActorVolume) DeepCopy() *ActorVolume { + if in == nil { + return nil + } + out := new(ActorVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AssetFile) DeepCopyInto(out *AssetFile) { *out = *in @@ -347,6 +367,36 @@ func (in *SnapshotsConfig) DeepCopy() *SnapshotsConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StorageVolume) DeepCopyInto(out *StorageVolume) { + *out = *in + if in.NFS != nil { + in, out := &in.NFS, &out.NFS + *out = new(corev1.NFSVolumeSource) + **out = **in + } + if in.PersistentVolumeClaim != nil { + in, out := &in.PersistentVolumeClaim, &out.PersistentVolumeClaim + *out = new(corev1.PersistentVolumeClaimVolumeSource) + **out = **in + } + if in.HostPath != nil { + in, out := &in.HostPath, &out.HostPath + *out = new(corev1.HostPathVolumeSource) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageVolume. +func (in *StorageVolume) DeepCopy() *StorageVolume { + if in == nil { + return nil + } + out := new(StorageVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerPool) DeepCopyInto(out *WorkerPool) { *out = *in @@ -448,6 +498,13 @@ func (in *WorkerPoolSpec) DeepCopyInto(out *WorkerPoolSpec) { *out = new(WorkerPoolPodTemplate) (*in).DeepCopyInto(*out) } + if in.StorageVolumes != nil { + in, out := &in.StorageVolumes, &out.StorageVolumes + *out = make([]StorageVolume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolSpec.