Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion operator/pkg/reconciler/backup_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ func (r *BackupDaemonReconciler) Reconcile() error {
return err
}

backupDaemonService := reconcileService(deployment.BackupDaemon, deployment.BackupDaemonLabels,
backupDaemonServiceLabels := util.Merge(
deployment.BackupDaemonLabels,
map[string]string{"cloud-backuper.netcracker.com/data-validation-enabled": "true"},
)
backupDaemonService := reconcileService(deployment.BackupDaemon, backupDaemonServiceLabels,
deployment.BackupDaemonLabels, deployment.GetPortsForBackupService(), false)
// TLS section
if cr.Spec.Tls != nil && cr.Spec.Tls.Enabled {
Expand Down
114 changes: 114 additions & 0 deletions services/backup-daemon/docker/postgres/endpoints/data_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2024-2025 NetCracker Technology Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Endpoints for physical restore data-validation marker.

POST /api/v1/data-validation/marker — write (overwrite) the marker
GET /api/v1/data-validation/marker — read the current marker
"""

import logging
import os

import psycopg2
import utils
from flask import request
from flask_httpauth import HTTPBasicAuth
from flask_restful import Resource

log = logging.getLogger("DataValidationEndpoint")

auth = HTTPBasicAuth()


@auth.verify_password
def verify(username, password):
return utils.validate_user(username, password)


def _connection_props():
return {
'host': os.getenv('POSTGRES_HOST', 'localhost'),
'port': os.getenv('POSTGRES_PORT', '5432'),
'user': os.getenv('POSTGRES_USER', 'postgres'),
'password': os.getenv('POSTGRES_PASSWORD'),
'database': 'postgres',
'connect_timeout': int(os.getenv('CONNECT_TIMEOUT', '5')),
}


_CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS backup_restore_markers (
sentinel TEXT PRIMARY KEY,
marker TEXT NOT NULL,
written_at TIMESTAMPTZ DEFAULT NOW()
)
"""

_UPSERT_SQL = """
INSERT INTO backup_restore_markers (sentinel, marker, written_at)
VALUES ('current', %s, NOW())
ON CONFLICT (sentinel) DO UPDATE
SET marker = EXCLUDED.marker, written_at = NOW()
"""

_SELECT_SQL = "SELECT marker FROM backup_restore_markers WHERE sentinel = 'current'"


class MarkerResource(Resource):

@auth.login_required
def post(self):
body = request.get_json(silent=True)
if not body or 'marker' not in body:
return {'error': 'Request body must contain "marker" field'}, 400

marker = body['marker']
if not isinstance(marker, str) or not marker:
return {'error': '"marker" must be a non-empty string'}, 400

try:
conn = psycopg2.connect(**_connection_props())
try:
with conn:
with conn.cursor() as cur:
cur.execute(_CREATE_TABLE_SQL)
cur.execute(_UPSERT_SQL, (marker,))
finally:
conn.close()
except Exception:
log.exception("Failed to write marker to database")
return {'error': 'Internal server error'}, 500

return None, 201

@auth.login_required
def get(self):
try:
conn = psycopg2.connect(**_connection_props())
try:
with conn.cursor() as cur:
cur.execute(_SELECT_SQL)
row = cur.fetchone()
finally:
conn.close()
except Exception:
log.exception("Failed to read marker from database")
return {'error': 'Internal server error'}, 500

if row is None:
return {'error': 'No marker found'}, 404

return {'marker': row[0]}, 200
69 changes: 49 additions & 20 deletions services/backup-daemon/docker/postgres/gunicorn/public.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,78 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from flask_restful import Api
import os
from flask import Flask

import configs
import endpoints.backup
import endpoints.data_validation
import endpoints.restore
import endpoints.status
import storage

from flask import Flask
from flask_restful import Api
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource


app = Flask("PublicEndpoints")
collector_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "")
if collector_endpoint != "":
collector_endpoint = "http://" + collector_endpoint
NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
ns = open(NAMESPACE_PATH).read()
resource = Resource(attributes={
SERVICE_NAME: "postgresql-backup-daemon-" + ns
})
resource = Resource(attributes={SERVICE_NAME: "postgresql-backup-daemon-" + ns})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=collector_endpoint, insecure=True))
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint=collector_endpoint, insecure=True)
)
provider.add_span_processor(processor)
FlaskInstrumentor().instrument_app(app=app, tracer_provider=provider, excluded_urls="health,/health,v2/health,/v2/health")
FlaskInstrumentor().instrument_app(
app=app,
tracer_provider=provider,
excluded_urls="health,/health,v2/health,/v2/health",
)
api = Api(app)

conf = configs.load_configs()
storage_instance = storage.init_storage(storageRoot=conf['storage'])
storage_instance = storage.init_storage(storageRoot=conf["storage"])

endpoints.restore.ExternalRestoreRequest.cleanup_restore_status(storage_instance)

api.add_resource(endpoints.status.Status, *endpoints.status.Status.get_endpoints(), resource_class_args=(storage_instance,))
api.add_resource(endpoints.backup.BackupRequest, *endpoints.backup.BackupRequest.get_endpoints())
api.add_resource(endpoints.status.Health, *endpoints.status.Health.get_endpoints(), resource_class_args=(storage_instance,))
api.add_resource(endpoints.status.BackupStatus, *endpoints.status.BackupStatus.get_endpoints(), resource_class_args=(storage_instance,))
api.add_resource(endpoints.status.ExternalRestoreStatus, *endpoints.status.ExternalRestoreStatus.get_endpoints(), resource_class_args=(storage_instance,))
api.add_resource(endpoints.restore.ExternalRestoreRequest, *endpoints.restore.ExternalRestoreRequest.get_endpoints(), resource_class_args=(storage_instance,))
api.add_resource(
endpoints.status.Status,
*endpoints.status.Status.get_endpoints(),
resource_class_args=(storage_instance,),
)
api.add_resource(
endpoints.backup.BackupRequest, *endpoints.backup.BackupRequest.get_endpoints()
)
api.add_resource(
endpoints.status.Health,
*endpoints.status.Health.get_endpoints(),
resource_class_args=(storage_instance,),
)
api.add_resource(
endpoints.status.BackupStatus,
*endpoints.status.BackupStatus.get_endpoints(),
resource_class_args=(storage_instance,),
)
api.add_resource(
endpoints.status.ExternalRestoreStatus,
*endpoints.status.ExternalRestoreStatus.get_endpoints(),
resource_class_args=(storage_instance,),
)
api.add_resource(
endpoints.restore.ExternalRestoreRequest,
*endpoints.restore.ExternalRestoreRequest.get_endpoints(),
resource_class_args=(storage_instance,),
)
api.add_resource(
endpoints.data_validation.MarkerResource,
'/api/v1/data-validation/marker',
)

if __name__ == '__main__':
if __name__ == "__main__":
app.run()
Loading