diff --git a/operator/pkg/reconciler/backup_daemon.go b/operator/pkg/reconciler/backup_daemon.go index 8323523c..77dcca2c 100644 --- a/operator/pkg/reconciler/backup_daemon.go +++ b/operator/pkg/reconciler/backup_daemon.go @@ -235,7 +235,11 @@ func (r *BackupDaemonReconciler) Reconcile() error { return err } - backupDaemonService := reconcileService(deployment.BackupDaemon, deployment.BackupDaemonLabels, + backupDaemonServiceLabels := util.Merge( + deployment.BackupDaemonLabels, + map[string]string{"cloud-backuper.netcracker.com/data-validation-enabled": "true"}, + ) + backupDaemonService := reconcileService(deployment.BackupDaemon, backupDaemonServiceLabels, deployment.BackupDaemonLabels, deployment.GetPortsForBackupService(), false) // TLS section if cr.Spec.Tls != nil && cr.Spec.Tls.Enabled { diff --git a/services/backup-daemon/docker/postgres/endpoints/data_validation.py b/services/backup-daemon/docker/postgres/endpoints/data_validation.py new file mode 100644 index 00000000..ea461cda --- /dev/null +++ b/services/backup-daemon/docker/postgres/endpoints/data_validation.py @@ -0,0 +1,114 @@ +# Copyright 2024-2025 NetCracker Technology Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Endpoints for physical restore data-validation marker. + +POST /api/v1/data-validation/marker — write (overwrite) the marker +GET /api/v1/data-validation/marker — read the current marker +""" + +import logging +import os + +import psycopg2 +import utils +from flask import request +from flask_httpauth import HTTPBasicAuth +from flask_restful import Resource + +log = logging.getLogger("DataValidationEndpoint") + +auth = HTTPBasicAuth() + + +@auth.verify_password +def verify(username, password): + return utils.validate_user(username, password) + + +def _connection_props(): + return { + 'host': os.getenv('POSTGRES_HOST', 'localhost'), + 'port': os.getenv('POSTGRES_PORT', '5432'), + 'user': os.getenv('POSTGRES_USER', 'postgres'), + 'password': os.getenv('POSTGRES_PASSWORD'), + 'database': 'postgres', + 'connect_timeout': int(os.getenv('CONNECT_TIMEOUT', '5')), + } + + +_CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS backup_restore_markers ( + sentinel TEXT PRIMARY KEY, + marker TEXT NOT NULL, + written_at TIMESTAMPTZ DEFAULT NOW() +) +""" + +_UPSERT_SQL = """ +INSERT INTO backup_restore_markers (sentinel, marker, written_at) +VALUES ('current', %s, NOW()) +ON CONFLICT (sentinel) DO UPDATE + SET marker = EXCLUDED.marker, written_at = NOW() +""" + +_SELECT_SQL = "SELECT marker FROM backup_restore_markers WHERE sentinel = 'current'" + + +class MarkerResource(Resource): + + @auth.login_required + def post(self): + body = request.get_json(silent=True) + if not body or 'marker' not in body: + return {'error': 'Request body must contain "marker" field'}, 400 + + marker = body['marker'] + if not isinstance(marker, str) or not marker: + return {'error': '"marker" must be a non-empty string'}, 400 + + try: + conn = psycopg2.connect(**_connection_props()) + try: + with conn: + with conn.cursor() as cur: + cur.execute(_CREATE_TABLE_SQL) + cur.execute(_UPSERT_SQL, (marker,)) + finally: + conn.close() + except Exception: + log.exception("Failed to write marker to database") + return {'error': 'Internal server error'}, 500 + + return None, 201 + + @auth.login_required + def get(self): + try: + conn = psycopg2.connect(**_connection_props()) + try: + with conn.cursor() as cur: + cur.execute(_SELECT_SQL) + row = cur.fetchone() + finally: + conn.close() + except Exception: + log.exception("Failed to read marker from database") + return {'error': 'Internal server error'}, 500 + + if row is None: + return {'error': 'No marker found'}, 404 + + return {'marker': row[0]}, 200 diff --git a/services/backup-daemon/docker/postgres/gunicorn/public.py b/services/backup-daemon/docker/postgres/gunicorn/public.py index 861298d6..6610e8d7 100644 --- a/services/backup-daemon/docker/postgres/gunicorn/public.py +++ b/services/backup-daemon/docker/postgres/gunicorn/public.py @@ -12,49 +12,78 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flask_restful import Api import os -from flask import Flask import configs import endpoints.backup +import endpoints.data_validation import endpoints.restore import endpoints.status import storage - +from flask import Flask +from flask_restful import Api +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.flask import FlaskInstrumentor +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource - app = Flask("PublicEndpoints") collector_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "") if collector_endpoint != "": collector_endpoint = "http://" + collector_endpoint - NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' + NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ns = open(NAMESPACE_PATH).read() - resource = Resource(attributes={ - SERVICE_NAME: "postgresql-backup-daemon-" + ns - }) + resource = Resource(attributes={SERVICE_NAME: "postgresql-backup-daemon-" + ns}) provider = TracerProvider(resource=resource) - processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=collector_endpoint, insecure=True)) + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint=collector_endpoint, insecure=True) + ) provider.add_span_processor(processor) - FlaskInstrumentor().instrument_app(app=app, tracer_provider=provider, excluded_urls="health,/health,v2/health,/v2/health") + FlaskInstrumentor().instrument_app( + app=app, + tracer_provider=provider, + excluded_urls="health,/health,v2/health,/v2/health", + ) api = Api(app) conf = configs.load_configs() -storage_instance = storage.init_storage(storageRoot=conf['storage']) +storage_instance = storage.init_storage(storageRoot=conf["storage"]) endpoints.restore.ExternalRestoreRequest.cleanup_restore_status(storage_instance) -api.add_resource(endpoints.status.Status, *endpoints.status.Status.get_endpoints(), resource_class_args=(storage_instance,)) -api.add_resource(endpoints.backup.BackupRequest, *endpoints.backup.BackupRequest.get_endpoints()) -api.add_resource(endpoints.status.Health, *endpoints.status.Health.get_endpoints(), resource_class_args=(storage_instance,)) -api.add_resource(endpoints.status.BackupStatus, *endpoints.status.BackupStatus.get_endpoints(), resource_class_args=(storage_instance,)) -api.add_resource(endpoints.status.ExternalRestoreStatus, *endpoints.status.ExternalRestoreStatus.get_endpoints(), resource_class_args=(storage_instance,)) -api.add_resource(endpoints.restore.ExternalRestoreRequest, *endpoints.restore.ExternalRestoreRequest.get_endpoints(), resource_class_args=(storage_instance,)) +api.add_resource( + endpoints.status.Status, + *endpoints.status.Status.get_endpoints(), + resource_class_args=(storage_instance,), +) +api.add_resource( + endpoints.backup.BackupRequest, *endpoints.backup.BackupRequest.get_endpoints() +) +api.add_resource( + endpoints.status.Health, + *endpoints.status.Health.get_endpoints(), + resource_class_args=(storage_instance,), +) +api.add_resource( + endpoints.status.BackupStatus, + *endpoints.status.BackupStatus.get_endpoints(), + resource_class_args=(storage_instance,), +) +api.add_resource( + endpoints.status.ExternalRestoreStatus, + *endpoints.status.ExternalRestoreStatus.get_endpoints(), + resource_class_args=(storage_instance,), +) +api.add_resource( + endpoints.restore.ExternalRestoreRequest, + *endpoints.restore.ExternalRestoreRequest.get_endpoints(), + resource_class_args=(storage_instance,), +) +api.add_resource( + endpoints.data_validation.MarkerResource, + '/api/v1/data-validation/marker', +) -if __name__ == '__main__': +if __name__ == "__main__": app.run()