diff --git a/docs/public/installation.md b/docs/public/installation.md index 242a0c17..380f7817 100644 --- a/docs/public/installation.md +++ b/docs/public/installation.md @@ -418,7 +418,15 @@ This sections describes all possible deploy parameters for PostgreSQL Backup Dae | backupDaemon.externalPv.storageClass | string | no | n/a | Specifies StorageClass of External PV. | | backupDaemon.priorityClassName | string | no | n/a | Specifies [Priority Class](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass). | | backupDaemon.affinity | json | no | n/a | Specifies the affinity scheduling rules. | -| backupDaemon.podLabels | yaml | no | n/a | Specifies custom pod labels. | +| backupDaemon.podLabels | yaml | no | n/a | Specifies custom pod labels. | +| backupDaemon.s3Aliases | list | no | [] | Array of S3 storage alias configurations. All entries are stored in a single Kubernetes Secret named s3-aliases, where each alias name is a separate data key with a JSON payload describing the S3 connection. Automatically filled from CLOUD_BACKUP_STORAGE_LOCATION if global.cloudIntegrationEnabled is enabled. | +| backupDaemon.s3Aliases[].name | string | yes | n/a | Unique alias name. Used as a top-level key inside `s3_aliases.json`. | +| backupDaemon.s3Aliases[].spec.storageBucket | string | yes | n/a | Specifies the name of the S3 bucket. | +| backupDaemon.s3Aliases[].spec.storageProvider | string | no | n/a | Specifies the storage provider type, for example `aws` or `minio`. | +| backupDaemon.s3Aliases[].spec.storageRegion | string | no | us-east-1 | Specifies the name of the region associated with the client. | +| backupDaemon.s3Aliases[].spec.storageServerUrl | string | yes | n/a | Specifies the URL address to S3 storage. | +| backupDaemon.s3Aliases[].spec.storageUsername | string | yes | n/a | Specifies S3 accessKeyId credential. | +| backupDaemon.s3Aliases[].secretContent.storagePassword | string | yes | n/a | Specifies S3 secretAccessKey credential. | ## metricCollector diff --git a/operator/api/apps/v1/postgresservice_types.go b/operator/api/apps/v1/postgresservice_types.go index d961aecf..ff0a6ea3 100644 --- a/operator/api/apps/v1/postgresservice_types.go +++ b/operator/api/apps/v1/postgresservice_types.go @@ -300,6 +300,7 @@ type BackupDaemon struct { SecurityContext v1.PodSecurityContext `json:"securityContext,omitempty"` PriorityClassName string `json:"priorityClassName,omitempty"` S3Storage *S3Storage `json:"s3Storage,omitempty"` + S3AliasesUsed bool `json:"s3AliasesUsed,omitempty"` PodLabels map[string]string `json:"podLabels,omitempty"` ExternalPv *ExternalPv `json:"externalPv,omitempty"` SslMode string `json:"sslMode,omitempty"` diff --git a/operator/charts/patroni-services/crds/netcracker.com_patroniservices.yaml b/operator/charts/patroni-services/crds/netcracker.com_patroniservices.yaml index e2143f0a..b631ca90 100644 --- a/operator/charts/patroni-services/crds/netcracker.com_patroniservices.yaml +++ b/operator/charts/patroni-services/crds/netcracker.com_patroniservices.yaml @@ -1073,6 +1073,8 @@ spec: type: object retainArchiveSettings: type: boolean + s3AliasesUsed: + type: boolean s3Storage: properties: accessKeyId: diff --git a/operator/charts/patroni-services/templates/_helpers.tpl b/operator/charts/patroni-services/templates/_helpers.tpl index 9ab3fbba..d194d98e 100644 --- a/operator/charts/patroni-services/templates/_helpers.tpl +++ b/operator/charts/patroni-services/templates/_helpers.tpl @@ -265,6 +265,22 @@ pg-{{ default "patroni" .Values.patroni.clusterName }}-direct {{- end }} {{- end -}} +{{/* +Effective backup daemon S3 aliases wrapped in a map: { items: [...] }. +When CLOUD_BACKUP_STORAGE_LOCATION is set and global.cloudIntegrationEnabled is true, +use cloud payload; otherwise use backupDaemon.s3Aliases from values. +Usage: (fromYaml (include "backupDaemon.s3Aliases" .)).items +*/}} +{{- define "backupDaemon.s3Aliases" -}} +{{- if and .Values.CLOUD_BACKUP_STORAGE_LOCATION .Values.global.cloudIntegrationEnabled -}} +items: {{ toYaml .Values.CLOUD_BACKUP_STORAGE_LOCATION | nindent 2 }} +{{- else if .Values.backupDaemon.s3Aliases -}} +items: {{ toYaml .Values.backupDaemon.s3Aliases | nindent 2 }} +{{- else -}} +items: [] +{{- end -}} +{{- end -}} + {{/* Postgres host for DBaaS adapter */}} diff --git a/operator/charts/patroni-services/templates/cr.yaml b/operator/charts/patroni-services/templates/cr.yaml index 89f9e5d7..7e46716d 100644 --- a/operator/charts/patroni-services/templates/cr.yaml +++ b/operator/charts/patroni-services/templates/cr.yaml @@ -120,6 +120,9 @@ spec: untrustedCert: {{ default "true" .Values.backupDaemon.s3Storage.untrustedCert }} region: {{ default "us-east-1" .Values.backupDaemon.s3Storage.region }} {{ end }} + {{- if (fromYaml (include "backupDaemon.s3Aliases" .)).items }} + s3AliasesUsed: true + {{- end }} {{ if .Values.backupDaemon.externalPv }} externalPv: {{ toYaml .Values.backupDaemon.externalPv | nindent 6 }} {{ end }} diff --git a/operator/charts/patroni-services/templates/secrets/s3-aliases-secret.yaml b/operator/charts/patroni-services/templates/secrets/s3-aliases-secret.yaml new file mode 100644 index 00000000..b2dbc011 --- /dev/null +++ b/operator/charts/patroni-services/templates/secrets/s3-aliases-secret.yaml @@ -0,0 +1,37 @@ +{{- if .Values.backupDaemon.install }} +{{- $s3Data := fromYaml (include "backupDaemon.s3Aliases" .) }} +{{- if $s3Data.items }} +{{- $aliases := dict }} +{{- range $s3Data.items }} +{{- $out := dict }} + +{{- if .spec }} +{{- $out = merge $out (omit .spec "storageBucket" "storageUsername" "storageRegion" "storageServerUrl") }} +{{- if .spec.storageBucket }}{{- $out = set $out "bucketName" .spec.storageBucket }}{{- end }} +{{- if .spec.storageUsername }}{{- $out = set $out "accessKeyId" .spec.storageUsername }}{{- end }} +{{- $out = set $out "region" (default "us-east-1" .spec.storageRegion) }} +{{- if .spec.storageServerUrl }}{{- $out = set $out "s3Url" .spec.storageServerUrl }}{{- end }} +{{- end }} + +{{- if .secretContent }} +{{- $out = merge $out (omit .secretContent "storagePassword") }} +{{- if .secretContent.storagePassword }}{{- $out = set $out "accessKeySecret" .secretContent.storagePassword }}{{- end }} +{{- end }} + +{{- $aliases = set $aliases .name $out }} +{{- end }} + +apiVersion: v1 +kind: Secret +metadata: + name: s3-aliases + labels: + app: postgres-backup-daemon + name: postgres-backup-daemon + {{- include "kubernetes.labels" . | nindent 4 }} +type: Opaque +stringData: + s3_aliases.json: | +{{ $aliases | toPrettyJson | indent 4 }} +{{- end }} +{{- end }} \ No newline at end of file diff --git a/operator/charts/patroni-services/values.yaml b/operator/charts/patroni-services/values.yaml index ad9d11af..889b5101 100644 --- a/operator/charts/patroni-services/values.yaml +++ b/operator/charts/patroni-services/values.yaml @@ -224,6 +224,7 @@ backupDaemon: # - postgresql-backup-pv-1 # The array of node-selectors that will be used for deployment. # storage.nodes can be used only if storage.type is set to PV + s3Aliases: [] # nodes: # - db-backup-node1 diff --git a/operator/pkg/deployment/backup.go b/operator/pkg/deployment/backup.go index 72271f47..93e5c3e4 100644 --- a/operator/pkg/deployment/backup.go +++ b/operator/pkg/deployment/backup.go @@ -296,6 +296,36 @@ func NewBackupDaemonDeployment(backupDaemon *netcrackerv1.BackupDaemon, pgCluste }, } } + if backupDaemon.S3AliasesUsed { + deployment.Spec.Template.Spec.Containers[0].Env = append( + deployment.Spec.Template.Spec.Containers[0].Env, + corev1.EnvVar{ + Name: "S3_ALIASES_USED", + Value: "true", + }, + ) + + deployment.Spec.Template.Spec.Volumes = append( + deployment.Spec.Template.Spec.Volumes, + corev1.Volume{ + Name: "s3-aliases", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "s3-aliases", + }, + }, + }, + ) + + deployment.Spec.Template.Spec.Containers[0].VolumeMounts = append( + deployment.Spec.Template.Spec.Containers[0].VolumeMounts, + corev1.VolumeMount{ + Name: "s3-aliases", + MountPath: "/aliases/", + ReadOnly: true, + }, + ) + } if backupDaemon.ExternalPv != nil { deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, getExternalBackupVolume()) diff --git a/operator/pkg/reconciler/backup_daemon.go b/operator/pkg/reconciler/backup_daemon.go index 8323523c..3fc23a63 100644 --- a/operator/pkg/reconciler/backup_daemon.go +++ b/operator/pkg/reconciler/backup_daemon.go @@ -92,7 +92,12 @@ func (r *BackupDaemonReconciler) Reconcile() error { } // Add Secret Hash - err := manager.AddCredHashToPodTemplate(credentials.PostgresSecretNames, &backupDaemonDeployment.Spec.Template) + secretNames := append([]string{}, credentials.PostgresSecretNames...) + + if bdSpec.S3AliasesUsed { + secretNames = append(secretNames, "s3-aliases") + } + err := manager.AddCredHashToPodTemplate(secretNames, &backupDaemonDeployment.Spec.Template) if err != nil { logger.Error(fmt.Sprintf("can't add secret HASH to annotations for %s", backupDaemonDeployment.Name), zap.Error(err)) return err diff --git a/services/backup-daemon/docker/granular/granular.py b/services/backup-daemon/docker/granular/granular.py index a57e4c82..c34cbc8b 100644 --- a/services/backup-daemon/docker/granular/granular.py +++ b/services/backup-daemon/docker/granular/granular.py @@ -1064,7 +1064,7 @@ class NewBackup(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewBackup") self.allowed_fields = ["storageName", "blobPath", "databases"] - self.s3 = storage_s3.AwsS3Vault(prefix="") + # self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1072,14 +1072,16 @@ def get_endpoints(): @auth.login_required def post(self): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - body = request.get_json(silent=True) or {} storage_name = body.get("storageName") blob_path = body.get("blobPath") databases = body.get("databases") or [] + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST + if not blob_path: return {"message": "blobPath is required"}, http.client.BAD_REQUEST if databases and not isinstance(databases, (list, tuple)): @@ -1142,7 +1144,7 @@ class NewBackupStatus(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewBackupStatus") - self.s3 = storage_s3.AwsS3Vault(prefix="") + # self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1150,20 +1152,30 @@ def get_endpoints(): @auth.login_required def get(self, backup_id): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - if not backup_id: return "Backup ID is not specified.", http.client.BAD_REQUEST namespace = request.args.get("namespace") or configs.default_namespace() - if not backups.is_valid_namespace(namespace): - return "Invalid namespace name: %s." % namespace.encode("utf-8"), http.client.BAD_REQUEST - blob_path = normalize_blobPath(request.args.get("blobPath")) - status_path = backups.build_backup_status_file_path(backup_id, blob_path=blob_path) + meta = load_backup_metadata_from_local_status(backup_id, namespace) + if not meta: + return { + "message": "Backup metadata is not found for backupId %s" % backup_id + }, http.client.NOT_FOUND + + storage_name = meta.get("storageName") + blob_path = normalize_blobPath(meta.get("blobPath")) + + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST + + status_path = backups.build_backup_status_file_path( + backup_id, + blob_path=blob_path + ) - try: raw = json.loads(self.s3.read_object(status_path)) except Exception: @@ -1171,51 +1183,49 @@ def get(self, backup_id): if blob_path and not raw.get("blobPath"): raw["blobPath"] = blob_path + if storage_name and not raw.get("storageName"): + raw["storageName"] = storage_name return backups.transform_backup_status_v1(raw), http.client.OK @auth.login_required @superuser_authorization def delete(self, backup_id): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - if not backup_id: - return {"backupId": backup_id, "message": "Backup ID is not specified", "status": "Failed"}, http.client.BAD_REQUEST + return { + "backupId": backup_id, + "message": "Backup ID is not specified", + "status": "Failed" + }, http.client.BAD_REQUEST - req_ns = request.args.get("namespace") - blob_path = request.args.get("blobPath") - if not blob_path: - return {"backupId": backup_id, - "message": "blobPath query parameter is required (e.g. ?blobPath=tmp/a/b/c).", - "status": "Failed"}, http.client.BAD_REQUEST - blob_path = normalize_blobPath(blob_path) + namespace = request.args.get("namespace") or configs.default_namespace() - def _exists(p: str) -> bool: - return self.s3.is_file_exists(p) + meta = load_backup_metadata_from_local_status(backup_id, namespace) + if not meta: + return { + "backupId": backup_id, + "message": "Backup metadata is not found.", + "status": "Failed" + }, http.client.NOT_FOUND - namespace = req_ns - if not namespace: - candidates = [] - try: - candidates.append(configs.default_namespace()) - except Exception: - pass - if "default" not in candidates: - candidates.append("default") - for cand in candidates: - status_try = backups.build_backup_status_file_path(backup_id, cand, blob_path=blob_path) - if _exists(status_try): - namespace = cand - break - if not namespace: - namespace = configs.default_namespace() - - status_path = backups.build_backup_status_file_path(backup_id, namespace, blob_path=blob_path) - existed_before = _exists(status_path) + storage_name = meta.get("storageName") + blob_path = normalize_blobPath(meta.get("blobPath")) + + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST + + status_path = backups.build_backup_status_file_path( + backup_id, + blob_path=blob_path + ) + + existed_before = self.s3.is_file_exists(status_path) resp = TerminateBackupEndpoint().post(backup_id) term_body, term_code = None, None + try: if isinstance(resp, Response): term_body = resp.get_data(as_text=True) @@ -1241,18 +1251,40 @@ def _exists(p: str) -> bool: self.log.info("Terminate response for %s: code=%s body=%s", backup_id, term_code, term_body) try: - target_dir = backups.build_backup_path(backup_id, blob_path=blob_path) + target_dir = backups.build_backup_path( + backup_id, + blob_path=blob_path + ) self.s3.delete_objects(target_dir) except Exception as e: if not existed_before and term_code == http.client.NOT_FOUND: - return {"backupId": backup_id, "message": "Backup is not found.", "status": "Failed"}, http.client.NOT_FOUND + return { + "backupId": backup_id, + "message": "Backup is not found.", + "status": "Failed" + }, http.client.NOT_FOUND + self.log.exception("Delete failed for %s: %s", backup_id, e) - return {"backupId": backup_id, - "message": f"An error occurred while deleting backup: {e}", - "status": "Failed"}, http.client.INTERNAL_SERVER_ERROR + return { + "backupId": backup_id, + "message": f"An error occurred while deleting backup: {e}", + "status": "Failed" + }, http.client.INTERNAL_SERVER_ERROR if not existed_before and term_code == http.client.NOT_FOUND: - return {"backupId": backup_id, "message": "Backup is not found.", "status": "Failed"}, http.client.NOT_FOUND + return { + "backupId": backup_id, + "message": "Backup is not found.", + "status": "Failed" + }, http.client.NOT_FOUND + + # Optional: remove local metadata/status directory too + try: + local_dir = backups.build_backup_path(backup_id, namespace) + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + except Exception: + self.log.warning("Failed to remove local backup metadata for %s", backup_id) if term_code and 200 <= term_code < 300: msg = "Backup terminated successfully. Cleanup completed." @@ -1273,7 +1305,7 @@ class NewRestore(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewRestore") - self.s3 = storage_s3.AwsS3Vault(prefix="") + # self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1282,12 +1314,15 @@ def get_endpoints(): @auth.login_required @superuser_authorization def post(self, backup_id): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - body = request.get_json(silent=True) or {} blob_path = body.get("blobPath") pairs = body.get("databases") or [] + storage_name = body.get("storageName") + + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST dry_run = body.get("dryRun") if dry_run: @@ -1365,7 +1400,7 @@ def post(self, backup_id): if not dry_run: worker = pg_restore.PostgreSQLRestoreWorker( requested, force, - {"backupId": backup_id, "namespace": namespace, "trackingId": tracking_id}, + {"backupId": backup_id, "namespace": namespace, "trackingId": tracking_id, "storageName": storage_name}, databases_mapping, owners_mapping, restore_roles, single_transaction, body.get("dbaasClone"), blob_path ) worker.start() @@ -1376,8 +1411,6 @@ def post(self, backup_id): except Exception: created_iso = "" - storage_name = body.get("storageName") or "" - dbs_out = [] for prev in (requested or []): prev_name = prev or "" @@ -1444,7 +1477,7 @@ class NewRestoreStatus(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewRestoreStatus") - self.s3 = storage_s3.AwsS3Vault(prefix="") + # self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1453,14 +1486,21 @@ def get_endpoints(): @auth.login_required @superuser_authorization def get(self, restore_id): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - if not restore_id: return "Restore tracking ID is not specified.", http.client.BAD_REQUEST + meta = load_restore_metadata_from_local_status(restore_id) + if not meta: + return { + "message": "Restore metadata is not found for restoreId %s" % restore_id + }, http.client.NOT_FOUND + + backup_id = meta.get("sourceBackupId") or meta.get("backupId") + storage_name = meta.get("storageName") + blob_path = normalize_blobPath(meta.get("blobPath")) + try: - backup_id, namespace = backups.extract_backup_id_from_tracking_id(restore_id) + _, namespace = backups.extract_backup_id_from_tracking_id(restore_id) except Exception as e: self.log.exception(e) return "Malformed restore tracking ID.", http.client.BAD_REQUEST @@ -1468,45 +1508,70 @@ def get(self, restore_id): if not backups.is_valid_namespace(namespace): return "Invalid namespace name: %s." % namespace.encode("utf-8"), http.client.BAD_REQUEST - blob_path = normalize_blobPath(request.args.get("blobPath")) - storage_name = request.args.get("storageName") or os.environ.get("STORAGE_NAME") - status_path = backups.build_restore_status_file_path(backup_id, restore_id, blob_path=blob_path) + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST + + status_path = backups.build_restore_status_file_path( + backup_id, + restore_id, + blob_path=blob_path + ) try: raw = json.loads(self.s3.read_object(status_path)) except Exception: - return "Backup in bucket is not found.", http.client.NOT_FOUND + return "Restore status in bucket is not found.", http.client.NOT_FOUND if blob_path and not raw.get("blobPath"): raw["blobPath"] = blob_path if storage_name and not raw.get("storageName"): raw["storageName"] = storage_name + if backup_id and not raw.get("sourceBackupId"): + raw["sourceBackupId"] = backup_id return backups.transform_restore_status_v1(raw), http.client.OK @auth.login_required @superuser_authorization def delete(self, restore_id): - if not self.s3: - return "S3 is not configured for backup daemon", http.client.FORBIDDEN - if not restore_id: - return {"restoreId": restore_id, "message": "Restore ID is not specified", "status": "Failed"}, http.client.BAD_REQUEST + return { + "restoreId": restore_id, + "message": "Restore ID is not specified", + "status": "Failed" + }, http.client.BAD_REQUEST + + meta = load_restore_metadata_from_local_status(restore_id) + if not meta: + return { + "restoreId": restore_id, + "message": "Restore metadata is not found.", + "status": "Failed" + }, http.client.NOT_FOUND + + backup_id = meta.get("sourceBackupId") or meta.get("backupId") + storage_name = meta.get("storageName") + blob_path = normalize_blobPath(meta.get("blobPath")) try: - backup_id, namespace = backups.extract_backup_id_from_tracking_id(restore_id) + _, namespace = backups.extract_backup_id_from_tracking_id(restore_id) except Exception as e: self.log.exception(e) resp = TerminateRestoreEndpoint().post(restore_id) term_code, term_body = None, None + try: if isinstance(resp, Response): term_body = resp.get_data(as_text=True) term_code = getattr(resp, "status_code", None) elif isinstance(resp, tuple) and len(resp) >= 2: term_body = resp[0] - try: term_code = int(resp[1]) - except Exception: term_code = None + try: + term_code = int(resp[1]) + except Exception: + term_code = None elif isinstance(resp, dict): term_body = json.dumps(resp) term_code = http.client.OK @@ -1524,19 +1589,28 @@ def delete(self, restore_id): "termination": {"code": term_code, "body": term_body} }, http.client.OK - blob_path = request.args.get("blobPath") - if not blob_path: + if not backups.is_valid_namespace(namespace): return { "restoreId": restore_id, - "message": "blobPath query parameter is required for cleanup (e.g. ?blobPath=tmp/a/b/c).", + "message": "Invalid namespace name: %s." % namespace, "status": "Failed" }, http.client.BAD_REQUEST - blob_path = normalize_blobPath(blob_path) - status_path = backups.build_restore_status_file_path(backup_id, restore_id, blob_path=blob_path) - backup_base = backups.build_backup_path(backup_id, blob_path=blob_path) + try: + self.s3 = storage_s3.AwsS3Vault(storage_name=storage_name, prefix="") + except Exception as e: + return {"message": str(e)}, http.client.BAD_REQUEST + + status_path = backups.build_restore_status_file_path( + backup_id, + restore_id, + blob_path=blob_path + ) + backup_base = backups.build_backup_path( + backup_id, + blob_path=blob_path + ) pattern_name = f"{restore_id}" - pattern_glob = os.path.join(backup_base, pattern_name + "*") def _exists_file(p: str) -> bool: try: @@ -1561,10 +1635,15 @@ def _prefix_exists() -> bool: TerminateRestoreEndpoint().post(restore_id) except Exception: pass - return {"restoreId": restore_id, "message": "Restore is not found.", "status": "Failed"}, http.client.NOT_FOUND + return { + "restoreId": restore_id, + "message": "Restore is not found.", + "status": "Failed" + }, http.client.NOT_FOUND resp = TerminateRestoreEndpoint().post(restore_id) term_body, term_code = None, None + try: if isinstance(resp, Response): term_body = resp.get_data(as_text=True) @@ -1576,7 +1655,7 @@ def _prefix_exists() -> bool: except Exception: term_code = None elif isinstance(resp, dict): - term_body = json.dumps(resp) + term_body = json.dumps(resp) term_code = http.client.OK elif isinstance(resp, (bytes, bytearray)): term_body = resp.decode("utf-8", "replace") @@ -1591,7 +1670,7 @@ def _prefix_exists() -> bool: try: prefix = os.path.join(backup_base, pattern_name).rstrip("/") - self.s3.delete_objects(prefix if prefix.endswith("/") else prefix) + self.s3.delete_objects(prefix) except Exception as e: self.log.exception("Restore cleanup failed for %s: %s", restore_id, e) return { @@ -1601,13 +1680,29 @@ def _prefix_exists() -> bool: "termination": {"code": term_code, "body": term_body} }, http.client.INTERNAL_SERVER_ERROR + # Optional: remove local restore metadata/status file + try: + local_status_path = backups.build_restore_status_file_path( + backup_id, + restore_id, + namespace + ) + if os.path.isfile(local_status_path): + os.remove(local_status_path) + except Exception: + self.log.warning("Failed to remove local restore metadata for %s", restore_id) + if term_code and 200 <= term_code < 300: msg = "Restore terminated successfully. Cleanup completed." else: msg = "Termination attempted. Cleanup completed." - return {"restoreId": restore_id, "message": msg, "status": "Successful", - "termination": {"code": term_code, "body": term_body}}, http.client.OK + return { + "restoreId": restore_id, + "message": msg, + "status": "Successful", + "termination": {"code": term_code, "body": term_body} + }, http.client.OK def normalize_blobPath(blob_path): # Normalize blob_path by removing a single leading and trailing slash @@ -1622,6 +1717,42 @@ def normalize_blobPath(blob_path): blob_path = blob_path[:-1] return blob_path +def load_backup_metadata_from_local_status(backup_id, namespace=None): + namespace = namespace or configs.default_namespace() + + local_status_path = backups.build_backup_status_file_path( + backup_id, + namespace + ) + + if not os.path.isfile(local_status_path): + return None + + try: + return utils.get_json_by_path(local_status_path) + except Exception: + return None + +def load_restore_metadata_from_local_status(restore_id): + try: + backup_id, namespace = backups.extract_backup_id_from_tracking_id(restore_id) + except Exception: + return None + + local_status_path = backups.build_restore_status_file_path( + backup_id, + restore_id, + namespace + ) + + if not os.path.isfile(local_status_path): + return None + + try: + return utils.get_json_by_path(local_status_path) + except Exception: + return None + def get_pgbackrest_service(): if os.getenv("BACKUP_FROM_STANDBY") == "true": try: diff --git a/services/backup-daemon/docker/granular/pg_backup.py b/services/backup-daemon/docker/granular/pg_backup.py index 6a3fcccc..367bb489 100644 --- a/services/backup-daemon/docker/granular/pg_backup.py +++ b/services/backup-daemon/docker/granular/pg_backup.py @@ -53,10 +53,12 @@ def __init__(self, databases, backup_request, blob_path=None): self.blob_path = blob_path self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) self.create_backup_dir() + self.storage_name = backup_request.get('storageName') or "" + if blob_path: - self.s3 = storage_s3.AwsS3Vault(prefix="") - else: - self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None + self.s3 = storage_s3.AwsS3Vault(storage_name=self.storage_name, prefix="") + else: + self.s3 = storage_s3.AwsS3Vault(storage_name=self.storage_name) if os.environ['STORAGE_TYPE'] == "s3" else None self._cancel_event = Event() if configs.get_encryption(): @@ -67,12 +69,12 @@ def __init__(self, databases, backup_request, blob_path=None): else: self.encryption = False - self.storage_name = backup_request.get('storageName') or "" self.status = { 'backupId': self.backup_id, 'namespace': self.namespace, 'status': backups.BackupStatus.PLANNED, 'storageName': self.storage_name, + 'blobPath': self.blob_path, } self.pg_dump_proc = None diff --git a/services/backup-daemon/docker/granular/pg_restore.py b/services/backup-daemon/docker/granular/pg_restore.py index a020e5b0..a2957675 100644 --- a/services/backup-daemon/docker/granular/pg_restore.py +++ b/services/backup-daemon/docker/granular/pg_restore.py @@ -63,10 +63,11 @@ def __init__(self, databases, force, restore_request, databases_mapping, owners_ self.owners_mapping = owners_mapping self.bin_path = configs.get_pgsql_bin_path(self.postgres_version) self.parallel_jobs = configs.get_parallel_jobs() + self.storage_name = restore_request.get('storageName') or "" if blobPath: - self.s3 = storage_s3.AwsS3Vault(prefix="") + self.s3 = storage_s3.AwsS3Vault(storage_name=self.storage_name,prefix="") else: - self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None + self.s3 = storage_s3.AwsS3Vault(storage_name=self.storage_name) if os.environ['STORAGE_TYPE'] == "s3" else None self.blob_path = blobPath self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) self.create_backup_dir(self.backup_dir) @@ -80,9 +81,13 @@ def __init__(self, databases, force, restore_request, databases_mapping, owners_ self.databases = list(databases_mapping.keys()) self.status = { 'trackingId': self.tracking_id, + 'restoreId': self.tracking_id, 'namespace': self.namespace, 'backupId': self.backup_id, - 'status': backups.BackupStatus.PLANNED + 'sourceBackupId': self.backup_id, + 'status': backups.BackupStatus.PLANNED, + 'storageName': self.storage_name, + 'blobPath': self.blob_path, } self._cancel_event = Event() self.pg_restore_proc = None @@ -503,8 +508,7 @@ def run(self): flush=True) self.update_status('status', backups.BackupStatus.SUCCESSFUL, flush=True) self.log.info(self.log_msg("Backup has been successfully restored.")) - if self.s3: - shutil.rmtree(self.backup_dir) + self.cleanup_restore_temp_files() except Exception as e: self.log.exception(self.log_msg("Restore request processing has failed.")) self.update_status('details', str(e)) @@ -512,8 +516,7 @@ def run(self): datetime.datetime.utcnow().isoformat(), flush=True) self.update_status('status', backups.BackupStatus.FAILED, flush=True) - if self.s3: - shutil.rmtree(self.backup_dir) + self.cleanup_restore_temp_files() finally: if self.is_cancelled(): self.on_cancel() @@ -713,3 +716,23 @@ def drop_lookup_func_for_db(db_name): finally: if conn: conn.close() + + def cleanup_restore_temp_files(self): + if not self.s3: + return + + for file_name in os.listdir(self.backup_dir): + if file_name.endswith(".json"): + continue + + file_path = os.path.join(self.backup_dir, file_name) + + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.remove(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + self.log.warning( + self.log_msg("Failed to remove temporary restore file {}: {}".format(file_path, e)) + ) diff --git a/services/backup-daemon/docker/granular/storage_s3.py b/services/backup-daemon/docker/granular/storage_s3.py index 4876949c..6bf663fc 100644 --- a/services/backup-daemon/docker/granular/storage_s3.py +++ b/services/backup-daemon/docker/granular/storage_s3.py @@ -20,6 +20,7 @@ import os import logging import configs +import json from retrying import retry try: @@ -38,10 +39,38 @@ class AwsS3Vault: __log = logging.getLogger("AwsS3Granular") - def __init__(self, cluster_name=None, cache_enabled=False, + @staticmethod + def get_s3_alias_config(storage_name=None): + if os.getenv("S3_ALIASES_USED", "false").lower() != "true": + return None + + with open("/aliases/s3_aliases.json", "r") as f: + aliases = json.load(f) + + if not aliases: + raise Exception("S3 aliases are enabled, but /aliases/s3_aliases.json is empty") + + if not storage_name: + raise Exception("storageName is required when S3 aliases are enabled") + + alias = aliases.get(storage_name) + if not alias: + raise Exception(f"S3 alias '{storage_name}' is not found in /aliases/s3_aliases.json") + + return alias + + @staticmethod + def get_s3_bucket_name(alias=None): + if alias: + return alias.get("bucketName") + return os.getenv("CONTAINER") or os.getenv("AWS_S3_BUCKET") or os.getenv("S3_BUCKET") + + def __init__(self, storage_name=None, cluster_name=None, cache_enabled=False, aws_s3_bucket_listing=None, prefix=None): - self.bucket = bucket or os.getenv("CONTAINER") or os.getenv("AWS_S3_BUCKET") or os.getenv("S3_BUCKET") + self.storage_name = storage_name + self.alias = AwsS3Vault.get_s3_alias_config(storage_name) + self.bucket = AwsS3Vault.get_s3_bucket_name(self.alias) self.console = None self.cluster_name = cluster_name self.cache_enabled = cache_enabled @@ -56,12 +85,15 @@ def __init__(self, cluster_name=None, cache_enabled=False, raise ValueError("S3 bucket is not configured. Set one of CONTAINER, AWS_S3_BUCKET, or S3_BUCKET.") def get_s3_client(self): - return boto3.client("s3", - region_name=os.getenv("AWS_DEFAULT_REGION") if os.getenv("AWS_DEFAULT_REGION") else None, - endpoint_url=os.getenv("AWS_S3_ENDPOINT_URL"), - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - verify=(False if os.getenv("AWS_S3_UNTRUSTED_CERT", "false").lower() == "true" else None)) + alias = self.alias + return boto3.client( + "s3", + region_name=alias.get("region") if alias else (os.getenv("AWS_DEFAULT_REGION") if os.getenv("AWS_DEFAULT_REGION") else None), + endpoint_url=alias.get("s3Url") if alias else os.getenv("AWS_S3_ENDPOINT_URL"), + aws_access_key_id=alias.get("accessKeyId") if alias else os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=alias.get("accessKeySecret") if alias else os.getenv("AWS_SECRET_ACCESS_KEY"), + verify=(False if os.getenv("AWS_S3_UNTRUSTED_CERT", "false").lower() == "true" else None), + ) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def upload_file(self, file_path, blob_path=None, backup_id=None): diff --git a/services/backup-daemon/docker/postgres/storage_s3.py b/services/backup-daemon/docker/postgres/storage_s3.py index 8451173f..6221498e 100644 --- a/services/backup-daemon/docker/postgres/storage_s3.py +++ b/services/backup-daemon/docker/postgres/storage_s3.py @@ -114,21 +114,21 @@ def prot_put_as_stream(self, filename, stream): self.__log.info("Start uploading: %s" % filename) # todo[anin] replace implementation # AwsS3Vault.get_s3_client().upload_fileobj(data, CONTAINER, filename) - AwsS3Vault.get_s3_client().upload_file(fs_filename, CONTAINER, filename) + AwsS3Vault.get_s3_client().upload_file(fs_filename, AwsS3Vault.get_s3_bucket_name(), filename) os.remove(fs_filename) return sha256.hexdigest() @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def prot_get_as_stream(self, filename): self.__log.info("Get stream request for file: %s" % self.aws_prefix + filename) - object_body = AwsS3Vault.get_s3_resource().Bucket(CONTAINER).Object(self.aws_prefix + filename).get()['Body'] + object_body = AwsS3Vault.get_s3_resource().Bucket(AwsS3Vault.get_s3_bucket_name()).Object(self.aws_prefix + filename).get()['Body'] return StreamWrapper(object_body) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def prot_delete_bundle(self, filename): - objects_to_delete = AwsS3Vault.get_s3_client().list_objects(Bucket=CONTAINER, Prefix=self.aws_prefix + filename) + objects_to_delete = AwsS3Vault.get_s3_client().list_objects(Bucket=AwsS3Vault.get_s3_bucket_name(), Prefix=self.aws_prefix + filename) for obj in objects_to_delete.get('Contents', []): - AwsS3Vault.get_s3_client().delete_object(Bucket=CONTAINER, Key=obj['Key']) + AwsS3Vault.get_s3_client().delete_object(Bucket=AwsS3Vault.get_s3_bucket_name(), Key=obj['Key']) def prot_delete(self, filename): self.prot_delete_bundle(filename) @@ -136,7 +136,7 @@ def prot_delete(self, filename): def prot_is_file_exists(self, filename): exists = True try: - AwsS3Vault.get_s3_resource().Object(CONTAINER, self.aws_prefix + filename).get() + AwsS3Vault.get_s3_resource().Object(AwsS3Vault.get_s3_bucket_name(), self.aws_prefix + filename).get() except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': exists = False @@ -147,7 +147,7 @@ def prot_is_file_exists(self, filename): def prot_get_file_size(self, filename): if self.prot_is_file_exists(filename): - return int(AwsS3Vault.get_s3_resource().Object(CONTAINER, filename).get()['Size']) + return int(AwsS3Vault.get_s3_resource().Object(AwsS3Vault.get_s3_bucket_name(), filename).get()['Size']) return 0 def is_valid_backup_id(self, backup_id): @@ -159,7 +159,7 @@ def is_valid_backup_id(self, backup_id): @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def list(self): - bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=CONTAINER) + bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=AwsS3Vault.get_s3_bucket_name()) aws_s3_vault_listing = [] if 'Contents' in bucket: # Collect backups ids only @@ -169,7 +169,7 @@ def list(self): vaults = [ AwsS3Vault(backup_id - , bucket=CONTAINER + , bucket=AwsS3Vault.get_s3_bucket_name() , cluster_name=PG_CLUSTER_NAME , cache_enabled=True , aws_s3_bucket_listing=(bucket['Contents'] if 'Contents' in bucket else None)) @@ -180,7 +180,7 @@ def list(self): def size(self): """ Returns whole storage size in bytes """ total_size = 0 - bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=CONTAINER) + bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=AwsS3Vault.get_s3_bucket_name()) if 'Contents' not in bucket: return 0 @@ -192,7 +192,7 @@ def size(self): def archive_size(self): """ Returns whole storage size in bytes """ total_size = 0 - bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=CONTAINER, Prefix="archive/") + bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=AwsS3Vault.get_s3_bucket_name(), Prefix="archive/") if 'Contents' not in bucket: return 0 @@ -211,7 +211,7 @@ def open_vault(self, backup_id): :return: :rtype: (str, dict, StringIO) """ - return AwsS3Vault("%s" % (datetime.now().strftime(VAULT_NAME_FORMAT)), CONTAINER, cluster_name=PG_CLUSTER_NAME) + return AwsS3Vault("%s" % (datetime.now().strftime(VAULT_NAME_FORMAT)), AwsS3Vault.get_s3_bucket_name(), cluster_name=PG_CLUSTER_NAME) def evict_vault(self, vault): self.__log.info("Evict vault: %s" % vault) @@ -224,7 +224,7 @@ def evict_vault(self, vault): return "Not Found" def prot_list_archive(self): - bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=CONTAINER, Prefix="archive/", Delimiter="/") + bucket = AwsS3Vault.get_s3_client().list_objects(Bucket=AwsS3Vault.get_s3_bucket_name(), Prefix="archive/", Delimiter="/") aws_s3_archive_listing = [] if 'Contents' in bucket: # Collect archive ids only @@ -256,14 +256,38 @@ class AwsS3VaultCreationException(Exception): class AwsS3Vault(storage.Vault): __log = logging.getLogger("AwsS3Vault") + @staticmethod + def get_s3_alias_config(): + if os.getenv("S3_ALIASES_USED", "false").lower() != "true": + return None + + with open("/aliases/s3_aliases.json", "r") as f: + aliases = json.load(f) + + if not aliases: + raise Exception("S3 aliases are enabled, but /aliases/s3_aliases.json is empty") + + alias_name = next(iter(aliases)) + return aliases[alias_name] + + @staticmethod + def get_s3_bucket_name(): + alias = AwsS3Vault.get_s3_alias_config() + if alias: + return alias.get("bucketName") + return CONTAINER + @staticmethod def get_s3_resource(): - return boto3.resource("s3", - region_name=os.getenv("AWS_DEFAULT_REGION") if os.getenv("AWS_DEFAULT_REGION") else None, - endpoint_url=os.getenv("AWS_S3_ENDPOINT_URL"), - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - verify=(False if os.getenv("AWS_S3_UNTRUSTED_CERT", "false").lower() == "true" else None)) + alias = AwsS3Vault.get_s3_alias_config() + return boto3.resource( + "s3", + region_name=alias.get("region") if alias else (os.getenv("AWS_DEFAULT_REGION") if os.getenv("AWS_DEFAULT_REGION") else None), + endpoint_url=alias.get("s3Url") if alias else os.getenv("AWS_S3_ENDPOINT_URL"), + aws_access_key_id=alias.get("accessKeyId") if alias else os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=alias.get("accessKeySecret") if alias else os.getenv("AWS_SECRET_ACCESS_KEY"), + verify=(False if os.getenv("AWS_S3_UNTRUSTED_CERT", "false").lower() == "true" else None), + ) @staticmethod def get_s3_client(): @@ -379,7 +403,7 @@ def is_failed(self): return self.__is_file_exists(self.bucket, self.__failed_filepath()) def is_done(self): - if not self.__is_file_exists(CONTAINER, self.__metrics_filepath()): + if not self.__is_file_exists(self.bucket, self.__metrics_filepath()): self.__log.info(self.__is_file_exists) return False j = self.__load_metrics_from_s3()