diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 4707cfb8ef..8d590ac4ea 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -15,120 +15,28 @@ package main import ( "context" - "errors" "fmt" - "log/slog" - "net" - "net/http" - "net/url" "os" "os/signal" - "path/filepath" "runtime" "strings" - "sync" - "sync/atomic" "syscall" - "time" "github.com/KimMachineGun/automemlimit/memlimit" "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" promslogflag "github.com/prometheus/common/promslog/flag" - "github.com/prometheus/common/route" "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" - "github.com/prometheus/alertmanager/alert" - "github.com/prometheus/alertmanager/api" "github.com/prometheus/alertmanager/cluster" - "github.com/prometheus/alertmanager/config" - "github.com/prometheus/alertmanager/config/receiver" - "github.com/prometheus/alertmanager/dispatch" - "github.com/prometheus/alertmanager/eventrecorder" - "github.com/prometheus/alertmanager/eventrecorder/eventrecorderpb" "github.com/prometheus/alertmanager/featurecontrol" - "github.com/prometheus/alertmanager/httpserver" - "github.com/prometheus/alertmanager/inhibit" - "github.com/prometheus/alertmanager/marker" + "github.com/prometheus/alertmanager/internal/app" "github.com/prometheus/alertmanager/matcher/compat" - "github.com/prometheus/alertmanager/nflog" - "github.com/prometheus/alertmanager/notify" - "github.com/prometheus/alertmanager/provider/mem" - "github.com/prometheus/alertmanager/silence" - "github.com/prometheus/alertmanager/template" - "github.com/prometheus/alertmanager/timeinterval" - "github.com/prometheus/alertmanager/tracing" - "github.com/prometheus/alertmanager/ui" ) -var ( - requestDuration = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "alertmanager_http_request_duration_seconds", - Help: "Histogram of latencies for HTTP requests.", - Buckets: prometheus.DefBuckets, - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 1 * time.Hour, - }, - []string{"handler", "method", "code"}, - ) - responseSize = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "alertmanager_http_response_size_bytes", - Help: "Histogram of response size for HTTP requests.", - Buckets: prometheus.ExponentialBuckets(100, 10, 7), - }, - []string{"handler", "method"}, - ) - clusterEnabled = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_cluster_enabled", - Help: "Indicates whether the clustering is enabled or not.", - }, - ) - configuredReceivers = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_receivers", - Help: "Number of configured receivers.", - }, - ) - configuredIntegrations = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_integrations", - Help: "Number of configured integrations.", - }, - ) - configuredInhibitionRules = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_inhibition_rules", - Help: "Number of configured inhibition rules.", - }, - ) - - promslogConfig = promslog.Config{} -) - -func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { - handlerLabel := prometheus.Labels{"handler": handlerName} - return promhttp.InstrumentHandlerDuration( - requestDuration.MustCurryWith(handlerLabel), - promhttp.InstrumentHandlerResponseSize( - responseSize.MustCurryWith(handlerLabel), - handler, - ), - ) -} - -const defaultClusterAddr = "0.0.0.0:9094" - func main() { os.Exit(run()) } @@ -150,7 +58,7 @@ func run() int { alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() perAlertNameLimit = kingpin.Flag("alerts.per-alertname-limit", "Maximum number of alerts per alertname. If negative or zero, no limit is set.").Default("0").Int() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() - DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() + dispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() @@ -162,7 +70,7 @@ func run() int { Default("0.9").Float64() clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster. Set to empty string to disable HA mode."). - Default(defaultClusterAddr).String() + Default(app.DefaultClusterAddr).String() clusterAdvertiseAddr = kingpin.Flag("cluster.advertise-address", "Explicit address to advertise in cluster.").String() clusterPeerName = kingpin.Flag("cluster.peer-name", "Explicit name of the peer, rather than generating a random one").Default("").String() peers = kingpin.Flag("cluster.peer", "Initial peers (may be repeated).").Strings() @@ -182,21 +90,16 @@ func run() int { featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String() ) - prometheus.MustRegister(versioncollector.NewCollector("alertmanager")) - + promslogConfig := promslog.Config{} promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig) kingpin.CommandLine.UsageWriter(os.Stdout) - kingpin.Version(version.Print("alertmanager")) kingpin.CommandLine.GetFlag("help").Short('h') kingpin.Parse() logger := promslog.New(&promslogConfig) - logger.Info("Starting Alertmanager", "version", version.Info()) - startTime := time.Now() - - logger.Info("Build context", "build_context", version.BuildContext()) + prometheus.MustRegister(versioncollector.NewCollector("alertmanager")) ff, err := featurecontrol.NewFlags(logger, *featureFlags) if err != nil { @@ -210,7 +113,6 @@ func run() int { logger.Error("--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.") return 1 } - if _, err := memlimit.SetGoMemLimitWithOpts( memlimit.WithRatio(*memlimitRatio), memlimit.WithProvider( @@ -228,495 +130,78 @@ func run() int { logger.Warn("automaxprocs", "msg", "This flag is deprecated and will be removed in the next release") } - err = os.MkdirAll(*dataDir, 0o777) - if err != nil { - logger.Error("Unable to create data directory", "err", err) - return 1 - } - - tlsTransportConfig, err := cluster.GetTLSTransportConfig(*tlsConfigFile) - if err != nil { - logger.Error("unable to initialize TLS transport configuration for gossip mesh", "err", err) - return 1 - } - var peer *cluster.Peer - if *clusterBindAddr != "" { - peer, err = cluster.Create( - logger.With("component", "cluster"), - prometheus.DefaultRegisterer, - *clusterBindAddr, - *clusterAdvertiseAddr, - *peers, - true, - *pushPullInterval, - *gossipInterval, - *tcpTimeout, - *peersResolveTimeout, - *probeTimeout, - *probeInterval, - tlsTransportConfig, - *allowInsecureAdvertise, - *label, - *clusterPeerName, - ) - if err != nil { - logger.Error("unable to initialize gossip mesh", "err", err) - return 1 - } - clusterEnabled.Set(1) - } - - stopc := make(chan struct{}) - var wg sync.WaitGroup - - // Load config once for both event recorder initialization and the first - // coordinator apply. Subsequent reloads (SIGHUP, /-/reload) go - // through configCoordinator.Reload() which reads the file again. - initialConf, err := config.LoadFile(*configFile) - if err != nil { - logger.Error("error loading configuration file", "err", err) - return 1 - } - - hostname, _ := os.Hostname() - var eventRec eventrecorder.Recorder - if ff.EnableEventRecorder() { - eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), prometheus.DefaultRegisterer) - } - defer eventRec.Close() - - recordCtx := eventrecorder.WithEventRecording(context.Background()) - eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ - EventType: &eventrecorderpb.EventData_AlertmanagerStartupEvent{ - AlertmanagerStartupEvent: &eventrecorderpb.AlertmanagerStartupEvent{ - Version: version.Version, - BuildContext: version.BuildContext(), - }, - }, - }) - defer func() { - eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ - EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{ - AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{}, - }, - }) - }() - - notificationLogOpts := nflog.Options{ - SnapshotFile: filepath.Join(*dataDir, "nflog"), - Retention: *retention, - Logger: logger.With("component", "nflog"), - Metrics: prometheus.DefaultRegisterer, - } - - notificationLog, err := nflog.New(notificationLogOpts) - if err != nil { - logger.Error("error creating notification log", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer) - notificationLog.SetBroadcast(c.Broadcast) - } - - wg.Go(func() { - notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) - }) - - marker := marker.NewGroupMarker() - - silenceOpts := silence.Options{ - SnapshotFile: filepath.Join(*dataDir, "silences"), - Retention: *retention, - Limits: silence.Limits{ - MaxSilences: func() int { return *maxSilences }, - MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, - }, - Logger: logger.With("component", "silences"), - Metrics: prometheus.DefaultRegisterer, - Logging: *silenceLogging, - EventRecorder: eventRec, - } - - silences, err := silence.New(silenceOpts) - if err != nil { - logger.Error("error creating silence", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("sil", silences, prometheus.DefaultRegisterer) - silences.SetBroadcast(c.Broadcast) - } - - // Start providers before router potentially sends updates. - wg.Go(func() { - silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil) - }) - - defer func() { - close(stopc) - wg.Wait() - }() - - silencer := silence.NewSilencer(silences, logger, eventRec) - - // Peer state listeners have been registered, now we can join and get the initial state. - if peer != nil { - err = peer.Join( - *reconnectInterval, - *peerReconnectTimeout, - ) - if err != nil { - logger.Warn("unable to join gossip mesh", "err", err) - } - ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout) - defer func() { - cancel() - if err := peer.Leave(10 * time.Second); err != nil { - logger.Warn("unable to leave gossip mesh", "err", err) - } - }() - go peer.Settle(ctx, *gossipInterval*10) - eventRec.SetClusterPeer(peer) - } - - alerts, err := mem.NewAlerts( - context.Background(), - *alertGCInterval, - *perAlertNameLimit, - silencer, - logger, - eventRec, - prometheus.DefaultRegisterer, - ff, - ) - if err != nil { - logger.Error("error creating memory provider", "err", err) - return 1 - } - defer alerts.Close() - - var disp atomic.Pointer[dispatch.Dispatcher] - defer func() { - disp.Load().Stop() - }() - - groupFn := func(ctx context.Context, routeFilter func(*dispatch.Route) bool, alertFilter func(*alert.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error) { - return disp.Load().Groups(ctx, routeFilter, alertFilter) - } - - // An interface value that holds a nil concrete value is non-nil. - // Therefore we explicly pass an empty interface, to detect if the - // cluster is not enabled in notify. - var clusterPeer cluster.ClusterPeer - if peer != nil { - clusterPeer = peer - } - - api, err := api.New(api.Options{ - Alerts: alerts, - Silences: silences, - GroupMutedFunc: marker.Muted, - Peer: clusterPeer, - Timeout: *httpTimeout, - Concurrency: *getConcurrency, - Logger: logger.With("component", "api"), - Registry: prometheus.DefaultRegisterer, - RequestDuration: requestDuration, - GroupFunc: groupFn, - }) - if err != nil { - logger.Error("failed to create API", "err", err) - return 1 - } - - amURL, err := extURL(logger, os.Hostname, (*webConfig.WebListenAddresses)[0], *externalURL) - if err != nil { - logger.Error("failed to determine external URL", "err", err) - return 1 - } - logger.Debug("external url", "externalUrl", amURL.String()) - - waitFunc := func() time.Duration { return 0 } - if peer != nil { - waitFunc = clusterWait(peer, *peerTimeout) - } - timeoutFunc := func(d time.Duration) time.Duration { - if d < notify.MinTimeout { - d = notify.MinTimeout - } - return d + waitFunc() - } - - tracingManager := tracing.NewManager(logger.With("component", "tracing")) - - var ( - inhibitor atomic.Pointer[inhibit.Inhibitor] - tmpl *template.Template - ) - - dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer, ff) - pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff, eventRec) - configLogger := logger.With("component", "configuration") - configCoordinator := config.NewCoordinator( - *configFile, - prometheus.DefaultRegisterer, - configLogger, - ) - configCoordinator.Subscribe(func(conf *config.Config) error { - // Reload event recorder outputs first so events emitted during - // the rest of this callback (e.g., by stopping the old - // dispatcher) go to the new outputs. - eventRec.ApplyConfig(conf.EventRecorder) - - tmpl, err = template.FromGlobs(conf.Templates) - if err != nil { - return fmt.Errorf("failed to parse templates: %w", err) - } - tmpl.ExternalURL = amURL - - // Build the routing tree and record which receivers are used. - routes := dispatch.NewRoute(conf.Route, nil) - activeReceivers := make(map[string]struct{}) - routes.Walk(func(r *dispatch.Route) { - activeReceivers[r.RouteOpts.Receiver] = struct{}{} - }) - - // Build the map of receiver to integrations. - receivers := make(map[string][]notify.Integration, len(activeReceivers)) - var integrationsNum int - for _, rcv := range conf.Receivers { - if _, found := activeReceivers[rcv.Name]; !found { - // No need to build a receiver if no route is using it. - configLogger.Info("skipping creation of receiver not referenced by any route", "receiver", rcv.Name) - continue - } - integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger) - if err != nil { - return err - } - // rcv.Name is guaranteed to be unique across all receivers. - receivers[rcv.Name] = integrations - integrationsNum += len(integrations) - } - - // Build the map of time interval names to time interval definitions. - timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals)) - for _, ti := range conf.MuteTimeIntervals { - timeIntervals[ti.Name] = ti.TimeIntervals - } - - for _, ti := range conf.TimeIntervals { - timeIntervals[ti.Name] = ti.TimeIntervals - } - - intervener := timeinterval.NewIntervener(timeIntervals) - - inhibitor.Load().Stop() - disp.Load().Stop() - - newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, logger, eventRec) - inhibitor.Store(newInhibitor) - - // An interface value that holds a nil concrete value is non-nil. - // Therefore we explicly pass an empty interface, to detect if the - // cluster is not enabled in notify. - var pipelinePeer notify.Peer - if peer != nil { - pipelinePeer = peer - } - - pipeline := pipelineBuilder.New( - receivers, - waitFunc, - newInhibitor, - silencer, - intervener, - marker, - notificationLog, - pipelinePeer, - ) - - configuredReceivers.Set(float64(len(activeReceivers))) - configuredIntegrations.Set(float64(integrationsNum)) - configuredInhibitionRules.Set(float64(len(conf.InhibitRules))) - - api.Update(conf, func(ctx context.Context, labels model.LabelSet) { - inhibitor.Load().Mutes(ctx, labels) - silencer.Mutes(ctx, labels) - }) - - newDisp := dispatch.NewDispatcher( - alerts, - routes, - pipeline, - marker, - timeoutFunc, - *dispatchMaintenanceInterval, - nil, - logger, - eventRec, - dispMetrics, - ) - routes.Walk(func(r *dispatch.Route) { - if r.RouteOpts.RepeatInterval > *retention { - configLogger.Warn( - "repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.", - "repeat_interval", - r.RouteOpts.RepeatInterval, - "retention", - *retention, - "route", - r.Key(), - ) - } - - if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval { - configLogger.Warn( - "repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.", - "repeat_interval", - r.RouteOpts.RepeatInterval, - "group_interval", - r.RouteOpts.GroupInterval, - "route", - r.Key(), - ) - } - }) - - // first, start the inhibitor so the inhibition cache can populate - // wait for this to load alerts before starting the dispatcher so - // we don't accidentially notify for an alert that will be inhibited - go newInhibitor.Run() - newInhibitor.WaitForLoading() - - // next, start the dispatcher and wait for it to load before swapping the disp pointer. - // This ensures that the API doesn't see the new dispatcher before it finishes populating - // the aggrGroups - go newDisp.Run(startTime.Add(*DispatchStartDelay)) - newDisp.WaitForLoading() - disp.Store(newDisp) - - err = tracingManager.ApplyConfig(conf.TracingConfig) - if err != nil { - return fmt.Errorf("failed to apply tracing config: %w", err) - } - - go tracingManager.Run() - - return nil - }) - - if err := configCoordinator.ApplyConfig(initialConf); err != nil { - return 1 - } - - // Make routePrefix default to externalURL path if empty string. - if *routePrefix == "" { - *routePrefix = amURL.Path - } - *routePrefix = "/" + strings.Trim(*routePrefix, "/") - logger.Debug("route prefix", "routePrefix", *routePrefix) - - router := route.New().WithInstrumentation(instrumentHandler) - if *routePrefix != "/" { - router.Get("/", func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, *routePrefix, http.StatusFound) - }) - router = router.WithPrefix(*routePrefix) - } - - webReload := make(chan chan error) - - ui.Register(router) - httpserver.Register(router, webReload) - - mux := api.Register(router, *routePrefix) + // Translate OS signals into context cancellation (SIGINT/SIGTERM) and + // reload events (SIGHUP). The app package no longer touches signals + // directly so that it can be embedded in tests. + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() - srv := &http.Server{ - // instrument all handlers with tracing - Handler: tracing.Middleware(mux), - } - srvc := make(chan struct{}) + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + defer signal.Stop(hup) + reload := make(chan struct{}, 1) go func() { - if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) { - logger.Error("Listen error", "err", err) - close(srvc) - } - defer func() { - if err := srv.Close(); err != nil { - logger.Error("Error on closing the server", "err", err) + for { + select { + case <-hup: + select { + case reload <- struct{}{}: + default: + } + case <-ctx.Done(): + return } - }() - }() - - var ( - hup = make(chan os.Signal, 1) - term = make(chan os.Signal, 1) - ) - signal.Notify(hup, syscall.SIGHUP) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - - for { - select { - case <-hup: - // ignore error, already logged in `reload()` - _ = configCoordinator.Reload() - case errc := <-webReload: - errc <- configCoordinator.Reload() - case <-term: - logger.Info("Received SIGTERM, exiting gracefully...") - - // shut down the tracing manager to flush any remaining spans. - // this blocks for up to 5s - tracingManager.Stop() - - return 0 - case <-srvc: - return 1 - } - } -} - -// clusterWait returns a function that inspects the current peer state and returns -// a duration of one base timeout for each peer with a higher ID than ourselves. -func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { - return func() time.Duration { - return time.Duration(p.Position()) * timeout - } -} - -func extURL(logger *slog.Logger, hostnamef func() (string, error), listen, external string) (*url.URL, error) { - if external == "" { - hostname, err := hostnamef() - if err != nil { - return nil, err - } - _, port, err := net.SplitHostPort(listen) - if err != nil { - return nil, err } - if port == "" { - logger.Warn("no port found for listen address", "address", listen) - } - - external = fmt.Sprintf("http://%s:%s/", hostname, port) - } - - u, err := url.Parse(external) - if err != nil { - return nil, err - } - if u.Scheme != "http" && u.Scheme != "https" { - return nil, fmt.Errorf("%q: invalid %q scheme, only 'http' and 'https' are supported", u.String(), u.Scheme) - } + }() - ppref := strings.TrimRight(u.Path, "/") - if ppref != "" && !strings.HasPrefix(ppref, "/") { - ppref = "/" + ppref + opts := app.Options{ + ConfigFile: *configFile, + DataDir: *dataDir, + Retention: *retention, + MaintenanceInterval: *maintenanceInterval, + MaxSilences: *maxSilences, + MaxSilenceSizeBytes: *maxSilenceSizeBytes, + SilenceLogging: *silenceLogging, + AlertGCInterval: *alertGCInterval, + PerAlertNameLimit: *perAlertNameLimit, + DispatchMaintenanceInterval: *dispatchMaintenanceInterval, + DispatchStartDelay: *dispatchStartDelay, + + WebConfig: webConfig, + ExternalURL: *externalURL, + RoutePrefix: *routePrefix, + GetConcurrency: *getConcurrency, + HTTPTimeout: *httpTimeout, + + ClusterBindAddr: *clusterBindAddr, + ClusterAdvertiseAddr: *clusterAdvertiseAddr, + ClusterPeerName: *clusterPeerName, + Peers: *peers, + PeerTimeout: *peerTimeout, + PeersResolveTimeout: *peersResolveTimeout, + GossipInterval: *gossipInterval, + PushPullInterval: *pushPullInterval, + TCPTimeout: *tcpTimeout, + ProbeTimeout: *probeTimeout, + ProbeInterval: *probeInterval, + SettleTimeout: *settleTimeout, + ReconnectInterval: *reconnectInterval, + PeerReconnectTimeout: *peerReconnectTimeout, + TLSConfigFile: *tlsConfigFile, + AllowInsecureAdvertise: *allowInsecureAdvertise, + Label: *label, + + Logger: logger, + Registerer: prometheus.DefaultRegisterer, + Flagger: ff, + Reload: reload, + } + + if err := app.Run(ctx, opts); err != nil { + logger.Error("alertmanager exited with error", "err", err) + return 1 } - u.Path = ppref - - return u, nil + logger.Info("Received shutdown signal, exiting gracefully...") + return 0 } diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000000..4f98064f8c --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,516 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package app contains the Alertmanager process logic extracted from +// cmd/alertmanager so that tests and other binaries can embed +// Alertmanager in-process instead of shelling out to a compiled binary. +// See https://github.com/prometheus/alertmanager/issues/406. +package app + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/common/route" + "github.com/prometheus/common/version" + + "github.com/prometheus/alertmanager/alert" + "github.com/prometheus/alertmanager/api" + "github.com/prometheus/alertmanager/cluster" + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/config/receiver" + "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/eventrecorder" + "github.com/prometheus/alertmanager/eventrecorder/eventrecorderpb" + "github.com/prometheus/alertmanager/httpserver" + "github.com/prometheus/alertmanager/inhibit" + "github.com/prometheus/alertmanager/marker" + "github.com/prometheus/alertmanager/nflog" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/provider/mem" + "github.com/prometheus/alertmanager/silence" + "github.com/prometheus/alertmanager/template" + "github.com/prometheus/alertmanager/timeinterval" + "github.com/prometheus/alertmanager/tracing" + "github.com/prometheus/alertmanager/ui" +) + +// Run starts an Alertmanager instance using opts and blocks until ctx is +// cancelled or an unrecoverable error occurs. It is a thin wrapper over +// New + Start + serveLoop + Stop intended for callers that don't need +// the richer lifecycle API. +// +// The deferred Stop also ensures cleanup runs on panic, matching the +// implicit panic-safety of the original defer-based implementation. +func Run(ctx context.Context, opts Options) error { + a, err := New(opts) + if err != nil { + return err + } + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = a.Stop(stopCtx) + }() + if err := a.Start(); err != nil { + return err + } + return a.serveLoop(ctx) +} + +// setup wires every Alertmanager subsystem and registers their teardown +// hooks on a.cleanups via a.onStop. Stop drains those hooks in LIFO order +// so the shutdown sequence matches the implicit ordering of the original +// defer-based Run implementation. +// +// The body is deliberately one long function rather than a chain of helpers. +// Nearly every step depends on locals produced by earlier steps (peer, +// eventRec, silences, alerts, groupMarker, silencer, notificationLog, +// disp, inhibitor, tmpl, configCoordinator, waitFunc, timeoutFunc, ...), +// and the configCoordinator.Subscribe callback closes over most of them. +// Splitting setup into helpers would force us to either thread a wide +// state struct between them or promote those locals to App fields; both +// obscure the dataflow without simplifying anything. +// +//nolint:gocyclo // intentional, see comment above. +func (a *App) setup() error { + opts := a.opts + if err := opts.validate(); err != nil { + return err + } + + logger := opts.Logger + reg := opts.Registerer + ff := opts.Flagger + m := newMetrics(reg) + + a.logger = logger + + logger.Info("Starting Alertmanager", "version", version.Info()) + startTime := time.Now() + logger.Info("Build context", "build_context", version.BuildContext()) + + if err := os.MkdirAll(opts.DataDir, 0o777); err != nil { + return fmt.Errorf("unable to create data directory: %w", err) + } + + tlsTransportConfig, err := cluster.GetTLSTransportConfig(opts.TLSConfigFile) + if err != nil { + return fmt.Errorf("unable to initialize TLS transport configuration for gossip mesh: %w", err) + } + + var peer *cluster.Peer + if opts.ClusterBindAddr != "" { + peer, err = cluster.Create( + logger.With("component", "cluster"), + reg, + opts.ClusterBindAddr, + opts.ClusterAdvertiseAddr, + opts.Peers, + true, + opts.PushPullInterval, + opts.GossipInterval, + opts.TCPTimeout, + opts.PeersResolveTimeout, + opts.ProbeTimeout, + opts.ProbeInterval, + tlsTransportConfig, + opts.AllowInsecureAdvertise, + opts.Label, + opts.ClusterPeerName, + ) + if err != nil { + return fmt.Errorf("unable to initialize gossip mesh: %w", err) + } + m.clusterEnabled.Set(1) + } + + stopc := make(chan struct{}) + var wg sync.WaitGroup + + // Load config once for both event recorder initialization and the + // first coordinator apply. Subsequent reloads go through + // configCoordinator.Reload() which reads the file again. + initialConf, err := config.LoadFile(opts.ConfigFile) + if err != nil { + return fmt.Errorf("error loading configuration file: %w", err) + } + + hostname, _ := os.Hostname() + var eventRec eventrecorder.Recorder + if ff.EnableEventRecorder() { + eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), reg) + } + a.onStop(func() { eventRec.Close() }) + + recordCtx := eventrecorder.WithEventRecording(context.Background()) + eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ + EventType: &eventrecorderpb.EventData_AlertmanagerStartupEvent{ + AlertmanagerStartupEvent: &eventrecorderpb.AlertmanagerStartupEvent{ + Version: version.Version, + BuildContext: version.BuildContext(), + }, + }, + }) + a.onStop(func() { + eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{ + EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{ + AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{}, + }, + }) + }) + + notificationLogOpts := nflog.Options{ + SnapshotFile: filepath.Join(opts.DataDir, "nflog"), + Retention: opts.Retention, + Logger: logger.With("component", "nflog"), + Metrics: reg, + } + notificationLog, err := nflog.New(notificationLogOpts) + if err != nil { + return fmt.Errorf("error creating notification log: %w", err) + } + if peer != nil { + c := peer.AddState("nfl", notificationLog, reg) + notificationLog.SetBroadcast(c.Broadcast) + } + + wg.Go(func() { + notificationLog.Maintenance(opts.MaintenanceInterval, filepath.Join(opts.DataDir, "nflog"), stopc, nil) + }) + + groupMarker := marker.NewGroupMarker() + + silenceOpts := silence.Options{ + SnapshotFile: filepath.Join(opts.DataDir, "silences"), + Retention: opts.Retention, + Limits: silence.Limits{ + MaxSilences: func() int { return opts.MaxSilences }, + MaxSilenceSizeBytes: func() int { return opts.MaxSilenceSizeBytes }, + }, + Logger: logger.With("component", "silences"), + Metrics: reg, + Logging: opts.SilenceLogging, + EventRecorder: eventRec, + } + silences, err := silence.New(silenceOpts) + if err != nil { + return fmt.Errorf("error creating silence: %w", err) + } + if peer != nil { + c := peer.AddState("sil", silences, reg) + silences.SetBroadcast(c.Broadcast) + } + + // Start providers before the router potentially sends updates. + wg.Go(func() { + silences.Maintenance(opts.MaintenanceInterval, filepath.Join(opts.DataDir, "silences"), stopc, nil) + }) + + a.onStop(func() { + close(stopc) + wg.Wait() + }) + + silencer := silence.NewSilencer(silences, logger, eventRec) + + // Peer state listeners have been registered, now we can join and get the initial state. + if peer != nil { + if err := peer.Join(opts.ReconnectInterval, opts.PeerReconnectTimeout); err != nil { + logger.Warn("unable to join gossip mesh", "err", err) + } + settleCtx, settleCancel := context.WithTimeout(context.Background(), opts.SettleTimeout) + a.onStop(func() { + settleCancel() + if err := peer.Leave(10 * time.Second); err != nil { + logger.Warn("unable to leave gossip mesh", "err", err) + } + }) + go peer.Settle(settleCtx, opts.GossipInterval*10) + eventRec.SetClusterPeer(peer) + } + + alerts, err := mem.NewAlerts( + context.Background(), + opts.AlertGCInterval, + opts.PerAlertNameLimit, + silencer, + logger, + eventRec, + reg, + ff, + ) + if err != nil { + return fmt.Errorf("error creating memory provider: %w", err) + } + a.onStop(alerts.Close) + + var disp atomic.Pointer[dispatch.Dispatcher] + a.onStop(func() { + if d := disp.Load(); d != nil { + d.Stop() + } + }) + + groupFn := func(ctx context.Context, routeFilter func(*dispatch.Route) bool, alertFilter func(*alert.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error) { + return disp.Load().Groups(ctx, routeFilter, alertFilter) + } + + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicitly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var clusterPeer cluster.ClusterPeer + if peer != nil { + clusterPeer = peer + } + + apih, err := api.New(api.Options{ + Alerts: alerts, + Silences: silences, + GroupMutedFunc: groupMarker.Muted, + Peer: clusterPeer, + Timeout: opts.HTTPTimeout, + Concurrency: opts.GetConcurrency, + Logger: logger.With("component", "api"), + Registry: reg, + RequestDuration: m.requestDuration, + GroupFunc: groupFn, + }) + if err != nil { + return fmt.Errorf("failed to create API: %w", err) + } + + amURL, err := extURL(logger, os.Hostname, (*opts.WebConfig.WebListenAddresses)[0], opts.ExternalURL) + if err != nil { + return fmt.Errorf("failed to determine external URL: %w", err) + } + logger.Debug("external url", "externalUrl", amURL.String()) + + waitFunc := func() time.Duration { return 0 } + if peer != nil { + waitFunc = clusterWait(peer, opts.PeerTimeout) + } + timeoutFunc := func(d time.Duration) time.Duration { + if d < notify.MinTimeout { + d = notify.MinTimeout + } + return d + waitFunc() + } + + tracingManager := tracing.NewManager(logger.With("component", "tracing")) + a.tracing = tracingManager + a.onStop(tracingManager.Stop) + + var ( + inhibitor atomic.Pointer[inhibit.Inhibitor] + tmpl *template.Template + ) + + dispMetrics := dispatch.NewDispatcherMetrics(false, reg, ff) + pipelineBuilder := notify.NewPipelineBuilder(reg, ff, eventRec) + configLogger := logger.With("component", "configuration") + configCoordinator := config.NewCoordinator( + opts.ConfigFile, + reg, + configLogger, + ) + a.coord = configCoordinator + + configCoordinator.Subscribe(func(conf *config.Config) error { + // Reload event recorder outputs first so events emitted during + // the rest of this callback (e.g., by stopping the old + // dispatcher) go to the new outputs. + eventRec.ApplyConfig(conf.EventRecorder) + + tmpl, err = template.FromGlobs(conf.Templates) + if err != nil { + return fmt.Errorf("failed to parse templates: %w", err) + } + tmpl.ExternalURL = amURL + + // Build the routing tree and record which receivers are used. + routes := dispatch.NewRoute(conf.Route, nil) + activeReceivers := make(map[string]struct{}) + routes.Walk(func(r *dispatch.Route) { + activeReceivers[r.RouteOpts.Receiver] = struct{}{} + }) + + // Build the map of receiver to integrations. + receivers := make(map[string][]notify.Integration, len(activeReceivers)) + var integrationsNum int + for _, rcv := range conf.Receivers { + if _, found := activeReceivers[rcv.Name]; !found { + // No need to build a receiver if no route is using it. + configLogger.Info("skipping creation of receiver not referenced by any route", "receiver", rcv.Name) + continue + } + integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger) + if err != nil { + return err + } + // rcv.Name is guaranteed to be unique across all receivers. + receivers[rcv.Name] = integrations + integrationsNum += len(integrations) + } + + // Build the map of time interval names to time interval definitions. + timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals)) + for _, ti := range conf.MuteTimeIntervals { + timeIntervals[ti.Name] = ti.TimeIntervals + } + for _, ti := range conf.TimeIntervals { + timeIntervals[ti.Name] = ti.TimeIntervals + } + + intervener := timeinterval.NewIntervener(timeIntervals) + + if old := inhibitor.Load(); old != nil { + old.Stop() + } + if old := disp.Load(); old != nil { + old.Stop() + } + + newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, logger, eventRec) + inhibitor.Store(newInhibitor) + + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicitly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var pipelinePeer notify.Peer + if peer != nil { + pipelinePeer = peer + } + + pipeline := pipelineBuilder.New( + receivers, + waitFunc, + newInhibitor, + silencer, + intervener, + groupMarker, + notificationLog, + pipelinePeer, + ) + + m.configuredReceivers.Set(float64(len(activeReceivers))) + m.configuredIntegrations.Set(float64(integrationsNum)) + m.configuredInhibitionRules.Set(float64(len(conf.InhibitRules))) + + apih.Update(conf, func(ctx context.Context, labels model.LabelSet) { + inhibitor.Load().Mutes(ctx, labels) + silencer.Mutes(ctx, labels) + }) + + newDisp := dispatch.NewDispatcher( + alerts, + routes, + pipeline, + groupMarker, + timeoutFunc, + opts.DispatchMaintenanceInterval, + nil, + logger, + eventRec, + dispMetrics, + ) + routes.Walk(func(r *dispatch.Route) { + if r.RouteOpts.RepeatInterval > opts.Retention { + configLogger.Warn( + "repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.", + "repeat_interval", r.RouteOpts.RepeatInterval, + "retention", opts.Retention, + "route", r.Key(), + ) + } + if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval { + configLogger.Warn( + "repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.", + "repeat_interval", r.RouteOpts.RepeatInterval, + "group_interval", r.RouteOpts.GroupInterval, + "route", r.Key(), + ) + } + }) + + // First, start the inhibitor so the inhibition cache can populate. + // Wait for this to load alerts before starting the dispatcher so + // we don't accidentally notify for an alert that will be inhibited. + go newInhibitor.Run() + newInhibitor.WaitForLoading() + + // Next, start the dispatcher and wait for it to load before swapping + // the disp pointer. This ensures that the API doesn't see the new + // dispatcher before it finishes populating the aggrGroups. + go newDisp.Run(startTime.Add(opts.DispatchStartDelay)) + newDisp.WaitForLoading() + disp.Store(newDisp) + + if err := tracingManager.ApplyConfig(conf.TracingConfig); err != nil { + return fmt.Errorf("failed to apply tracing config: %w", err) + } + + go tracingManager.Run() + + return nil + }) + + if err := configCoordinator.ApplyConfig(initialConf); err != nil { + return fmt.Errorf("failed to apply initial configuration: %w", err) + } + + // Default routePrefix to externalURL path if empty. + routePrefix := opts.RoutePrefix + if routePrefix == "" { + routePrefix = amURL.Path + } + routePrefix = "/" + strings.Trim(routePrefix, "/") + logger.Debug("route prefix", "routePrefix", routePrefix) + + router := route.New().WithInstrumentation(m.instrumentHandler) + if routePrefix != "/" { + prefix := routePrefix + router.Get("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, prefix, http.StatusFound) + }) + router = router.WithPrefix(routePrefix) + } + + ui.Register(router) + httpserver.Register(router, a.webReload) + + mux := apih.Register(router, routePrefix) + + a.srv = &http.Server{ + // Instrument all handlers with tracing. + Handler: tracing.Middleware(mux), + } + + // Bind listeners now so Addr is meaningful before Start runs and + // ":0" ports can be discovered by callers. + listeners, err := listenAll(opts.WebConfig) + if err != nil { + return err + } + a.listeners = listeners + + return nil +} diff --git a/internal/app/cluster.go b/internal/app/cluster.go new file mode 100644 index 0000000000..28538e2d94 --- /dev/null +++ b/internal/app/cluster.go @@ -0,0 +1,29 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "time" + + "github.com/prometheus/alertmanager/cluster" +) + +// clusterWait returns a function that inspects the current peer state and +// returns a duration of one base timeout for each peer with a higher ID +// than ourselves. +func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { + return func() time.Duration { + return time.Duration(p.Position()) * timeout + } +} diff --git a/internal/app/lifecycle.go b/internal/app/lifecycle.go new file mode 100644 index 0000000000..850ed1184b --- /dev/null +++ b/internal/app/lifecycle.go @@ -0,0 +1,284 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/http" + "sync" + "time" + + "github.com/prometheus/exporter-toolkit/web" + + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/tracing" +) + +// App is a running (or runnable) Alertmanager instance built from Options. +// +// Compared to the top-level Run function, App exposes lifecycle hooks +// (Start, Stop, Addr, Reload) so callers — typically tests — can drive an +// instance without OS signals and discover the actually-bound HTTP +// address (useful when listening on ":0"). +// +// Construct an App with New, then call Start to begin serving HTTP. The +// caller is responsible for calling Stop, ideally via a deferred call so +// teardown also runs on panic. An App is single-use: calling Start more +// than once is an error. +type App struct { + opts Options + logger *slog.Logger + + // Lifecycle dependencies retained for use by Start, Reload, and Stop. + coord *config.Coordinator + tracing *tracing.Manager + srv *http.Server + listeners []net.Listener + + // webReload is the channel exposed by httpserver.Register for the + // /-/reload HTTP endpoint. We read from it in serveLoop. + webReload chan chan error + + // srvc carries errors from the HTTP serve goroutine. It is closed + // when the goroutine exits cleanly. + srvc chan error + + // cleanups is the LIFO teardown stack: New (via setup) registers + // cleanups in source order; Stop drains them in reverse so that + // shutdown order mirrors the original `defer` chain in Run. + cleanups []func() + + startedOnce sync.Once + startErr error + // started records whether the serve goroutine in Start was actually + // launched. Stop uses this to decide whether draining a.srvc is + // meaningful — if Start never ran, nothing will ever close srvc and + // the drain would deadlock (e.g., during setup-failure rollback). + started bool + + // routerQuit signals the reload-routing goroutine (started by Start) + // to exit; routerDone is closed by that goroutine on exit. Both are + // allocated in Start and only used when a.started is true. + routerQuit chan struct{} + routerDone chan struct{} + + stoppedOnce sync.Once +} + +// New wires every Alertmanager subsystem according to opts but does not +// start serving HTTP yet. On error, partial setup is rolled back via the +// same cleanup stack that Stop would drain on success. +func New(opts Options) (*App, error) { + a := &App{ + opts: opts, + srvc: make(chan error, 1), + webReload: make(chan chan error), + } + if err := a.setup(); err != nil { + // Roll back partial setup (Stop is idempotent and nil-safe). + _ = a.Stop(context.Background()) + return nil, err + } + return a, nil +} + +// Start begins serving HTTP traffic on the listeners established by New. +// It returns immediately; the listen goroutine signals any error via the +// channel drained by serveLoop. Subsequent calls are no-ops. +func (a *App) Start() error { + a.startedOnce.Do(func() { + if a.srv == nil || len(a.listeners) == 0 { + a.startErr = errors.New("alertmanager/app: App.Start called before successful New") + return + } + a.started = true + a.routerQuit = make(chan struct{}) + a.routerDone = make(chan struct{}) + + // reloadRouter consumes /-/reload requests and opts.Reload sends so + // they trigger reloads regardless of whether the caller is using + // Run (which also runs serveLoop) or the lifecycle API directly + // (which doesn't). Without this goroutine the /-/reload HTTP + // handler would block forever in embedded mode because its send + // on an unbuffered channel has no receiver. + go a.reloadRouter() + + go func() { + err := web.ServeMultiple(a.listeners, a.srv, a.opts.WebConfig, a.logger) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + a.logger.Error("Listen error", "err", err) + a.srvc <- err + } + close(a.srvc) + }() + }) + return a.startErr +} + +// reloadRouter forwards reload triggers (HTTP /-/reload and opts.Reload) +// to the config coordinator until routerQuit closes. It is started by +// Start and stopped by Stop after the HTTP server has finished draining, +// so that any in-flight /-/reload handlers can complete their +// send/receive cycle through this goroutine. +func (a *App) reloadRouter() { + defer close(a.routerDone) + for { + select { + case <-a.routerQuit: + return + case <-a.opts.Reload: + if err := a.coord.Reload(); err != nil { + a.logger.Error("configuration reload failed", "err", err) + } + case errc := <-a.webReload: + errc <- a.coord.Reload() + } + } +} + +// Addr returns the address of the first bound listener, suitable for +// dialing a single-listener instance (the common case for tests that +// bind ":0"). Use Addrs if configured with multiple listen addresses. +func (a *App) Addr() string { + if len(a.listeners) == 0 { + return "" + } + return a.listeners[0].Addr().String() +} + +// Addrs returns all bound listener addresses in the order given by +// Options.WebConfig.WebListenAddresses. +func (a *App) Addrs() []string { + out := make([]string, len(a.listeners)) + for i, l := range a.listeners { + out[i] = l.Addr().String() + } + return out +} + +// Reload triggers a configuration reload (the programmatic equivalent of +// SIGHUP). Safe to call concurrently with serveLoop. +func (a *App) Reload(_ context.Context) error { + if a.coord == nil { + return errors.New("alertmanager/app: App.Reload called before successful New") + } + return a.coord.Reload() +} + +// Stop gracefully shuts down the App, draining cleanups in reverse +// registration order so that teardown ordering matches the original +// defer chain in Run. Safe to call multiple times; safe to call before +// Start (it will then merely roll back what setup registered). +func (a *App) Stop(ctx context.Context) error { + a.stoppedOnce.Do(func() { + // Stop accepting new HTTP traffic first so in-flight handlers + // don't observe collaborators being torn down underneath them. + // The 5s cap is derived from ctx so callers can request faster + // shutdown via a tighter deadline. The reload router is still + // running at this point so any in-flight /-/reload handler can + // complete its send/receive cycle and unblock Shutdown. + if a.srv != nil { + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := a.srv.Shutdown(shutdownCtx); err != nil { + a.logger.Warn("graceful HTTP shutdown failed", "err", err) + } + } + // HTTP is fully drained; no new /-/reload requests can arrive. + // Terminate the reload router and wait for it to exit before + // running cleanups (Coordinator is among them). + if a.started { + close(a.routerQuit) + <-a.routerDone + } + // Drain srvc so the listen goroutine, if any, exits before we + // release listener resources. ServeMultiple returns once all + // per-listener Serve calls return (which happens once Shutdown + // completes), so this drain is bounded. + // + // Guard on `started` because srvc is allocated in New (so it can + // be non-nil here) but only closed by Start's serve goroutine — + // without this guard, Stop would deadlock when called from New's + // rollback path on setup failure. + if a.started && a.srvc != nil { + for range a.srvc { + // no-op + } + } + // Run remaining cleanups in reverse-registration (LIFO) order, + // mirroring Go's `defer` semantics so the in-place transform + // from `defer X` to `a.onStop(X)` in setup is order-preserving. + for i := len(a.cleanups) - 1; i >= 0; i-- { + a.cleanups[i]() + } + }) + return nil +} + +// onStop registers fn to run when Stop is called. Cleanups run LIFO. +func (a *App) onStop(fn func()) { + a.cleanups = append(a.cleanups, fn) +} + +// serveLoop blocks until ctx is cancelled or an HTTP listener fails. It +// is used by Run only; reload routing is handled by reloadRouter, which +// is started directly from Start so it is also active for embedders that +// drive the App lifecycle without using Run. +func (a *App) serveLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + a.logger.Info("Shutting down gracefully") + return nil + case err, ok := <-a.srvc: + if !ok { + // Channel closed without an error report — the serve + // goroutine exited cleanly (ErrServerClosed). Treat + // this as graceful shutdown. + return nil + } + return fmt.Errorf("alertmanager: HTTP listener failed: %w", err) + } + } +} + +// listenAll binds TCP listeners for every address in +// flags.WebListenAddresses. Embedders that need systemd socket +// activation or non-TCP listeners (vsock, etc.) should drive +// Alertmanager via cmd/alertmanager instead. +func listenAll(flags *web.FlagConfig) ([]net.Listener, error) { + if flags.WebSystemdSocket != nil && *flags.WebSystemdSocket { + return nil, errors.New("alertmanager/app: systemd socket activation is not supported when embedding; use cmd/alertmanager directly") + } + if flags.WebListenAddresses == nil || len(*flags.WebListenAddresses) == 0 { + return nil, web.ErrNoListeners + } + addrs := *flags.WebListenAddresses + listeners := make([]net.Listener, 0, len(addrs)) + for _, addr := range addrs { + l, err := net.Listen("tcp", addr) + if err != nil { + for _, prev := range listeners { + _ = prev.Close() + } + return nil, fmt.Errorf("alertmanager/app: listen %q: %w", addr, err) + } + listeners = append(listeners, l) + } + return listeners, nil +} diff --git a/internal/app/lifecycle_test.go b/internal/app/lifecycle_test.go new file mode 100644 index 0000000000..a60b1f06fe --- /dev/null +++ b/internal/app/lifecycle_test.go @@ -0,0 +1,221 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + "io" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/promslog" + "github.com/prometheus/exporter-toolkit/web" + "github.com/stretchr/testify/require" + + "github.com/prometheus/alertmanager/featurecontrol" + "github.com/prometheus/alertmanager/matcher/compat" +) + +const minimalConfig = `route: + receiver: default +receivers: + - name: default +` + +// testOptions returns an Options value that is sufficient to bring up an +// Alertmanager instance bound to an ephemeral port with clustering +// disabled. +func testOptions(t *testing.T) Options { + t.Helper() + + dir := t.TempDir() + configPath := filepath.Join(dir, "alertmanager.yml") + require.NoError(t, os.WriteFile(configPath, []byte(minimalConfig), 0o600)) + + logger := promslog.NewNopLogger() + ff, err := featurecontrol.NewFlags(logger, "") + require.NoError(t, err) + // compat.InitFromFlags mutates package-global state; safe because all + // tests in this package use the same (empty) feature flag set. + compat.InitFromFlags(logger, ff) + + addrs := []string{"127.0.0.1:0"} + systemd := false + webCfg := "" + + return Options{ + ConfigFile: configPath, + DataDir: dir, + Retention: 120 * time.Hour, + MaintenanceInterval: 15 * time.Minute, + AlertGCInterval: 30 * time.Minute, + DispatchMaintenanceInterval: 30 * time.Second, + WebConfig: &web.FlagConfig{ + WebListenAddresses: &addrs, + WebSystemdSocket: &systemd, + WebConfigFile: &webCfg, + }, + PeerTimeout: 15 * time.Second, + PeersResolveTimeout: 15 * time.Second, + GossipInterval: 200 * time.Millisecond, + PushPullInterval: 60 * time.Second, + TCPTimeout: 10 * time.Second, + ProbeTimeout: 500 * time.Millisecond, + ProbeInterval: 1 * time.Second, + SettleTimeout: 0, + ReconnectInterval: 10 * time.Second, + PeerReconnectTimeout: 6 * time.Hour, + // Empty disables clustering — essential when running multiple + // instances in one process. + ClusterBindAddr: "", + + Logger: logger, + Registerer: prometheus.NewRegistry(), + Flagger: ff, + } +} + +func TestApp_StartStop(t *testing.T) { + a, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a.Start()) + + addr := a.Addr() + require.NotEmpty(t, addr, "Addr should be populated after Start") + + // Probe /-/healthy with a short retry loop to absorb listener warmup. + url := "http://" + addr + "/-/healthy" + require.Eventually(t, func() bool { + resp, err := http.Get(url) + if err != nil { + return false + } + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 5*time.Second, 50*time.Millisecond, "instance never became healthy") + + require.NoError(t, a.Stop(t.Context())) + + // Stop is idempotent. + require.NoError(t, a.Stop(t.Context())) +} + +func TestApp_TwoSequentialInstances(t *testing.T) { + // Validates that per-instance Registerer + cleanup-stack teardown + // allow constructing a second App in the same process without + // duplicate-registration panics or leaked goroutines. + for i := range 2 { + a, err := New(testOptions(t)) + require.NoError(t, err, "iteration %d", i) + require.NoError(t, a.Start(), "iteration %d", i) + require.NotEmpty(t, a.Addr(), "iteration %d", i) + require.NoError(t, a.Stop(t.Context()), "iteration %d", i) + } +} + +func TestApp_TwoConcurrentInstances(t *testing.T) { + // Two live instances on different ephemeral ports, sharing the + // same process. This exercises the metrics-per-Registerer change + // from Phase A and ensures no shutdown-ordering bugs surface when + // Stop runs on one instance while another is still serving. + a1, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a1.Start()) + defer func() { _ = a1.Stop(t.Context()) }() + + a2, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a2.Start()) + defer func() { _ = a2.Stop(t.Context()) }() + + require.NotEqual(t, a1.Addr(), a2.Addr(), "instances should bind distinct ports") +} + +func TestApp_EmbeddedReloadDoesNotDeadlock(t *testing.T) { + // Regression: when callers use the lifecycle API (New + Start + Stop) + // without Run, the /-/reload HTTP handler must not block forever on + // the unbuffered a.webReload channel. The reload-routing goroutine + // started by Start is the consumer. + a, err := New(testOptions(t)) + require.NoError(t, err) + require.NoError(t, a.Start()) + defer func() { _ = a.Stop(t.Context()) }() + + type reloadResult struct { + err error + status int + } + resultCh := make(chan reloadResult, 1) + go func() { + resp, err := http.Post("http://"+a.Addr()+"/-/reload", "", nil) + if err != nil { + resultCh <- reloadResult{err: err} + return + } + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + resultCh <- reloadResult{status: resp.StatusCode} + }() + select { + case r := <-resultCh: + require.NoError(t, r.err) + require.Equal(t, http.StatusOK, r.status) + case <-time.After(5 * time.Second): + t.Fatal("/-/reload deadlocked in embedded mode") + } +} + +func TestApp_New_SetupFailureDoesNotDeadlock(t *testing.T) { + // Regression: setup failure in New triggers the rollback path which + // calls Stop. Stop must not block draining a.srvc because Start has + // not run and nothing will ever close that channel. + errCh := make(chan error, 1) + go func() { + // Empty Options fails validate (Logger required), exercising + // the earliest setup-failure path. + _, err := New(Options{}) + errCh <- err + }() + select { + case err := <-errCh: + require.Error(t, err) + case <-time.After(5 * time.Second): + t.Fatal("New deadlocked on setup-failure rollback") + } +} + +func TestApp_Run_ContextCancel(t *testing.T) { + // Exercises the Run wrapper end-to-end: cancel ctx and assert it + // returns without error and cleanup has run. + ctx, cancel := context.WithCancel(t.Context()) + done := make(chan error, 1) + go func() { done <- Run(ctx, testOptions(t)) }() + + // Give Run a moment to bind. We can't peek inside it for Addr, but + // cancelling is unconditionally safe. + time.Sleep(200 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(10 * time.Second): + t.Fatal("Run did not return after ctx cancel") + } +} diff --git a/internal/app/metrics.go b/internal/app/metrics.go new file mode 100644 index 0000000000..e2b69437d8 --- /dev/null +++ b/internal/app/metrics.go @@ -0,0 +1,98 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// metrics bundles the process-level Prometheus metrics owned by the app +// package. They used to live as package-level variables in +// cmd/alertmanager/main.go and were registered against +// prometheus.DefaultRegisterer at init time. They are now constructed per +// app.Run invocation against the registerer supplied via Options so that +// multiple instances can coexist within a single process (e.g. tests). +type metrics struct { + requestDuration *prometheus.HistogramVec + responseSize *prometheus.HistogramVec + clusterEnabled prometheus.Gauge + configuredReceivers prometheus.Gauge + configuredIntegrations prometheus.Gauge + configuredInhibitionRules prometheus.Gauge +} + +func newMetrics(reg prometheus.Registerer) *metrics { + f := promauto.With(reg) + return &metrics{ + requestDuration: f.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "alertmanager_http_request_duration_seconds", + Help: "Histogram of latencies for HTTP requests.", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + }, + []string{"handler", "method", "code"}, + ), + responseSize: f.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "alertmanager_http_response_size_bytes", + Help: "Histogram of response size for HTTP requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 7), + }, + []string{"handler", "method"}, + ), + clusterEnabled: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_cluster_enabled", + Help: "Indicates whether the clustering is enabled or not.", + }, + ), + configuredReceivers: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_receivers", + Help: "Number of configured receivers.", + }, + ), + configuredIntegrations: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_integrations", + Help: "Number of configured integrations.", + }, + ), + configuredInhibitionRules: f.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_inhibition_rules", + Help: "Number of configured inhibition rules.", + }, + ), + } +} + +func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + handlerLabel := prometheus.Labels{"handler": handlerName} + return promhttp.InstrumentHandlerDuration( + m.requestDuration.MustCurryWith(handlerLabel), + promhttp.InstrumentHandlerResponseSize( + m.responseSize.MustCurryWith(handlerLabel), + handler, + ), + ) +} diff --git a/internal/app/options.go b/internal/app/options.go new file mode 100644 index 0000000000..3541f80fe6 --- /dev/null +++ b/internal/app/options.go @@ -0,0 +1,104 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "errors" + "log/slog" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/exporter-toolkit/web" + + "github.com/prometheus/alertmanager/featurecontrol" +) + +// DefaultClusterAddr is the default listen address used when the operator +// does not pass --cluster.listen-address. +const DefaultClusterAddr = "0.0.0.0:9094" + +// Options carries the resolved configuration for a single Alertmanager +// instance. Field names follow the kingpin flags in cmd/alertmanager/main.go +// so that mapping between the two is straightforward. +// +// Logger, Registerer and Flagger are required dependencies; the remaining +// fields default to their zero value (which generally matches the kingpin +// flag default). +type Options struct { + // Storage and lifecycle. + ConfigFile string + DataDir string + Retention time.Duration + MaintenanceInterval time.Duration + MaxSilences int + MaxSilenceSizeBytes int + SilenceLogging bool + AlertGCInterval time.Duration + PerAlertNameLimit int + DispatchMaintenanceInterval time.Duration + DispatchStartDelay time.Duration + + // Web server. + WebConfig *web.FlagConfig + ExternalURL string + RoutePrefix string + GetConcurrency int + HTTPTimeout time.Duration + + // Cluster. + ClusterBindAddr string + ClusterAdvertiseAddr string + ClusterPeerName string + Peers []string + PeerTimeout time.Duration + PeersResolveTimeout time.Duration + GossipInterval time.Duration + PushPullInterval time.Duration + TCPTimeout time.Duration + ProbeTimeout time.Duration + ProbeInterval time.Duration + SettleTimeout time.Duration + ReconnectInterval time.Duration + PeerReconnectTimeout time.Duration + TLSConfigFile string + AllowInsecureAdvertise bool + Label string + + // Injected dependencies. + Logger *slog.Logger + Registerer prometheus.Registerer + Flagger featurecontrol.Flagger + + // Reload triggers a configuration reload each time it receives a + // value. The binary translates SIGHUP into sends on this channel; + // callers can also drive reloads programmatically. A nil channel + // disables external reloads (the /-/reload HTTP endpoint still works). + Reload <-chan struct{} +} + +func (o *Options) validate() error { + if o.Logger == nil { + return errors.New("alertmanager/app: Options.Logger is required") + } + if o.Registerer == nil { + return errors.New("alertmanager/app: Options.Registerer is required") + } + if o.Flagger == nil { + return errors.New("alertmanager/app: Options.Flagger is required") + } + if o.WebConfig == nil || o.WebConfig.WebListenAddresses == nil || len(*o.WebConfig.WebListenAddresses) == 0 { + return errors.New("alertmanager/app: Options.WebConfig must contain at least one listen address") + } + return nil +} diff --git a/internal/app/url.go b/internal/app/url.go new file mode 100644 index 0000000000..0f478e2c27 --- /dev/null +++ b/internal/app/url.go @@ -0,0 +1,57 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "fmt" + "log/slog" + "net" + "net/url" + "strings" +) + +// extURL resolves the externally visible URL of this Alertmanager instance, +// defaulting to http://: when external is empty. +func extURL(logger *slog.Logger, hostnamef func() (string, error), listen, external string) (*url.URL, error) { + if external == "" { + hostname, err := hostnamef() + if err != nil { + return nil, err + } + _, port, err := net.SplitHostPort(listen) + if err != nil { + return nil, err + } + if port == "" { + logger.Warn("no port found for listen address", "address", listen) + } + external = fmt.Sprintf("http://%s:%s/", hostname, port) + } + + u, err := url.Parse(external) + if err != nil { + return nil, err + } + if u.Scheme != "http" && u.Scheme != "https" { + return nil, fmt.Errorf("%q: invalid %q scheme, only 'http' and 'https' are supported", u.String(), u.Scheme) + } + + ppref := strings.TrimRight(u.Path, "/") + if ppref != "" && !strings.HasPrefix(ppref, "/") { + ppref = "/" + ppref + } + u.Path = ppref + + return u, nil +} diff --git a/cmd/alertmanager/main_test.go b/internal/app/url_test.go similarity index 99% rename from cmd/alertmanager/main_test.go rename to internal/app/url_test.go index 28f21bdc2b..b7df64deac 100644 --- a/cmd/alertmanager/main_test.go +++ b/internal/app/url_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package app import ( "fmt"