diff --git a/adapter/admin_grpc.go b/adapter/admin_grpc.go new file mode 100644 index 00000000..be43baca --- /dev/null +++ b/adapter/admin_grpc.go @@ -0,0 +1,192 @@ +package adapter + +import ( + "context" + "crypto/subtle" + "strings" + "sync" + + "github.com/bootjp/elastickv/internal/raftengine" + pb "github.com/bootjp/elastickv/proto" + "github.com/cockroachdb/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow +// subset of raftengine.Engine so tests can supply an in-memory fake without +// standing up a real Raft cluster. +type AdminGroup interface { + Status() raftengine.Status +} + +// NodeIdentity is the value form of the protobuf NodeIdentity message used for +// AdminServer configuration. It avoids copying pb.NodeIdentity, which embeds a +// protoimpl.MessageState (and a mutex). +type NodeIdentity struct { + NodeID string + GRPCAddress string +} + +func (n NodeIdentity) toProto() *pb.NodeIdentity { + return &pb.NodeIdentity{NodeId: n.NodeID, GrpcAddress: n.GRPCAddress} +} + +// AdminServer implements the node-side Admin gRPC service described in +// docs/admin_ui_key_visualizer_design.md §4 (Layer A). Phase 0 only implements +// GetClusterOverview and GetRaftGroups; remaining RPCs return Unimplemented so +// the generated client can still compile against older nodes during rollout. +type AdminServer struct { + self NodeIdentity + members []NodeIdentity + + groupsMu sync.RWMutex + groups map[uint64]AdminGroup + + pb.UnimplementedAdminServer +} + +// NewAdminServer constructs an AdminServer. `self` identifies the local node +// for responses that return node identity. `members` is the static membership +// snapshot shipped to the admin binary; callers that already have a membership +// source may pass nil and let the admin binary's fan-out layer discover peers +// by other means. +func NewAdminServer(self NodeIdentity, members []NodeIdentity) *AdminServer { + cloned := append([]NodeIdentity(nil), members...) + return &AdminServer{ + self: self, + members: cloned, + groups: make(map[uint64]AdminGroup), + } +} + +// RegisterGroup binds a Raft group ID to its engine so the Admin service can +// report leader and log state for that group. +func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) { + if g == nil { + return + } + s.groupsMu.Lock() + s.groups[groupID] = g + s.groupsMu.Unlock() +} + +// GetClusterOverview returns the local node identity, the configured member +// list, and per-group leader identity collected from the engines registered +// via RegisterGroup. +func (s *AdminServer) GetClusterOverview( + _ context.Context, + _ *pb.GetClusterOverviewRequest, +) (*pb.GetClusterOverviewResponse, error) { + leaders := s.snapshotLeaders() + members := make([]*pb.NodeIdentity, 0, len(s.members)) + for _, m := range s.members { + members = append(members, m.toProto()) + } + return &pb.GetClusterOverviewResponse{ + Self: s.self.toProto(), + Members: members, + GroupLeaders: leaders, + }, nil +} + +// GetRaftGroups returns per-group state snapshots. Phase 0 wires commit/applied +// indices only; per-follower contact and term history land in later phases. +func (s *AdminServer) GetRaftGroups( + _ context.Context, + _ *pb.GetRaftGroupsRequest, +) (*pb.GetRaftGroupsResponse, error) { + s.groupsMu.RLock() + out := make([]*pb.RaftGroupState, 0, len(s.groups)) + for id, g := range s.groups { + st := g.Status() + out = append(out, &pb.RaftGroupState{ + RaftGroupId: id, + LeaderNodeId: st.Leader.ID, + LeaderTerm: st.Term, + CommitIndex: st.CommitIndex, + AppliedIndex: st.AppliedIndex, + }) + } + s.groupsMu.RUnlock() + return &pb.GetRaftGroupsResponse{Groups: out}, nil +} + +func (s *AdminServer) snapshotLeaders() []*pb.GroupLeader { + s.groupsMu.RLock() + defer s.groupsMu.RUnlock() + out := make([]*pb.GroupLeader, 0, len(s.groups)) + for id, g := range s.groups { + st := g.Status() + out = append(out, &pb.GroupLeader{ + RaftGroupId: id, + LeaderNodeId: st.Leader.ID, + LeaderTerm: st.Term, + }) + } + return out +} + +// AdminTokenAuth builds a gRPC unary+stream interceptor pair enforcing +// "authorization: Bearer " metadata against the supplied token. An +// empty token disables enforcement; callers should pair that mode with a +// --adminInsecureNoAuth flag so operators knowingly opt in. +func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { + if token == "" { + return nil, nil + } + expected := []byte(token) + check := func(ctx context.Context) error { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return status.Error(codes.Unauthenticated, "missing authorization metadata") + } + values := md.Get("authorization") + if len(values) == 0 { + return status.Error(codes.Unauthenticated, "missing authorization header") + } + got, ok := strings.CutPrefix(values[0], "Bearer ") + if !ok { + return status.Error(codes.Unauthenticated, "authorization is not a bearer token") + } + if subtle.ConstantTimeCompare([]byte(got), expected) != 1 { + return status.Error(codes.Unauthenticated, "invalid admin token") + } + return nil + } + unary := func( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (any, error) { + if !strings.HasPrefix(info.FullMethod, "/Admin/") { + return handler(ctx, req) + } + if err := check(ctx); err != nil { + return nil, err + } + return handler(ctx, req) + } + stream := func( + srv any, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + if !strings.HasPrefix(info.FullMethod, "/Admin/") { + return handler(srv, ss) + } + if err := check(ss.Context()); err != nil { + return err + } + return handler(srv, ss) + } + return unary, stream +} + +// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator +// failed to supply a token and also did not opt into insecure mode. +var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without") diff --git a/adapter/admin_grpc_test.go b/adapter/admin_grpc_test.go new file mode 100644 index 00000000..3001ebec --- /dev/null +++ b/adapter/admin_grpc_test.go @@ -0,0 +1,141 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + pb "github.com/bootjp/elastickv/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +type fakeGroup struct { + leaderID string + term uint64 + commit uint64 + applied uint64 +} + +func (f fakeGroup) Status() raftengine.Status { + return raftengine.Status{ + Leader: raftengine.LeaderInfo{ID: f.leaderID}, + Term: f.term, + CommitIndex: f.commit, + AppliedIndex: f.applied, + } +} + +func TestGetClusterOverviewReturnsSelfAndLeaders(t *testing.T) { + t.Parallel() + srv := NewAdminServer( + NodeIdentity{NodeID: "node-a", GRPCAddress: "127.0.0.1:50051"}, + []NodeIdentity{{NodeID: "node-b", GRPCAddress: "127.0.0.1:50052"}}, + ) + srv.RegisterGroup(1, fakeGroup{leaderID: "node-a", term: 7}) + srv.RegisterGroup(2, fakeGroup{leaderID: "node-b", term: 3}) + + resp, err := srv.GetClusterOverview(context.Background(), &pb.GetClusterOverviewRequest{}) + if err != nil { + t.Fatalf("GetClusterOverview: %v", err) + } + if resp.Self.NodeId != "node-a" { + t.Fatalf("self = %q, want node-a", resp.Self.NodeId) + } + if len(resp.Members) != 1 || resp.Members[0].NodeId != "node-b" { + t.Fatalf("members = %v, want [node-b]", resp.Members) + } + if len(resp.GroupLeaders) != 2 { + t.Fatalf("group_leaders count = %d, want 2", len(resp.GroupLeaders)) + } +} + +func TestGetRaftGroupsExposesCommitApplied(t *testing.T) { + t.Parallel() + srv := NewAdminServer(NodeIdentity{NodeID: "n1"}, nil) + srv.RegisterGroup(1, fakeGroup{leaderID: "n1", term: 2, commit: 99, applied: 97}) + + resp, err := srv.GetRaftGroups(context.Background(), &pb.GetRaftGroupsRequest{}) + if err != nil { + t.Fatalf("GetRaftGroups: %v", err) + } + if len(resp.Groups) != 1 { + t.Fatalf("groups = %d, want 1", len(resp.Groups)) + } + g := resp.Groups[0] + if g.CommitIndex != 99 || g.AppliedIndex != 97 || g.LeaderTerm != 2 { + t.Fatalf("unexpected state %+v", g) + } +} + +func TestAdminTokenAuth(t *testing.T) { + t.Parallel() + unary, _ := AdminTokenAuth("s3cret") + if unary == nil { + t.Fatal("interceptor should be non-nil for configured token") + } + + info := &grpc.UnaryServerInfo{FullMethod: "/Admin/GetClusterOverview"} + handler := func(_ context.Context, _ any) (any, error) { return "ok", nil } + + cases := []struct { + name string + md metadata.MD + code codes.Code + call bool + }{ + {"missing metadata", nil, codes.Unauthenticated, false}, + {"missing header", metadata.Pairs(), codes.Unauthenticated, false}, + {"wrong scheme", metadata.Pairs("authorization", "Basic zzz"), codes.Unauthenticated, false}, + {"wrong token", metadata.Pairs("authorization", "Bearer nope"), codes.Unauthenticated, false}, + {"correct", metadata.Pairs("authorization", "Bearer s3cret"), codes.OK, true}, + } + for _, tc := range cases { + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + if tc.md != nil { + ctx = metadata.NewIncomingContext(ctx, tc.md) + } + resp, err := unary(ctx, nil, info, handler) + if tc.code == codes.OK { + if err != nil { + t.Fatalf("want OK, got %v", err) + } + if resp != "ok" { + t.Fatalf("handler not called: resp=%v", resp) + } + return + } + if status.Code(err) != tc.code { + t.Fatalf("code = %v, want %v (err=%v)", status.Code(err), tc.code, err) + } + }) + } +} + +func TestAdminTokenAuthSkipsOtherServices(t *testing.T) { + t.Parallel() + unary, _ := AdminTokenAuth("s3cret") + info := &grpc.UnaryServerInfo{FullMethod: "/RawKV/Get"} + handler := func(_ context.Context, _ any) (any, error) { return "ok", nil } + + resp, err := unary(context.Background(), nil, info, handler) + if err != nil { + t.Fatalf("non-admin method should not be gated: %v", err) + } + if resp != "ok" { + t.Fatalf("handler not called: resp=%v", resp) + } +} + +func TestAdminTokenAuthEmptyTokenDisabled(t *testing.T) { + t.Parallel() + unary, stream := AdminTokenAuth("") + if unary != nil || stream != nil { + t.Fatal("empty token should disable interceptors") + } +} diff --git a/cmd/elastickv-admin/main.go b/cmd/elastickv-admin/main.go new file mode 100644 index 00000000..4c9a2c1c --- /dev/null +++ b/cmd/elastickv-admin/main.go @@ -0,0 +1,257 @@ +// Command elastickv-admin serves the Elastickv admin Web UI described in +// docs/admin_ui_key_visualizer_design.md. Phase 0: token-protected passthrough +// of Admin.GetClusterOverview at /api/cluster/overview, no SPA yet. +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + pb "github.com/bootjp/elastickv/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" +) + +const ( + defaultBindAddr = "127.0.0.1:8080" + defaultNodesRefreshInterval = 15 * time.Second + defaultGRPCRequestTimeout = 10 * time.Second + readHeaderTimeout = 5 * time.Second + shutdownTimeout = 5 * time.Second +) + +var ( + bindAddr = flag.String("bindAddr", defaultBindAddr, "HTTP bind address for the admin UI") + nodes = flag.String("nodes", "", "Comma-separated list of elastickv node gRPC addresses") + nodeTokenFile = flag.String("nodeTokenFile", "", "File containing the bearer token sent to nodes' Admin service") + nodesRefreshInterval = flag.Duration("nodesRefreshInterval", defaultNodesRefreshInterval, "Duration to cache cluster membership before re-fetching") + insecureNoAuth = flag.Bool("adminInsecureNoAuth", false, "Skip bearer token authentication; development only") +) + +func main() { + flag.Parse() + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() error { + seeds := splitNodes(*nodes) + if len(seeds) == 0 { + return errors.New("--nodes is required (comma-separated gRPC addresses)") + } + + token, err := loadToken(*nodeTokenFile, *insecureNoAuth) + if err != nil { + return err + } + + fan := newFanout(seeds, token, *nodesRefreshInterval) + defer fan.Close() + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + mux.HandleFunc("/api/cluster/overview", fan.handleOverview) + mux.HandleFunc("/api/", func(w http.ResponseWriter, _ *http.Request) { + writeJSONError(w, http.StatusServiceUnavailable, "endpoint not implemented in phase 0") + }) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + _, _ = w.Write([]byte("elastickv-admin: phase 0 — SPA not yet embedded\n")) + }) + + srv := &http.Server{ + Addr: *bindAddr, + Handler: mux, + ReadHeaderTimeout: readHeaderTimeout, + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + errCh := make(chan error, 1) + go func() { + log.Printf("elastickv-admin listening on %s (seeds=%v)", *bindAddr, seeds) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + return + } + errCh <- nil + }() + + select { + case <-ctx.Done(): + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer shutdownCancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("shutdown: %w", err) + } + return nil + case err := <-errCh: + return err + } +} + +func splitNodes(raw string) []string { + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func loadToken(path string, insecureMode bool) (string, error) { + if path == "" { + if insecureMode { + return "", nil + } + return "", errors.New("--nodeTokenFile is required; pass --adminInsecureNoAuth for insecure dev mode") + } + if insecureMode { + return "", errors.New("--adminInsecureNoAuth and --nodeTokenFile are mutually exclusive") + } + abs, err := filepath.Abs(path) + if err != nil { + return "", fmt.Errorf("resolve token path: %w", err) + } + b, err := os.ReadFile(abs) + if err != nil { + return "", fmt.Errorf("read token file: %w", err) + } + token := strings.TrimSpace(string(b)) + if token == "" { + return "", errors.New("token file is empty") + } + return token, nil +} + +type nodeClient struct { + addr string + conn *grpc.ClientConn + client pb.AdminClient +} + +type fanout struct { + seeds []string + token string + refreshInterval time.Duration + + mu sync.Mutex + clients map[string]*nodeClient +} + +func newFanout(seeds []string, token string, refreshInterval time.Duration) *fanout { + if refreshInterval <= 0 { + refreshInterval = defaultNodesRefreshInterval + } + return &fanout{ + seeds: seeds, + token: token, + refreshInterval: refreshInterval, + clients: make(map[string]*nodeClient), + } +} + +func (f *fanout) Close() { + f.mu.Lock() + defer f.mu.Unlock() + for _, c := range f.clients { + _ = c.conn.Close() + } + f.clients = nil +} + +func (f *fanout) clientFor(addr string) (*nodeClient, error) { + f.mu.Lock() + defer f.mu.Unlock() + if c, ok := f.clients[addr]; ok { + return c, nil + } + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("dial %s: %w", addr, err) + } + c := &nodeClient{addr: addr, conn: conn, client: pb.NewAdminClient(conn)} + f.clients[addr] = c + return c, nil +} + +func (f *fanout) outgoingCtx(parent context.Context) context.Context { + if f.token == "" { + return parent + } + return metadata.AppendToOutgoingContext(parent, "authorization", "Bearer "+f.token) +} + +func (f *fanout) handleOverview(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), defaultGRPCRequestTimeout) + defer cancel() + + type perNode struct { + Node string `json:"node"` + OK bool `json:"ok"` + Error string `json:"error,omitempty"` + Data *pb.GetClusterOverviewResponse `json:"data,omitempty"` + } + + results := make([]perNode, len(f.seeds)) + var wg sync.WaitGroup + for i, addr := range f.seeds { + wg.Add(1) + go func(i int, addr string) { + defer wg.Done() + entry := perNode{Node: addr} + cli, err := f.clientFor(addr) + if err != nil { + entry.Error = err.Error() + results[i] = entry + return + } + resp, err := cli.client.GetClusterOverview(f.outgoingCtx(ctx), &pb.GetClusterOverviewRequest{}) + if err != nil { + entry.Error = err.Error() + results[i] = entry + return + } + entry.OK = true + entry.Data = resp + results[i] = entry + }(i, addr) + } + wg.Wait() + + writeJSON(w, http.StatusOK, map[string]any{"nodes": results}) +} + +func writeJSON(w http.ResponseWriter, code int, body any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(body) +} + +func writeJSONError(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]any{"code": code, "message": msg}) +} diff --git a/cmd/elastickv-admin/main_test.go b/cmd/elastickv-admin/main_test.go new file mode 100644 index 00000000..e33d167c --- /dev/null +++ b/cmd/elastickv-admin/main_test.go @@ -0,0 +1,77 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestSplitNodesTrimsAndDrops(t *testing.T) { + t.Parallel() + got := splitNodes(" host-a:50051 ,,host-b:50051 ,") + want := []string{"host-a:50051", "host-b:50051"} + if len(got) != len(want) { + t.Fatalf("len = %d, want %d (%v)", len(got), len(want), got) + } + for i, w := range want { + if got[i] != w { + t.Fatalf("[%d] = %q, want %q", i, got[i], w) + } + } +} + +func TestLoadTokenRequiresFileOrInsecure(t *testing.T) { + t.Parallel() + if _, err := loadToken("", false); err == nil { + t.Fatal("expected error when neither token nor insecure mode supplied") + } + tok, err := loadToken("", true) + if err != nil { + t.Fatalf("insecure-mode empty path should succeed: %v", err) + } + if tok != "" { + t.Fatalf("insecure-mode token = %q, want empty", tok) + } +} + +func TestLoadTokenReadsAndTrims(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "token") + if err := os.WriteFile(path, []byte("\n s3cret \n"), 0o600); err != nil { + t.Fatal(err) + } + tok, err := loadToken(path, false) + if err != nil { + t.Fatalf("loadToken: %v", err) + } + if tok != "s3cret" { + t.Fatalf("tok = %q, want s3cret", tok) + } +} + +func TestLoadTokenRejectsEmptyFile(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "empty") + if err := os.WriteFile(path, []byte(" \n"), 0o600); err != nil { + t.Fatal(err) + } + _, err := loadToken(path, false) + if err == nil || !strings.Contains(err.Error(), "empty") { + t.Fatalf("expected empty-file error, got %v", err) + } +} + +func TestLoadTokenRejectsInsecureWithFile(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "tok") + if err := os.WriteFile(path, []byte("x"), 0o600); err != nil { + t.Fatal(err) + } + if _, err := loadToken(path, true); err == nil { + t.Fatal("expected mutual-exclusion error when both supplied") + } +} diff --git a/docs/admin_ui_key_visualizer_design.md b/docs/admin_ui_key_visualizer_design.md new file mode 100644 index 00000000..d6e9dfa2 --- /dev/null +++ b/docs/admin_ui_key_visualizer_design.md @@ -0,0 +1,323 @@ +# Admin UI and Key Visualizer Design for Elastickv + +## 1. Background + +Elastickv currently exposes four data-plane surfaces (gRPC `RawKV`/`TransactionalKV`, Redis, DynamoDB, S3) and one control-plane surface (`Distribution.ListRoutes`, `SplitRange`). Operational insight is provided today by: + +- Prometheus metrics on `--metricsAddress` (default `:9090`), backed by `monitoring.Registry` (`monitoring/registry.go:12`). +- Pre-built Grafana dashboards under `monitoring/grafana/`. +- `grpcurl` against the `Distribution` and `RaftAdmin` services. +- `cmd/raftadmin` and `cmd/client` CLIs. + +There is no first-party Web UI, and — critically — no per-key or per-route traffic signal. Operators cannot answer questions such as "which key range is hot right now?", "is the load skewed across Raft groups?", or "did the last `SplitRange` actually relieve the hotspot?" without building ad-hoc Prometheus queries, and even those queries cannot drill below the Raft-group aggregate. + +This document proposes a built-in admin Web UI, shipped as a separate binary `cmd/elastickv-admin`, and a TiKV-style **Key Visualizer** that renders a time × key-range heatmap of load. The design reuses existing control-plane gRPC APIs (routes, Raft status) and adds a minimal, hot-path-safe sampler for per-route traffic. The initial milestones intentionally avoid depending on the Prometheus client library so that the admin binary remains independently buildable and shippable. + +## 2. Goals and Non-goals + +### 2.1 Goals + +1. Ship a standalone admin binary `cmd/elastickv-admin` that connects to one or more elastickv nodes over gRPC and serves a Web UI. +2. Provide a single UI that covers cluster overview, routes, Raft groups, adapter throughput, and the key visualizer. +3. Produce a time × key-space heatmap with at least four switchable series: read count, write count, read bytes, write bytes. +4. Follow hotspot shards across `SplitRange` / merge events so the heatmap stays continuous. +5. Keep the sampler's hot-path overhead within the measurement noise floor of `BenchmarkCoordinatorDispatch`. Accuracy is expressed as a bound on the **estimator's relative error**, not a raw capture rate (see §5.2). +6. Stay off the Prometheus client library in Phases 0–3. Traffic counters used by the UI are maintained by the in-process sampler and a small adapter-side aggregator that already exists on the hot path. +7. Make the admin binary easy to deploy: a single Go binary with the SPA embedded via `go:embed`, producing one artifact per platform in CI. +8. Protect the node-side `Admin` gRPC service from Phase 0. The UI may bind to localhost, but the nodes expose metadata on their data-plane gRPC port, so read-only admin RPCs require an operator token by default. + +### 2.2 Non-goals + +1. Replacement of the existing Grafana dashboards. The admin UI focuses on cluster state and the keyspace view; long-horizon trend analysis remains a Prometheus/Grafana concern. +2. Per-individual-key statistics. The visualizer operates on route-level buckets, not on a `GET` / `PUT` trace. +3. Full multi-user RBAC, identity federation, or browser login flows. Phase 0 only requires a shared read-only admin token for the node-side gRPC service; richer auth remains deferred. +4. Query console (SQL/Redis/DynamoDB REPL) inside the UI. Deferred. +5. Multi-cluster federation. Scope is a single cluster; the admin binary may target any single node. + +## 3. High-level Architecture + +```mermaid +flowchart LR + Browser["Browser (Svelte SPA, embedded)"] + + subgraph AdminHost["Operator machine or sidecar"] + Admin["cmd/elastickv-admin :8080"] + end + + subgraph Cluster["Elastickv Cluster"] + Node1["Node A"] + Node2["Node B"] + Node3["Node C"] + end + + Browser -- "HTTP/JSON + WebSocket" --> Admin + Admin -- "gRPC: Distribution, RaftAdmin, Admin.KeyViz" --> Node1 + Admin -- "gRPC" --> Node2 + Admin -- "gRPC" --> Node3 + + subgraph NodeInternal["Inside each Node"] + Sampler["keyviz.Sampler"] + Coord["kv.ShardedCoordinator"] + Dist["distribution.Engine"] + Raft["raftengine.StatusReader"] + AdminSvc["Admin gRPC Service"] + end + + Coord -- "Observe(routeID, op, size)" --> Sampler + AdminSvc --> Sampler + AdminSvc --> Dist + AdminSvc --> Raft +``` + +The admin binary holds no authoritative state. All data is fetched on demand from nodes via a new `Admin` gRPC service. The sampler's ring buffer lives inside each node's process, rebuildable after restart once Phase 3 persistence is enabled (see §5.6). + +### 3.1 Why a separate binary + +- Release cadence for the UI is decoupled from the data plane. +- The admin binary can be placed on an operator workstation or a sidecar pod, so a compromised UI does not imply a compromised data node. +- Node binaries remain free of the Prometheus client (goal §2.1-6) and of any SPA assets. +- `cmd/elastickv-admin --nodes=host:50051 --nodeTokenFile=/etc/elastickv/admin.token` is the full invocation; no multi-file config bundle is required for the default use case. + +## 4. API Surface + +Two layers: + +**Layer A — gRPC, node → admin binary.** A new `Admin` service on each node, registered on the same gRPC port as `RawKV` (`--address`, default `:50051`). All methods are read-only in Phases 0–3 and require `authorization: Bearer ` metadata. Nodes load the token from `--adminTokenFile`; the admin binary sends it from `--nodeTokenFile`. An explicit `--adminInsecureNoAuth` flag exists only for local development and logs a warning at startup. + +| RPC | Purpose | +|---|---| +| `GetClusterOverview` | Node identity, Raft leader map per group, aggregate QPS | +| `ListRoutes` | Existing `Distribution.ListRoutes` (reused, not duplicated) | +| `GetRaftGroups` | Per-group state (leader, term, commit/applied, last contact) | +| `GetAdapterSummary` | Per-adapter QPS and latency quantiles from the in-process aggregator | +| `GetKeyVizMatrix` | Heatmap matrix for **this node's locally observed samples**: leader writes plus reads served locally, including follower-local reads (see §5.1). The admin binary fans out and merges. | +| `GetRouteDetail` | Time series for one route or virtual bucket (drill-down). The admin binary fans out because reads may be observed by followers. | +| `StreamEvents` | Server-stream of route-state transitions and fresh matrix columns | + +**Layer B — HTTP/JSON, browser → admin binary.** Thin pass-through wrappers over the gRPC calls, plus static asset serving. + +| Method | Path | Purpose | +|---|---|---| +| GET | `/` (and `/assets/*`) | Embedded SPA | +| GET | `/api/cluster/overview` | Wraps `GetClusterOverview` | +| GET | `/api/routes` | Wraps `ListRoutes` + derived size/leader | +| GET | `/api/raft/groups` | Wraps `GetRaftGroups` | +| GET | `/api/adapters/summary` | Wraps `GetAdapterSummary` | +| GET | `/api/keyviz/matrix` | Wraps `GetKeyVizMatrix` | +| GET | `/api/keyviz/buckets/{bucketID}` | Wraps `GetRouteDetail` for a real route bucket or coarsened virtual bucket | +| WS | `/api/stream` | Multiplexes `StreamEvents` from all targeted nodes | + +HTTP errors use a minimal `{code, message}` envelope. No caching headers on read endpoints. + +### 4.1 `GetKeyVizMatrix` parameters + +| Field | Type | Default | Notes | +|---|---|---|---| +| `series` | enum(`reads`,`writes`,`readBytes`,`writeBytes`) | `writes` | Selects which counter is returned | +| `from` | timestamp | now−1h | Inclusive | +| `to` | timestamp | now | Exclusive | +| `rows` | int | 256 | Target Y-axis resolution (server may return fewer) | + +Response matrix format: `matrix[i][j]` is the value for bucket `i` at time column `j`. Keys in `start`/`end` are raw bytes; the server supplies `label` as a printable preview (§5.6). Each row also carries bucket metadata: + +| Field | Meaning | +|---|---| +| `bucketID` | Stable UI identifier, either `route:` or `virtual:`. | +| `aggregate` | `true` when multiple routes were coarsened into this row. | +| `routeIDs` / `routeCount` | Exact route IDs for small aggregates, plus total count. Large aggregates may truncate `routeIDs` and set `routeIDsTruncated=true`. | +| `sampleRoles` | Which roles contributed: `leaderWrite`, `leaderRead`, `followerRead`. | +| `lineageID` | Present for persisted Phase 3 rows so the UI can track continuity across split/merge events. | + +## 5. Key Visualizer + +### 5.1 Sampling point + +A single call site is added at the dispatch entry of `kv.ShardedCoordinator` (see `kv/sharded_coordinator.go`), immediately after the request is resolved to a `RouteID`: + +```go +sampler.Observe(routeID, op, keyLen, valueLen) +``` + +`sampler` is an interface; the default implementation is nil-safe (a nil sampler compiles to one branch and no allocation). The hook runs *before* Raft proposal so it measures offered load, not applied load. + +Writes are sampled exactly once by the current Raft leader before proposal. Reads are sampled by the node that actually serves the read: leader reads are marked `leaderRead`, and lease/follower-local reads are marked `followerRead`. Requests forwarded between nodes carry an internal "already sampled" marker so a logical operation is not counted twice. Because read load can be spread across followers, a cluster-wide heatmap requires the admin binary to fan out and merge across nodes (§9.1) — pointing at a single node would produce a partial view. + +**Leadership loss.** Each sample carries the `(raftGroupID, leaderTerm)` under which it was recorded. When the node's lease-loss callback fires for a group, the sampler stamps all `leaderWrite` samples for that group in the current and previous step window with `staleLeader=true` rather than deleting them — keeping them visible on the heatmap helps operators diagnose rapid leadership churn, and they remain authoritative for the window in which this node was in fact the leader. The admin fan-out (§9.1) merges writes by `(bucketID, raftGroupID, leaderTerm, windowStart)`, so the stale samples from an old leader and the fresh samples from a new leader never double-count: distinct terms are summed (each term's leader only saw its own term's writes), and within a single term the one leader's samples are authoritative. If fan-out receives `staleLeader=true` samples that conflict with a concurrent newer-term sample for the same window, the cell is flagged `conflict=true` and rendered hatched. + +The hot path uses lock-free reads for route lookup and counter increments. The data structures used are: + +- **Current-window counters**: `routes` is an immutable `routeTable` published through `atomic.Pointer[routeTable]`. `routeTable` owns `map[RouteID]*routeSlot`; each `routeSlot` owns an `atomic.Pointer[routeCounters]`. `Observe` loads the current table, performs a plain map lookup against that immutable snapshot, loads the slot's counter pointer, and uses `atomic.AddUint64` on the counter fields. Adding a new `RouteID` or replacing split/merge mappings performs a copy-on-write table update under a non-hot-path `routesMu`, then publishes the new table with one atomic store. No `Observe` call ever runs against a Go map that can be mutated concurrently. +- **Flush**: instead of holding a long write lock, the flush goroutine **atomically swaps** the `*routeCounters` pointer for each key using `atomic.Pointer[routeCounters]`, then reads the old pointer's frozen counters to build the new matrix column. `Observe` that loaded the old pointer before the swap completes its increments against the (now-retired) old counters, which the next flush will harvest. No counts are lost; at most one step-boundary's worth of counts land in the next column instead of the current one. +- **Split/merge** (§5.4): the route-watch callback creates the new child slots and publishes a new immutable `routeTable` *before* the `distribution.Engine` exposes the new `RouteID` to the coordinator, so by the time `Observe` sees the new `RouteID` the counter already exists and the callback does not race with the hot path. + +### 5.2 Adaptive sub-sampling and the accuracy SLO + +Observing every call is cheap but not free. To stay under the benchmark noise floor at very high per-route QPS, the sampler may sub-sample via **adaptive 1-in-N per route**. Counters remain unbiased estimators because each accepted sample increments by `sampleRate`. + +The capture rate itself is not the SLO — at `sampleRate = 8` the raw capture rate is 12.5%, but the estimator is still unbiased. What the UI cares about is the **relative error of the bucket total** shown in the heatmap. The SLO is therefore: + +> For every bucket displayed in the response, the estimated total is within **±5% of the true value with 95% confidence**, over the bucket's full step window (default 60 s). + +For Poisson-ish traffic, the relative error of the Horvitz–Thompson estimator is approximately `1 / sqrt(acceptedSamples)` for 1-in-N sub-sampling where N > 1. Setting this ≤0.05 at 95% CI gives a required `acceptedSamples ≥ (1.96 / 0.05)² ≈ 1537`, independent of the current 1-in-N rate. Buckets sampled at `sampleRate = 1` are exact and do not need the bound. The adaptive controller enforces this by never raising `sampleRate` past the point where the most recent window's `acceptedSamples` falls below that bound; if a burst violates the bound the affected buckets are flagged in the response and the UI renders them hatched so the operator knows the estimate is soft. + +`sampleRate` only rises at all when the previous flush window's estimated `Observe` cost crosses a measured threshold. To avoid adding profiling overhead to the hot path, the cost is estimated with a **synthetic model** (no runtime profiler involved): at startup `BenchmarkCoordinatorDispatch` with the sampler enabled records `costPerObserveNs` once, and each flush window computes `estimatedObserveCPU = Σ_routes(observeCount × costPerObserveNs)` directly from the counters already being harvested. This is exact up to the benchmarked cost constant and zero-overhead at runtime. In steady state with moderate per-route QPS, `sampleRate` stays at 1 and every op is counted. + +Benchmark gate in CI: `BenchmarkCoordinatorDispatch` with sampler off vs on; the delta must stay within run-to-run variance. Separately, a correctness test drives a known synthetic workload through a sub-sampling sampler and asserts the ±5% / 95%-CI bound holds across 1000 trials. + +### 5.3 In-memory representation and the route budget + +``` +Sampler + ├─ routes atomic.Pointer[routeTable] // immutable map[RouteID]*routeSlot, COW-updated off the hot path + │ each routeSlot points to (reads, writes, readBytes, writeBytes, sampleRate) + └─ history *ringBuffer[matrixColumn] // one column per stepSeconds (default 60s) +``` + +Every `stepSeconds` a flush goroutine swaps each route's counter pointer (§5.1) and drops a new column into the ring buffer. + +**Route budget and memory cap.** Naïve sizing (`columns × routes × series × 8B`) does not scale: 1 M routes × 1440 columns × 4 series × 8 B = ~46 GiB. Unbounded growth is unacceptable. The sampler enforces a hard budget on tracked routes: + +- A new flag `--keyvizMaxTrackedRoutes` (default **10 000** per node) caps the size of `routes`. +- When `ListRoutes` exceeds the cap, the sampler **coarsens adjacent routes into virtual tracking buckets** sized to fit the budget. The admin binary still sees real `RouteID`s in `ListRoutes`, but their `Observe` calls land in the shared bucket. The matrix response never pretends that such a row is a single route: it sets `aggregate=true`, returns a `virtual:*` `bucketID`, includes `routeCount` and the constituent `routeIDs` when small enough, and labels the range `[start-of-first, end-of-last)`. +- Coarsening is greedy on sorted `start` with merge priority given to **lowest recent activity**, so hot routes stay 1:1 until the budget is exhausted. +- Compacted storage: columns older than 1 hour are re-bucketed into 5-minute aggregates, and columns older than 6 hours into 1-hour aggregates. The resulting steady-state footprint is: + +| Tracked routes | Ring-buffer retention | Footprint (4 series × 8 B) | +|---|---|---| +| 10 000 (default cap) | 24 h (1440 × 60 s) | ~1.8 GiB raw, **~120 MiB** after tiered compaction | +| 10 000 | 1 h only | **~18 MiB** | +| 1 000 | 24 h compacted | ~12 MiB | + +If an operator needs higher fidelity across more routes than the cap allows, they raise `--keyvizMaxTrackedRoutes` knowingly; the log emits an `INFO` at startup stating the selected cap and projected memory. If the cap is hit at runtime, an `INFO` fires once per hour naming which adjacent routes were coalesced. + +### 5.4 Keeping up with splits and merges + +`distribution.Engine` already emits a watch stream on route-state transitions. The sampler subscribes and, on a split, copies the parent route's historical column values into both children so the heatmap stays visually continuous across the event. On a merge, child columns are summed into the surviving parent. Current-window updates use the immutable-table, pointer-swap scheme from §5.1: child `routeSlot`s and `routeCounters` are installed in a freshly copied `routeTable` **before** the `distribution.Engine` publishes the new `RouteID` to the coordinator, so `Observe` never dereferences a missing route. Counts that raced a transition are attributed to whichever `RouteID` the coordinator resolved — acceptable because the loss is bounded by a single step window. + +### 5.5 Bucketing for the response + +The API's `rows` parameter is a *target*, not a guarantee. The server walks the route list in lexicographic order of `start` and greedily merges adjacent routes until the row count fits. Merge priority: lowest total activity across the requested window, so hotspots stay un-merged and visible. + +### 5.6 Persistence + +Phases 0–2 keep history in memory only. Restart loses the heatmap — acceptable for an MVP and keeps the Raft critical path untouched. Phase 3 changes that contract: persisted lineage records are the source of truth and the sampler rebuilds `RouteID → lineageID` state from them on restart. + +Phase 3 persists compacted columns **distributed across the user Raft groups themselves, not the default group**. Concentrating KeyViz writes on the default group would centralise I/O and Raft-log growth onto a single group, creating exactly the kind of hotspot this feature is built to surface. Instead: + +- Each compacted KeyViz column is written to the **Raft group that owns its key range**, under a group-local admin namespace `!admin|keyviz|range||`; the prefix is not routed through the default group. Phase 3 also adds an explicit system-namespace filter so every user-plane read and timestamp-selection path — `pebbleStore.ScanAt`, `ReverseScanAt`, `GetAt`, `ExistsAt`, and `ShardedCoordinator.maxLatestCommitTS` — ignores `!admin|*` records; point reads that target an `!admin|*` key return `NotFound` as if the key did not exist, so an attacker cannot distinguish "hidden" from "missing". The current `isPebbleMetaKey` exact-match check (`store/lsm_store.go:299`) is widened to a prefix check on `!admin|`, and the same check is applied in `nextScannableUserKey` / `prevScannableUserKey` so internal KeyViz records are skipped during user-plane scans. To prevent the inverse leak, every data-plane adapter (gRPC `RawKV`/`TransactionalKV`, Redis, DynamoDB, S3) rejects user-plane writes — `Put`, `Delete`, transactional mutations, and Redis equivalents — whose key starts with `!admin|`. The check is centralised in `kv.ShardedCoordinator` so adapters cannot forget it; a write attempting an `!admin|*` key returns `InvalidArgument` and is recorded in the audit metric. +- `lineageID` is generated **exactly once, by the Raft leader proposing the split/merge**, as part of the route-transition command itself, and then stored in the Raft log — so every replica reads the same value instead of regenerating it. This avoids violating the repository invariant that persistence timestamps must originate from the Raft leader, not from a node-local clock. The transition HLC used is the **leader-issued HLC stamped onto the `SplitRange`/`MergeRange` Raft proposal** (same HLC that backs OCC decisions), never a node-local snapshot; followers observe the lineageID by replaying the committed command. +- The UUIDv7 is derived deterministically from that leader-issued HLC plus the proposal's Raft log index so the same transition yields the same lineageID on every replica and on re-proposal: the 48-bit `unix_ts_ms` field gets the HLC physical part (ms resolution), and the full 16-bit HLC logical counter is packed across `rand_a` (12 bits) and the top nibble of `rand_b` — logical bits `[15:4]` into `rand_a`, logical bits `[3:0]` into the top 4 bits of `rand_b`, so no logical bits are dropped. The remaining 58 bits of `rand_b` are filled from `BLAKE2b-256(raftGroupID || raftLogIndex || proposalBytes)` truncated to 58 bits — deterministic across replicas, collision-resistant across transitions, and no runtime RNG dependency. The lineage record stores `{start, end, routeID, validFromHLC, validToHLC, parentLineageIDs, raftLogIndex}` with `validFromHLC` carrying the full HLC so the reader can re-sort authoritatively; `RouteID` is recorded only as the current routing hint, never as the primary history key. +- Split and merge events append small group-local lineage records under `!admin|keyviz|lineage|` and mark closed branches with `validToHLC` so retention GC can later prune them. On split, both children point back to the parent lineage and inherit the parent's compacted history for continuity. On merge, the survivor records both child lineage IDs and the reader sums overlapping intervals. If a node sees historical rows without a lineage record during an upgrade, the admin reader falls back to overlap on the persisted `[start, end)` range before using `RouteID`. +- On startup, the sampler rebuilds its in-memory `RouteID → lineageID` map by scanning the group-local lineage index for routes currently owned by the node's groups and matching active `[start, end)` ranges from `ListRoutes`. If a route exists without a matching lineage record, the node creates a new lineage record with a parent pointer to the best overlapping retained range. This makes rolling restarts and upgrades preserve historical continuity. +- Writes are batched per group on a configurable interval (`--keyvizPersistInterval`, **default 5 min**, max 1 h) and dispatched as a single low-priority Raft proposal per group, keeping the write amplification proportional to the group's own traffic. Hourly was rejected as the default because a node crash between flushes would lose up to one hour of heatmap; 5 min bounds worst-case loss while still amortising Raft cost. As a defence-in-depth against single-point loss, each node also keeps the most recent unflushed window in a small **append-only WAL file** (`/keyviz/wal-.log`) under the same retention contract, with two hard bounds to keep restart fast: the WAL is **size-capped at `--keyvizWALMaxBytes` (default 64 MiB)** and **checkpointed every `--keyvizPersistInterval`** — when a batch is persisted to Raft, the corresponding WAL prefix is truncated. This caps worst-case replay at one interval's worth of data (at the default, tens of MiB at most), and a target recovery budget of **≤1 s replay time at 1 M ops/s**. If the WAL exceeds its size cap before the next flush — indicating the node is behind on persistence — the sampler drops the oldest records and records a `keyviz_wal_shed_total` metric instead of blocking the hot path. On startup the sampler fast-loads the WAL without running the adaptive controller, then resumes normal operation; readiness is gated on WAL replay completion so rolling upgrades do not route traffic to a node that is still rebuilding state. Operators that want stricter durability set `--keyvizPersistInterval=30s`; those that want faster restart at the cost of more write amplification set a smaller `--keyvizWALMaxBytes`. +- Retention is enforced by a KeyViz-specific GC pass, not by assuming ordinary HLC expiry will delete the latest MVCC version. Phase 3 prefers a **Pebble `CompactionFilter`** that drops expired `!admin|keyviz|*` versions during normal background compactions — this avoids the I/O and CPU cost of an out-of-band scan-and-delete sweep, since the work happens during compactions that would run anyway. As a fallback for store flavours where a CompactionFilter is unavailable, an opt-in maintenance pass tombstones expired column and lineage records using a bounded, time-budgeted scan (default ≤5% of disk read bandwidth). Persistence refuses to enable if neither path is available, avoiding unbounded growth. +- Lineage records are retained while any column in the 7-day retention window references them. The same GC pass prunes closed lineage branches whose `validToHLC` and descendants are older than retention, so frequent split/merge clusters do not accumulate an unbounded lineage tree. +- The admin binary, on a history query, fans out to all groups' leaders (§9.1), reconstructs the range timeline from lineage metadata, and merges returned slices by time × key-range overlap. This keeps a hotspot visually continuous even when its serving `RouteID` changed across a `SplitRange` or merge. +- For coarsened virtual buckets (§5.3), the column is written to the group owning the bucket's **first** constituent route, with a small index entry under `!admin|keyviz|index|` on the same group so the fan-out reader can discover it. The index entry is the only per-hour write that is shared — but its size is bounded by the route-budget cap, not by total traffic. + +This keeps the data-plane Raft-log overhead bounded by per-group load and fails independently when a single group is unavailable. + +### 5.7 Key preview labels + +Raw keys are binary. The UI needs a printable hint per bucket. Strategy: + +1. If all keys in the bucket's `[start, end)` are valid UTF-8 with no control characters, return the common byte prefix truncated to 24 chars. +2. Otherwise, return a hex preview of the common prefix plus `…`. +3. Internal reserved prefixes (`!txn|`, `!dist|*`, `!admin|*`) are labelled explicitly and rendered with a distinct color in the UI, so system traffic is never confused with user traffic. + +## 6. Adapter Summary Without Prometheus + +The existing `monitoring.Registry` observers record into Prometheus counters/histograms — useful for Grafana, but not readable back without pulling in the Prometheus client library. To keep the admin binary and node binary free of that dependency during Phases 0–3: + +- A small sibling struct `monitoring.LiveSummary` is added alongside each observer. It maintains, in parallel with the existing Prometheus writes, an in-process rolling window (10-second buckets, 5-minute history) of request count and latency per adapter and per operation. +- Latency is tracked with a **fixed-bucket log-linear histogram** (256 pre-sized buckets covering 1 µs – 10 s, similar to the Prometheus default schema but owned in-process). Each observation is a single `atomic.AddUint64` on the bucket's counter — no sort, no merge, no locks, predictable nanosecond cost. Quantiles (p50/p95/p99) are interpolated at read time by `GetAdapterSummary`. A t-digest was considered but rejected because its centroid merge cost is not bounded on the hot path and is hard to make concurrent without a lock. +- Count, in-flight, and byte totals are plain `atomic.Uint64`. +- `GetAdapterSummary` reads directly from `LiveSummary`. The Prometheus exposition remains unchanged and untouched. + +This adds roughly a dozen integer fields per tracked operation and avoids both the Prometheus dependency and the need to scrape `/metrics` from within the admin binary. + +## 7. Frontend + +- **Stack**: SvelteKit (static adapter) + TypeScript + Tailwind + ECharts (`heatmap` series). +- **Why Svelte**: smaller bundle (~150 KB gzipped for the full app vs ~350 KB for React + equivalent libs), fewer transitive dependency updates to audit, trivial static build that embeds cleanly with `go:embed`. Selected explicitly to favour maintenance simplicity and deployment size. +- **Layout**: left nav with Overview / Routes / Raft / Adapters / Key Visualizer. +- **Key Visualizer page**: + - X-axis time, Y-axis route buckets, brush-to-zoom on both axes. + - Series switcher (reads / writes / readBytes / writeBytes). + - Range selection opens a drawer with the underlying route list, current leader(s), size, and a link to the Raft group page. For `aggregate=true` rows, the drawer explicitly says the row is a coarsened virtual bucket and lists the constituent routes or the truncated route count. + - Live mode: a WebSocket push appends a new column every `stepSeconds` without refetching history. + - Buckets that miss the ±5% / 95%-CI estimator bound are hatched to signal estimation uncertainty. +- **Build**: `web/` at repo root, `pnpm build` output copied to `cmd/elastickv-admin/dist/`, embedded with `//go:embed dist`. +- **Dev flow**: Vite dev server on `:5173` proxies `/api` and `/stream` to a locally running `cmd/elastickv-admin`. + +## 8. Integration Points + +| File | Change | +|---|---| +| `cmd/elastickv-admin/` (new) | Main, HTTP server, gRPC clients, embedded SPA. | +| `adapter/admin_grpc.go` (new) | Server-side implementation of the `Admin` gRPC service, registered in `main.go`. | +| `proto/admin.proto` (new) | Service definition for `Admin`. | +| `kv/sharded_coordinator.go` | One-line `sampler.Observe(...)` at dispatch entry; `sampler` is `keyviz.Sampler` injected via constructor, nil-safe. Phase 3 also filters `!admin|*` from `maxLatestCommitTS` and rejects user-plane writes (`Put`/`Delete`/transactional mutations) targeting `!admin|*` with `InvalidArgument`, so adapters (gRPC, Redis, DynamoDB, S3) cannot bypass the isolation. | +| `keyviz/` (new) | `Sampler`, adaptive sub-sampler, ring buffer, route-watch subscriber, WAL replay, preview logic, tests. | +| `monitoring/live_summary.go` (new) | Rolling-window adapter counters, hooked into existing observers. | +| `store/lsm_store.go` | Phase 3 widens `isPebbleMetaKey` from exact-match to a prefix check on `!admin|` so `nextScannableUserKey` / `prevScannableUserKey` skip all internal KeyViz records during user-plane scans; adds retention GC (Pebble `CompactionFilter` preferred, time-budgeted maintenance sweep fallback) for expired `!admin|keyviz|*` columns and lineage records. | +| `main.go` | Register token-protected `Admin` gRPC service; wire `keyviz.Sampler` into the coordinator; wire `LiveSummary` into observers; add `--adminTokenFile`, `--adminInsecureNoAuth`, `--keyvizMaxTrackedRoutes`, `--keyvizPersistInterval`, and `--keyvizWALMaxBytes`. | +| `web/` (new) | Svelte SPA source. | + +No changes to Raft or FSM are required. Data-plane protocol adapters only receive the sampler call site and the `LiveSummary` hook that sits next to existing Prometheus writes. Phase 3 intentionally touches the store/coordinator read paths to keep `!admin|keyviz|*` metadata out of user scans and timestamp selection. + +## 9. Deployment and Operation + +- The admin binary is not intended to be exposed on the public network in its initial form. Default bind is `127.0.0.1:8080`; browser login and RBAC are deferred, but node-side `Admin` gRPC calls require the shared read-only token from §4. +- Typical operator workflow: `ssh -L 8080:localhost:8080 operator@host` then `elastickv-admin --nodes=host1:50051,host2:50051,host3:50051 --nodeTokenFile=/etc/elastickv/admin.token`, or run the binary on a laptop and point it at any reachable subset of nodes. +- The admin binary is stateless; it can be killed and restarted without coordination. +- CI produces release artifacts for `linux/amd64`, `linux/arm64`, `darwin/arm64`, and `windows/amd64`. + +### 9.1 Cluster-wide fan-out + +Because writes are recorded by Raft leaders and follower-local reads are recorded by the followers that serve them (§5.1), pointing the admin binary at a single node produces a **partial heatmap**. To give operators a complete view by default, the admin binary runs in **fan-out mode**: + +- `--nodes` accepts a comma-separated list of seed addresses. The admin binary calls `GetClusterOverview` on any reachable seed to discover the current full membership (node → gRPC endpoint, plus per-group leader identity). Membership is cached for `--nodesRefreshInterval` (**default 15 s**) so a stampede of concurrent browser requests hits at most one `GetClusterOverview` per interval per seed, while scale-out events are still reflected within seconds. The cache is refreshed lazily on the first request after expiry and invalidated immediately on any per-node `Unavailable` error, so removed or replaced nodes are dropped on the next request instead of waiting for the next tick. +- For each query (`GetKeyVizMatrix`, `GetRouteDetail`, `GetAdapterSummary`), the admin binary issues parallel gRPC calls to every known node and merges results server-side before sending one combined JSON payload to the browser. +- Merging rule for the heatmap: rows are grouped by `bucketID`/`lineageID` and time step. Read samples from multiple nodes are **summed**, because they represent distinct locally served reads. Write samples are grouped by `(bucketID, raftGroupID, leaderTerm, sourceNode, windowStart)` and summed across distinct leader terms during leadership transitions; exact duplicate source keys are deduplicated. The admin binary never uses "later timestamp wins" to overwrite a previous leader's complete window with a new leader's partial window. If two leaders claim overlapping terms for the same group, the cell is returned with `conflict=true` and rendered hatched rather than silently dropping data. +- Degraded mode: if any node is unreachable, the admin binary returns a partial result with a per-node `{node, ok, error}` status array so the UI can surface "3 of 4 nodes responded" instead of silently hiding ranges. The heatmap hatches rows or time windows whose expected source node failed. +- A single-node mode (`--nodes=one:50051 --no-fanout`) is retained for operators who explicitly want the partial view. + +## 10. Performance Considerations + +- Sampler fast path on a hit: `atomic.Pointer[routeTable].Load`, immutable map lookup by `RouteID`, `atomic.Pointer[routeCounters].Load`, then `atomic.AddUint64` on the four counters. No allocation per call, no mutex acquisition, no global lock. +- The coordinator already holds the `RouteID` at the hook site, so the sampler does not re-resolve. +- The flush goroutine performs atomic pointer swaps per tracked route; there is no write lock covering `Observe` calls. Splits and merges publish a copied immutable route table with child counters before publishing the new `RouteID` (§5.4), so the callback does not race with the hot path. +- API endpoints cap `to − from` at 7 days and `rows` at 1024 to bound server work. +- `LiveSummary` adds a second atomic increment alongside each existing Prometheus `Inc()`, plus one atomic increment on a fixed-bucket histogram counter. Cost is on the order of a nanosecond and well below the noise floor in §5.2. +- Fan-out cost (§9.1) is N parallel gRPC calls; each node serves only its locally observed samples, so the response size is distributed and the aggregate wall-clock is bounded by the slowest node, not the sum. + +## 11. Testing + +1. Unit tests for `keyviz.Sampler`: concurrent `Observe` under the `-race` detector while copy-on-write route-table updates run, flush correctness via the pointer-swap protocol, split/merge reshaping, forwarded-read "already sampled" deduplication, and the **accuracy SLO** (1000 trials of synthetic workload must satisfy ±5% relative error at 95% CI per §5.2). +2. Route-budget test: generate more than `--keyvizMaxTrackedRoutes` routes and assert that coarsening preserves total observed traffic, keeps hot routes un-merged, and returns `aggregate`, `bucketID`, `routeCount`, and constituent route metadata correctly. +3. Integration test in `kv/` that drives synthetic traffic through the coordinator and asserts the matrix reflects the skew. +4. gRPC handler tests with a fake engine and fake Raft status reader. +5. Fan-out test: admin binary against a 3-node fake cluster, including follower-local reads, one unreachable node, and a leadership transfer in the middle of a step window; the merged response must sum non-duplicate samples, preserve the partial-status array, and flag ambiguous overlap. +6. Persistence test: write compacted columns to per-range groups, perform split and merge transitions, restart a node, take a leadership transfer, run KeyViz GC, and verify the lineage reader reconstructs complete history across groups without relying on stable `RouteID`s`. +7. Namespace isolation test: user `ScanAt`, `ReverseScanAt`, and `maxLatestCommitTS` must ignore `!admin|keyviz|*` records, and user-plane `Put` / `Delete` / transactional writes to any `!admin|*` key must be rejected with `InvalidArgument` by every adapter (gRPC `RawKV`/`TransactionalKV`, Redis, DynamoDB, S3). +8. Auth test: `Admin` gRPC methods reject missing or wrong tokens and accept the configured read-only token. +9. Benchmark gate: `BenchmarkCoordinatorDispatch` with sampler off vs on. CI fails if the difference exceeds the benchmark's own run-to-run variance. +10. Playwright smoke test against the embedded SPA to catch build-time regressions. + +## 12. Phased Delivery + +| Phase | Scope | Exit criteria | +|---|---|---| +| 0 | `cmd/elastickv-admin` skeleton, token-protected `Admin` gRPC service stub, empty SPA shell, CI wiring. | Binary builds, `/api/cluster/overview` returns live data from a real node only when the configured admin token is supplied. | +| 1 | Overview, Routes, Raft Groups, Adapters pages. `LiveSummary` added. No sampler. | All read-only pages match `grpcurl` ground truth. | +| 2 | Key Visualizer MVP: in-memory sampler with adaptive sub-sampling, leader writes, leader/follower reads, fan-out across nodes, static matrix API with virtual-bucket metadata. | Benchmark gate green; heatmap shows synthetic hotspot within 2 s of load; ±5% / 95%-CI accuracy SLO holds under synthetic bursts; fan-out returns complete view with 1 node down. | +| 3 | Bytes series, drill-down, split/merge continuity, namespace-isolated persistence of compacted columns distributed **per owning Raft group**, lineage recovery, and retention GC. | Heatmap remains continuous across a live `SplitRange`; restart preserves last 7 days; expired data and stale lineage records are collected; no single Raft group sees more than its share of KeyViz writes. | +| 4 (deferred) | Mutating admin operations (`SplitRange` from UI), browser login, RBAC, and identity-provider integration. Out of scope for this design; a follow-up design will cover it. | — | + +Phases 0–2 are the minimum operationally useful product; Phase 3 is the "ship-quality" target. + +## 13. Open Questions + +1. Default value of `--keyvizMaxTrackedRoutes`. 10 000 is conservative; operators with very large clusters may prefer a higher default paired with shorter retention. Settle during Phase 2 benchmarking. +2. For the Phase 3 persistence schema, should KeyViz writes share a transaction with other per-group low-priority maintenance (compaction metadata, etc.) to amortise Raft cost, or remain a dedicated batch for easier rollback? diff --git a/proto/Makefile b/proto/Makefile index c329a70b..8f811e88 100644 --- a/proto/Makefile +++ b/proto/Makefile @@ -30,6 +30,9 @@ gen: check-tools protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ distribution.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + admin.proto protoc --go_out=. --go_opt=paths=source_relative \ dynamodb_internal.proto protoc --go_out=. --go_opt=paths=source_relative \ diff --git a/proto/admin.pb.go b/proto/admin.pb.go new file mode 100644 index 00000000..5845642e --- /dev/null +++ b/proto/admin.pb.go @@ -0,0 +1,1529 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v7.34.0 +// source: admin.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type KeyVizSeries int32 + +const ( + KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED KeyVizSeries = 0 + KeyVizSeries_KEYVIZ_SERIES_READS KeyVizSeries = 1 + KeyVizSeries_KEYVIZ_SERIES_WRITES KeyVizSeries = 2 + KeyVizSeries_KEYVIZ_SERIES_READ_BYTES KeyVizSeries = 3 + KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES KeyVizSeries = 4 +) + +// Enum value maps for KeyVizSeries. +var ( + KeyVizSeries_name = map[int32]string{ + 0: "KEYVIZ_SERIES_UNSPECIFIED", + 1: "KEYVIZ_SERIES_READS", + 2: "KEYVIZ_SERIES_WRITES", + 3: "KEYVIZ_SERIES_READ_BYTES", + 4: "KEYVIZ_SERIES_WRITE_BYTES", + } + KeyVizSeries_value = map[string]int32{ + "KEYVIZ_SERIES_UNSPECIFIED": 0, + "KEYVIZ_SERIES_READS": 1, + "KEYVIZ_SERIES_WRITES": 2, + "KEYVIZ_SERIES_READ_BYTES": 3, + "KEYVIZ_SERIES_WRITE_BYTES": 4, + } +) + +func (x KeyVizSeries) Enum() *KeyVizSeries { + p := new(KeyVizSeries) + *p = x + return p +} + +func (x KeyVizSeries) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KeyVizSeries) Descriptor() protoreflect.EnumDescriptor { + return file_admin_proto_enumTypes[0].Descriptor() +} + +func (KeyVizSeries) Type() protoreflect.EnumType { + return &file_admin_proto_enumTypes[0] +} + +func (x KeyVizSeries) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KeyVizSeries.Descriptor instead. +func (KeyVizSeries) EnumDescriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{0} +} + +type SampleRole int32 + +const ( + SampleRole_SAMPLE_ROLE_UNSPECIFIED SampleRole = 0 + SampleRole_SAMPLE_ROLE_LEADER_WRITE SampleRole = 1 + SampleRole_SAMPLE_ROLE_LEADER_READ SampleRole = 2 + SampleRole_SAMPLE_ROLE_FOLLOWER_READ SampleRole = 3 +) + +// Enum value maps for SampleRole. +var ( + SampleRole_name = map[int32]string{ + 0: "SAMPLE_ROLE_UNSPECIFIED", + 1: "SAMPLE_ROLE_LEADER_WRITE", + 2: "SAMPLE_ROLE_LEADER_READ", + 3: "SAMPLE_ROLE_FOLLOWER_READ", + } + SampleRole_value = map[string]int32{ + "SAMPLE_ROLE_UNSPECIFIED": 0, + "SAMPLE_ROLE_LEADER_WRITE": 1, + "SAMPLE_ROLE_LEADER_READ": 2, + "SAMPLE_ROLE_FOLLOWER_READ": 3, + } +) + +func (x SampleRole) Enum() *SampleRole { + p := new(SampleRole) + *p = x + return p +} + +func (x SampleRole) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SampleRole) Descriptor() protoreflect.EnumDescriptor { + return file_admin_proto_enumTypes[1].Descriptor() +} + +func (SampleRole) Type() protoreflect.EnumType { + return &file_admin_proto_enumTypes[1] +} + +func (x SampleRole) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SampleRole.Descriptor instead. +func (SampleRole) EnumDescriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{1} +} + +type NodeIdentity struct { + state protoimpl.MessageState `protogen:"open.v1"` + NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + GrpcAddress string `protobuf:"bytes,2,opt,name=grpc_address,json=grpcAddress,proto3" json:"grpc_address,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NodeIdentity) Reset() { + *x = NodeIdentity{} + mi := &file_admin_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NodeIdentity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeIdentity) ProtoMessage() {} + +func (x *NodeIdentity) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeIdentity.ProtoReflect.Descriptor instead. +func (*NodeIdentity) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{0} +} + +func (x *NodeIdentity) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + +func (x *NodeIdentity) GetGrpcAddress() string { + if x != nil { + return x.GrpcAddress + } + return "" +} + +type GroupLeader struct { + state protoimpl.MessageState `protogen:"open.v1"` + RaftGroupId uint64 `protobuf:"varint,1,opt,name=raft_group_id,json=raftGroupId,proto3" json:"raft_group_id,omitempty"` + LeaderNodeId string `protobuf:"bytes,2,opt,name=leader_node_id,json=leaderNodeId,proto3" json:"leader_node_id,omitempty"` + LeaderTerm uint64 `protobuf:"varint,3,opt,name=leader_term,json=leaderTerm,proto3" json:"leader_term,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GroupLeader) Reset() { + *x = GroupLeader{} + mi := &file_admin_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GroupLeader) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GroupLeader) ProtoMessage() {} + +func (x *GroupLeader) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GroupLeader.ProtoReflect.Descriptor instead. +func (*GroupLeader) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{1} +} + +func (x *GroupLeader) GetRaftGroupId() uint64 { + if x != nil { + return x.RaftGroupId + } + return 0 +} + +func (x *GroupLeader) GetLeaderNodeId() string { + if x != nil { + return x.LeaderNodeId + } + return "" +} + +func (x *GroupLeader) GetLeaderTerm() uint64 { + if x != nil { + return x.LeaderTerm + } + return 0 +} + +type GetClusterOverviewRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetClusterOverviewRequest) Reset() { + *x = GetClusterOverviewRequest{} + mi := &file_admin_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetClusterOverviewRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetClusterOverviewRequest) ProtoMessage() {} + +func (x *GetClusterOverviewRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetClusterOverviewRequest.ProtoReflect.Descriptor instead. +func (*GetClusterOverviewRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{2} +} + +type GetClusterOverviewResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Self *NodeIdentity `protobuf:"bytes,1,opt,name=self,proto3" json:"self,omitempty"` + Members []*NodeIdentity `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"` + GroupLeaders []*GroupLeader `protobuf:"bytes,3,rep,name=group_leaders,json=groupLeaders,proto3" json:"group_leaders,omitempty"` + AggregateQps uint64 `protobuf:"varint,4,opt,name=aggregate_qps,json=aggregateQps,proto3" json:"aggregate_qps,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetClusterOverviewResponse) Reset() { + *x = GetClusterOverviewResponse{} + mi := &file_admin_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetClusterOverviewResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetClusterOverviewResponse) ProtoMessage() {} + +func (x *GetClusterOverviewResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetClusterOverviewResponse.ProtoReflect.Descriptor instead. +func (*GetClusterOverviewResponse) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{3} +} + +func (x *GetClusterOverviewResponse) GetSelf() *NodeIdentity { + if x != nil { + return x.Self + } + return nil +} + +func (x *GetClusterOverviewResponse) GetMembers() []*NodeIdentity { + if x != nil { + return x.Members + } + return nil +} + +func (x *GetClusterOverviewResponse) GetGroupLeaders() []*GroupLeader { + if x != nil { + return x.GroupLeaders + } + return nil +} + +func (x *GetClusterOverviewResponse) GetAggregateQps() uint64 { + if x != nil { + return x.AggregateQps + } + return 0 +} + +type RaftGroupState struct { + state protoimpl.MessageState `protogen:"open.v1"` + RaftGroupId uint64 `protobuf:"varint,1,opt,name=raft_group_id,json=raftGroupId,proto3" json:"raft_group_id,omitempty"` + LeaderNodeId string `protobuf:"bytes,2,opt,name=leader_node_id,json=leaderNodeId,proto3" json:"leader_node_id,omitempty"` + LeaderTerm uint64 `protobuf:"varint,3,opt,name=leader_term,json=leaderTerm,proto3" json:"leader_term,omitempty"` + CommitIndex uint64 `protobuf:"varint,4,opt,name=commit_index,json=commitIndex,proto3" json:"commit_index,omitempty"` + AppliedIndex uint64 `protobuf:"varint,5,opt,name=applied_index,json=appliedIndex,proto3" json:"applied_index,omitempty"` + LastContactUnixMs int64 `protobuf:"varint,6,opt,name=last_contact_unix_ms,json=lastContactUnixMs,proto3" json:"last_contact_unix_ms,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RaftGroupState) Reset() { + *x = RaftGroupState{} + mi := &file_admin_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RaftGroupState) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RaftGroupState) ProtoMessage() {} + +func (x *RaftGroupState) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RaftGroupState.ProtoReflect.Descriptor instead. +func (*RaftGroupState) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{4} +} + +func (x *RaftGroupState) GetRaftGroupId() uint64 { + if x != nil { + return x.RaftGroupId + } + return 0 +} + +func (x *RaftGroupState) GetLeaderNodeId() string { + if x != nil { + return x.LeaderNodeId + } + return "" +} + +func (x *RaftGroupState) GetLeaderTerm() uint64 { + if x != nil { + return x.LeaderTerm + } + return 0 +} + +func (x *RaftGroupState) GetCommitIndex() uint64 { + if x != nil { + return x.CommitIndex + } + return 0 +} + +func (x *RaftGroupState) GetAppliedIndex() uint64 { + if x != nil { + return x.AppliedIndex + } + return 0 +} + +func (x *RaftGroupState) GetLastContactUnixMs() int64 { + if x != nil { + return x.LastContactUnixMs + } + return 0 +} + +type GetRaftGroupsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRaftGroupsRequest) Reset() { + *x = GetRaftGroupsRequest{} + mi := &file_admin_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRaftGroupsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRaftGroupsRequest) ProtoMessage() {} + +func (x *GetRaftGroupsRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRaftGroupsRequest.ProtoReflect.Descriptor instead. +func (*GetRaftGroupsRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{5} +} + +type GetRaftGroupsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Groups []*RaftGroupState `protobuf:"bytes,1,rep,name=groups,proto3" json:"groups,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRaftGroupsResponse) Reset() { + *x = GetRaftGroupsResponse{} + mi := &file_admin_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRaftGroupsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRaftGroupsResponse) ProtoMessage() {} + +func (x *GetRaftGroupsResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRaftGroupsResponse.ProtoReflect.Descriptor instead. +func (*GetRaftGroupsResponse) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{6} +} + +func (x *GetRaftGroupsResponse) GetGroups() []*RaftGroupState { + if x != nil { + return x.Groups + } + return nil +} + +type AdapterSummary struct { + state protoimpl.MessageState `protogen:"open.v1"` + Adapter string `protobuf:"bytes,1,opt,name=adapter,proto3" json:"adapter,omitempty"` + Operation string `protobuf:"bytes,2,opt,name=operation,proto3" json:"operation,omitempty"` + Requests uint64 `protobuf:"varint,3,opt,name=requests,proto3" json:"requests,omitempty"` + InFlight uint64 `protobuf:"varint,4,opt,name=in_flight,json=inFlight,proto3" json:"in_flight,omitempty"` + BytesIn uint64 `protobuf:"varint,5,opt,name=bytes_in,json=bytesIn,proto3" json:"bytes_in,omitempty"` + BytesOut uint64 `protobuf:"varint,6,opt,name=bytes_out,json=bytesOut,proto3" json:"bytes_out,omitempty"` + P50Ns float64 `protobuf:"fixed64,7,opt,name=p50_ns,json=p50Ns,proto3" json:"p50_ns,omitempty"` + P95Ns float64 `protobuf:"fixed64,8,opt,name=p95_ns,json=p95Ns,proto3" json:"p95_ns,omitempty"` + P99Ns float64 `protobuf:"fixed64,9,opt,name=p99_ns,json=p99Ns,proto3" json:"p99_ns,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AdapterSummary) Reset() { + *x = AdapterSummary{} + mi := &file_admin_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AdapterSummary) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AdapterSummary) ProtoMessage() {} + +func (x *AdapterSummary) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AdapterSummary.ProtoReflect.Descriptor instead. +func (*AdapterSummary) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{7} +} + +func (x *AdapterSummary) GetAdapter() string { + if x != nil { + return x.Adapter + } + return "" +} + +func (x *AdapterSummary) GetOperation() string { + if x != nil { + return x.Operation + } + return "" +} + +func (x *AdapterSummary) GetRequests() uint64 { + if x != nil { + return x.Requests + } + return 0 +} + +func (x *AdapterSummary) GetInFlight() uint64 { + if x != nil { + return x.InFlight + } + return 0 +} + +func (x *AdapterSummary) GetBytesIn() uint64 { + if x != nil { + return x.BytesIn + } + return 0 +} + +func (x *AdapterSummary) GetBytesOut() uint64 { + if x != nil { + return x.BytesOut + } + return 0 +} + +func (x *AdapterSummary) GetP50Ns() float64 { + if x != nil { + return x.P50Ns + } + return 0 +} + +func (x *AdapterSummary) GetP95Ns() float64 { + if x != nil { + return x.P95Ns + } + return 0 +} + +func (x *AdapterSummary) GetP99Ns() float64 { + if x != nil { + return x.P99Ns + } + return 0 +} + +type GetAdapterSummaryRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetAdapterSummaryRequest) Reset() { + *x = GetAdapterSummaryRequest{} + mi := &file_admin_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetAdapterSummaryRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetAdapterSummaryRequest) ProtoMessage() {} + +func (x *GetAdapterSummaryRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetAdapterSummaryRequest.ProtoReflect.Descriptor instead. +func (*GetAdapterSummaryRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{8} +} + +type GetAdapterSummaryResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Summaries []*AdapterSummary `protobuf:"bytes,1,rep,name=summaries,proto3" json:"summaries,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetAdapterSummaryResponse) Reset() { + *x = GetAdapterSummaryResponse{} + mi := &file_admin_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetAdapterSummaryResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetAdapterSummaryResponse) ProtoMessage() {} + +func (x *GetAdapterSummaryResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetAdapterSummaryResponse.ProtoReflect.Descriptor instead. +func (*GetAdapterSummaryResponse) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{9} +} + +func (x *GetAdapterSummaryResponse) GetSummaries() []*AdapterSummary { + if x != nil { + return x.Summaries + } + return nil +} + +type KeyVizRow struct { + state protoimpl.MessageState `protogen:"open.v1"` + // bucket_id is either "route:" or "virtual:". + BucketId string `protobuf:"bytes,1,opt,name=bucket_id,json=bucketId,proto3" json:"bucket_id,omitempty"` + Start []byte `protobuf:"bytes,2,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,3,opt,name=end,proto3" json:"end,omitempty"` + Label string `protobuf:"bytes,4,opt,name=label,proto3" json:"label,omitempty"` + Aggregate bool `protobuf:"varint,5,opt,name=aggregate,proto3" json:"aggregate,omitempty"` + RouteIds []uint64 `protobuf:"varint,6,rep,packed,name=route_ids,json=routeIds,proto3" json:"route_ids,omitempty"` + RouteIdsTruncated bool `protobuf:"varint,7,opt,name=route_ids_truncated,json=routeIdsTruncated,proto3" json:"route_ids_truncated,omitempty"` + RouteCount uint64 `protobuf:"varint,8,opt,name=route_count,json=routeCount,proto3" json:"route_count,omitempty"` + SampleRoles []SampleRole `protobuf:"varint,9,rep,packed,name=sample_roles,json=sampleRoles,proto3,enum=SampleRole" json:"sample_roles,omitempty"` + LineageId string `protobuf:"bytes,10,opt,name=lineage_id,json=lineageId,proto3" json:"lineage_id,omitempty"` + // values[j] is the series value at time column j. + Values []uint64 `protobuf:"varint,11,rep,packed,name=values,proto3" json:"values,omitempty"` + // soft_columns[j] is true when the j-th column missed the estimator SLO. + SoftColumns []bool `protobuf:"varint,12,rep,packed,name=soft_columns,json=softColumns,proto3" json:"soft_columns,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KeyVizRow) Reset() { + *x = KeyVizRow{} + mi := &file_admin_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KeyVizRow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KeyVizRow) ProtoMessage() {} + +func (x *KeyVizRow) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KeyVizRow.ProtoReflect.Descriptor instead. +func (*KeyVizRow) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{10} +} + +func (x *KeyVizRow) GetBucketId() string { + if x != nil { + return x.BucketId + } + return "" +} + +func (x *KeyVizRow) GetStart() []byte { + if x != nil { + return x.Start + } + return nil +} + +func (x *KeyVizRow) GetEnd() []byte { + if x != nil { + return x.End + } + return nil +} + +func (x *KeyVizRow) GetLabel() string { + if x != nil { + return x.Label + } + return "" +} + +func (x *KeyVizRow) GetAggregate() bool { + if x != nil { + return x.Aggregate + } + return false +} + +func (x *KeyVizRow) GetRouteIds() []uint64 { + if x != nil { + return x.RouteIds + } + return nil +} + +func (x *KeyVizRow) GetRouteIdsTruncated() bool { + if x != nil { + return x.RouteIdsTruncated + } + return false +} + +func (x *KeyVizRow) GetRouteCount() uint64 { + if x != nil { + return x.RouteCount + } + return 0 +} + +func (x *KeyVizRow) GetSampleRoles() []SampleRole { + if x != nil { + return x.SampleRoles + } + return nil +} + +func (x *KeyVizRow) GetLineageId() string { + if x != nil { + return x.LineageId + } + return "" +} + +func (x *KeyVizRow) GetValues() []uint64 { + if x != nil { + return x.Values + } + return nil +} + +func (x *KeyVizRow) GetSoftColumns() []bool { + if x != nil { + return x.SoftColumns + } + return nil +} + +type GetKeyVizMatrixRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Series KeyVizSeries `protobuf:"varint,1,opt,name=series,proto3,enum=KeyVizSeries" json:"series,omitempty"` + FromUnixMs int64 `protobuf:"varint,2,opt,name=from_unix_ms,json=fromUnixMs,proto3" json:"from_unix_ms,omitempty"` + ToUnixMs int64 `protobuf:"varint,3,opt,name=to_unix_ms,json=toUnixMs,proto3" json:"to_unix_ms,omitempty"` + Rows uint32 `protobuf:"varint,4,opt,name=rows,proto3" json:"rows,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetKeyVizMatrixRequest) Reset() { + *x = GetKeyVizMatrixRequest{} + mi := &file_admin_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetKeyVizMatrixRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetKeyVizMatrixRequest) ProtoMessage() {} + +func (x *GetKeyVizMatrixRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetKeyVizMatrixRequest.ProtoReflect.Descriptor instead. +func (*GetKeyVizMatrixRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{11} +} + +func (x *GetKeyVizMatrixRequest) GetSeries() KeyVizSeries { + if x != nil { + return x.Series + } + return KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED +} + +func (x *GetKeyVizMatrixRequest) GetFromUnixMs() int64 { + if x != nil { + return x.FromUnixMs + } + return 0 +} + +func (x *GetKeyVizMatrixRequest) GetToUnixMs() int64 { + if x != nil { + return x.ToUnixMs + } + return 0 +} + +func (x *GetKeyVizMatrixRequest) GetRows() uint32 { + if x != nil { + return x.Rows + } + return 0 +} + +type GetKeyVizMatrixResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + ColumnUnixMs []int64 `protobuf:"varint,1,rep,packed,name=column_unix_ms,json=columnUnixMs,proto3" json:"column_unix_ms,omitempty"` + Rows []*KeyVizRow `protobuf:"bytes,2,rep,name=rows,proto3" json:"rows,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetKeyVizMatrixResponse) Reset() { + *x = GetKeyVizMatrixResponse{} + mi := &file_admin_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetKeyVizMatrixResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetKeyVizMatrixResponse) ProtoMessage() {} + +func (x *GetKeyVizMatrixResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetKeyVizMatrixResponse.ProtoReflect.Descriptor instead. +func (*GetKeyVizMatrixResponse) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{12} +} + +func (x *GetKeyVizMatrixResponse) GetColumnUnixMs() []int64 { + if x != nil { + return x.ColumnUnixMs + } + return nil +} + +func (x *GetKeyVizMatrixResponse) GetRows() []*KeyVizRow { + if x != nil { + return x.Rows + } + return nil +} + +type GetRouteDetailRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Either a concrete route: or a virtual: emitted in a previous + // GetKeyVizMatrix response. + BucketId string `protobuf:"bytes,1,opt,name=bucket_id,json=bucketId,proto3" json:"bucket_id,omitempty"` + FromUnixMs int64 `protobuf:"varint,2,opt,name=from_unix_ms,json=fromUnixMs,proto3" json:"from_unix_ms,omitempty"` + ToUnixMs int64 `protobuf:"varint,3,opt,name=to_unix_ms,json=toUnixMs,proto3" json:"to_unix_ms,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRouteDetailRequest) Reset() { + *x = GetRouteDetailRequest{} + mi := &file_admin_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRouteDetailRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteDetailRequest) ProtoMessage() {} + +func (x *GetRouteDetailRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteDetailRequest.ProtoReflect.Descriptor instead. +func (*GetRouteDetailRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{13} +} + +func (x *GetRouteDetailRequest) GetBucketId() string { + if x != nil { + return x.BucketId + } + return "" +} + +func (x *GetRouteDetailRequest) GetFromUnixMs() int64 { + if x != nil { + return x.FromUnixMs + } + return 0 +} + +func (x *GetRouteDetailRequest) GetToUnixMs() int64 { + if x != nil { + return x.ToUnixMs + } + return 0 +} + +type GetRouteDetailResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Row *KeyVizRow `protobuf:"bytes,1,opt,name=row,proto3" json:"row,omitempty"` + PerAdapter []*AdapterSummary `protobuf:"bytes,2,rep,name=per_adapter,json=perAdapter,proto3" json:"per_adapter,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRouteDetailResponse) Reset() { + *x = GetRouteDetailResponse{} + mi := &file_admin_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRouteDetailResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteDetailResponse) ProtoMessage() {} + +func (x *GetRouteDetailResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteDetailResponse.ProtoReflect.Descriptor instead. +func (*GetRouteDetailResponse) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{14} +} + +func (x *GetRouteDetailResponse) GetRow() *KeyVizRow { + if x != nil { + return x.Row + } + return nil +} + +func (x *GetRouteDetailResponse) GetPerAdapter() []*AdapterSummary { + if x != nil { + return x.PerAdapter + } + return nil +} + +type StreamEventsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamEventsRequest) Reset() { + *x = StreamEventsRequest{} + mi := &file_admin_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamEventsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamEventsRequest) ProtoMessage() {} + +func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. +func (*StreamEventsRequest) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{15} +} + +type StreamEventsEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Event: + // + // *StreamEventsEvent_RouteTransition + // *StreamEventsEvent_KeyvizColumn + Event isStreamEventsEvent_Event `protobuf_oneof:"event"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamEventsEvent) Reset() { + *x = StreamEventsEvent{} + mi := &file_admin_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamEventsEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamEventsEvent) ProtoMessage() {} + +func (x *StreamEventsEvent) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[16] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamEventsEvent.ProtoReflect.Descriptor instead. +func (*StreamEventsEvent) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{16} +} + +func (x *StreamEventsEvent) GetEvent() isStreamEventsEvent_Event { + if x != nil { + return x.Event + } + return nil +} + +func (x *StreamEventsEvent) GetRouteTransition() *RouteTransition { + if x != nil { + if x, ok := x.Event.(*StreamEventsEvent_RouteTransition); ok { + return x.RouteTransition + } + } + return nil +} + +func (x *StreamEventsEvent) GetKeyvizColumn() *KeyVizColumn { + if x != nil { + if x, ok := x.Event.(*StreamEventsEvent_KeyvizColumn); ok { + return x.KeyvizColumn + } + } + return nil +} + +type isStreamEventsEvent_Event interface { + isStreamEventsEvent_Event() +} + +type StreamEventsEvent_RouteTransition struct { + RouteTransition *RouteTransition `protobuf:"bytes,1,opt,name=route_transition,json=routeTransition,proto3,oneof"` +} + +type StreamEventsEvent_KeyvizColumn struct { + KeyvizColumn *KeyVizColumn `protobuf:"bytes,2,opt,name=keyviz_column,json=keyvizColumn,proto3,oneof"` +} + +func (*StreamEventsEvent_RouteTransition) isStreamEventsEvent_Event() {} + +func (*StreamEventsEvent_KeyvizColumn) isStreamEventsEvent_Event() {} + +type RouteTransition struct { + state protoimpl.MessageState `protogen:"open.v1"` + ParentRouteId uint64 `protobuf:"varint,1,opt,name=parent_route_id,json=parentRouteId,proto3" json:"parent_route_id,omitempty"` + ChildRouteIds []uint64 `protobuf:"varint,2,rep,packed,name=child_route_ids,json=childRouteIds,proto3" json:"child_route_ids,omitempty"` + LineageId string `protobuf:"bytes,3,opt,name=lineage_id,json=lineageId,proto3" json:"lineage_id,omitempty"` + UnixMs int64 `protobuf:"varint,4,opt,name=unix_ms,json=unixMs,proto3" json:"unix_ms,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RouteTransition) Reset() { + *x = RouteTransition{} + mi := &file_admin_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RouteTransition) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RouteTransition) ProtoMessage() {} + +func (x *RouteTransition) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RouteTransition.ProtoReflect.Descriptor instead. +func (*RouteTransition) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{17} +} + +func (x *RouteTransition) GetParentRouteId() uint64 { + if x != nil { + return x.ParentRouteId + } + return 0 +} + +func (x *RouteTransition) GetChildRouteIds() []uint64 { + if x != nil { + return x.ChildRouteIds + } + return nil +} + +func (x *RouteTransition) GetLineageId() string { + if x != nil { + return x.LineageId + } + return "" +} + +func (x *RouteTransition) GetUnixMs() int64 { + if x != nil { + return x.UnixMs + } + return 0 +} + +type KeyVizColumn struct { + state protoimpl.MessageState `protogen:"open.v1"` + ColumnUnixMs int64 `protobuf:"varint,1,opt,name=column_unix_ms,json=columnUnixMs,proto3" json:"column_unix_ms,omitempty"` + Series KeyVizSeries `protobuf:"varint,2,opt,name=series,proto3,enum=KeyVizSeries" json:"series,omitempty"` + Rows []*KeyVizRow `protobuf:"bytes,3,rep,name=rows,proto3" json:"rows,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KeyVizColumn) Reset() { + *x = KeyVizColumn{} + mi := &file_admin_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KeyVizColumn) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KeyVizColumn) ProtoMessage() {} + +func (x *KeyVizColumn) ProtoReflect() protoreflect.Message { + mi := &file_admin_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KeyVizColumn.ProtoReflect.Descriptor instead. +func (*KeyVizColumn) Descriptor() ([]byte, []int) { + return file_admin_proto_rawDescGZIP(), []int{18} +} + +func (x *KeyVizColumn) GetColumnUnixMs() int64 { + if x != nil { + return x.ColumnUnixMs + } + return 0 +} + +func (x *KeyVizColumn) GetSeries() KeyVizSeries { + if x != nil { + return x.Series + } + return KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED +} + +func (x *KeyVizColumn) GetRows() []*KeyVizRow { + if x != nil { + return x.Rows + } + return nil +} + +var File_admin_proto protoreflect.FileDescriptor + +const file_admin_proto_rawDesc = "" + + "\n" + + "\vadmin.proto\"J\n" + + "\fNodeIdentity\x12\x17\n" + + "\anode_id\x18\x01 \x01(\tR\x06nodeId\x12!\n" + + "\fgrpc_address\x18\x02 \x01(\tR\vgrpcAddress\"x\n" + + "\vGroupLeader\x12\"\n" + + "\rraft_group_id\x18\x01 \x01(\x04R\vraftGroupId\x12$\n" + + "\x0eleader_node_id\x18\x02 \x01(\tR\fleaderNodeId\x12\x1f\n" + + "\vleader_term\x18\x03 \x01(\x04R\n" + + "leaderTerm\"\x1b\n" + + "\x19GetClusterOverviewRequest\"\xc0\x01\n" + + "\x1aGetClusterOverviewResponse\x12!\n" + + "\x04self\x18\x01 \x01(\v2\r.NodeIdentityR\x04self\x12'\n" + + "\amembers\x18\x02 \x03(\v2\r.NodeIdentityR\amembers\x121\n" + + "\rgroup_leaders\x18\x03 \x03(\v2\f.GroupLeaderR\fgroupLeaders\x12#\n" + + "\raggregate_qps\x18\x04 \x01(\x04R\faggregateQps\"\xf4\x01\n" + + "\x0eRaftGroupState\x12\"\n" + + "\rraft_group_id\x18\x01 \x01(\x04R\vraftGroupId\x12$\n" + + "\x0eleader_node_id\x18\x02 \x01(\tR\fleaderNodeId\x12\x1f\n" + + "\vleader_term\x18\x03 \x01(\x04R\n" + + "leaderTerm\x12!\n" + + "\fcommit_index\x18\x04 \x01(\x04R\vcommitIndex\x12#\n" + + "\rapplied_index\x18\x05 \x01(\x04R\fappliedIndex\x12/\n" + + "\x14last_contact_unix_ms\x18\x06 \x01(\x03R\x11lastContactUnixMs\"\x16\n" + + "\x14GetRaftGroupsRequest\"@\n" + + "\x15GetRaftGroupsResponse\x12'\n" + + "\x06groups\x18\x01 \x03(\v2\x0f.RaftGroupStateR\x06groups\"\xfe\x01\n" + + "\x0eAdapterSummary\x12\x18\n" + + "\aadapter\x18\x01 \x01(\tR\aadapter\x12\x1c\n" + + "\toperation\x18\x02 \x01(\tR\toperation\x12\x1a\n" + + "\brequests\x18\x03 \x01(\x04R\brequests\x12\x1b\n" + + "\tin_flight\x18\x04 \x01(\x04R\binFlight\x12\x19\n" + + "\bbytes_in\x18\x05 \x01(\x04R\abytesIn\x12\x1b\n" + + "\tbytes_out\x18\x06 \x01(\x04R\bbytesOut\x12\x15\n" + + "\x06p50_ns\x18\a \x01(\x01R\x05p50Ns\x12\x15\n" + + "\x06p95_ns\x18\b \x01(\x01R\x05p95Ns\x12\x15\n" + + "\x06p99_ns\x18\t \x01(\x01R\x05p99Ns\"\x1a\n" + + "\x18GetAdapterSummaryRequest\"J\n" + + "\x19GetAdapterSummaryResponse\x12-\n" + + "\tsummaries\x18\x01 \x03(\v2\x0f.AdapterSummaryR\tsummaries\"\xfc\x02\n" + + "\tKeyVizRow\x12\x1b\n" + + "\tbucket_id\x18\x01 \x01(\tR\bbucketId\x12\x14\n" + + "\x05start\x18\x02 \x01(\fR\x05start\x12\x10\n" + + "\x03end\x18\x03 \x01(\fR\x03end\x12\x14\n" + + "\x05label\x18\x04 \x01(\tR\x05label\x12\x1c\n" + + "\taggregate\x18\x05 \x01(\bR\taggregate\x12\x1b\n" + + "\troute_ids\x18\x06 \x03(\x04R\brouteIds\x12.\n" + + "\x13route_ids_truncated\x18\a \x01(\bR\x11routeIdsTruncated\x12\x1f\n" + + "\vroute_count\x18\b \x01(\x04R\n" + + "routeCount\x12.\n" + + "\fsample_roles\x18\t \x03(\x0e2\v.SampleRoleR\vsampleRoles\x12\x1d\n" + + "\n" + + "lineage_id\x18\n" + + " \x01(\tR\tlineageId\x12\x16\n" + + "\x06values\x18\v \x03(\x04R\x06values\x12!\n" + + "\fsoft_columns\x18\f \x03(\bR\vsoftColumns\"\x93\x01\n" + + "\x16GetKeyVizMatrixRequest\x12%\n" + + "\x06series\x18\x01 \x01(\x0e2\r.KeyVizSeriesR\x06series\x12 \n" + + "\ffrom_unix_ms\x18\x02 \x01(\x03R\n" + + "fromUnixMs\x12\x1c\n" + + "\n" + + "to_unix_ms\x18\x03 \x01(\x03R\btoUnixMs\x12\x12\n" + + "\x04rows\x18\x04 \x01(\rR\x04rows\"_\n" + + "\x17GetKeyVizMatrixResponse\x12$\n" + + "\x0ecolumn_unix_ms\x18\x01 \x03(\x03R\fcolumnUnixMs\x12\x1e\n" + + "\x04rows\x18\x02 \x03(\v2\n" + + ".KeyVizRowR\x04rows\"t\n" + + "\x15GetRouteDetailRequest\x12\x1b\n" + + "\tbucket_id\x18\x01 \x01(\tR\bbucketId\x12 \n" + + "\ffrom_unix_ms\x18\x02 \x01(\x03R\n" + + "fromUnixMs\x12\x1c\n" + + "\n" + + "to_unix_ms\x18\x03 \x01(\x03R\btoUnixMs\"h\n" + + "\x16GetRouteDetailResponse\x12\x1c\n" + + "\x03row\x18\x01 \x01(\v2\n" + + ".KeyVizRowR\x03row\x120\n" + + "\vper_adapter\x18\x02 \x03(\v2\x0f.AdapterSummaryR\n" + + "perAdapter\"\x15\n" + + "\x13StreamEventsRequest\"\x91\x01\n" + + "\x11StreamEventsEvent\x12=\n" + + "\x10route_transition\x18\x01 \x01(\v2\x10.RouteTransitionH\x00R\x0frouteTransition\x124\n" + + "\rkeyviz_column\x18\x02 \x01(\v2\r.KeyVizColumnH\x00R\fkeyvizColumnB\a\n" + + "\x05event\"\x99\x01\n" + + "\x0fRouteTransition\x12&\n" + + "\x0fparent_route_id\x18\x01 \x01(\x04R\rparentRouteId\x12&\n" + + "\x0fchild_route_ids\x18\x02 \x03(\x04R\rchildRouteIds\x12\x1d\n" + + "\n" + + "lineage_id\x18\x03 \x01(\tR\tlineageId\x12\x17\n" + + "\aunix_ms\x18\x04 \x01(\x03R\x06unixMs\"{\n" + + "\fKeyVizColumn\x12$\n" + + "\x0ecolumn_unix_ms\x18\x01 \x01(\x03R\fcolumnUnixMs\x12%\n" + + "\x06series\x18\x02 \x01(\x0e2\r.KeyVizSeriesR\x06series\x12\x1e\n" + + "\x04rows\x18\x03 \x03(\v2\n" + + ".KeyVizRowR\x04rows*\x9d\x01\n" + + "\fKeyVizSeries\x12\x1d\n" + + "\x19KEYVIZ_SERIES_UNSPECIFIED\x10\x00\x12\x17\n" + + "\x13KEYVIZ_SERIES_READS\x10\x01\x12\x18\n" + + "\x14KEYVIZ_SERIES_WRITES\x10\x02\x12\x1c\n" + + "\x18KEYVIZ_SERIES_READ_BYTES\x10\x03\x12\x1d\n" + + "\x19KEYVIZ_SERIES_WRITE_BYTES\x10\x04*\x83\x01\n" + + "\n" + + "SampleRole\x12\x1b\n" + + "\x17SAMPLE_ROLE_UNSPECIFIED\x10\x00\x12\x1c\n" + + "\x18SAMPLE_ROLE_LEADER_WRITE\x10\x01\x12\x1b\n" + + "\x17SAMPLE_ROLE_LEADER_READ\x10\x02\x12\x1d\n" + + "\x19SAMPLE_ROLE_FOLLOWER_READ\x10\x032\xb3\x03\n" + + "\x05Admin\x12O\n" + + "\x12GetClusterOverview\x12\x1a.GetClusterOverviewRequest\x1a\x1b.GetClusterOverviewResponse\"\x00\x12@\n" + + "\rGetRaftGroups\x12\x15.GetRaftGroupsRequest\x1a\x16.GetRaftGroupsResponse\"\x00\x12L\n" + + "\x11GetAdapterSummary\x12\x19.GetAdapterSummaryRequest\x1a\x1a.GetAdapterSummaryResponse\"\x00\x12F\n" + + "\x0fGetKeyVizMatrix\x12\x17.GetKeyVizMatrixRequest\x1a\x18.GetKeyVizMatrixResponse\"\x00\x12C\n" + + "\x0eGetRouteDetail\x12\x16.GetRouteDetailRequest\x1a\x17.GetRouteDetailResponse\"\x00\x12<\n" + + "\fStreamEvents\x12\x14.StreamEventsRequest\x1a\x12.StreamEventsEvent\"\x000\x01B#Z!github.com/bootjp/elastickv/protob\x06proto3" + +var ( + file_admin_proto_rawDescOnce sync.Once + file_admin_proto_rawDescData []byte +) + +func file_admin_proto_rawDescGZIP() []byte { + file_admin_proto_rawDescOnce.Do(func() { + file_admin_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_admin_proto_rawDesc), len(file_admin_proto_rawDesc))) + }) + return file_admin_proto_rawDescData +} + +var file_admin_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_admin_proto_msgTypes = make([]protoimpl.MessageInfo, 19) +var file_admin_proto_goTypes = []any{ + (KeyVizSeries)(0), // 0: KeyVizSeries + (SampleRole)(0), // 1: SampleRole + (*NodeIdentity)(nil), // 2: NodeIdentity + (*GroupLeader)(nil), // 3: GroupLeader + (*GetClusterOverviewRequest)(nil), // 4: GetClusterOverviewRequest + (*GetClusterOverviewResponse)(nil), // 5: GetClusterOverviewResponse + (*RaftGroupState)(nil), // 6: RaftGroupState + (*GetRaftGroupsRequest)(nil), // 7: GetRaftGroupsRequest + (*GetRaftGroupsResponse)(nil), // 8: GetRaftGroupsResponse + (*AdapterSummary)(nil), // 9: AdapterSummary + (*GetAdapterSummaryRequest)(nil), // 10: GetAdapterSummaryRequest + (*GetAdapterSummaryResponse)(nil), // 11: GetAdapterSummaryResponse + (*KeyVizRow)(nil), // 12: KeyVizRow + (*GetKeyVizMatrixRequest)(nil), // 13: GetKeyVizMatrixRequest + (*GetKeyVizMatrixResponse)(nil), // 14: GetKeyVizMatrixResponse + (*GetRouteDetailRequest)(nil), // 15: GetRouteDetailRequest + (*GetRouteDetailResponse)(nil), // 16: GetRouteDetailResponse + (*StreamEventsRequest)(nil), // 17: StreamEventsRequest + (*StreamEventsEvent)(nil), // 18: StreamEventsEvent + (*RouteTransition)(nil), // 19: RouteTransition + (*KeyVizColumn)(nil), // 20: KeyVizColumn +} +var file_admin_proto_depIdxs = []int32{ + 2, // 0: GetClusterOverviewResponse.self:type_name -> NodeIdentity + 2, // 1: GetClusterOverviewResponse.members:type_name -> NodeIdentity + 3, // 2: GetClusterOverviewResponse.group_leaders:type_name -> GroupLeader + 6, // 3: GetRaftGroupsResponse.groups:type_name -> RaftGroupState + 9, // 4: GetAdapterSummaryResponse.summaries:type_name -> AdapterSummary + 1, // 5: KeyVizRow.sample_roles:type_name -> SampleRole + 0, // 6: GetKeyVizMatrixRequest.series:type_name -> KeyVizSeries + 12, // 7: GetKeyVizMatrixResponse.rows:type_name -> KeyVizRow + 12, // 8: GetRouteDetailResponse.row:type_name -> KeyVizRow + 9, // 9: GetRouteDetailResponse.per_adapter:type_name -> AdapterSummary + 19, // 10: StreamEventsEvent.route_transition:type_name -> RouteTransition + 20, // 11: StreamEventsEvent.keyviz_column:type_name -> KeyVizColumn + 0, // 12: KeyVizColumn.series:type_name -> KeyVizSeries + 12, // 13: KeyVizColumn.rows:type_name -> KeyVizRow + 4, // 14: Admin.GetClusterOverview:input_type -> GetClusterOverviewRequest + 7, // 15: Admin.GetRaftGroups:input_type -> GetRaftGroupsRequest + 10, // 16: Admin.GetAdapterSummary:input_type -> GetAdapterSummaryRequest + 13, // 17: Admin.GetKeyVizMatrix:input_type -> GetKeyVizMatrixRequest + 15, // 18: Admin.GetRouteDetail:input_type -> GetRouteDetailRequest + 17, // 19: Admin.StreamEvents:input_type -> StreamEventsRequest + 5, // 20: Admin.GetClusterOverview:output_type -> GetClusterOverviewResponse + 8, // 21: Admin.GetRaftGroups:output_type -> GetRaftGroupsResponse + 11, // 22: Admin.GetAdapterSummary:output_type -> GetAdapterSummaryResponse + 14, // 23: Admin.GetKeyVizMatrix:output_type -> GetKeyVizMatrixResponse + 16, // 24: Admin.GetRouteDetail:output_type -> GetRouteDetailResponse + 18, // 25: Admin.StreamEvents:output_type -> StreamEventsEvent + 20, // [20:26] is the sub-list for method output_type + 14, // [14:20] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name +} + +func init() { file_admin_proto_init() } +func file_admin_proto_init() { + if File_admin_proto != nil { + return + } + file_admin_proto_msgTypes[16].OneofWrappers = []any{ + (*StreamEventsEvent_RouteTransition)(nil), + (*StreamEventsEvent_KeyvizColumn)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_admin_proto_rawDesc), len(file_admin_proto_rawDesc)), + NumEnums: 2, + NumMessages: 19, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_admin_proto_goTypes, + DependencyIndexes: file_admin_proto_depIdxs, + EnumInfos: file_admin_proto_enumTypes, + MessageInfos: file_admin_proto_msgTypes, + }.Build() + File_admin_proto = out.File + file_admin_proto_goTypes = nil + file_admin_proto_depIdxs = nil +} diff --git a/proto/admin.proto b/proto/admin.proto new file mode 100644 index 00000000..3b476166 --- /dev/null +++ b/proto/admin.proto @@ -0,0 +1,149 @@ +syntax = "proto3"; + +option go_package = "github.com/bootjp/elastickv/proto"; + +// Admin is the node-side read-only admin gRPC service consumed by +// cmd/elastickv-admin. Every method requires "authorization: Bearer " +// metadata unless the node was started with --adminInsecureNoAuth. +// See docs/admin_ui_key_visualizer_design.md §4 (Layer A). +service Admin { + rpc GetClusterOverview (GetClusterOverviewRequest) returns (GetClusterOverviewResponse) {} + rpc GetRaftGroups (GetRaftGroupsRequest) returns (GetRaftGroupsResponse) {} + rpc GetAdapterSummary (GetAdapterSummaryRequest) returns (GetAdapterSummaryResponse) {} + rpc GetKeyVizMatrix (GetKeyVizMatrixRequest) returns (GetKeyVizMatrixResponse) {} + rpc GetRouteDetail (GetRouteDetailRequest) returns (GetRouteDetailResponse) {} + rpc StreamEvents (StreamEventsRequest) returns (stream StreamEventsEvent) {} +} + +message NodeIdentity { + string node_id = 1; + string grpc_address = 2; +} + +message GroupLeader { + uint64 raft_group_id = 1; + string leader_node_id = 2; + uint64 leader_term = 3; +} + +message GetClusterOverviewRequest {} + +message GetClusterOverviewResponse { + NodeIdentity self = 1; + repeated NodeIdentity members = 2; + repeated GroupLeader group_leaders = 3; + uint64 aggregate_qps = 4; +} + +message RaftGroupState { + uint64 raft_group_id = 1; + string leader_node_id = 2; + uint64 leader_term = 3; + uint64 commit_index = 4; + uint64 applied_index = 5; + int64 last_contact_unix_ms = 6; +} + +message GetRaftGroupsRequest {} + +message GetRaftGroupsResponse { + repeated RaftGroupState groups = 1; +} + +message AdapterSummary { + string adapter = 1; + string operation = 2; + uint64 requests = 3; + uint64 in_flight = 4; + uint64 bytes_in = 5; + uint64 bytes_out = 6; + double p50_ns = 7; + double p95_ns = 8; + double p99_ns = 9; +} + +message GetAdapterSummaryRequest {} + +message GetAdapterSummaryResponse { + repeated AdapterSummary summaries = 1; +} + +enum KeyVizSeries { + KEYVIZ_SERIES_UNSPECIFIED = 0; + KEYVIZ_SERIES_READS = 1; + KEYVIZ_SERIES_WRITES = 2; + KEYVIZ_SERIES_READ_BYTES = 3; + KEYVIZ_SERIES_WRITE_BYTES = 4; +} + +enum SampleRole { + SAMPLE_ROLE_UNSPECIFIED = 0; + SAMPLE_ROLE_LEADER_WRITE = 1; + SAMPLE_ROLE_LEADER_READ = 2; + SAMPLE_ROLE_FOLLOWER_READ = 3; +} + +message KeyVizRow { + // bucket_id is either "route:" or "virtual:". + string bucket_id = 1; + bytes start = 2; + bytes end = 3; + string label = 4; + bool aggregate = 5; + repeated uint64 route_ids = 6; + bool route_ids_truncated = 7; + uint64 route_count = 8; + repeated SampleRole sample_roles = 9; + string lineage_id = 10; + // values[j] is the series value at time column j. + repeated uint64 values = 11; + // soft_columns[j] is true when the j-th column missed the estimator SLO. + repeated bool soft_columns = 12; +} + +message GetKeyVizMatrixRequest { + KeyVizSeries series = 1; + int64 from_unix_ms = 2; + int64 to_unix_ms = 3; + uint32 rows = 4; +} + +message GetKeyVizMatrixResponse { + repeated int64 column_unix_ms = 1; + repeated KeyVizRow rows = 2; +} + +message GetRouteDetailRequest { + // Either a concrete route: or a virtual: emitted in a previous + // GetKeyVizMatrix response. + string bucket_id = 1; + int64 from_unix_ms = 2; + int64 to_unix_ms = 3; +} + +message GetRouteDetailResponse { + KeyVizRow row = 1; + repeated AdapterSummary per_adapter = 2; +} + +message StreamEventsRequest {} + +message StreamEventsEvent { + oneof event { + RouteTransition route_transition = 1; + KeyVizColumn keyviz_column = 2; + } +} + +message RouteTransition { + uint64 parent_route_id = 1; + repeated uint64 child_route_ids = 2; + string lineage_id = 3; + int64 unix_ms = 4; +} + +message KeyVizColumn { + int64 column_unix_ms = 1; + KeyVizSeries series = 2; + repeated KeyVizRow rows = 3; +} diff --git a/proto/admin_grpc.pb.go b/proto/admin_grpc.pb.go new file mode 100644 index 00000000..379805d3 --- /dev/null +++ b/proto/admin_grpc.pb.go @@ -0,0 +1,325 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.1 +// - protoc v7.34.0 +// source: admin.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + Admin_GetClusterOverview_FullMethodName = "/Admin/GetClusterOverview" + Admin_GetRaftGroups_FullMethodName = "/Admin/GetRaftGroups" + Admin_GetAdapterSummary_FullMethodName = "/Admin/GetAdapterSummary" + Admin_GetKeyVizMatrix_FullMethodName = "/Admin/GetKeyVizMatrix" + Admin_GetRouteDetail_FullMethodName = "/Admin/GetRouteDetail" + Admin_StreamEvents_FullMethodName = "/Admin/StreamEvents" +) + +// AdminClient is the client API for Admin service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// Admin is the node-side read-only admin gRPC service consumed by +// cmd/elastickv-admin. Every method requires "authorization: Bearer " +// metadata unless the node was started with --adminInsecureNoAuth. +// See docs/admin_ui_key_visualizer_design.md §4 (Layer A). +type AdminClient interface { + GetClusterOverview(ctx context.Context, in *GetClusterOverviewRequest, opts ...grpc.CallOption) (*GetClusterOverviewResponse, error) + GetRaftGroups(ctx context.Context, in *GetRaftGroupsRequest, opts ...grpc.CallOption) (*GetRaftGroupsResponse, error) + GetAdapterSummary(ctx context.Context, in *GetAdapterSummaryRequest, opts ...grpc.CallOption) (*GetAdapterSummaryResponse, error) + GetKeyVizMatrix(ctx context.Context, in *GetKeyVizMatrixRequest, opts ...grpc.CallOption) (*GetKeyVizMatrixResponse, error) + GetRouteDetail(ctx context.Context, in *GetRouteDetailRequest, opts ...grpc.CallOption) (*GetRouteDetailResponse, error) + StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamEventsEvent], error) +} + +type adminClient struct { + cc grpc.ClientConnInterface +} + +func NewAdminClient(cc grpc.ClientConnInterface) AdminClient { + return &adminClient{cc} +} + +func (c *adminClient) GetClusterOverview(ctx context.Context, in *GetClusterOverviewRequest, opts ...grpc.CallOption) (*GetClusterOverviewResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetClusterOverviewResponse) + err := c.cc.Invoke(ctx, Admin_GetClusterOverview_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *adminClient) GetRaftGroups(ctx context.Context, in *GetRaftGroupsRequest, opts ...grpc.CallOption) (*GetRaftGroupsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetRaftGroupsResponse) + err := c.cc.Invoke(ctx, Admin_GetRaftGroups_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *adminClient) GetAdapterSummary(ctx context.Context, in *GetAdapterSummaryRequest, opts ...grpc.CallOption) (*GetAdapterSummaryResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetAdapterSummaryResponse) + err := c.cc.Invoke(ctx, Admin_GetAdapterSummary_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *adminClient) GetKeyVizMatrix(ctx context.Context, in *GetKeyVizMatrixRequest, opts ...grpc.CallOption) (*GetKeyVizMatrixResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetKeyVizMatrixResponse) + err := c.cc.Invoke(ctx, Admin_GetKeyVizMatrix_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *adminClient) GetRouteDetail(ctx context.Context, in *GetRouteDetailRequest, opts ...grpc.CallOption) (*GetRouteDetailResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetRouteDetailResponse) + err := c.cc.Invoke(ctx, Admin_GetRouteDetail_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *adminClient) StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamEventsEvent], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Admin_ServiceDesc.Streams[0], Admin_StreamEvents_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[StreamEventsRequest, StreamEventsEvent]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Admin_StreamEventsClient = grpc.ServerStreamingClient[StreamEventsEvent] + +// AdminServer is the server API for Admin service. +// All implementations must embed UnimplementedAdminServer +// for forward compatibility. +// +// Admin is the node-side read-only admin gRPC service consumed by +// cmd/elastickv-admin. Every method requires "authorization: Bearer " +// metadata unless the node was started with --adminInsecureNoAuth. +// See docs/admin_ui_key_visualizer_design.md §4 (Layer A). +type AdminServer interface { + GetClusterOverview(context.Context, *GetClusterOverviewRequest) (*GetClusterOverviewResponse, error) + GetRaftGroups(context.Context, *GetRaftGroupsRequest) (*GetRaftGroupsResponse, error) + GetAdapterSummary(context.Context, *GetAdapterSummaryRequest) (*GetAdapterSummaryResponse, error) + GetKeyVizMatrix(context.Context, *GetKeyVizMatrixRequest) (*GetKeyVizMatrixResponse, error) + GetRouteDetail(context.Context, *GetRouteDetailRequest) (*GetRouteDetailResponse, error) + StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[StreamEventsEvent]) error + mustEmbedUnimplementedAdminServer() +} + +// UnimplementedAdminServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedAdminServer struct{} + +func (UnimplementedAdminServer) GetClusterOverview(context.Context, *GetClusterOverviewRequest) (*GetClusterOverviewResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetClusterOverview not implemented") +} +func (UnimplementedAdminServer) GetRaftGroups(context.Context, *GetRaftGroupsRequest) (*GetRaftGroupsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetRaftGroups not implemented") +} +func (UnimplementedAdminServer) GetAdapterSummary(context.Context, *GetAdapterSummaryRequest) (*GetAdapterSummaryResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetAdapterSummary not implemented") +} +func (UnimplementedAdminServer) GetKeyVizMatrix(context.Context, *GetKeyVizMatrixRequest) (*GetKeyVizMatrixResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetKeyVizMatrix not implemented") +} +func (UnimplementedAdminServer) GetRouteDetail(context.Context, *GetRouteDetailRequest) (*GetRouteDetailResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetRouteDetail not implemented") +} +func (UnimplementedAdminServer) StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[StreamEventsEvent]) error { + return status.Error(codes.Unimplemented, "method StreamEvents not implemented") +} +func (UnimplementedAdminServer) mustEmbedUnimplementedAdminServer() {} +func (UnimplementedAdminServer) testEmbeddedByValue() {} + +// UnsafeAdminServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AdminServer will +// result in compilation errors. +type UnsafeAdminServer interface { + mustEmbedUnimplementedAdminServer() +} + +func RegisterAdminServer(s grpc.ServiceRegistrar, srv AdminServer) { + // If the following call panics, it indicates UnimplementedAdminServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&Admin_ServiceDesc, srv) +} + +func _Admin_GetClusterOverview_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetClusterOverviewRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminServer).GetClusterOverview(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Admin_GetClusterOverview_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminServer).GetClusterOverview(ctx, req.(*GetClusterOverviewRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Admin_GetRaftGroups_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRaftGroupsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminServer).GetRaftGroups(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Admin_GetRaftGroups_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminServer).GetRaftGroups(ctx, req.(*GetRaftGroupsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Admin_GetAdapterSummary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetAdapterSummaryRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminServer).GetAdapterSummary(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Admin_GetAdapterSummary_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminServer).GetAdapterSummary(ctx, req.(*GetAdapterSummaryRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Admin_GetKeyVizMatrix_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetKeyVizMatrixRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminServer).GetKeyVizMatrix(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Admin_GetKeyVizMatrix_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminServer).GetKeyVizMatrix(ctx, req.(*GetKeyVizMatrixRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Admin_GetRouteDetail_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRouteDetailRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminServer).GetRouteDetail(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Admin_GetRouteDetail_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminServer).GetRouteDetail(ctx, req.(*GetRouteDetailRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Admin_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StreamEventsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AdminServer).StreamEvents(m, &grpc.GenericServerStream[StreamEventsRequest, StreamEventsEvent]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Admin_StreamEventsServer = grpc.ServerStreamingServer[StreamEventsEvent] + +// Admin_ServiceDesc is the grpc.ServiceDesc for Admin service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Admin_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Admin", + HandlerType: (*AdminServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetClusterOverview", + Handler: _Admin_GetClusterOverview_Handler, + }, + { + MethodName: "GetRaftGroups", + Handler: _Admin_GetRaftGroups_Handler, + }, + { + MethodName: "GetAdapterSummary", + Handler: _Admin_GetAdapterSummary_Handler, + }, + { + MethodName: "GetKeyVizMatrix", + Handler: _Admin_GetKeyVizMatrix_Handler, + }, + { + MethodName: "GetRouteDetail", + Handler: _Admin_GetRouteDetail_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamEvents", + Handler: _Admin_StreamEvents_Handler, + ServerStreams: true, + }, + }, + Metadata: "admin.proto", +}