diff --git a/serverless-fleet-worker-registration/.dockerignore b/serverless-fleet-worker-registration/.dockerignore new file mode 100644 index 00000000..b06bada8 --- /dev/null +++ b/serverless-fleet-worker-registration/.dockerignore @@ -0,0 +1,35 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +fleet-register + +# Test files +*_test.go + +# Output files +*.csv + +# Git +.git +.gitignore + +# IDE +.vscode +.idea +*.swp +*.swo +*~ + +# Documentation +README.md + +# Docker +Dockerfile +.dockerignore + +# OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/serverless-fleet-worker-registration/Dockerfile b/serverless-fleet-worker-registration/Dockerfile new file mode 100644 index 00000000..c4d131fd --- /dev/null +++ b/serverless-fleet-worker-registration/Dockerfile @@ -0,0 +1,39 @@ +# Stage 1: Build stage +FROM quay.io/projectquay/golang:1.25 AS builder + +# Set working directory +WORKDIR /build + +# Copy go mod files +COPY go.mod go.sum ./ + +# Download dependencies +RUN go mod download + +# Copy source code +COPY main.go ./ + +# Build the application +# CGO_ENABLED=0 for static binary +# -ldflags="-w -s" to reduce binary size +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-w -s" -o fleet-register . + +# Stage 2: Runtime stage +FROM registry.access.redhat.com/ubi10/ubi-minimal + +# Copy binary from builder +COPY --from=builder /build/fleet-register /app/fleet-register + +# Set working directory +WORKDIR /app + +# Expose port +EXPOSE 8080 + +RUN mkdir /fleet-workers + +# Set environment variable for the CSV path +ENV CSV_PATH=/fleet-workers/fleet-register.csv + +# Run the application +CMD ["/app/fleet-register"] \ No newline at end of file diff --git a/serverless-fleet-worker-registration/README.md b/serverless-fleet-worker-registration/README.md new file mode 100644 index 00000000..030edfb0 --- /dev/null +++ b/serverless-fleet-worker-registration/README.md @@ -0,0 +1,138 @@ +# Fleet Register + +A lightweight Go HTTP server that monitors IBM Cloud Code Engine fleet workers by tracking their lifecycle events in a CSV file. + +## Use Case + +This application is deployed as a Code Engine application to monitor fleet workers. Fleet workers call the `/register` endpoint when they start and the `/deregister` endpoint before they shut down. The application maintains a persistent CSV file tracking all worker activity, including registration and completion timestamps. + +**Deployment Flow:** +1. Deploy this app as a Code Engine application with persistent storage +2. Configure the fleet with register/deregister hooks pointing to this app +3. Fleet workers automatically report their status throughout their lifecycle + +## Endpoints + +- `POST /register` + Creates a row in `fleet-register.csv` with status `running` and records the registration timestamp. + +- `POST /deregister` + Updates the matching row by `worker_name` and `worker_ip` to `completed` and records the completion timestamp. + If no row exists, it creates a new row with status `completed`. + +- `GET /download` + Downloads the CSV file. + +## Request body + +```json +{ + "worker_name": "worker-01", + "worker_ip": "192.168.1.10" +} +``` + +## Deployment + +### Code Engine (Production) + +1. **Deploy the application with persistent storage:** + ```bash + ibmcloud ce app create --name fleet-workers \ + --build-dockerfile Dockerfile \ + --build-source . \ + --mount-data-store /fleet-workers=fleet-data + ``` + +2. **Run fleet with hooks:** + + The `run` script automatically retrieves the app URL and creates a fleet with register/deregister hooks: + ```bash + ./run + ``` + + Or manually create a fleet: + ```bash + APP_URL=$(ibmcloud ce app get --name fleet-workers -o url) + + PREHOOK="curl -X POST \${APP_URL}/register -H 'Content-Type: application/json' -d '{\"worker_name\":\"\${CE_WORKER_NAME}\",\"worker_ip\":\"\${CE_WORKER_IP}\"}'" + + POSTHOOK="curl -X POST \${APP_URL}/deregister -H 'Content-Type: application/json' -d '{\"worker_name\":\"\${CE_WORKER_NAME}\",\"worker_ip\":\"\${CE_WORKER_IP}\"}'" + + ibmcloud code-engine fleet create --name my-fleet \ + --tasks-state-store fleet-task-store \ + --subnetpool-name fleet-subnetpool \ + --image registry.access.redhat.com/ubi10/ubi-minimal \ + --max-scale 100 \ + --command="sleep" \ + --arg "30" \ + --tasks 100 \ + --env __CE_INTERNAL_HOOK_AFTER_STARTUP="${PREHOOK}" \ + --env __CE_INTERNAL_HOOK_AFTER_STARTUP_RETRY_LIMIT=3 \ + --env __CE_INTERNAL_HOOK_AFTER_STARTUP_MAX_EXECUTION_TIME=30m \ + --env __CE_INTERNAL_HOOK_BEFORE_SHUTDOWN="${POSTHOOK}" \ + --env __CE_INTERNAL_HOOK_BEFORE_SHUTDOWN_RETRY_LIMIT=3 \ + --env __CE_INTERNAL_HOOK_BEFORE_SHUTDOWN_MAX_EXECUTION_TIME=30m \ + --env APP_URL="${APP_URL}" \ + --cpu 2 \ + --memory 4G + ``` + +### Local Development + +```bash +go run . +``` + +Server listens on `:8080`. + +### Docker + +```bash +docker build -t fleet-register . +docker run -p 8080:8080 -v fleet-data:/fleet-workers fleet-register +``` + +The CSV file is saved to `/fleet-workers/fleet-register.csv` and persisted in the volume. + +## Examples + +Register a worker: + +```bash +curl -X POST http://localhost:8080/register \ + -H "Content-Type: application/json" \ + -d '{ + "worker_name": "worker-01", + "worker_ip": "192.168.1.10" + }' +``` + +Deregister a worker: + +```bash +curl -X POST http://localhost:8080/deregister \ + -H "Content-Type: application/json" \ + -d '{ + "worker_name": "worker-01", + "worker_ip": "192.168.1.10" + }' +``` + +Download the CSV file: + +```bash +curl -O -J http://localhost:8080/download +``` + +## CSV file + +The server automatically creates `fleet-register.csv` in the project root (or `/fleet-workers/` in Docker) if it does not exist. + +The CSV file contains the following columns: + +- `worker_name` - Name of the worker +- `worker_ip` - IP address of the worker +- `status` - Current status (running/completed) +- `registered_at` - Timestamp when the worker was registered (ISO 8601 format) +- `completed_at` - Timestamp when the worker completed (ISO 8601 format) \ No newline at end of file diff --git a/serverless-fleet-worker-registration/go.mod b/serverless-fleet-worker-registration/go.mod new file mode 100644 index 00000000..24b45f57 --- /dev/null +++ b/serverless-fleet-worker-registration/go.mod @@ -0,0 +1,3 @@ +module fleet-register + +go 1.22 diff --git a/serverless-fleet-worker-registration/go.sum b/serverless-fleet-worker-registration/go.sum new file mode 100644 index 00000000..e69de29b diff --git a/serverless-fleet-worker-registration/main.go b/serverless-fleet-worker-registration/main.go new file mode 100644 index 00000000..45553974 --- /dev/null +++ b/serverless-fleet-worker-registration/main.go @@ -0,0 +1,265 @@ +package main + +import ( + "encoding/csv" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "strings" + "sync" + "time" +) + +const ( + serverAddr = ":8080" +) + +var csvPath = getCSVPath() + +func getCSVPath() string { + if path := os.Getenv("CSV_PATH"); path != "" { + return path + } + return "fleet-register.csv" +} + +var csvMu sync.Mutex + +type workerRequest struct { + WorkerName string `json:"worker_name"` + WorkerIP string `json:"worker_ip"` +} + +type apiResponse struct { + Message string `json:"message"` + WorkerName string `json:"worker_name,omitempty"` + WorkerIP string `json:"worker_ip,omitempty"` + Status string `json:"status,omitempty"` + RegisteredAt string `json:"registered_at,omitempty"` + CompletedAt string `json:"completed_at,omitempty"` + File string `json:"file,omitempty"` +} + +func main() { + mux := http.NewServeMux() + mux.HandleFunc("POST /register", registerHandler) + mux.HandleFunc("POST /deregister", deregisterHandler) + mux.HandleFunc("GET /download", downloadHandler) + + log.Printf("server listening on %s", serverAddr) + if err := http.ListenAndServe(serverAddr, mux); err != nil { + log.Fatal(err) + } +} + +func registerHandler(w http.ResponseWriter, r *http.Request) { + req, err := decodeWorkerRequest(r) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + csvMu.Lock() + defer csvMu.Unlock() + + if err := appendWorkerRow(req, "running"); err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to register worker: %v", err)) + return + } + + writeJSON(w, http.StatusCreated, apiResponse{ + Message: "worker registered", + WorkerName: req.WorkerName, + WorkerIP: req.WorkerIP, + Status: "running", + File: csvPath, + }) +} + +func deregisterHandler(w http.ResponseWriter, r *http.Request) { + req, err := decodeWorkerRequest(r) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + csvMu.Lock() + defer csvMu.Unlock() + + updated, err := completeWorker(req) + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to deregister worker: %v", err)) + return + } + + message := "worker deregistered" + if !updated { + message = "worker deregistration added" + } + + writeJSON(w, http.StatusOK, apiResponse{ + Message: message, + WorkerName: req.WorkerName, + WorkerIP: req.WorkerIP, + Status: "completed", + File: csvPath, + }) +} + +func downloadHandler(w http.ResponseWriter, r *http.Request) { + csvMu.Lock() + defer csvMu.Unlock() + + if err := ensureCSV(); err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to prepare CSV: %v", err)) + return + } + + content, err := os.ReadFile(csvPath) + if err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to read CSV: %v", err)) + return + } + + w.Header().Set("Content-Type", "text/csv") + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, csvPath)) + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(content))) + w.WriteHeader(http.StatusOK) + + if _, err := w.Write(content); err != nil { + log.Printf("failed to write download response: %v", err) + } +} + +func decodeWorkerRequest(r *http.Request) (workerRequest, error) { + defer r.Body.Close() + + var req workerRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return workerRequest{}, errors.New("invalid JSON body") + } + + req.WorkerName = strings.TrimSpace(req.WorkerName) + req.WorkerIP = strings.TrimSpace(req.WorkerIP) + + if req.WorkerName == "" { + return workerRequest{}, errors.New("field \"worker_name\" is required") + } + + if req.WorkerIP == "" { + return workerRequest{}, errors.New("field \"worker_ip\" is required") + } + + return req, nil +} + +func appendWorkerRow(req workerRequest, status string) error { + if err := ensureCSV(); err != nil { + return err + } + + file, err := os.OpenFile(csvPath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + registeredAt := time.Now().Format(time.RFC3339) + record := []string{req.WorkerName, req.WorkerIP, status, registeredAt, ""} + + return writer.Write(record) +} + +func completeWorker(req workerRequest) (bool, error) { + if err := ensureCSV(); err != nil { + return false, err + } + + // Read all records + file, err := os.Open(csvPath) + if err != nil { + return false, err + } + + reader := csv.NewReader(file) + records, err := reader.ReadAll() + file.Close() + if err != nil { + return false, err + } + + completedAt := time.Now().Format(time.RFC3339) + updated := false + + // Update matching record + for i := 1; i < len(records); i++ { + if len(records[i]) >= 2 && records[i][0] == req.WorkerName && records[i][1] == req.WorkerIP { + records[i][2] = "completed" + records[i][4] = completedAt + updated = true + break + } + } + + // If not found, add new record + if !updated { + records = append(records, []string{req.WorkerName, req.WorkerIP, "completed", "", completedAt}) + } + + // Write all records back + file, err = os.Create(csvPath) + if err != nil { + return false, err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + for _, record := range records { + if err := writer.Write(record); err != nil { + return false, err + } + } + + return updated, nil +} + +func ensureCSV() error { + if _, err := os.Stat(csvPath); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + + file, err := os.Create(csvPath) + if err != nil { + return err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + headers := []string{"worker_name", "worker_ip", "status", "registered_at", "completed_at"} + return writer.Write(headers) +} + +func writeJSON(w http.ResponseWriter, statusCode int, payload apiResponse) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + + if err := json.NewEncoder(w).Encode(payload); err != nil { + log.Printf("failed to encode JSON response: %v", err) + } +} + +func writeError(w http.ResponseWriter, statusCode int, message string) { + writeJSON(w, statusCode, apiResponse{Message: message}) +} diff --git a/serverless-fleet-worker-registration/run b/serverless-fleet-worker-registration/run new file mode 100755 index 00000000..e3766c63 --- /dev/null +++ b/serverless-fleet-worker-registration/run @@ -0,0 +1,67 @@ +#!/bin/bash + +set -e + +uuid=$(uuidgen | tr '[:upper:]' '[:lower:]' | awk -F- '{print $1}') + +APP_URL="$(ibmcloud ce app get --name fleet-workers -o url)" + +PREHOOK=$(cat <