From 4a21a370d1ad25f6f079c39c1cd980690bb796d6 Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Fri, 29 May 2026 11:46:15 +0200 Subject: [PATCH 1/2] cmd/alertmanager: extract main into internal/app package Move the body of run() from cmd/alertmanager/main.go into a new internal/app package so Alertmanager can be embedded in tests and other binaries without shelling out to a compiled binary. Resolves the long-standing TODO from #406. cmd/alertmanager/main.go shrinks from 724 to 196 lines and is now responsible only for kingpin flag parsing, logger construction, versioncollector registration, feature-flag / GOMEMLIMIT side effects, and translating OS signals into context cancellation (SIGINT/SIGTERM) plus reload events (SIGHUP) consumed by app.Run. The new internal/app package is split into: * options.go - Options struct, validate(), DefaultClusterAddr * app.go - Run(ctx, opts) error * metrics.go - per-instance Prometheus metrics struct * cluster.go - clusterWait helper * url.go - extURL helper (+ url_test.go for TestExternalURL) The six previously package-level promauto.NewXxx variables in cmd/alertmanager/main.go are now constructed per Run() invocation against opts.Registerer. Combined with threading the registerer through every collaborator (versioncollector excepted, which stays in main.go as a process-global), this unblocks running multiple Alertmanager instances in the same process without duplicate- registration panics. Behavioural notes: * prometheus.DefaultRegisterer is no longer referenced inside app.Run; the binary still passes it in via Options.Registerer so on-disk behaviour is identical. * app.Run defers srv.Shutdown(5s) on exit. Previously the deferred srv.Close lived inside the listen goroutine and never ran in practice because os.Exit killed the process first. Behaviour for the binary is unchanged; embedded callers now get clean HTTP teardown. * --cluster.listen-address default moved from a const in cmd/alertmanager to the exported app.DefaultClusterAddr. Known follow-ups intentionally out of scope: * matcher/compat.InitFromFlags still mutates package-level state; multi-instance tests with different feature flags will collide. * Richer App lifecycle (New/Start/Addr/Reload/Stop) for tests that need :0-port discovery or programmatic reload. * Migrating the v2 acceptance harness to use app.Run directly instead of building and spawning the binary. Verification: `go build ./...`, `go vet ./...`, and `go test -count=1 ./...` all pass, including the existing test/with_api_v2/acceptance suite which continues to build and spawn the binary end-to-end. Closes #406 Signed-off-by: Siavash Safi --- cmd/alertmanager/main.go | 659 ++---------------- internal/app/app.go | 502 +++++++++++++ internal/app/cluster.go | 29 + internal/app/metrics.go | 98 +++ internal/app/options.go | 104 +++ internal/app/url.go | 57 ++ .../main_test.go => internal/app/url_test.go | 2 +- 7 files changed, 863 insertions(+), 588 deletions(-) create mode 100644 internal/app/app.go create mode 100644 internal/app/cluster.go create mode 100644 internal/app/metrics.go create mode 100644 internal/app/options.go create mode 100644 internal/app/url.go rename cmd/alertmanager/main_test.go => internal/app/url_test.go (99%) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 4707cfb8ef..8d590ac4ea 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -15,120 +15,28 @@ package main import ( "context" - "errors" "fmt" - "log/slog" - "net" - "net/http" - "net/url" "os" "os/signal" - "path/filepath" "runtime" "strings" - "sync" - "sync/atomic" "syscall" - "time" "github.com/KimMachineGun/automemlimit/memlimit" "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" promslogflag "github.com/prometheus/common/promslog/flag" - "github.com/prometheus/common/route" "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" - "github.com/prometheus/alertmanager/alert" - "github.com/prometheus/alertmanager/api" "github.com/prometheus/alertmanager/cluster" - "github.com/prometheus/alertmanager/config" - "github.com/prometheus/alertmanager/config/receiver" - "github.com/prometheus/alertmanager/dispatch" - "github.com/prometheus/alertmanager/eventrecorder" - "github.com/prometheus/alertmanager/eventrecorder/eventrecorderpb" "github.com/prometheus/alertmanager/featurecontrol" - "github.com/prometheus/alertmanager/httpserver" - "github.com/prometheus/alertmanager/inhibit" - "github.com/prometheus/alertmanager/marker" + "github.com/prometheus/alertmanager/internal/app" "github.com/prometheus/alertmanager/matcher/compat" - "github.com/prometheus/alertmanager/nflog" - "github.com/prometheus/alertmanager/notify" - "github.com/prometheus/alertmanager/provider/mem" - "github.com/prometheus/alertmanager/silence" - "github.com/prometheus/alertmanager/template" - "github.com/prometheus/alertmanager/timeinterval" - "github.com/prometheus/alertmanager/tracing" - "github.com/prometheus/alertmanager/ui" ) -var ( - requestDuration = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "alertmanager_http_request_duration_seconds", - Help: "Histogram of latencies for HTTP requests.", - Buckets: prometheus.DefBuckets, - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 1 * time.Hour, - }, - []string{"handler", "method", "code"}, - ) - responseSize = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "alertmanager_http_response_size_bytes", - Help: "Histogram of response size for HTTP requests.", - Buckets: prometheus.ExponentialBuckets(100, 10, 7), - }, - []string{"handler", "method"}, - ) - clusterEnabled = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_cluster_enabled", - Help: "Indicates whether the clustering is enabled or not.", - }, - ) - configuredReceivers = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_receivers", - Help: "Number of configured receivers.", - }, - ) - configuredIntegrations = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_integrations", - Help: "Number of configured integrations.", - }, - ) - configuredInhibitionRules = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_inhibition_rules", - Help: "Number of configured inhibition rules.", - }, - ) - - promslogConfig = promslog.Config{} -) - -func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { - handlerLabel := prometheus.Labels{"handler": handlerName} - return promhttp.InstrumentHandlerDuration( - requestDuration.MustCurryWith(handlerLabel), - promhttp.InstrumentHandlerResponseSize( - responseSize.MustCurryWith(handlerLabel), - handler, - ), - ) -} - -const defaultClusterAddr = "0.0.0.0:9094" - func main() { os.Exit(run()) } @@ -150,7 +58,7 @@ func run() int { alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() perAlertNameLimit = kingpin.Flag("alerts.per-alertname-limit", "Maximum number of alerts per alertname. If negative or zero, no limit is set.").Default("0").Int() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() - DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() + dispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() @@ -162,7 +70,7 @@ func run() int { Default("0.9").Float64() clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster. Set to empty string to disable HA mode."). - Default(defaultClusterAddr).String() + Default(app.DefaultClusterAddr).String() clusterAdvertiseAddr = kingpin.Flag("cluster.advertise-address", "Explicit address to advertise in cluster.").String() clusterPeerName = kingpin.Flag("cluster.peer-name", "Explicit name of the peer, rather than generating a random one").Default("").String() peers = kingpin.Flag("cluster.peer", "Initial peers (may be repeated).").Strings() @@ -182,21 +90,16 @@ func run() int { featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String() ) - prometheus.MustRegister(versioncollector.NewCollector("alertmanager")) - + promslogConfig := promslog.Config{} promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig) kingpin.CommandLine.UsageWriter(os.Stdout) - kingpin.Version(version.Print("alertmanager")) kingpin.CommandLine.GetFlag("help").Short('h') kingpin.Parse() logger := promslog.New(&promslogConfig) - logger.Info("Starting Alertmanager", "version", version.Info()) - startTime := time.Now() - - logger.Info("Build context", "build_context", version.BuildContext()) + prometheus.MustRegister(versioncollector.NewCollector("alertmanager")) ff, err := featurecontrol.NewFlags(logger, *featureFlags) if err != nil { @@ -210,7 +113,6 @@ func run() int { logger.Error("--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.") return 1 } - if _, err := memlimit.SetGoMemLimitWithOpts( memlimit.WithRatio(*memlimitRatio), memlimit.WithProvider( @@ -228,495 +130,78 @@ func run() int { logger.Warn("automaxprocs", "msg", "This flag is deprecated and will be removed in the next release") } - err = os.MkdirAll(*dataDir, 0o777) - if err != nil { - logger.Error("Unable to create data directory", "err", err) - return 1 - } - - tlsTransportConfig, err := cluster.GetTLSTransportConfig(*tlsConfigFile) - if err != nil { - logger.Error("unable to initialize TLS transport configuration for gossip mesh", "err", err) - return 1 - } - var peer *cluster.Peer - if *clusterBindAddr != "" { - peer, err = cluster.Create( - logger.With("component", "cluster"), - prometheus.DefaultRegisterer, - *clusterBindAddr, - *clusterAdvertiseAddr, - *peers, - true, - *pushPullInterval, - *gossipInterval, - *tcpTimeout, - *peersResolveTimeout, - *probeTimeout, - *probeInterval, - tlsTransportConfig, - *allowInsecureAdvertise, - *label, - *clusterPeerName, - ) - if err != nil { - logger.Error("unable to initialize gossip mesh", "err", err) - return 1 - } - clusterEnabled.Set(1) - } - - stopc := make(chan struct{}) - var wg sync.WaitGroup - - // Load config once for both event recorder initialization and the first - // coordinator apply. Subsequent reloads (SIGHUP, /-/reload) go - // through configCoordinator.Reload() which reads the file again. - initialConf, err := config.LoadFile(*configFile) - if err != nil { - logger.Error("error loading configuration file", "err", err) - return 1 - } - - hostname, _ := os.Hostname() - var eventRec eventrecorder.Recorder - if ff.EnableEventRecorder() { - eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), prometheus.DefaultRegisterer) - } - defer eventRec.Close() - - recordCtx := eventrecorder.WithEventRecording(context.Background()) - eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ - EventType: &eventrecorderpb.EventData_AlertmanagerStartupEvent{ - AlertmanagerStartupEvent: &eventrecorderpb.AlertmanagerStartupEvent{ - Version: version.Version, - BuildContext: version.BuildContext(), - }, - }, - }) - defer func() { - eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ - EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{ - AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{}, - }, - }) - }() - - notificationLogOpts := nflog.Options{ - SnapshotFile: filepath.Join(*dataDir, "nflog"), - Retention: *retention, - Logger: logger.With("component", "nflog"), - Metrics: prometheus.DefaultRegisterer, - } - - notificationLog, err := nflog.New(notificationLogOpts) - if err != nil { - logger.Error("error creating notification log", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer) - notificationLog.SetBroadcast(c.Broadcast) - } - - wg.Go(func() { - notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) - }) - - marker := marker.NewGroupMarker() - - silenceOpts := silence.Options{ - SnapshotFile: filepath.Join(*dataDir, "silences"), - Retention: *retention, - Limits: silence.Limits{ - MaxSilences: func() int { return *maxSilences }, - MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, - }, - Logger: logger.With("component", "silences"), - Metrics: prometheus.DefaultRegisterer, - Logging: *silenceLogging, - EventRecorder: eventRec, - } - - silences, err := silence.New(silenceOpts) - if err != nil { - logger.Error("error creating silence", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("sil", silences, prometheus.DefaultRegisterer) - silences.SetBroadcast(c.Broadcast) - } - - // Start providers before router potentially sends updates. - wg.Go(func() { - silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil) - }) - - defer func() { - close(stopc) - wg.Wait() - }() - - silencer := silence.NewSilencer(silences, logger, eventRec) - - // Peer state listeners have been registered, now we can join and get the initial state. - if peer != nil { - err = peer.Join( - *reconnectInterval, - *peerReconnectTimeout, - ) - if err != nil { - logger.Warn("unable to join gossip mesh", "err", err) - } - ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout) - defer func() { - cancel() - if err := peer.Leave(10 * time.Second); err != nil { - logger.Warn("unable to leave gossip mesh", "err", err) - } - }() - go peer.Settle(ctx, *gossipInterval*10) - eventRec.SetClusterPeer(peer) - } - - alerts, err := mem.NewAlerts( - context.Background(), - *alertGCInterval, - *perAlertNameLimit, - silencer, - logger, - eventRec, - prometheus.DefaultRegisterer, - ff, - ) - if err != nil { - logger.Error("error creating memory provider", "err", err) - return 1 - } - defer alerts.Close() - - var disp atomic.Pointer[dispatch.Dispatcher] - defer func() { - disp.Load().Stop() - }() - - groupFn := func(ctx context.Context, routeFilter func(*dispatch.Route) bool, alertFilter func(*alert.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error) { - return disp.Load().Groups(ctx, routeFilter, alertFilter) - } - - // An interface value that holds a nil concrete value is non-nil. - // Therefore we explicly pass an empty interface, to detect if the - // cluster is not enabled in notify. - var clusterPeer cluster.ClusterPeer - if peer != nil { - clusterPeer = peer - } - - api, err := api.New(api.Options{ - Alerts: alerts, - Silences: silences, - GroupMutedFunc: marker.Muted, - Peer: clusterPeer, - Timeout: *httpTimeout, - Concurrency: *getConcurrency, - Logger: logger.With("component", "api"), - Registry: prometheus.DefaultRegisterer, - RequestDuration: requestDuration, - GroupFunc: groupFn, - }) - if err != nil { - logger.Error("failed to create API", "err", err) - return 1 - } - - amURL, err := extURL(logger, os.Hostname, (*webConfig.WebListenAddresses)[0], *externalURL) - if err != nil { - logger.Error("failed to determine external URL", "err", err) - return 1 - } - logger.Debug("external url", "externalUrl", amURL.String()) - - waitFunc := func() time.Duration { return 0 } - if peer != nil { - waitFunc = clusterWait(peer, *peerTimeout) - } - timeoutFunc := func(d time.Duration) time.Duration { - if d < notify.MinTimeout { - d = notify.MinTimeout - } - return d + waitFunc() - } - - tracingManager := tracing.NewManager(logger.With("component", "tracing")) - - var ( - inhibitor atomic.Pointer[inhibit.Inhibitor] - tmpl *template.Template - ) - - dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer, ff) - pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff, eventRec) - configLogger := logger.With("component", "configuration") - configCoordinator := config.NewCoordinator( - *configFile, - prometheus.DefaultRegisterer, - configLogger, - ) - configCoordinator.Subscribe(func(conf *config.Config) error { - // Reload event recorder outputs first so events emitted during - // the rest of this callback (e.g., by stopping the old - // dispatcher) go to the new outputs. - eventRec.ApplyConfig(conf.EventRecorder) - - tmpl, err = template.FromGlobs(conf.Templates) - if err != nil { - return fmt.Errorf("failed to parse templates: %w", err) - } - tmpl.ExternalURL = amURL - - // Build the routing tree and record which receivers are used. - routes := dispatch.NewRoute(conf.Route, nil) - activeReceivers := make(map[string]struct{}) - routes.Walk(func(r *dispatch.Route) { - activeReceivers[r.RouteOpts.Receiver] = struct{}{} - }) - - // Build the map of receiver to integrations. - receivers := make(map[string][]notify.Integration, len(activeReceivers)) - var integrationsNum int - for _, rcv := range conf.Receivers { - if _, found := activeReceivers[rcv.Name]; !found { - // No need to build a receiver if no route is using it. - configLogger.Info("skipping creation of receiver not referenced by any route", "receiver", rcv.Name) - continue - } - integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger) - if err != nil { - return err - } - // rcv.Name is guaranteed to be unique across all receivers. - receivers[rcv.Name] = integrations - integrationsNum += len(integrations) - } - - // Build the map of time interval names to time interval definitions. - timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals)) - for _, ti := range conf.MuteTimeIntervals { - timeIntervals[ti.Name] = ti.TimeIntervals - } - - for _, ti := range conf.TimeIntervals { - timeIntervals[ti.Name] = ti.TimeIntervals - } - - intervener := timeinterval.NewIntervener(timeIntervals) - - inhibitor.Load().Stop() - disp.Load().Stop() - - newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, logger, eventRec) - inhibitor.Store(newInhibitor) - - // An interface value that holds a nil concrete value is non-nil. - // Therefore we explicly pass an empty interface, to detect if the - // cluster is not enabled in notify. - var pipelinePeer notify.Peer - if peer != nil { - pipelinePeer = peer - } - - pipeline := pipelineBuilder.New( - receivers, - waitFunc, - newInhibitor, - silencer, - intervener, - marker, - notificationLog, - pipelinePeer, - ) - - configuredReceivers.Set(float64(len(activeReceivers))) - configuredIntegrations.Set(float64(integrationsNum)) - configuredInhibitionRules.Set(float64(len(conf.InhibitRules))) - - api.Update(conf, func(ctx context.Context, labels model.LabelSet) { - inhibitor.Load().Mutes(ctx, labels) - silencer.Mutes(ctx, labels) - }) - - newDisp := dispatch.NewDispatcher( - alerts, - routes, - pipeline, - marker, - timeoutFunc, - *dispatchMaintenanceInterval, - nil, - logger, - eventRec, - dispMetrics, - ) - routes.Walk(func(r *dispatch.Route) { - if r.RouteOpts.RepeatInterval > *retention { - configLogger.Warn( - "repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.", - "repeat_interval", - r.RouteOpts.RepeatInterval, - "retention", - *retention, - "route", - r.Key(), - ) - } - - if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval { - configLogger.Warn( - "repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.", - "repeat_interval", - r.RouteOpts.RepeatInterval, - "group_interval", - r.RouteOpts.GroupInterval, - "route", - r.Key(), - ) - } - }) - - // first, start the inhibitor so the inhibition cache can populate - // wait for this to load alerts before starting the dispatcher so - // we don't accidentially notify for an alert that will be inhibited - go newInhibitor.Run() - newInhibitor.WaitForLoading() - - // next, start the dispatcher and wait for it to load before swapping the disp pointer. - // This ensures that the API doesn't see the new dispatcher before it finishes populating - // the aggrGroups - go newDisp.Run(startTime.Add(*DispatchStartDelay)) - newDisp.WaitForLoading() - disp.Store(newDisp) - - err = tracingManager.ApplyConfig(conf.TracingConfig) - if err != nil { - return fmt.Errorf("failed to apply tracing config: %w", err) - } - - go tracingManager.Run() - - return nil - }) - - if err := configCoordinator.ApplyConfig(initialConf); err != nil { - return 1 - } - - // Make routePrefix default to externalURL path if empty string. - if *routePrefix == "" { - *routePrefix = amURL.Path - } - *routePrefix = "/" + strings.Trim(*routePrefix, "/") - logger.Debug("route prefix", "routePrefix", *routePrefix) - - router := route.New().WithInstrumentation(instrumentHandler) - if *routePrefix != "/" { - router.Get("/", func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, *routePrefix, http.StatusFound) - }) - router = router.WithPrefix(*routePrefix) - } - - webReload := make(chan chan error) - - ui.Register(router) - httpserver.Register(router, webReload) - - mux := api.Register(router, *routePrefix) + // Translate OS signals into context cancellation (SIGINT/SIGTERM) and + // reload events (SIGHUP). The app package no longer touches signals + // directly so that it can be embedded in tests. + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() - srv := &http.Server{ - // instrument all handlers with tracing - Handler: tracing.Middleware(mux), - } - srvc := make(chan struct{}) + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + defer signal.Stop(hup) + reload := make(chan struct{}, 1) go func() { - if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) { - logger.Error("Listen error", "err", err) - close(srvc) - } - defer func() { - if err := srv.Close(); err != nil { - logger.Error("Error on closing the server", "err", err) + for { + select { + case <-hup: + select { + case reload <- struct{}{}: + default: + } + case <-ctx.Done(): + return } - }() - }() - - var ( - hup = make(chan os.Signal, 1) - term = make(chan os.Signal, 1) - ) - signal.Notify(hup, syscall.SIGHUP) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - - for { - select { - case <-hup: - // ignore error, already logged in `reload()` - _ = configCoordinator.Reload() - case errc := <-webReload: - errc <- configCoordinator.Reload() - case <-term: - logger.Info("Received SIGTERM, exiting gracefully...") - - // shut down the tracing manager to flush any remaining spans. - // this blocks for up to 5s - tracingManager.Stop() - - return 0 - case <-srvc: - return 1 - } - } -} - -// clusterWait returns a function that inspects the current peer state and returns -// a duration of one base timeout for each peer with a higher ID than ourselves. -func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { - return func() time.Duration { - return time.Duration(p.Position()) * timeout - } -} - -func extURL(logger *slog.Logger, hostnamef func() (string, error), listen, external string) (*url.URL, error) { - if external == "" { - hostname, err := hostnamef() - if err != nil { - return nil, err - } - _, port, err := net.SplitHostPort(listen) - if err != nil { - return nil, err } - if port == "" { - logger.Warn("no port found for listen address", "address", listen) - } - - external = fmt.Sprintf("http://%s:%s/", hostname, port) - } - - u, err := url.Parse(external) - if err != nil { - return nil, err - } - if u.Scheme != "http" && u.Scheme != "https" { - return nil, fmt.Errorf("%q: invalid %q scheme, only 'http' and 'https' are supported", u.String(), u.Scheme) - } + }() - ppref := strings.TrimRight(u.Path, "/") - if ppref != "" && !strings.HasPrefix(ppref, "/") { - ppref = "/" + ppref + opts := app.Options{ + ConfigFile: *configFile, + DataDir: *dataDir, + Retention: *retention, + MaintenanceInterval: *maintenanceInterval, + MaxSilences: *maxSilences, + MaxSilenceSizeBytes: *maxSilenceSizeBytes, + SilenceLogging: *silenceLogging, + AlertGCInterval: *alertGCInterval, + PerAlertNameLimit: *perAlertNameLimit, + DispatchMaintenanceInterval: *dispatchMaintenanceInterval, + DispatchStartDelay: *dispatchStartDelay, + + WebConfig: webConfig, + ExternalURL: *externalURL, + RoutePrefix: *routePrefix, + GetConcurrency: *getConcurrency, + HTTPTimeout: *httpTimeout, + + ClusterBindAddr: *clusterBindAddr, + ClusterAdvertiseAddr: *clusterAdvertiseAddr, + ClusterPeerName: *clusterPeerName, + Peers: *peers, + PeerTimeout: *peerTimeout, + PeersResolveTimeout: *peersResolveTimeout, + GossipInterval: *gossipInterval, + PushPullInterval: *pushPullInterval, + TCPTimeout: *tcpTimeout, + ProbeTimeout: *probeTimeout, + ProbeInterval: *probeInterval, + SettleTimeout: *settleTimeout, + ReconnectInterval: *reconnectInterval, + PeerReconnectTimeout: *peerReconnectTimeout, + TLSConfigFile: *tlsConfigFile, + AllowInsecureAdvertise: *allowInsecureAdvertise, + Label: *label, + + Logger: logger, + Registerer: prometheus.DefaultRegisterer, + Flagger: ff, + Reload: reload, + } + + if err := app.Run(ctx, opts); err != nil { + logger.Error("alertmanager exited with error", "err", err) + return 1 } - u.Path = ppref - - return u, nil + logger.Info("Received shutdown signal, exiting gracefully...") + return 0 } diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000000..cdb6008a1d --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,502 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package app contains the Alertmanager process logic extracted from +// cmd/alertmanager so that tests and other binaries can embed +// Alertmanager in-process instead of shelling out to a compiled binary. +// See https://github.com/prometheus/alertmanager/issues/406. +package app + +import ( + "context" + "errors" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/common/route" + "github.com/prometheus/common/version" + "github.com/prometheus/exporter-toolkit/web" + + "github.com/prometheus/alertmanager/alert" + "github.com/prometheus/alertmanager/api" + "github.com/prometheus/alertmanager/cluster" + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/config/receiver" + "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/eventrecorder" + "github.com/prometheus/alertmanager/eventrecorder/eventrecorderpb" + "github.com/prometheus/alertmanager/httpserver" + "github.com/prometheus/alertmanager/inhibit" + "github.com/prometheus/alertmanager/marker" + "github.com/prometheus/alertmanager/nflog" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/provider/mem" + "github.com/prometheus/alertmanager/silence" + "github.com/prometheus/alertmanager/template" + "github.com/prometheus/alertmanager/timeinterval" + "github.com/prometheus/alertmanager/tracing" + "github.com/prometheus/alertmanager/ui" +) + +// Run starts an Alertmanager instance using opts and blocks until ctx is +// cancelled or an unrecoverable error occurs. It is the package-level +// equivalent of what cmd/alertmanager used to do inline. +func Run(ctx context.Context, opts Options) error { + if err := opts.validate(); err != nil { + return err + } + + logger := opts.Logger + reg := opts.Registerer + ff := opts.Flagger + m := newMetrics(reg) + + logger.Info("Starting Alertmanager", "version", version.Info()) + startTime := time.Now() + logger.Info("Build context", "build_context", version.BuildContext()) + + if err := os.MkdirAll(opts.DataDir, 0o777); err != nil { + return fmt.Errorf("unable to create data directory: %w", err) + } + + tlsTransportConfig, err := cluster.GetTLSTransportConfig(opts.TLSConfigFile) + if err != nil { + return fmt.Errorf("unable to initialize TLS transport configuration for gossip mesh: %w", err) + } + + var peer *cluster.Peer + if opts.ClusterBindAddr != "" { + peer, err = cluster.Create( + logger.With("component", "cluster"), + reg, + opts.ClusterBindAddr, + opts.ClusterAdvertiseAddr, + opts.Peers, + true, + opts.PushPullInterval, + opts.GossipInterval, + opts.TCPTimeout, + opts.PeersResolveTimeout, + opts.ProbeTimeout, + opts.ProbeInterval, + tlsTransportConfig, + opts.AllowInsecureAdvertise, + opts.Label, + opts.ClusterPeerName, + ) + if err != nil { + return fmt.Errorf("unable to initialize gossip mesh: %w", err) + } + m.clusterEnabled.Set(1) + } + + stopc := make(chan struct{}) + var wg sync.WaitGroup + + // Load config once for both event recorder initialization and the + // first coordinator apply. Subsequent reloads go through + // configCoordinator.Reload() which reads the file again. + initialConf, err := config.LoadFile(opts.ConfigFile) + if err != nil { + return fmt.Errorf("error loading configuration file: %w", err) + } + + hostname, _ := os.Hostname() + var eventRec eventrecorder.Recorder + if ff.EnableEventRecorder() { + eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), reg) + } + defer eventRec.Close() + + recordCtx := eventrecorder.WithEventRecording(context.Background()) + eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ + EventType: &eventrecorderpb.EventData_AlertmanagerStartupEvent{ + AlertmanagerStartupEvent: &eventrecorderpb.AlertmanagerStartupEvent{ + Version: version.Version, + BuildContext: version.BuildContext(), + }, + }, + }) + defer func() { + eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ + EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{ + AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{}, + }, + }) + }() + + notificationLogOpts := nflog.Options{ + SnapshotFile: filepath.Join(opts.DataDir, "nflog"), + Retention: opts.Retention, + Logger: logger.With("component", "nflog"), + Metrics: reg, + } + notificationLog, err := nflog.New(notificationLogOpts) + if err != nil { + return fmt.Errorf("error creating notification log: %w", err) + } + if peer != nil { + c := peer.AddState("nfl", notificationLog, reg) + notificationLog.SetBroadcast(c.Broadcast) + } + + wg.Go(func() { + notificationLog.Maintenance(opts.MaintenanceInterval, filepath.Join(opts.DataDir, "nflog"), stopc, nil) + }) + + groupMarker := marker.NewGroupMarker() + + silenceOpts := silence.Options{ + SnapshotFile: filepath.Join(opts.DataDir, "silences"), + Retention: opts.Retention, + Limits: silence.Limits{ + MaxSilences: func() int { return opts.MaxSilences }, + MaxSilenceSizeBytes: func() int { return opts.MaxSilenceSizeBytes }, + }, + Logger: logger.With("component", "silences"), + Metrics: reg, + Logging: opts.SilenceLogging, + EventRecorder: eventRec, + } + silences, err := silence.New(silenceOpts) + if err != nil { + return fmt.Errorf("error creating silence: %w", err) + } + if peer != nil { + c := peer.AddState("sil", silences, reg) + silences.SetBroadcast(c.Broadcast) + } + + // Start providers before router potentially sends updates. + wg.Go(func() { + silences.Maintenance(opts.MaintenanceInterval, filepath.Join(opts.DataDir, "silences"), stopc, nil) + }) + + defer func() { + close(stopc) + wg.Wait() + }() + + silencer := silence.NewSilencer(silences, logger, eventRec) + + // Peer state listeners have been registered, now we can join and get the initial state. + if peer != nil { + if err := peer.Join(opts.ReconnectInterval, opts.PeerReconnectTimeout); err != nil { + logger.Warn("unable to join gossip mesh", "err", err) + } + settleCtx, cancel := context.WithTimeout(context.Background(), opts.SettleTimeout) + defer func() { + cancel() + if err := peer.Leave(10 * time.Second); err != nil { + logger.Warn("unable to leave gossip mesh", "err", err) + } + }() + go peer.Settle(settleCtx, opts.GossipInterval*10) + eventRec.SetClusterPeer(peer) + } + + alerts, err := mem.NewAlerts( + context.Background(), + opts.AlertGCInterval, + opts.PerAlertNameLimit, + silencer, + logger, + eventRec, + reg, + ff, + ) + if err != nil { + return fmt.Errorf("error creating memory provider: %w", err) + } + defer alerts.Close() + + var disp atomic.Pointer[dispatch.Dispatcher] + defer func() { + if d := disp.Load(); d != nil { + d.Stop() + } + }() + + groupFn := func(ctx context.Context, routeFilter func(*dispatch.Route) bool, alertFilter func(*alert.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error) { + return disp.Load().Groups(ctx, routeFilter, alertFilter) + } + + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicitly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var clusterPeer cluster.ClusterPeer + if peer != nil { + clusterPeer = peer + } + + apih, err := api.New(api.Options{ + Alerts: alerts, + Silences: silences, + GroupMutedFunc: groupMarker.Muted, + Peer: clusterPeer, + Timeout: opts.HTTPTimeout, + Concurrency: opts.GetConcurrency, + Logger: logger.With("component", "api"), + Registry: reg, + RequestDuration: m.requestDuration, + GroupFunc: groupFn, + }) + if err != nil { + return fmt.Errorf("failed to create API: %w", err) + } + + amURL, err := extURL(logger, os.Hostname, (*opts.WebConfig.WebListenAddresses)[0], opts.ExternalURL) + if err != nil { + return fmt.Errorf("failed to determine external URL: %w", err) + } + logger.Debug("external url", "externalUrl", amURL.String()) + + waitFunc := func() time.Duration { return 0 } + if peer != nil { + waitFunc = clusterWait(peer, opts.PeerTimeout) + } + timeoutFunc := func(d time.Duration) time.Duration { + if d < notify.MinTimeout { + d = notify.MinTimeout + } + return d + waitFunc() + } + + tracingManager := tracing.NewManager(logger.With("component", "tracing")) + + var ( + inhibitor atomic.Pointer[inhibit.Inhibitor] + tmpl *template.Template + ) + + dispMetrics := dispatch.NewDispatcherMetrics(false, reg, ff) + pipelineBuilder := notify.NewPipelineBuilder(reg, ff, eventRec) + configLogger := logger.With("component", "configuration") + configCoordinator := config.NewCoordinator( + opts.ConfigFile, + reg, + configLogger, + ) + + configCoordinator.Subscribe(func(conf *config.Config) error { + // Reload event recorder outputs first so events emitted during + // the rest of this callback (e.g., by stopping the old + // dispatcher) go to the new outputs. + eventRec.ApplyConfig(conf.EventRecorder) + + tmpl, err = template.FromGlobs(conf.Templates) + if err != nil { + return fmt.Errorf("failed to parse templates: %w", err) + } + tmpl.ExternalURL = amURL + + // Build the routing tree and record which receivers are used. + routes := dispatch.NewRoute(conf.Route, nil) + activeReceivers := make(map[string]struct{}) + routes.Walk(func(r *dispatch.Route) { + activeReceivers[r.RouteOpts.Receiver] = struct{}{} + }) + + // Build the map of receiver to integrations. + receivers := make(map[string][]notify.Integration, len(activeReceivers)) + var integrationsNum int + for _, rcv := range conf.Receivers { + if _, found := activeReceivers[rcv.Name]; !found { + // No need to build a receiver if no route is using it. + configLogger.Info("skipping creation of receiver not referenced by any route", "receiver", rcv.Name) + continue + } + integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger) + if err != nil { + return err + } + // rcv.Name is guaranteed to be unique across all receivers. + receivers[rcv.Name] = integrations + integrationsNum += len(integrations) + } + + // Build the map of time interval names to time interval definitions. + timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals)) + for _, ti := range conf.MuteTimeIntervals { + timeIntervals[ti.Name] = ti.TimeIntervals + } + for _, ti := range conf.TimeIntervals { + timeIntervals[ti.Name] = ti.TimeIntervals + } + + intervener := timeinterval.NewIntervener(timeIntervals) + + if old := inhibitor.Load(); old != nil { + old.Stop() + } + if old := disp.Load(); old != nil { + old.Stop() + } + + newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, logger, eventRec) + inhibitor.Store(newInhibitor) + + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicitly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var pipelinePeer notify.Peer + if peer != nil { + pipelinePeer = peer + } + + pipeline := pipelineBuilder.New( + receivers, + waitFunc, + newInhibitor, + silencer, + intervener, + groupMarker, + notificationLog, + pipelinePeer, + ) + + m.configuredReceivers.Set(float64(len(activeReceivers))) + m.configuredIntegrations.Set(float64(integrationsNum)) + m.configuredInhibitionRules.Set(float64(len(conf.InhibitRules))) + + apih.Update(conf, func(ctx context.Context, labels model.LabelSet) { + inhibitor.Load().Mutes(ctx, labels) + silencer.Mutes(ctx, labels) + }) + + newDisp := dispatch.NewDispatcher( + alerts, + routes, + pipeline, + groupMarker, + timeoutFunc, + opts.DispatchMaintenanceInterval, + nil, + logger, + eventRec, + dispMetrics, + ) + routes.Walk(func(r *dispatch.Route) { + if r.RouteOpts.RepeatInterval > opts.Retention { + configLogger.Warn( + "repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.", + "repeat_interval", r.RouteOpts.RepeatInterval, + "retention", opts.Retention, + "route", r.Key(), + ) + } + if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval { + configLogger.Warn( + "repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.", + "repeat_interval", r.RouteOpts.RepeatInterval, + "group_interval", r.RouteOpts.GroupInterval, + "route", r.Key(), + ) + } + }) + + // First, start the inhibitor so the inhibition cache can populate. + // Wait for this to load alerts before starting the dispatcher so + // we don't accidentally notify for an alert that will be inhibited. + go newInhibitor.Run() + newInhibitor.WaitForLoading() + + // Next, start the dispatcher and wait for it to load before swapping + // the disp pointer. This ensures that the API doesn't see the new + // dispatcher before it finishes populating the aggrGroups. + go newDisp.Run(startTime.Add(opts.DispatchStartDelay)) + newDisp.WaitForLoading() + disp.Store(newDisp) + + if err := tracingManager.ApplyConfig(conf.TracingConfig); err != nil { + return fmt.Errorf("failed to apply tracing config: %w", err) + } + + go tracingManager.Run() + + return nil + }) + + if err := configCoordinator.ApplyConfig(initialConf); err != nil { + return fmt.Errorf("failed to apply initial configuration: %w", err) + } + + // Default routePrefix to externalURL path if empty. + routePrefix := opts.RoutePrefix + if routePrefix == "" { + routePrefix = amURL.Path + } + routePrefix = "/" + strings.Trim(routePrefix, "/") + logger.Debug("route prefix", "routePrefix", routePrefix) + + router := route.New().WithInstrumentation(m.instrumentHandler) + if routePrefix != "/" { + prefix := routePrefix + router.Get("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, prefix, http.StatusFound) + }) + router = router.WithPrefix(routePrefix) + } + + webReload := make(chan chan error) + + ui.Register(router) + httpserver.Register(router, webReload) + + mux := apih.Register(router, routePrefix) + + srv := &http.Server{ + // Instrument all handlers with tracing. + Handler: tracing.Middleware(mux), + } + srvc := make(chan struct{}) + + go func() { + if err := web.ListenAndServe(srv, opts.WebConfig, logger); !errors.Is(err, http.ErrServerClosed) { + logger.Error("Listen error", "err", err) + close(srvc) + } + }() + + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + logger.Warn("graceful HTTP shutdown failed", "err", err) + } + }() + + for { + select { + case <-opts.Reload: + if err := configCoordinator.Reload(); err != nil { + logger.Error("configuration reload failed", "err", err) + } + case errc := <-webReload: + errc <- configCoordinator.Reload() + case <-ctx.Done(): + logger.Info("Shutting down gracefully") + tracingManager.Stop() + return nil + case <-srvc: + return errors.New("alertmanager: HTTP listener failed") + } + } +} diff --git a/internal/app/cluster.go b/internal/app/cluster.go new file mode 100644 index 0000000000..28538e2d94 --- /dev/null +++ b/internal/app/cluster.go @@ -0,0 +1,29 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "time" + + "github.com/prometheus/alertmanager/cluster" +) + +// clusterWait returns a function that inspects the current peer state and +// returns a duration of one base timeout for each peer with a higher ID +// than ourselves. +func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { + return func() time.Duration { + return time.Duration(p.Position()) * timeout + } +} diff --git a/internal/app/metrics.go b/internal/app/metrics.go new file mode 100644 index 0000000000..e2b69437d8 --- /dev/null +++ b/internal/app/metrics.go @@ -0,0 +1,98 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// metrics bundles the process-level Prometheus metrics owned by the app +// package. They used to live as package-level variables in +// cmd/alertmanager/main.go and were registered against +// prometheus.DefaultRegisterer at init time. They are now constructed per +// app.Run invocation against the registerer supplied via Options so that +// multiple instances can coexist within a single process (e.g. tests). +type metrics struct { + requestDuration *prometheus.HistogramVec + responseSize *prometheus.HistogramVec + clusterEnabled prometheus.Gauge + configuredReceivers prometheus.Gauge + configuredIntegrations prometheus.Gauge + configuredInhibitionRules prometheus.Gauge +} + +func newMetrics(reg prometheus.Registerer) *metrics { + f := promauto.With(reg) + return &metrics{ + requestDuration: f.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "alertmanager_http_request_duration_seconds", + Help: "Histogram of latencies for HTTP requests.", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + }, + []string{"handler", "method", "code"}, + ), + responseSize: f.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "alertmanager_http_response_size_bytes", + Help: "Histogram of response size for HTTP requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 7), + }, + []string{"handler", "method"}, + ), + clusterEnabled: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_cluster_enabled", + Help: "Indicates whether the clustering is enabled or not.", + }, + ), + configuredReceivers: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_receivers", + Help: "Number of configured receivers.", + }, + ), + configuredIntegrations: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_integrations", + Help: "Number of configured integrations.", + }, + ), + configuredInhibitionRules: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_inhibition_rules", + Help: "Number of configured inhibition rules.", + }, + ), + } +} + +func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + handlerLabel := prometheus.Labels{"handler": handlerName} + return promhttp.InstrumentHandlerDuration( + m.requestDuration.MustCurryWith(handlerLabel), + promhttp.InstrumentHandlerResponseSize( + m.responseSize.MustCurryWith(handlerLabel), + handler, + ), + ) +} diff --git a/internal/app/options.go b/internal/app/options.go new file mode 100644 index 0000000000..3541f80fe6 --- /dev/null +++ b/internal/app/options.go @@ -0,0 +1,104 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "errors" + "log/slog" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/exporter-toolkit/web" + + "github.com/prometheus/alertmanager/featurecontrol" +) + +// DefaultClusterAddr is the default listen address used when the operator +// does not pass --cluster.listen-address. +const DefaultClusterAddr = "0.0.0.0:9094" + +// Options carries the resolved configuration for a single Alertmanager +// instance. Field names follow the kingpin flags in cmd/alertmanager/main.go +// so that mapping between the two is straightforward. +// +// Logger, Registerer and Flagger are required dependencies; the remaining +// fields default to their zero value (which generally matches the kingpin +// flag default). +type Options struct { + // Storage and lifecycle. + ConfigFile string + DataDir string + Retention time.Duration + MaintenanceInterval time.Duration + MaxSilences int + MaxSilenceSizeBytes int + SilenceLogging bool + AlertGCInterval time.Duration + PerAlertNameLimit int + DispatchMaintenanceInterval time.Duration + DispatchStartDelay time.Duration + + // Web server. + WebConfig *web.FlagConfig + ExternalURL string + RoutePrefix string + GetConcurrency int + HTTPTimeout time.Duration + + // Cluster. + ClusterBindAddr string + ClusterAdvertiseAddr string + ClusterPeerName string + Peers []string + PeerTimeout time.Duration + PeersResolveTimeout time.Duration + GossipInterval time.Duration + PushPullInterval time.Duration + TCPTimeout time.Duration + ProbeTimeout time.Duration + ProbeInterval time.Duration + SettleTimeout time.Duration + ReconnectInterval time.Duration + PeerReconnectTimeout time.Duration + TLSConfigFile string + AllowInsecureAdvertise bool + Label string + + // Injected dependencies. + Logger *slog.Logger + Registerer prometheus.Registerer + Flagger featurecontrol.Flagger + + // Reload triggers a configuration reload each time it receives a + // value. The binary translates SIGHUP into sends on this channel; + // callers can also drive reloads programmatically. A nil channel + // disables external reloads (the /-/reload HTTP endpoint still works). + Reload <-chan struct{} +} + +func (o *Options) validate() error { + if o.Logger == nil { + return errors.New("alertmanager/app: Options.Logger is required") + } + if o.Registerer == nil { + return errors.New("alertmanager/app: Options.Registerer is required") + } + if o.Flagger == nil { + return errors.New("alertmanager/app: Options.Flagger is required") + } + if o.WebConfig == nil || o.WebConfig.WebListenAddresses == nil || len(*o.WebConfig.WebListenAddresses) == 0 { + return errors.New("alertmanager/app: Options.WebConfig must contain at least one listen address") + } + return nil +} diff --git a/internal/app/url.go b/internal/app/url.go new file mode 100644 index 0000000000..0f478e2c27 --- /dev/null +++ b/internal/app/url.go @@ -0,0 +1,57 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "fmt" + "log/slog" + "net" + "net/url" + "strings" +) + +// extURL resolves the externally visible URL of this Alertmanager instance, +// defaulting to http://: when external is empty. +func extURL(logger *slog.Logger, hostnamef func() (string, error), listen, external string) (*url.URL, error) { + if external == "" { + hostname, err := hostnamef() + if err != nil { + return nil, err + } + _, port, err := net.SplitHostPort(listen) + if err != nil { + return nil, err + } + if port == "" { + logger.Warn("no port found for listen address", "address", listen) + } + external = fmt.Sprintf("http://%s:%s/", hostname, port) + } + + u, err := url.Parse(external) + if err != nil { + return nil, err + } + if u.Scheme != "http" && u.Scheme != "https" { + return nil, fmt.Errorf("%q: invalid %q scheme, only 'http' and 'https' are supported", u.String(), u.Scheme) + } + + ppref := strings.TrimRight(u.Path, "/") + if ppref != "" && !strings.HasPrefix(ppref, "/") { + ppref = "/" + ppref + } + u.Path = ppref + + return u, nil +} diff --git a/cmd/alertmanager/main_test.go b/internal/app/url_test.go similarity index 99% rename from cmd/alertmanager/main_test.go rename to internal/app/url_test.go index 28f21bdc2b..b7df64deac 100644 --- a/cmd/alertmanager/main_test.go +++ b/internal/app/url_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package app import ( "fmt" From cdd92306144eb90a70db0bb162a3751595819b95 Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Fri, 29 May 2026 12:19:01 +0200 Subject: [PATCH 2/2] internal/app: add App lifecycle (New/Start/Addr/Reload/Stop) introduce an App lifecycle so tests and embedders can drive Alertmanager without OS signals or os/exec, and discover the bound HTTP address even when listening on ":0". API: New(opts) (*App, error) (*App).Start() error (*App).Addr() string // first listener (*App).Addrs() []string // all listeners (*App).Reload(ctx) error (*App).Stop(ctx) error Run is preserved as a thin wrapper (New + Start + serveLoop + Stop) with a deferred Stop on a fresh 30s context so cleanup also runs on panic, matching the implicit panic-safety of the previous defer- based implementation. Internally, setup uses a cleanup stack (a.onStop) that Stop drains in LIFO order, mirroring Go's defer semantics so the source order of the old `defer X` lines in Run is preserved verbatim and the shutdown ordering does not depend on hand-written reverse-order code. Listeners are bound at New time via a new listenAll helper that calls net.Listen directly (so Addr is meaningful before Start); web.ServeMultiple is then invoked in Start. Systemd socket activation is not supported when embedding and returns an explicit error pointing callers back to cmd/alertmanager. Stop honors its context parameter for the HTTP shutdown step, capped at 5s, so callers passing a tighter deadline get faster teardown and callers passing context.Background get the default. Tests cover: single instance round-trip; two sequential instances in the same process (guards the Phase A metrics-per-Registerer fix against duplicate-registration panics); two concurrent instances on distinct ephemeral ports; and the Run wrapper end-to-end with ctx cancellation. All pass under -race. Signed-off-by: Siavash Safi --- internal/app/app.go | 118 ++++++++------ internal/app/lifecycle.go | 284 +++++++++++++++++++++++++++++++++ internal/app/lifecycle_test.go | 221 +++++++++++++++++++++++++ 3 files changed, 571 insertions(+), 52 deletions(-) create mode 100644 internal/app/lifecycle.go create mode 100644 internal/app/lifecycle_test.go diff --git a/internal/app/app.go b/internal/app/app.go index cdb6008a1d..4f98064f8c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -19,7 +19,6 @@ package app import ( "context" - "errors" "fmt" "net/http" "os" @@ -32,7 +31,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" "github.com/prometheus/alertmanager/alert" "github.com/prometheus/alertmanager/api" @@ -56,9 +54,45 @@ import ( ) // Run starts an Alertmanager instance using opts and blocks until ctx is -// cancelled or an unrecoverable error occurs. It is the package-level -// equivalent of what cmd/alertmanager used to do inline. +// cancelled or an unrecoverable error occurs. It is a thin wrapper over +// New + Start + serveLoop + Stop intended for callers that don't need +// the richer lifecycle API. +// +// The deferred Stop also ensures cleanup runs on panic, matching the +// implicit panic-safety of the original defer-based implementation. func Run(ctx context.Context, opts Options) error { + a, err := New(opts) + if err != nil { + return err + } + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = a.Stop(stopCtx) + }() + if err := a.Start(); err != nil { + return err + } + return a.serveLoop(ctx) +} + +// setup wires every Alertmanager subsystem and registers their teardown +// hooks on a.cleanups via a.onStop. Stop drains those hooks in LIFO order +// so the shutdown sequence matches the implicit ordering of the original +// defer-based Run implementation. +// +// The body is deliberately one long function rather than a chain of helpers. +// Nearly every step depends on locals produced by earlier steps (peer, +// eventRec, silences, alerts, groupMarker, silencer, notificationLog, +// disp, inhibitor, tmpl, configCoordinator, waitFunc, timeoutFunc, ...), +// and the configCoordinator.Subscribe callback closes over most of them. +// Splitting setup into helpers would force us to either thread a wide +// state struct between them or promote those locals to App fields; both +// obscure the dataflow without simplifying anything. +// +//nolint:gocyclo // intentional, see comment above. +func (a *App) setup() error { + opts := a.opts if err := opts.validate(); err != nil { return err } @@ -68,6 +102,8 @@ func Run(ctx context.Context, opts Options) error { ff := opts.Flagger m := newMetrics(reg) + a.logger = logger + logger.Info("Starting Alertmanager", "version", version.Info()) startTime := time.Now() logger.Info("Build context", "build_context", version.BuildContext()) @@ -123,7 +159,7 @@ func Run(ctx context.Context, opts Options) error { if ff.EnableEventRecorder() { eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), reg) } - defer eventRec.Close() + a.onStop(func() { eventRec.Close() }) recordCtx := eventrecorder.WithEventRecording(context.Background()) eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ @@ -134,13 +170,13 @@ func Run(ctx context.Context, opts Options) error { }, }, }) - defer func() { + a.onStop(func() { eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{ AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{}, }, }) - }() + }) notificationLogOpts := nflog.Options{ SnapshotFile: filepath.Join(opts.DataDir, "nflog"), @@ -184,15 +220,15 @@ func Run(ctx context.Context, opts Options) error { silences.SetBroadcast(c.Broadcast) } - // Start providers before router potentially sends updates. + // Start providers before the router potentially sends updates. wg.Go(func() { silences.Maintenance(opts.MaintenanceInterval, filepath.Join(opts.DataDir, "silences"), stopc, nil) }) - defer func() { + a.onStop(func() { close(stopc) wg.Wait() - }() + }) silencer := silence.NewSilencer(silences, logger, eventRec) @@ -201,13 +237,13 @@ func Run(ctx context.Context, opts Options) error { if err := peer.Join(opts.ReconnectInterval, opts.PeerReconnectTimeout); err != nil { logger.Warn("unable to join gossip mesh", "err", err) } - settleCtx, cancel := context.WithTimeout(context.Background(), opts.SettleTimeout) - defer func() { - cancel() + settleCtx, settleCancel := context.WithTimeout(context.Background(), opts.SettleTimeout) + a.onStop(func() { + settleCancel() if err := peer.Leave(10 * time.Second); err != nil { logger.Warn("unable to leave gossip mesh", "err", err) } - }() + }) go peer.Settle(settleCtx, opts.GossipInterval*10) eventRec.SetClusterPeer(peer) } @@ -225,14 +261,14 @@ func Run(ctx context.Context, opts Options) error { if err != nil { return fmt.Errorf("error creating memory provider: %w", err) } - defer alerts.Close() + a.onStop(alerts.Close) var disp atomic.Pointer[dispatch.Dispatcher] - defer func() { + a.onStop(func() { if d := disp.Load(); d != nil { d.Stop() } - }() + }) groupFn := func(ctx context.Context, routeFilter func(*dispatch.Route) bool, alertFilter func(*alert.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error) { return disp.Load().Groups(ctx, routeFilter, alertFilter) @@ -280,6 +316,8 @@ func Run(ctx context.Context, opts Options) error { } tracingManager := tracing.NewManager(logger.With("component", "tracing")) + a.tracing = tracingManager + a.onStop(tracingManager.Stop) var ( inhibitor atomic.Pointer[inhibit.Inhibitor] @@ -294,6 +332,7 @@ func Run(ctx context.Context, opts Options) error { reg, configLogger, ) + a.coord = configCoordinator configCoordinator.Subscribe(func(conf *config.Config) error { // Reload event recorder outputs first so events emitted during @@ -455,48 +494,23 @@ func Run(ctx context.Context, opts Options) error { router = router.WithPrefix(routePrefix) } - webReload := make(chan chan error) - ui.Register(router) - httpserver.Register(router, webReload) + httpserver.Register(router, a.webReload) mux := apih.Register(router, routePrefix) - srv := &http.Server{ + a.srv = &http.Server{ // Instrument all handlers with tracing. Handler: tracing.Middleware(mux), } - srvc := make(chan struct{}) - go func() { - if err := web.ListenAndServe(srv, opts.WebConfig, logger); !errors.Is(err, http.ErrServerClosed) { - logger.Error("Listen error", "err", err) - close(srvc) - } - }() - - defer func() { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := srv.Shutdown(shutdownCtx); err != nil { - logger.Warn("graceful HTTP shutdown failed", "err", err) - } - }() - - for { - select { - case <-opts.Reload: - if err := configCoordinator.Reload(); err != nil { - logger.Error("configuration reload failed", "err", err) - } - case errc := <-webReload: - errc <- configCoordinator.Reload() - case <-ctx.Done(): - logger.Info("Shutting down gracefully") - tracingManager.Stop() - return nil - case <-srvc: - return errors.New("alertmanager: HTTP listener failed") - } + // Bind listeners now so Addr is meaningful before Start runs and + // ":0" ports can be discovered by callers. + listeners, err := listenAll(opts.WebConfig) + if err != nil { + return err } + a.listeners = listeners + + return nil } diff --git a/internal/app/lifecycle.go b/internal/app/lifecycle.go new file mode 100644 index 0000000000..850ed1184b --- /dev/null +++ b/internal/app/lifecycle.go @@ -0,0 +1,284 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/http" + "sync" + "time" + + "github.com/prometheus/exporter-toolkit/web" + + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/tracing" +) + +// App is a running (or runnable) Alertmanager instance built from Options. +// +// Compared to the top-level Run function, App exposes lifecycle hooks +// (Start, Stop, Addr, Reload) so callers — typically tests — can drive an +// instance without OS signals and discover the actually-bound HTTP +// address (useful when listening on ":0"). +// +// Construct an App with New, then call Start to begin serving HTTP. The +// caller is responsible for calling Stop, ideally via a deferred call so +// teardown also runs on panic. An App is single-use: calling Start more +// than once is an error. +type App struct { + opts Options + logger *slog.Logger + + // Lifecycle dependencies retained for use by Start, Reload, and Stop. + coord *config.Coordinator + tracing *tracing.Manager + srv *http.Server + listeners []net.Listener + + // webReload is the channel exposed by httpserver.Register for the + // /-/reload HTTP endpoint. We read from it in serveLoop. + webReload chan chan error + + // srvc carries errors from the HTTP serve goroutine. It is closed + // when the goroutine exits cleanly. + srvc chan error + + // cleanups is the LIFO teardown stack: New (via setup) registers + // cleanups in source order; Stop drains them in reverse so that + // shutdown order mirrors the original `defer` chain in Run. + cleanups []func() + + startedOnce sync.Once + startErr error + // started records whether the serve goroutine in Start was actually + // launched. Stop uses this to decide whether draining a.srvc is + // meaningful — if Start never ran, nothing will ever close srvc and + // the drain would deadlock (e.g., during setup-failure rollback). + started bool + + // routerQuit signals the reload-routing goroutine (started by Start) + // to exit; routerDone is closed by that goroutine on exit. Both are + // allocated in Start and only used when a.started is true. + routerQuit chan struct{} + routerDone chan struct{} + + stoppedOnce sync.Once +} + +// New wires every Alertmanager subsystem according to opts but does not +// start serving HTTP yet. On error, partial setup is rolled back via the +// same cleanup stack that Stop would drain on success. +func New(opts Options) (*App, error) { + a := &App{ + opts: opts, + srvc: make(chan error, 1), + webReload: make(chan chan error), + } + if err := a.setup(); err != nil { + // Roll back partial setup (Stop is idempotent and nil-safe). + _ = a.Stop(context.Background()) + return nil, err + } + return a, nil +} + +// Start begins serving HTTP traffic on the listeners established by New. +// It returns immediately; the listen goroutine signals any error via the +// channel drained by serveLoop. Subsequent calls are no-ops. +func (a *App) Start() error { + a.startedOnce.Do(func() { + if a.srv == nil || len(a.listeners) == 0 { + a.startErr = errors.New("alertmanager/app: App.Start called before successful New") + return + } + a.started = true + a.routerQuit = make(chan struct{}) + a.routerDone = make(chan struct{}) + + // reloadRouter consumes /-/reload requests and opts.Reload sends so + // they trigger reloads regardless of whether the caller is using + // Run (which also runs serveLoop) or the lifecycle API directly + // (which doesn't). Without this goroutine the /-/reload HTTP + // handler would block forever in embedded mode because its send + // on an unbuffered channel has no receiver. + go a.reloadRouter() + + go func() { + err := web.ServeMultiple(a.listeners, a.srv, a.opts.WebConfig, a.logger) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + a.logger.Error("Listen error", "err", err) + a.srvc <- err + } + close(a.srvc) + }() + }) + return a.startErr +} + +// reloadRouter forwards reload triggers (HTTP /-/reload and opts.Reload) +// to the config coordinator until routerQuit closes. It is started by +// Start and stopped by Stop after the HTTP server has finished draining, +// so that any in-flight /-/reload handlers can complete their +// send/receive cycle through this goroutine. +func (a *App) reloadRouter() { + defer close(a.routerDone) + for { + select { + case <-a.routerQuit: + return + case <-a.opts.Reload: + if err := a.coord.Reload(); err != nil { + a.logger.Error("configuration reload failed", "err", err) + } + case errc := <-a.webReload: + errc <- a.coord.Reload() + } + } +} + +// Addr returns the address of the first bound listener, suitable for +// dialing a single-listener instance (the common case for tests that +// bind ":0"). Use Addrs if configured with multiple listen addresses. +func (a *App) Addr() string { + if len(a.listeners) == 0 { + return "" + } + return a.listeners[0].Addr().String() +} + +// Addrs returns all bound listener addresses in the order given by +// Options.WebConfig.WebListenAddresses. +func (a *App) Addrs() []string { + out := make([]string, len(a.listeners)) + for i, l := range a.listeners { + out[i] = l.Addr().String() + } + return out +} + +// Reload triggers a configuration reload (the programmatic equivalent of +// SIGHUP). Safe to call concurrently with serveLoop. +func (a *App) Reload(_ context.Context) error { + if a.coord == nil { + return errors.New("alertmanager/app: App.Reload called before successful New") + } + return a.coord.Reload() +} + +// Stop gracefully shuts down the App, draining cleanups in reverse +// registration order so that teardown ordering matches the original +// defer chain in Run. Safe to call multiple times; safe to call before +// Start (it will then merely roll back what setup registered). +func (a *App) Stop(ctx context.Context) error { + a.stoppedOnce.Do(func() { + // Stop accepting new HTTP traffic first so in-flight handlers + // don't observe collaborators being torn down underneath them. + // The 5s cap is derived from ctx so callers can request faster + // shutdown via a tighter deadline. The reload router is still + // running at this point so any in-flight /-/reload handler can + // complete its send/receive cycle and unblock Shutdown. + if a.srv != nil { + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := a.srv.Shutdown(shutdownCtx); err != nil { + a.logger.Warn("graceful HTTP shutdown failed", "err", err) + } + } + // HTTP is fully drained; no new /-/reload requests can arrive. + // Terminate the reload router and wait for it to exit before + // running cleanups (Coordinator is among them). + if a.started { + close(a.routerQuit) + <-a.routerDone + } + // Drain srvc so the listen goroutine, if any, exits before we + // release listener resources. ServeMultiple returns once all + // per-listener Serve calls return (which happens once Shutdown + // completes), so this drain is bounded. + // + // Guard on `started` because srvc is allocated in New (so it can + // be non-nil here) but only closed by Start's serve goroutine — + // without this guard, Stop would deadlock when called from New's + // rollback path on setup failure. + if a.started && a.srvc != nil { + for range a.srvc { + // no-op + } + } + // Run remaining cleanups in reverse-registration (LIFO) order, + // mirroring Go's `defer` semantics so the in-place transform + // from `defer X` to `a.onStop(X)` in setup is order-preserving. + for i := len(a.cleanups) - 1; i >= 0; i-- { + a.cleanups[i]() + } + }) + return nil +} + +// onStop registers fn to run when Stop is called. Cleanups run LIFO. +func (a *App) onStop(fn func()) { + a.cleanups = append(a.cleanups, fn) +} + +// serveLoop blocks until ctx is cancelled or an HTTP listener fails. It +// is used by Run only; reload routing is handled by reloadRouter, which +// is started directly from Start so it is also active for embedders that +// drive the App lifecycle without using Run. +func (a *App) serveLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + a.logger.Info("Shutting down gracefully") + return nil + case err, ok := <-a.srvc: + if !ok { + // Channel closed without an error report — the serve + // goroutine exited cleanly (ErrServerClosed). Treat + // this as graceful shutdown. + return nil + } + return fmt.Errorf("alertmanager: HTTP listener failed: %w", err) + } + } +} + +// listenAll binds TCP listeners for every address in +// flags.WebListenAddresses. Embedders that need systemd socket +// activation or non-TCP listeners (vsock, etc.) should drive +// Alertmanager via cmd/alertmanager instead. +func listenAll(flags *web.FlagConfig) ([]net.Listener, error) { + if flags.WebSystemdSocket != nil && *flags.WebSystemdSocket { + return nil, errors.New("alertmanager/app: systemd socket activation is not supported when embedding; use cmd/alertmanager directly") + } + if flags.WebListenAddresses == nil || len(*flags.WebListenAddresses) == 0 { + return nil, web.ErrNoListeners + } + addrs := *flags.WebListenAddresses + listeners := make([]net.Listener, 0, len(addrs)) + for _, addr := range addrs { + l, err := net.Listen("tcp", addr) + if err != nil { + for _, prev := range listeners { + _ = prev.Close() + } + return nil, fmt.Errorf("alertmanager/app: listen %q: %w", addr, err) + } + listeners = append(listeners, l) + } + return listeners, nil +} diff --git a/internal/app/lifecycle_test.go b/internal/app/lifecycle_test.go new file mode 100644 index 0000000000..a60b1f06fe --- /dev/null +++ b/internal/app/lifecycle_test.go @@ -0,0 +1,221 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + "io" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/promslog" + "github.com/prometheus/exporter-toolkit/web" + "github.com/stretchr/testify/require" + + "github.com/prometheus/alertmanager/featurecontrol" + "github.com/prometheus/alertmanager/matcher/compat" +) + +const minimalConfig = `route: + receiver: default +receivers: + - name: default +` + +// testOptions returns an Options value that is sufficient to bring up an +// Alertmanager instance bound to an ephemeral port with clustering +// disabled. +func testOptions(t *testing.T) Options { + t.Helper() + + dir := t.TempDir() + configPath := filepath.Join(dir, "alertmanager.yml") + require.NoError(t, os.WriteFile(configPath, []byte(minimalConfig), 0o600)) + + logger := promslog.NewNopLogger() + ff, err := featurecontrol.NewFlags(logger, "") + require.NoError(t, err) + // compat.InitFromFlags mutates package-global state; safe because all + // tests in this package use the same (empty) feature flag set. + compat.InitFromFlags(logger, ff) + + addrs := []string{"127.0.0.1:0"} + systemd := false + webCfg := "" + + return Options{ + ConfigFile: configPath, + DataDir: dir, + Retention: 120 * time.Hour, + MaintenanceInterval: 15 * time.Minute, + AlertGCInterval: 30 * time.Minute, + DispatchMaintenanceInterval: 30 * time.Second, + WebConfig: &web.FlagConfig{ + WebListenAddresses: &addrs, + WebSystemdSocket: &systemd, + WebConfigFile: &webCfg, + }, + PeerTimeout: 15 * time.Second, + PeersResolveTimeout: 15 * time.Second, + GossipInterval: 200 * time.Millisecond, + PushPullInterval: 60 * time.Second, + TCPTimeout: 10 * time.Second, + ProbeTimeout: 500 * time.Millisecond, + ProbeInterval: 1 * time.Second, + SettleTimeout: 0, + ReconnectInterval: 10 * time.Second, + PeerReconnectTimeout: 6 * time.Hour, + // Empty disables clustering — essential when running multiple + // instances in one process. + ClusterBindAddr: "", + + Logger: logger, + Registerer: prometheus.NewRegistry(), + Flagger: ff, + } +} + +func TestApp_StartStop(t *testing.T) { + a, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a.Start()) + + addr := a.Addr() + require.NotEmpty(t, addr, "Addr should be populated after Start") + + // Probe /-/healthy with a short retry loop to absorb listener warmup. + url := "http://" + addr + "/-/healthy" + require.Eventually(t, func() bool { + resp, err := http.Get(url) + if err != nil { + return false + } + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 5*time.Second, 50*time.Millisecond, "instance never became healthy") + + require.NoError(t, a.Stop(t.Context())) + + // Stop is idempotent. + require.NoError(t, a.Stop(t.Context())) +} + +func TestApp_TwoSequentialInstances(t *testing.T) { + // Validates that per-instance Registerer + cleanup-stack teardown + // allow constructing a second App in the same process without + // duplicate-registration panics or leaked goroutines. + for i := range 2 { + a, err := New(testOptions(t)) + require.NoError(t, err, "iteration %d", i) + require.NoError(t, a.Start(), "iteration %d", i) + require.NotEmpty(t, a.Addr(), "iteration %d", i) + require.NoError(t, a.Stop(t.Context()), "iteration %d", i) + } +} + +func TestApp_TwoConcurrentInstances(t *testing.T) { + // Two live instances on different ephemeral ports, sharing the + // same process. This exercises the metrics-per-Registerer change + // from Phase A and ensures no shutdown-ordering bugs surface when + // Stop runs on one instance while another is still serving. + a1, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a1.Start()) + defer func() { _ = a1.Stop(t.Context()) }() + + a2, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a2.Start()) + defer func() { _ = a2.Stop(t.Context()) }() + + require.NotEqual(t, a1.Addr(), a2.Addr(), "instances should bind distinct ports") +} + +func TestApp_EmbeddedReloadDoesNotDeadlock(t *testing.T) { + // Regression: when callers use the lifecycle API (New + Start + Stop) + // without Run, the /-/reload HTTP handler must not block forever on + // the unbuffered a.webReload channel. The reload-routing goroutine + // started by Start is the consumer. + a, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a.Start()) + defer func() { _ = a.Stop(t.Context()) }() + + type reloadResult struct { + err error + status int + } + resultCh := make(chan reloadResult, 1) + go func() { + resp, err := http.Post("http://"+a.Addr()+"/-/reload", "", nil) + if err != nil { + resultCh <- reloadResult{err: err} + return + } + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + resultCh <- reloadResult{status: resp.StatusCode} + }() + select { + case r := <-resultCh: + require.NoError(t, r.err) + require.Equal(t, http.StatusOK, r.status) + case <-time.After(5 * time.Second): + t.Fatal("/-/reload deadlocked in embedded mode") + } +} + +func TestApp_New_SetupFailureDoesNotDeadlock(t *testing.T) { + // Regression: setup failure in New triggers the rollback path which + // calls Stop. Stop must not block draining a.srvc because Start has + // not run and nothing will ever close that channel. + errCh := make(chan error, 1) + go func() { + // Empty Options fails validate (Logger required), exercising + // the earliest setup-failure path. + _, err := New(Options{}) + errCh <- err + }() + select { + case err := <-errCh: + require.Error(t, err) + case <-time.After(5 * time.Second): + t.Fatal("New deadlocked on setup-failure rollback") + } +} + +func TestApp_Run_ContextCancel(t *testing.T) { + // Exercises the Run wrapper end-to-end: cancel ctx and assert it + // returns without error and cleanup has run. + ctx, cancel := context.WithCancel(t.Context()) + done := make(chan error, 1) + go func() { done <- Run(ctx, testOptions(t)) }() + + // Give Run a moment to bind. We can't peek inside it for Addr, but + // cancelling is unconditionally safe. + time.Sleep(200 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(10 * time.Second): + t.Fatal("Run did not return after ctx cancel") + } +}