From 64d0edf52b9fd726e5dc9ef5b8307b911b4edb95 Mon Sep 17 00:00:00 2001 From: chenggui53 Date: Mon, 22 Jun 2026 11:47:50 +0800 Subject: [PATCH] feat: add network storage volume mount support for actors Add the ability to mount network storage (NFS, PVC, HostPath) into actor sandboxes when starting actors. This enables actors to access shared persistent storage. ## Changes ### CRD Types - WorkerPool: Add StorageVolume type and storageVolumes field to declare available storage infrastructure (NFS, PVC, HostPath sources) - ActorTemplate: Add ActorVolume type and volumes field to declare mount requirements (name, mountPath, subPath, readOnly) ### Proto - atelet.proto: Add VolumeMount message and volume_mounts field to WorkloadSpec ### Controller - WorkerPool controller injects storageVolumes as K8s volumes into worker pods and mounts them at /var/lib/ateom-storage/ ### API Server - workloadSpecFromActorTemplate converts ActorTemplate volumes to proto - CallAteletRestoreStep validates volume refs against WorkerPool ### Atelet (OCI) - resolveVolumeMounts converts proto mounts to host-path configs - buildActorOCISpec creates OCI bind mounts for storage volumes - createMountPoint creates mount point directories in rootfs - Supports ${ACTOR_ID} placeholder in SubPath for per-actor isolation ### Validation - ValidateVolumeMountPath ensures mount paths are absolute and safe ## Usage WorkerPool (infrastructure): storageVolumes: - name: shared-data nfs: {server: nfs.example.com, path: /exports/data} ActorTemplate (workload): volumes: - name: shared-data mountPath: /workspace subPath: '${ACTOR_ID}/workspace' --- .../internal/controlapi/workflow_resume.go | 34 +++ .../internal/controlapi/workload_spec.go | 10 + .../internal/controllers/workerpool_apply.go | 35 +++ cmd/atelet/main.go | 32 ++- cmd/atelet/oci.go | 44 +++- cmd/atelet/oci_test.go | 172 ++++++++++++- internal/ateompath/ateompath.go | 5 + internal/proto/ateletpb/atelet.pb.go | 235 ++++++++++++------ internal/proto/ateletpb/atelet.proto | 14 +- internal/resources/validate.go | 16 ++ .../generated/ate.dev_actortemplates.yaml | 40 +++ .../generated/ate.dev_workerpools.yaml | 85 +++++++ pkg/api/v1alpha1/actortemplate_types.go | 40 +++ pkg/api/v1alpha1/workerpool_types.go | 39 +++ pkg/api/v1alpha1/zz_generated.deepcopy.go | 57 +++++ 15 files changed, 777 insertions(+), 81 deletions(-) diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index a604e1709..eaf660896 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -25,6 +25,7 @@ import ( "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" "github.com/agent-substrate/substrate/internal/proto/ateletpb" + "github.com/agent-substrate/substrate/internal/resources" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -241,6 +242,12 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, return err } + // Validate that the ActorTemplate's volume references are satisfied by the + // assigned WorkerPool's storageVolumes. + if err := s.validateVolumeRefs(ctx, state); err != nil { + return err + } + if state.Actor.GetLatestSnapshotInfo().GetType() != ateapipb.SnapshotType_SNAPSHOT_TYPE_UNSPECIFIED { slog.InfoContext(ctx, "Actor has snapshot; Restoring from snapshot") @@ -329,6 +336,33 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff { return nil } +// validateVolumeRefs checks that every volume referenced by the ActorTemplate +// exists in the assigned WorkerPool's storageVolumes. +func (s *CallAteletRestoreStep) validateVolumeRefs(ctx context.Context, state *ResumeState) error { + if len(state.ActorTemplate.Spec.Volumes) == 0 { + return nil + } + pool, err := s.workerPoolLister.WorkerPools(state.Actor.GetAteomPodNamespace()).Get(state.Actor.GetWorkerPoolName()) + if err != nil { + return fmt.Errorf("while getting worker pool for volume validation: %w", err) + } + poolVolumes := make(map[string]bool, len(pool.Spec.StorageVolumes)) + for _, sv := range pool.Spec.StorageVolumes { + poolVolumes[sv.Name] = true + } + for _, v := range state.ActorTemplate.Spec.Volumes { + if !poolVolumes[v.Name] { + return status.Errorf(codes.FailedPrecondition, + "actor template references volume %q not provided by worker pool %s", + v.Name, pool.Name) + } + if err := resources.ValidateVolumeMountPath(v.MountPath); err != nil { + return status.Errorf(codes.InvalidArgument, "invalid volume mount: %v", err) + } + } + return nil +} + type FinalizeRunningStep struct { store store.Interface } diff --git a/cmd/ateapi/internal/controlapi/workload_spec.go b/cmd/ateapi/internal/controlapi/workload_spec.go index 018fb444f..3fba07756 100644 --- a/cmd/ateapi/internal/controlapi/workload_spec.go +++ b/cmd/ateapi/internal/controlapi/workload_spec.go @@ -60,6 +60,16 @@ func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.In workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr) } + // Convert ActorTemplate volumes to proto VolumeMounts. + for _, v := range actorTemplate.Spec.Volumes { + workloadSpec.VolumeMounts = append(workloadSpec.VolumeMounts, &ateletpb.VolumeMount{ + Name: v.Name, + MountPath: v.MountPath, + ReadOnly: v.ReadOnly, + SubPath: v.SubPath, + }) + } + return workloadSpec, nil } diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply.go b/cmd/atecontroller/internal/controllers/workerpool_apply.go index e9ce1f7a1..e656f6b8f 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_apply.go +++ b/cmd/atecontroller/internal/controllers/workerpool_apply.go @@ -15,6 +15,8 @@ package controllers import ( + "path/filepath" + corev1 "k8s.io/api/core/v1" appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" @@ -59,6 +61,39 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment WithPath(ateompath.BasePath). WithType(corev1.HostPathDirectoryOrCreate))) + // Add storage volumes from WorkerPool spec. Each volume is mounted into + // the ateom container at a well-known path so atelet can create OCI bind + // mounts from it into actor sandboxes. + for _, sv := range wp.Spec.StorageVolumes { + vol := corev1ac.Volume().WithName(sv.Name) + switch { + case sv.NFS != nil: + vol.WithNFS(corev1ac.NFSVolumeSource(). + WithServer(sv.NFS.Server). + WithPath(sv.NFS.Path). + WithReadOnly(sv.NFS.ReadOnly)) + case sv.PersistentVolumeClaim != nil: + vol.WithPersistentVolumeClaim(corev1ac.PersistentVolumeClaimVolumeSource(). + WithClaimName(sv.PersistentVolumeClaim.ClaimName). + WithReadOnly(sv.PersistentVolumeClaim.ReadOnly)) + case sv.HostPath != nil: + hp := corev1ac.HostPathVolumeSource().WithPath(sv.HostPath.Path) + if sv.HostPath.Type != nil { + hp.WithType(*sv.HostPath.Type) + } + vol.WithHostPath(hp) + } + podSpecAC.WithVolumes(vol) + + // Mount the volume into the ateom container at StorageVolumesPath/ + // so atelet can reference it when building OCI bind mounts. + mountPath := filepath.Join(ateompath.StorageVolumesPath, sv.Name) + containerAC.WithVolumeMounts( + corev1ac.VolumeMount(). + WithName(sv.Name). + WithMountPath(mountPath)) + } + applyWorkerPoolPodTemplate(podSpecAC, wp.Spec.Template) podSpecAC.WithContainers(containerAC) diff --git a/cmd/atelet/main.go b/cmd/atelet/main.go index 7094f7315..bf0587c26 100644 --- a/cmd/atelet/main.go +++ b/cmd/atelet/main.go @@ -608,6 +608,9 @@ func (s *AteomHerder) prepareOCIBundles( return fmt.Errorf("while writing actor identity file: %w", err) } + // Resolve volume mounts from proto to host-path-based configs. + volumeMounts := resolveVolumeMounts(spec.GetVolumeMounts(), actorID) + g, gCtx := errgroup.WithContext(ctx) // Pause container. @@ -625,7 +628,8 @@ func (s *AteomHerder) prepareOCIBundles( "io.kubernetes.cri.container-name": "pause", }, netnsPath, - "", // pause is sandbox infra; it gets no actor identity mount. + "", // pause is sandbox infra; it gets no actor identity mount. + nil, // pause gets no storage volume mounts. ); err != nil { return fmt.Errorf("while creating pause OCI bundle: %w", err) } @@ -655,6 +659,7 @@ func (s *AteomHerder) prepareOCIBundles( }, netnsPath, identityDir, + volumeMounts, ); err != nil { return fmt.Errorf("while creating %q OCI bundle: %w", ctr.GetName(), err) } @@ -665,6 +670,31 @@ func (s *AteomHerder) prepareOCIBundles( return g.Wait() } +// resolveVolumeMounts converts proto VolumeMounts to host-path-based mount +// configs. The host path is {ateompath.StorageVolumesPath}/{volumeName}, +// which is where the WorkerPool controller mounted the volume in the ateom +// container. The ${ACTOR_ID} placeholder in SubPath is replaced with the +// actor's actual ID. +func resolveVolumeMounts(protoMounts []*ateletpb.VolumeMount, actorID string) []VolumeMountConfig { + if len(protoMounts) == 0 { + return nil + } + mounts := make([]VolumeMountConfig, 0, len(protoMounts)) + for _, m := range protoMounts { + hostPath := filepath.Join(ateompath.StorageVolumesPath, m.GetName()) + subPath := m.GetSubPath() + subPath = strings.ReplaceAll(subPath, "${ACTOR_ID}", actorID) + mounts = append(mounts, VolumeMountConfig{ + Name: m.GetName(), + MountPath: m.GetMountPath(), + HostPath: hostPath, + SubPath: subPath, + ReadOnly: m.GetReadOnly(), + }) + } + return mounts +} + // dialAteom opens (or reuses) the gRPC connection to the target ateom // pod and returns an ateom client. func (s *AteomHerder) dialAteom(ctx context.Context, targetAteomUid string) (ateompb.AteomClient, error) { diff --git a/cmd/atelet/oci.go b/cmd/atelet/oci.go index 4451f6362..af6975b51 100644 --- a/cmd/atelet/oci.go +++ b/cmd/atelet/oci.go @@ -51,7 +51,19 @@ const ( ActorIDFileName = "actor-id" ) -func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string) error { +// VolumeMountConfig holds the resolved configuration for a single volume +// mount to be added to an actor container's OCI spec. HostPath is the +// absolute path on the host (inside the ateom container) where the storage +// volume is mounted. +type VolumeMountConfig struct { + Name string + MountPath string + HostPath string + SubPath string + ReadOnly bool +} + +func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) error { tracer := otel.Tracer("prepareOCIDirectory") ctx, span := tracer.Start(ctx, "prepareOCIDirectory") @@ -88,7 +100,14 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP } } - ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir) + // Create mount point directories in rootfs for each storage volume mount. + for _, vm := range volumeMounts { + if err := createMountPoint(rootPath, vm.MountPath); err != nil { + return fmt.Errorf("while creating volume mount point %q: %w", vm.MountPath, err) + } + } + + ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir, volumeMounts) ociSpecBytes, err := json.MarshalIndent(ociSpec, "", " ") if err != nil { return fmt.Errorf("while marshaling OCI spec: %w", err) @@ -105,7 +124,8 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP // When identityDir is non-empty it adds a read-only bind mount of that host // directory at IdentityMountPath so the actor can read its own ID (see // IdentityMountPath for why this is a bind mount rather than env vars). -func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string) *specs.Spec { +// volumeMounts are additional bind mounts from WorkerPool storage volumes. +func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string, volumeMounts []VolumeMountConfig) *specs.Spec { envVars := []string{ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", } @@ -149,6 +169,24 @@ func buildActorOCISpec(args []string, env []string, annotations map[string]strin }) } + // Add storage volume bind mounts from WorkerPool. + for _, vm := range volumeMounts { + options := []string{"bind"} + if vm.ReadOnly { + options = append(options, "ro") + } + source := vm.HostPath + if vm.SubPath != "" { + source = filepath.Join(source, vm.SubPath) + } + mounts = append(mounts, specs.Mount{ + Destination: vm.MountPath, + Type: "bind", + Source: source, + Options: options, + }) + } + return &specs.Spec{ Process: &specs.Process{ User: specs.User{ diff --git a/cmd/atelet/oci_test.go b/cmd/atelet/oci_test.go index 5b53f2520..e2bda6ad3 100644 --- a/cmd/atelet/oci_test.go +++ b/cmd/atelet/oci_test.go @@ -89,6 +89,7 @@ func TestBuildActorOCISpec_IdentityMount(t *testing.T) { map[string]string{"k": "v"}, "/run/netns/x", "/host/actors/ns:tmpl:id/identity", + nil, ) found := false for _, m := range spec.Mounts { @@ -113,7 +114,7 @@ func TestBuildActorOCISpec_IdentityMount(t *testing.T) { // Without an identity dir (the pause container), no identity mount appears. func TestBuildActorOCISpec_NoIdentityMountForPause(t *testing.T) { - bare := buildActorOCISpec([]string{"/pause"}, nil, nil, "/run/netns/x", "") + bare := buildActorOCISpec([]string{"/pause"}, nil, nil, "/run/netns/x", "", nil) for _, m := range bare.Mounts { if m.Destination == IdentityMountPath { t.Errorf("identity mount must be absent when identityDir is empty") @@ -526,3 +527,172 @@ func TestUntar_TruncatedArchive(t *testing.T) { t.Errorf("error = %v, want it to surface the underlying tar/copy error", err) } } + +// --- Volume mount tests --- + +func TestBuildActorOCISpec_VolumeMounts(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + { + Name: "shared-data", + MountPath: "/workspace", + HostPath: "/var/lib/ateom-storage/shared-data", + ReadOnly: false, + }, + { + Name: "model-cache", + MountPath: "/models", + HostPath: "/var/lib/ateom-storage/model-cache", + ReadOnly: true, + }, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "/var/lib/ateom-gvisor/actors/ns:tmpl:id/identity", + volumeMounts, + ) + + // 4 standard + 1 identity + 2 volume mounts = 7 + if got := len(spec.Mounts); got != 7 { + t.Fatalf("mount count = %d, want 7; mounts: %+v", got, spec.Mounts) + } + + // Check shared-data mount (read-write) + sharedDataMount := spec.Mounts[5] + if sharedDataMount.Destination != "/workspace" { + t.Errorf("shared-data mount destination = %q, want %q", sharedDataMount.Destination, "/workspace") + } + if sharedDataMount.Source != "/var/lib/ateom-storage/shared-data" { + t.Errorf("shared-data mount source = %q, want %q", sharedDataMount.Source, "/var/lib/ateom-storage/shared-data") + } + if sharedDataMount.Type != "bind" { + t.Errorf("shared-data mount type = %q, want %q", sharedDataMount.Type, "bind") + } + if slices.Contains(sharedDataMount.Options, "ro") { + t.Errorf("shared-data mount should NOT be read-only, options=%v", sharedDataMount.Options) + } + + // Check model-cache mount (read-only) + modelCacheMount := spec.Mounts[6] + if modelCacheMount.Destination != "/models" { + t.Errorf("model-cache mount destination = %q, want %q", modelCacheMount.Destination, "/models") + } + if !slices.Contains(modelCacheMount.Options, "ro") { + t.Errorf("model-cache mount must be read-only, options=%v", modelCacheMount.Options) + } +} + +func TestBuildActorOCISpec_VolumeMountWithSubPath(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + { + Name: "shared-data", + MountPath: "/workspace", + HostPath: "/var/lib/ateom-storage/shared-data", + SubPath: "actor-123/workspace", + ReadOnly: false, + }, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "", + volumeMounts, + ) + + // 4 standard + 1 volume mount = 5 + mount := spec.Mounts[4] + wantSource := "/var/lib/ateom-storage/shared-data/actor-123/workspace" + if mount.Source != wantSource { + t.Errorf("mount source = %q, want %q", mount.Source, wantSource) + } + if mount.Destination != "/workspace" { + t.Errorf("mount destination = %q, want %q", mount.Destination, "/workspace") + } +} + +func TestBuildActorOCISpec_NoVolumeMounts(t *testing.T) { + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "", + nil, + ) + + // 4 standard mounts only + if got := len(spec.Mounts); got != 4 { + t.Errorf("mount count = %d, want 4", got) + } + for _, m := range spec.Mounts { + if m.Destination == "/workspace" || m.Destination == "/models" { + t.Errorf("unexpected volume mount at %q", m.Destination) + } + } +} + +func TestBuildActorOCISpec_MountOrder(t *testing.T) { + volumeMounts := []VolumeMountConfig{ + {Name: "vol1", MountPath: "/a", HostPath: "/host/a"}, + {Name: "vol2", MountPath: "/b", HostPath: "/host/b"}, + } + + spec := buildActorOCISpec( + []string{"/app"}, + nil, + nil, + "/run/netns/test", + "/identity/dir", + volumeMounts, + ) + + // Expected order: proc, dev, sys, resolv.conf, identity, vol1, vol2 + wantDests := []string{ + "/proc", "/dev", "/sys", "/etc/resolv.conf", + IdentityMountPath, "/a", "/b", + } + + if len(spec.Mounts) != len(wantDests) { + t.Fatalf("mount count = %d, want %d", len(spec.Mounts), len(wantDests)) + } + + for i, want := range wantDests { + if spec.Mounts[i].Destination != want { + t.Errorf("mount[%d].Destination = %q, want %q", i, spec.Mounts[i].Destination, want) + } + } +} + +func TestCreateMountPoint_VolumePath(t *testing.T) { + root := t.TempDir() + if err := createMountPoint(root, "/workspace"); err != nil { + t.Fatalf("createMountPoint: %v", err) + } + info, err := os.Stat(filepath.Join(root, "workspace")) + if err != nil { + t.Fatalf("mount point not created: %v", err) + } + if !info.IsDir() { + t.Errorf("mount point must be a directory") + } +} + +func TestCreateMountPoint_NestedVolumePath(t *testing.T) { + root := t.TempDir() + if err := createMountPoint(root, "/data/nested/path"); err != nil { + t.Fatalf("createMountPoint: %v", err) + } + info, err := os.Stat(filepath.Join(root, "data", "nested", "path")) + if err != nil { + t.Fatalf("nested mount point not created: %v", err) + } + if !info.IsDir() { + t.Errorf("nested mount point must be a directory") + } +} diff --git a/internal/ateompath/ateompath.go b/internal/ateompath/ateompath.go index a779755c4..a611f918f 100644 --- a/internal/ateompath/ateompath.go +++ b/internal/ateompath/ateompath.go @@ -23,6 +23,11 @@ const ( // The base path. This is both the path of the root shared folder on the // host filesystem, and when it is mounted into ateom and atelet containers. BasePath = "/var/lib/ateom-gvisor" + + // StorageVolumesPath is the base directory inside the ateom container where + // WorkerPool storage volumes are mounted. Each named volume from the + // WorkerPool's storageVolumes appears as a subdirectory here. + StorageVolumesPath = "/var/lib/ateom-storage" ) var ( diff --git a/internal/proto/ateletpb/atelet.pb.go b/internal/proto/ateletpb/atelet.pb.go index 03b0fe750..b88007e6a 100644 --- a/internal/proto/ateletpb/atelet.pb.go +++ b/internal/proto/ateletpb/atelet.pb.go @@ -332,18 +332,89 @@ func (x *SandboxAssets) GetAssets() map[string]*ArchAssets { return nil } +// VolumeMount describes a single storage volume mount for an actor container. +// The volume name references a storage volume defined on the WorkerPool. +type VolumeMount struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // References a WorkerPool storageVolume name + MountPath string `protobuf:"bytes,2,opt,name=mount_path,json=mountPath,proto3" json:"mount_path,omitempty"` // Absolute path inside the container + ReadOnly bool `protobuf:"varint,3,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"` // Mount as read-only + SubPath string `protobuf:"bytes,4,opt,name=sub_path,json=subPath,proto3" json:"sub_path,omitempty"` // Optional sub-directory within the volume + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VolumeMount) Reset() { + *x = VolumeMount{} + mi := &file_atelet_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VolumeMount) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VolumeMount) ProtoMessage() {} + +func (x *VolumeMount) ProtoReflect() protoreflect.Message { + mi := &file_atelet_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VolumeMount.ProtoReflect.Descriptor instead. +func (*VolumeMount) Descriptor() ([]byte, []int) { + return file_atelet_proto_rawDescGZIP(), []int{4} +} + +func (x *VolumeMount) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *VolumeMount) GetMountPath() string { + if x != nil { + return x.MountPath + } + return "" +} + +func (x *VolumeMount) GetReadOnly() bool { + if x != nil { + return x.ReadOnly + } + return false +} + +func (x *VolumeMount) GetSubPath() string { + if x != nil { + return x.SubPath + } + return "" +} + // WorkloadSpec parallels Pod, but with far fewer configurable fields. type WorkloadSpec struct { state protoimpl.MessageState `protogen:"open.v1"` Containers []*Container `protobuf:"bytes,1,rep,name=containers,proto3" json:"containers,omitempty"` PauseImage string `protobuf:"bytes,2,opt,name=pause_image,json=pauseImage,proto3" json:"pause_image,omitempty"` + VolumeMounts []*VolumeMount `protobuf:"bytes,3,rep,name=volume_mounts,json=volumeMounts,proto3" json:"volume_mounts,omitempty"` // Storage volume mounts unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *WorkloadSpec) Reset() { *x = WorkloadSpec{} - mi := &file_atelet_proto_msgTypes[4] + mi := &file_atelet_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +426,7 @@ func (x *WorkloadSpec) String() string { func (*WorkloadSpec) ProtoMessage() {} func (x *WorkloadSpec) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[4] + mi := &file_atelet_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +439,7 @@ func (x *WorkloadSpec) ProtoReflect() protoreflect.Message { // Deprecated: Use WorkloadSpec.ProtoReflect.Descriptor instead. func (*WorkloadSpec) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{4} + return file_atelet_proto_rawDescGZIP(), []int{5} } func (x *WorkloadSpec) GetContainers() []*Container { @@ -385,6 +456,13 @@ func (x *WorkloadSpec) GetPauseImage() string { return "" } +func (x *WorkloadSpec) GetVolumeMounts() []*VolumeMount { + if x != nil { + return x.VolumeMounts + } + return nil +} + type Container struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -397,7 +475,7 @@ type Container struct { func (x *Container) Reset() { *x = Container{} - mi := &file_atelet_proto_msgTypes[5] + mi := &file_atelet_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -409,7 +487,7 @@ func (x *Container) String() string { func (*Container) ProtoMessage() {} func (x *Container) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[5] + mi := &file_atelet_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -422,7 +500,7 @@ func (x *Container) ProtoReflect() protoreflect.Message { // Deprecated: Use Container.ProtoReflect.Descriptor instead. func (*Container) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{5} + return file_atelet_proto_rawDescGZIP(), []int{6} } func (x *Container) GetName() string { @@ -463,7 +541,7 @@ type EnvEntry struct { func (x *EnvEntry) Reset() { *x = EnvEntry{} - mi := &file_atelet_proto_msgTypes[6] + mi := &file_atelet_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -475,7 +553,7 @@ func (x *EnvEntry) String() string { func (*EnvEntry) ProtoMessage() {} func (x *EnvEntry) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[6] + mi := &file_atelet_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -488,7 +566,7 @@ func (x *EnvEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use EnvEntry.ProtoReflect.Descriptor instead. func (*EnvEntry) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{6} + return file_atelet_proto_rawDescGZIP(), []int{7} } func (x *EnvEntry) GetName() string { @@ -513,7 +591,7 @@ type RunResponse struct { func (x *RunResponse) Reset() { *x = RunResponse{} - mi := &file_atelet_proto_msgTypes[7] + mi := &file_atelet_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -525,7 +603,7 @@ func (x *RunResponse) String() string { func (*RunResponse) ProtoMessage() {} func (x *RunResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[7] + mi := &file_atelet_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -538,7 +616,7 @@ func (x *RunResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RunResponse.ProtoReflect.Descriptor instead. func (*RunResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{7} + return file_atelet_proto_rawDescGZIP(), []int{8} } type LocalCheckpointConfiguration struct { @@ -552,7 +630,7 @@ type LocalCheckpointConfiguration struct { func (x *LocalCheckpointConfiguration) Reset() { *x = LocalCheckpointConfiguration{} - mi := &file_atelet_proto_msgTypes[8] + mi := &file_atelet_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -564,7 +642,7 @@ func (x *LocalCheckpointConfiguration) String() string { func (*LocalCheckpointConfiguration) ProtoMessage() {} func (x *LocalCheckpointConfiguration) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[8] + mi := &file_atelet_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -577,7 +655,7 @@ func (x *LocalCheckpointConfiguration) ProtoReflect() protoreflect.Message { // Deprecated: Use LocalCheckpointConfiguration.ProtoReflect.Descriptor instead. func (*LocalCheckpointConfiguration) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{8} + return file_atelet_proto_rawDescGZIP(), []int{9} } func (x *LocalCheckpointConfiguration) GetSnapshotPrefix() string { @@ -606,7 +684,7 @@ type ExternalCheckpointConfiguration struct { func (x *ExternalCheckpointConfiguration) Reset() { *x = ExternalCheckpointConfiguration{} - mi := &file_atelet_proto_msgTypes[9] + mi := &file_atelet_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -618,7 +696,7 @@ func (x *ExternalCheckpointConfiguration) String() string { func (*ExternalCheckpointConfiguration) ProtoMessage() {} func (x *ExternalCheckpointConfiguration) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[9] + mi := &file_atelet_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -631,7 +709,7 @@ func (x *ExternalCheckpointConfiguration) ProtoReflect() protoreflect.Message { // Deprecated: Use ExternalCheckpointConfiguration.ProtoReflect.Descriptor instead. func (*ExternalCheckpointConfiguration) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{9} + return file_atelet_proto_rawDescGZIP(), []int{10} } func (x *ExternalCheckpointConfiguration) GetSnapshotUriPrefix() string { @@ -662,7 +740,7 @@ type CheckpointRequest struct { func (x *CheckpointRequest) Reset() { *x = CheckpointRequest{} - mi := &file_atelet_proto_msgTypes[10] + mi := &file_atelet_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -674,7 +752,7 @@ func (x *CheckpointRequest) String() string { func (*CheckpointRequest) ProtoMessage() {} func (x *CheckpointRequest) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[10] + mi := &file_atelet_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -687,7 +765,7 @@ func (x *CheckpointRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckpointRequest.ProtoReflect.Descriptor instead. func (*CheckpointRequest) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{10} + return file_atelet_proto_rawDescGZIP(), []int{11} } func (x *CheckpointRequest) GetTargetAteomUid() string { @@ -781,7 +859,7 @@ type CheckpointResponse struct { func (x *CheckpointResponse) Reset() { *x = CheckpointResponse{} - mi := &file_atelet_proto_msgTypes[11] + mi := &file_atelet_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -793,7 +871,7 @@ func (x *CheckpointResponse) String() string { func (*CheckpointResponse) ProtoMessage() {} func (x *CheckpointResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[11] + mi := &file_atelet_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -806,7 +884,7 @@ func (x *CheckpointResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckpointResponse.ProtoReflect.Descriptor instead. func (*CheckpointResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{11} + return file_atelet_proto_rawDescGZIP(), []int{12} } type RestoreRequest struct { @@ -830,7 +908,7 @@ type RestoreRequest struct { func (x *RestoreRequest) Reset() { *x = RestoreRequest{} - mi := &file_atelet_proto_msgTypes[12] + mi := &file_atelet_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -842,7 +920,7 @@ func (x *RestoreRequest) String() string { func (*RestoreRequest) ProtoMessage() {} func (x *RestoreRequest) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[12] + mi := &file_atelet_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -855,7 +933,7 @@ func (x *RestoreRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RestoreRequest.ProtoReflect.Descriptor instead. func (*RestoreRequest) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{12} + return file_atelet_proto_rawDescGZIP(), []int{13} } func (x *RestoreRequest) GetTargetAteomUid() string { @@ -949,7 +1027,7 @@ type RestoreResponse struct { func (x *RestoreResponse) Reset() { *x = RestoreResponse{} - mi := &file_atelet_proto_msgTypes[13] + mi := &file_atelet_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -961,7 +1039,7 @@ func (x *RestoreResponse) String() string { func (*RestoreResponse) ProtoMessage() {} func (x *RestoreResponse) ProtoReflect() protoreflect.Message { - mi := &file_atelet_proto_msgTypes[13] + mi := &file_atelet_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -974,7 +1052,7 @@ func (x *RestoreResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RestoreResponse.ProtoReflect.Descriptor instead. func (*RestoreResponse) Descriptor() ([]byte, []int) { - return file_atelet_proto_rawDescGZIP(), []int{13} + return file_atelet_proto_rawDescGZIP(), []int{14} } var File_atelet_proto protoreflect.FileDescriptor @@ -1005,13 +1083,20 @@ const file_atelet_proto_rawDesc = "" + "\x06assets\x18\x02 \x03(\v2!.atelet.SandboxAssets.AssetsEntryR\x06assets\x1aM\n" + "\vAssetsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12(\n" + - "\x05value\x18\x02 \x01(\v2\x12.atelet.ArchAssetsR\x05value:\x028\x01\"b\n" + + "\x05value\x18\x02 \x01(\v2\x12.atelet.ArchAssetsR\x05value:\x028\x01\"x\n" + + "\vVolumeMount\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1d\n" + + "\n" + + "mount_path\x18\x02 \x01(\tR\tmountPath\x12\x1b\n" + + "\tread_only\x18\x03 \x01(\bR\breadOnly\x12\x19\n" + + "\bsub_path\x18\x04 \x01(\tR\asubPath\"\x9c\x01\n" + "\fWorkloadSpec\x121\n" + "\n" + "containers\x18\x01 \x03(\v2\x11.atelet.ContainerR\n" + "containers\x12\x1f\n" + "\vpause_image\x18\x02 \x01(\tR\n" + - "pauseImage\"s\n" + + "pauseImage\x128\n" + + "\rvolume_mounts\x18\x03 \x03(\v2\x13.atelet.VolumeMountR\fvolumeMounts\"s\n" + "\tContainer\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" + "\x05image\x18\x02 \x01(\tR\x05image\x12\x18\n" + @@ -1072,54 +1157,56 @@ func file_atelet_proto_rawDescGZIP() []byte { } var file_atelet_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_atelet_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_atelet_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_atelet_proto_goTypes = []any{ (CheckpointType)(0), // 0: atelet.CheckpointType (*RunRequest)(nil), // 1: atelet.RunRequest (*AssetFile)(nil), // 2: atelet.AssetFile (*ArchAssets)(nil), // 3: atelet.ArchAssets (*SandboxAssets)(nil), // 4: atelet.SandboxAssets - (*WorkloadSpec)(nil), // 5: atelet.WorkloadSpec - (*Container)(nil), // 6: atelet.Container - (*EnvEntry)(nil), // 7: atelet.EnvEntry - (*RunResponse)(nil), // 8: atelet.RunResponse - (*LocalCheckpointConfiguration)(nil), // 9: atelet.LocalCheckpointConfiguration - (*ExternalCheckpointConfiguration)(nil), // 10: atelet.ExternalCheckpointConfiguration - (*CheckpointRequest)(nil), // 11: atelet.CheckpointRequest - (*CheckpointResponse)(nil), // 12: atelet.CheckpointResponse - (*RestoreRequest)(nil), // 13: atelet.RestoreRequest - (*RestoreResponse)(nil), // 14: atelet.RestoreResponse - nil, // 15: atelet.ArchAssets.FilesEntry - nil, // 16: atelet.SandboxAssets.AssetsEntry + (*VolumeMount)(nil), // 5: atelet.VolumeMount + (*WorkloadSpec)(nil), // 6: atelet.WorkloadSpec + (*Container)(nil), // 7: atelet.Container + (*EnvEntry)(nil), // 8: atelet.EnvEntry + (*RunResponse)(nil), // 9: atelet.RunResponse + (*LocalCheckpointConfiguration)(nil), // 10: atelet.LocalCheckpointConfiguration + (*ExternalCheckpointConfiguration)(nil), // 11: atelet.ExternalCheckpointConfiguration + (*CheckpointRequest)(nil), // 12: atelet.CheckpointRequest + (*CheckpointResponse)(nil), // 13: atelet.CheckpointResponse + (*RestoreRequest)(nil), // 14: atelet.RestoreRequest + (*RestoreResponse)(nil), // 15: atelet.RestoreResponse + nil, // 16: atelet.ArchAssets.FilesEntry + nil, // 17: atelet.SandboxAssets.AssetsEntry } var file_atelet_proto_depIdxs = []int32{ - 5, // 0: atelet.RunRequest.spec:type_name -> atelet.WorkloadSpec + 6, // 0: atelet.RunRequest.spec:type_name -> atelet.WorkloadSpec 4, // 1: atelet.RunRequest.sandbox_assets:type_name -> atelet.SandboxAssets - 15, // 2: atelet.ArchAssets.files:type_name -> atelet.ArchAssets.FilesEntry - 16, // 3: atelet.SandboxAssets.assets:type_name -> atelet.SandboxAssets.AssetsEntry - 6, // 4: atelet.WorkloadSpec.containers:type_name -> atelet.Container - 7, // 5: atelet.Container.env:type_name -> atelet.EnvEntry - 5, // 6: atelet.CheckpointRequest.spec:type_name -> atelet.WorkloadSpec - 0, // 7: atelet.CheckpointRequest.type:type_name -> atelet.CheckpointType - 9, // 8: atelet.CheckpointRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration - 10, // 9: atelet.CheckpointRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration - 5, // 10: atelet.RestoreRequest.spec:type_name -> atelet.WorkloadSpec - 0, // 11: atelet.RestoreRequest.type:type_name -> atelet.CheckpointType - 9, // 12: atelet.RestoreRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration - 10, // 13: atelet.RestoreRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration - 2, // 14: atelet.ArchAssets.FilesEntry.value:type_name -> atelet.AssetFile - 3, // 15: atelet.SandboxAssets.AssetsEntry.value:type_name -> atelet.ArchAssets - 1, // 16: atelet.AteomHerder.Run:input_type -> atelet.RunRequest - 11, // 17: atelet.AteomHerder.Checkpoint:input_type -> atelet.CheckpointRequest - 13, // 18: atelet.AteomHerder.Restore:input_type -> atelet.RestoreRequest - 8, // 19: atelet.AteomHerder.Run:output_type -> atelet.RunResponse - 12, // 20: atelet.AteomHerder.Checkpoint:output_type -> atelet.CheckpointResponse - 14, // 21: atelet.AteomHerder.Restore:output_type -> atelet.RestoreResponse - 19, // [19:22] is the sub-list for method output_type - 16, // [16:19] is the sub-list for method input_type - 16, // [16:16] is the sub-list for extension type_name - 16, // [16:16] is the sub-list for extension extendee - 0, // [0:16] is the sub-list for field type_name + 16, // 2: atelet.ArchAssets.files:type_name -> atelet.ArchAssets.FilesEntry + 17, // 3: atelet.SandboxAssets.assets:type_name -> atelet.SandboxAssets.AssetsEntry + 7, // 4: atelet.WorkloadSpec.containers:type_name -> atelet.Container + 5, // 5: atelet.WorkloadSpec.volume_mounts:type_name -> atelet.VolumeMount + 8, // 6: atelet.Container.env:type_name -> atelet.EnvEntry + 6, // 7: atelet.CheckpointRequest.spec:type_name -> atelet.WorkloadSpec + 0, // 8: atelet.CheckpointRequest.type:type_name -> atelet.CheckpointType + 10, // 9: atelet.CheckpointRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration + 11, // 10: atelet.CheckpointRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration + 6, // 11: atelet.RestoreRequest.spec:type_name -> atelet.WorkloadSpec + 0, // 12: atelet.RestoreRequest.type:type_name -> atelet.CheckpointType + 10, // 13: atelet.RestoreRequest.local_config:type_name -> atelet.LocalCheckpointConfiguration + 11, // 14: atelet.RestoreRequest.external_config:type_name -> atelet.ExternalCheckpointConfiguration + 2, // 15: atelet.ArchAssets.FilesEntry.value:type_name -> atelet.AssetFile + 3, // 16: atelet.SandboxAssets.AssetsEntry.value:type_name -> atelet.ArchAssets + 1, // 17: atelet.AteomHerder.Run:input_type -> atelet.RunRequest + 12, // 18: atelet.AteomHerder.Checkpoint:input_type -> atelet.CheckpointRequest + 14, // 19: atelet.AteomHerder.Restore:input_type -> atelet.RestoreRequest + 9, // 20: atelet.AteomHerder.Run:output_type -> atelet.RunResponse + 13, // 21: atelet.AteomHerder.Checkpoint:output_type -> atelet.CheckpointResponse + 15, // 22: atelet.AteomHerder.Restore:output_type -> atelet.RestoreResponse + 20, // [20:23] is the sub-list for method output_type + 17, // [17:20] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_atelet_proto_init() } @@ -1127,11 +1214,11 @@ func file_atelet_proto_init() { if File_atelet_proto != nil { return } - file_atelet_proto_msgTypes[10].OneofWrappers = []any{ + file_atelet_proto_msgTypes[11].OneofWrappers = []any{ (*CheckpointRequest_LocalConfig)(nil), (*CheckpointRequest_ExternalConfig)(nil), } - file_atelet_proto_msgTypes[12].OneofWrappers = []any{ + file_atelet_proto_msgTypes[13].OneofWrappers = []any{ (*RestoreRequest_LocalConfig)(nil), (*RestoreRequest_ExternalConfig)(nil), } @@ -1141,7 +1228,7 @@ func file_atelet_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_atelet_proto_rawDesc), len(file_atelet_proto_rawDesc)), NumEnums: 1, - NumMessages: 16, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/proto/ateletpb/atelet.proto b/internal/proto/ateletpb/atelet.proto index d210656d6..a965888ec 100644 --- a/internal/proto/ateletpb/atelet.proto +++ b/internal/proto/ateletpb/atelet.proto @@ -73,10 +73,20 @@ message SandboxAssets { map assets = 2; // arch -> {name -> file} } +// VolumeMount describes a single storage volume mount for an actor container. +// The volume name references a storage volume defined on the WorkerPool. +message VolumeMount { + string name = 1; // References a WorkerPool storageVolume name + string mount_path = 2; // Absolute path inside the container + bool read_only = 3; // Mount as read-only + string sub_path = 4; // Optional sub-directory within the volume +} + // WorkloadSpec parallels Pod, but with far fewer configurable fields. message WorkloadSpec { - repeated Container containers = 1; - string pause_image = 2; + repeated Container containers = 1; + string pause_image = 2; + repeated VolumeMount volume_mounts = 3; // Storage volume mounts } message Container { diff --git a/internal/resources/validate.go b/internal/resources/validate.go index 3855fe856..147fdcf97 100644 --- a/internal/resources/validate.go +++ b/internal/resources/validate.go @@ -136,3 +136,19 @@ func ValidateSnapshotURIPrefix(prefix string) error { } return nil } + +// ValidateVolumeMountPath ensures a volume mount path is an absolute path that +// does not contain traversal sequences. The path is used as the mount +// destination inside the OCI bundle rootfs, so it must be safe. +func ValidateVolumeMountPath(mountPath string) error { + if mountPath == "" { + return fmt.Errorf("volume mount path must not be empty") + } + if mountPath[0] != '/' { + return fmt.Errorf("volume mount path %q must be absolute", mountPath) + } + if strings.Contains(mountPath, "..") { + return fmt.Errorf("volume mount path %q must not contain '..'", mountPath) + } + return nil +} diff --git a/manifests/ate-install/generated/ate.dev_actortemplates.yaml b/manifests/ate-install/generated/ate.dev_actortemplates.yaml index 4fcb59ac2..f81b1bf80 100644 --- a/manifests/ate-install/generated/ate.dev_actortemplates.yaml +++ b/manifests/ate-install/generated/ate.dev_actortemplates.yaml @@ -177,6 +177,46 @@ spec: required: - location type: object + volumes: + description: |- + Volumes defines storage volumes available to this actor's containers. + Each volume references a name defined in the assigned WorkerPool's + storageVolumes. The actor cannot request storage the pool doesn't provide. + items: + description: |- + ActorVolume maps a WorkerPool storage volume into the actor's containers. + The name must match a volume defined in the assigned WorkerPool's + storageVolumes field. + properties: + mountPath: + description: |- + MountPath is the absolute path inside the actor's containers where + the volume will be mounted. + pattern: ^/ + type: string + name: + description: Name of the volume (must match a WorkerPool storageVolume + name). + minLength: 1 + type: string + readOnly: + description: ReadOnly forces the volume to be read-only inside + the actor. + type: boolean + subPath: + description: |- + SubPath is an optional sub-directory within the volume to mount + instead of the whole volume. The literal "${ACTOR_ID}" is replaced + with the actor's ID at mount time, enabling per-actor subdirectories + on shared storage. + type: string + required: + - mountPath + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-type: atomic workerSelector: description: |- WorkerSelector restricts which worker pools actors from this template may diff --git a/manifests/ate-install/generated/ate.dev_workerpools.yaml b/manifests/ate-install/generated/ate.dev_workerpools.yaml index 5714891ab..5f0fdb2cd 100644 --- a/manifests/ate-install/generated/ate.dev_workerpools.yaml +++ b/manifests/ate-install/generated/ate.dev_workerpools.yaml @@ -94,6 +94,91 @@ spec: this pool's SandboxClass. If empty, the default SandboxConfig for the SandboxClass is used. type: string + storageVolumes: + description: |- + StorageVolumes defines network or persistent storage volumes that will be + mounted into each worker pod. Actors can reference these by name via their + ActorTemplate's volumes field. + items: + description: |- + StorageVolume describes a network or persistent storage volume to be mounted + into each worker pod. Actors reference these by name via their ActorTemplate's + Volumes field. Exactly one source (NFS, PersistentVolumeClaim, or HostPath) + must be specified. + properties: + hostPath: + description: |- + HostPath represents a pre-existing host path (e.g., NFS already + mounted at a well-known path on all nodes). + properties: + path: + description: |- + path of the directory on the host. + If the path is a symlink, it will follow the link to the real path. + More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath + type: string + type: + description: |- + type for HostPath Volume + Defaults to "" + More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath + type: string + required: + - path + type: object + name: + description: Name of the volume. Referenced by ActorTemplate + volume entries. + maxLength: 63 + minLength: 1 + pattern: ^[a-zA-Z][a-zA-Z0-9-]*$ + type: string + nfs: + description: NFS represents an NFS mount on the host. + properties: + path: + description: |- + path that is exported by the NFS server. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: string + readOnly: + description: |- + readOnly here will force the NFS export to be mounted with read-only permissions. + Defaults to false. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: boolean + server: + description: |- + server is the hostname or IP address of the NFS server. + More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs + type: string + required: + - path + - server + type: object + persistentVolumeClaim: + description: PersistentVolumeClaim references a PVC in the worker + pool's namespace. + properties: + claimName: + description: |- + claimName is the name of a PersistentVolumeClaim in the same namespace as the pod using this volume. + More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#persistentvolumeclaims + type: string + readOnly: + description: |- + readOnly Will force the ReadOnly setting in VolumeMounts. + Default false. + type: boolean + required: + - claimName + type: object + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-type: atomic template: description: Template holds optional pod scheduling and resource settings for worker pods. diff --git a/pkg/api/v1alpha1/actortemplate_types.go b/pkg/api/v1alpha1/actortemplate_types.go index 38b777b71..42d8dd5dd 100644 --- a/pkg/api/v1alpha1/actortemplate_types.go +++ b/pkg/api/v1alpha1/actortemplate_types.go @@ -125,6 +125,37 @@ type SecretKeySelector struct { Optional *bool `json:"optional,omitempty"` } +// ActorVolume maps a WorkerPool storage volume into the actor's containers. +// The name must match a volume defined in the assigned WorkerPool's +// storageVolumes field. +type ActorVolume struct { + // Name of the volume (must match a WorkerPool storageVolume name). + // + // +required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // MountPath is the absolute path inside the actor's containers where + // the volume will be mounted. + // + // +required + // +kubebuilder:validation:Pattern=`^/` + MountPath string `json:"mountPath"` + + // ReadOnly forces the volume to be read-only inside the actor. + // + // +optional + ReadOnly bool `json:"readOnly,omitempty"` + + // SubPath is an optional sub-directory within the volume to mount + // instead of the whole volume. The literal "${ACTOR_ID}" is replaced + // with the actor's ID at mount time, enabling per-actor subdirectories + // on shared storage. + // + // +optional + SubPath string `json:"subPath,omitempty"` +} + type SnapshotsConfig struct { // Location to store snapshots in. // @@ -165,6 +196,15 @@ type ActorTemplateSpec struct { // // +optional WorkerSelector *metav1.LabelSelector `json:"workerSelector,omitempty"` + + // Volumes defines storage volumes available to this actor's containers. + // Each volume references a name defined in the assigned WorkerPool's + // storageVolumes. The actor cannot request storage the pool doesn't provide. + // + // +optional + // +kubebuilder:validation:MaxItems=16 + // +listType=atomic + Volumes []ActorVolume `json:"volumes,omitempty"` } // TODO: add validation diff --git a/pkg/api/v1alpha1/workerpool_types.go b/pkg/api/v1alpha1/workerpool_types.go index cb0818274..629290314 100644 --- a/pkg/api/v1alpha1/workerpool_types.go +++ b/pkg/api/v1alpha1/workerpool_types.go @@ -46,6 +46,36 @@ type WorkerPoolPodTemplate struct { NodeAffinity *corev1.NodeAffinity `json:"nodeAffinity,omitempty"` } +// StorageVolume describes a network or persistent storage volume to be mounted +// into each worker pod. Actors reference these by name via their ActorTemplate's +// Volumes field. Exactly one source (NFS, PersistentVolumeClaim, or HostPath) +// must be specified. +type StorageVolume struct { + // Name of the volume. Referenced by ActorTemplate volume entries. + // + // +required + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=63 + // +kubebuilder:validation:Pattern=`^[a-zA-Z][a-zA-Z0-9-]*$` + Name string `json:"name"` + + // NFS represents an NFS mount on the host. + // + // +optional + NFS *corev1.NFSVolumeSource `json:"nfs,omitempty"` + + // PersistentVolumeClaim references a PVC in the worker pool's namespace. + // + // +optional + PersistentVolumeClaim *corev1.PersistentVolumeClaimVolumeSource `json:"persistentVolumeClaim,omitempty"` + + // HostPath represents a pre-existing host path (e.g., NFS already + // mounted at a well-known path on all nodes). + // + // +optional + HostPath *corev1.HostPathVolumeSource `json:"hostPath,omitempty"` +} + type WorkerPoolSpec struct { // Replicas is the number of worker pods to run. // +required @@ -78,6 +108,15 @@ type WorkerPoolSpec struct { // SandboxClass is used. // +optional SandboxConfigName string `json:"sandboxConfigName,omitempty"` + + // StorageVolumes defines network or persistent storage volumes that will be + // mounted into each worker pod. Actors can reference these by name via their + // ActorTemplate's volumes field. + // + // +optional + // +kubebuilder:validation:MaxItems=16 + // +listType=atomic + StorageVolumes []StorageVolume `json:"storageVolumes,omitempty"` } type WorkerPoolStatus struct { diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 65cd9cdbb..1027f1800 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -99,6 +99,11 @@ func (in *ActorTemplateSpec) DeepCopyInto(out *ActorTemplateSpec) { *out = new(v1.LabelSelector) (*in).DeepCopyInto(*out) } + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + *out = make([]ActorVolume, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ActorTemplateSpec. @@ -134,6 +139,21 @@ func (in *ActorTemplateStatus) DeepCopy() *ActorTemplateStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ActorVolume) DeepCopyInto(out *ActorVolume) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ActorVolume. +func (in *ActorVolume) DeepCopy() *ActorVolume { + if in == nil { + return nil + } + out := new(ActorVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AssetFile) DeepCopyInto(out *AssetFile) { *out = *in @@ -347,6 +367,36 @@ func (in *SnapshotsConfig) DeepCopy() *SnapshotsConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StorageVolume) DeepCopyInto(out *StorageVolume) { + *out = *in + if in.NFS != nil { + in, out := &in.NFS, &out.NFS + *out = new(corev1.NFSVolumeSource) + **out = **in + } + if in.PersistentVolumeClaim != nil { + in, out := &in.PersistentVolumeClaim, &out.PersistentVolumeClaim + *out = new(corev1.PersistentVolumeClaimVolumeSource) + **out = **in + } + if in.HostPath != nil { + in, out := &in.HostPath, &out.HostPath + *out = new(corev1.HostPathVolumeSource) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageVolume. +func (in *StorageVolume) DeepCopy() *StorageVolume { + if in == nil { + return nil + } + out := new(StorageVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerPool) DeepCopyInto(out *WorkerPool) { *out = *in @@ -448,6 +498,13 @@ func (in *WorkerPoolSpec) DeepCopyInto(out *WorkerPoolSpec) { *out = new(WorkerPoolPodTemplate) (*in).DeepCopyInto(*out) } + if in.StorageVolumes != nil { + in, out := &in.StorageVolumes, &out.StorageVolumes + *out = make([]StorageVolume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolSpec.