Emit node job info platform events#22175
Conversation
|
I see you updated files related to
|
|
✅ No conflicts with other open PRs targeting |
6317f74 to
7cc621c
Compare
There was a problem hiding this comment.
Pull request overview
Risk Rating: MEDIUM — introduces a new always-on background service that periodically queries the jobs table and emits telemetry events.
Updates node-platform telemetry emission to support the new NodeJobInfo.submitter_addresses proto shape and exposes a new NodePlatformJobInfo health check alongside existing node-platform build info emission.
Changes:
- Add
NodePlatformJobInfoServiceto emitcommon.v1.NodeJobInfoplatform events with source-aware submitter address entries derived from job specs and pipeline ETHTx tasks. - Wire the new service into
NewApplicationand update health endpoint golden test fixtures to includeNodePlatformJobInfo. - Bump
chainlink-protos/node-platformdependency (root + nested Go modules) to the commit that contains the new proto shape.
Areas requiring scrupulous human review:
core/services/chainlink/node_platform.go: the job enumeration strategy insidesubmitterAddresses()(DB load / runtime impact) and the address extraction logic from job/pipeline fields.
Reviewed changes
Copilot reviewed 16 out of 23 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| testdata/scripts/health/multi-chain.txtar | Updates golden health output to include NodePlatformJobInfo. |
| testdata/scripts/health/multi-chain-loopp.txtar | Updates golden health output to include NodePlatformJobInfo. |
| testdata/scripts/health/default.txtar | Updates golden health output to include NodePlatformJobInfo. |
| core/web/testdata/body/health.txt | Updates plain-text health fixture with NodePlatformJobInfo. |
| core/web/testdata/body/health.json | Updates JSON health fixture with NodePlatformJobInfo check block. |
| core/web/testdata/body/health.html | Updates HTML health fixture with NodePlatformJobInfo section. |
| core/services/chainlink/node_platform.go | Implements NodePlatformJobInfoService + submitter-address extraction helpers; renames build-info entity const. |
| core/services/chainlink/node_platform_test.go | Adds unit test covering submitter address extraction/emission behavior. |
| core/services/chainlink/application.go | Wires NodePlatformJobInfoService into application service list. |
| go.mod | Bumps chainlink-protos/node-platform version. |
| go.sum | Updates checksums for new chainlink-protos/node-platform version. |
| deployment/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| deployment/go.sum | Updates checksums for dependency bump. |
| core/scripts/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| core/scripts/go.sum | Updates checksums for dependency bump. |
| integration-tests/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| integration-tests/go.sum | Updates checksums for dependency bump. |
| integration-tests/load/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| integration-tests/load/go.sum | Updates checksums for dependency bump. |
| system-tests/lib/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| system-tests/lib/go.sum | Updates checksums for dependency bump. |
| system-tests/tests/go.mod | Bumps chainlink-protos/node-platform (indirect). |
| system-tests/tests/go.sum | Updates checksums for dependency bump. |
| func (s *NodePlatformJobInfoService) submitterAddresses(ctx context.Context) []*commonv1.NodeSubmitterAddress { | ||
| if s.opts.JobReader == nil { | ||
| return nil | ||
| } | ||
|
|
||
| jobs, _, err := s.opts.JobReader.FindJobs(ctx, 0, math.MaxInt) | ||
| if err != nil { | ||
| s.eng.Warnw("failed to resolve node-platform submitter addresses", "err", err) | ||
| return nil | ||
| } | ||
|
|
||
| return nodeSubmitterAddressesFromJobs(jobs) | ||
| } |
There was a problem hiding this comment.
submitterAddresses calls FindJobs(ctx, 0, math.MaxInt), which loads all jobs (and FindJobs also loads job type details) every beat. On nodes with many jobs this can cause large DB queries, high memory usage, and long emit latencies. Consider paging through jobs with a reasonable limit (e.g., 500/1000) using the returned count, or adding a lighter-weight query on the JobReader that returns only the fields needed for submitter address extraction.
|
| nodePlatformBuildInfoEntity = "common.v1.NodeBuildInfo" | ||
| nodePlatformJobInfoEntity = "common.v1.NodeJobInfo" | ||
| nodePlatformDataSchema = "/node-platform/common/v1" |
There was a problem hiding this comment.
What does common mean in this context?
There was a problem hiding this comment.
Is there a an example of the raw structure being emitted? I'm having trouble reading all the nested helper funcs.
| addNodeSubmitterAddress(bySource, spec.OracleFactory.ChainID, jobType(jb, job.StandardCapabilities), "", nodeSubmitterFieldOracleFactoryTransmitterID, spec.OracleFactory.TransmitterID) | ||
| } | ||
|
|
||
| func addPipelineETHTxSubmitterAddresses(bySource map[nodeSubmitterAddressKey]map[string]struct{}, jb job.Job) { |
There was a problem hiding this comment.
Can we find a way to have these funcs produce a result instead of modify an argument? If you are worried about efficiency, then iter.Seq2s can be used. Otherwise, a basic builder type with methods would help.
| addresses := make([]string, 0, len(bySource[key])) | ||
| for address := range bySource[key] { | ||
| addresses = append(addresses, address) | ||
| } | ||
| sort.Strings(addresses) | ||
| if len(addresses) == 0 { | ||
| continue | ||
| } |
There was a problem hiding this comment.
| addresses := make([]string, 0, len(bySource[key])) | |
| for address := range bySource[key] { | |
| addresses = append(addresses, address) | |
| } | |
| sort.Strings(addresses) | |
| if len(addresses) == 0 { | |
| continue | |
| } | |
| if len(bySource[key]) == 0 { | |
| continue | |
| } | |
| addresses := slices.Sorted(maps.Keys(bySource[key]) |




Summary
Updates the node-platform job info emitter to use the new
NodeJobInfo.submitter_addressesproto shape from the mergedchainlink-protos/node-platformcommitacccf17ece83099c7e4d91bd3e40b2d65b47b356.The emitted entries preserve the Chainlink job source field names through
field_path, including:transmitterAddresstransmitterIDrelayConfig.sendingKeysrelayConfig.dualTransmission.transmitterAddressfromAddressesoracle_factory.transmitter_idobservationSource.ethtx.fromThe implementation emits one source-aware submitter address entry per chain/job/plugin/field path, with deduped sorted addresses.
Notes
transmitterIDvalues are excluded because those are offchain transmitter identities, not gas-paying on-chain submitter addresses.ethtx.frompipeline values.develop, so the old KeeperfromAddresspath is no longer emitted.chainlink-protos/node-platformdependency was updated across root and nested Go modules viamake gomodtidy.Validation
make gomodtidyGOTOOLCHAIN=auto GOCACHE=/tmp/go-build go test -count=1 ./core/services/chainlink -run 'NodePlatform(BuildInfo|JobInfo)'GOTOOLCHAIN=auto GOCACHE=/tmp/go-build go test -count=1 ./core/services/chainlink -run '^$'cd system-tests/tests && GOTOOLCHAIN=auto GOCACHE=/tmp/go-build go test -run '^$' ./...git diff --check