From 7409e169b8dc368288386b3415d24dcc1cbfed1d Mon Sep 17 00:00:00 2001 From: Jiri Kraml Date: Tue, 12 Nov 2024 09:14:15 +0100 Subject: [PATCH] Allow exporting JS metadata --- Makefile | 4 +- collector/collector.go | 11 ++- collector/collector_test.go | 133 +++++++++++++++++++++++++++++++++--- collector/jsz.go | 74 +++++++++++++++----- exporter/exporter.go | 21 +++++- main.go | 4 ++ test/test.go | 2 +- 7 files changed, 219 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index 3a88fcb..a365c69 100644 --- a/Makefile +++ b/Makefile @@ -27,10 +27,10 @@ test-cov: .PHONY: lint lint: - @PATH=$(shell go env GOPATH)/bin:$(PATH) + @PATH=$(shell go env GOBIN):$(PATH) @if ! which golangci-lint >/dev/null; then \ echo "golangci-lint is required and was not found"; \ exit 1; \ fi go vet ./... - $(shell go env GOPATH)/bin/golangci-lint run ./... + $(shell go env GOBIN)/golangci-lint run ./... diff --git a/collector/collector.go b/collector/collector.go index 881fa87..ef3553d 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -494,7 +494,16 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p return newReplicatorCollector(getSystem(system, prefix), servers) } if isJszEndpoint(system) { - return newJszCollector(getSystem(system, prefix), endpoint, servers) + return newJszCollector(getSystem(system, prefix), endpoint, servers, []string{}, []string{}) } return newNatsCollector(getSystem(system, prefix), endpoint, servers) } + +// NewJszCollector creates a new NATS JetStream Collector. +func NewJszCollector( + endpoint, prefix string, + servers []*CollectedServer, + streamMetaKeys, consumerMetaKeys []string, +) prometheus.Collector { + return newJszCollector(getSystem(JetStreamSystem, prefix), endpoint, servers, streamMetaKeys, consumerMetaKeys) +} diff --git a/collector/collector_test.go b/collector/collector_test.go index 60f68da..d7c9415 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -42,8 +42,22 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float URL: url, } coll := NewCollector(system, endpoint, "", servers) + verifySpecificCollector(cases, coll, t) +} - // now collect the metrics +func verifyJszCollector(url string, endpoint string, cases map[string]float64, t *testing.T) { + // create a new collector. + servers := make([]*CollectedServer, 1) + servers[0] = &CollectedServer{ + ID: "id", + URL: url, + } + coll := NewJszCollector(endpoint, "", servers, []string{}, []string{}) + + verifySpecificCollector(cases, coll, t) +} + +func verifySpecificCollector(cases map[string]float64, coll prometheus.Collector, t *testing.T) { c := make(chan prometheus.Metric) go coll.Collect(c) for { @@ -107,6 +121,34 @@ func verifyStreamingCollector(url string, endpoint string, cases map[string]floa // To account for the metrics that share the same descriptor but differ in their variable label values, // return a list of lists of label pairs for each of the supplied metric names. func getLabelValues(system, url, endpoint string, metricNames []string) (map[string][]map[string]string, error) { + servers := make([]*CollectedServer, 1) + servers[0] = &CollectedServer{ + ID: "id", + URL: url, + } + coll := NewCollector(system, endpoint, "", servers) + return getLabelValuesFromCollector(metricNames, coll) +} + +// To account for the metrics that share the same descriptor but differ in their variable label values, +// return a list of lists of label pairs for each of the supplied metric names. +func getJszLabelValues( + url, endpoint string, + streamMetaKeys, consumerMetaKeys, metricNames []string, +) (map[string][]map[string]string, error) { + servers := make([]*CollectedServer, 1) + servers[0] = &CollectedServer{ + ID: "id", + URL: url, + } + coll := NewJszCollector(endpoint, "", servers, streamMetaKeys, consumerMetaKeys) + return getLabelValuesFromCollector(metricNames, coll) +} + +func getLabelValuesFromCollector( + metricNames []string, + coll prometheus.Collector, +) (map[string][]map[string]string, error) { labelValues := make(map[string][]map[string]string) namesMap := make(map[string]bool) for _, metricName := range metricNames { @@ -148,13 +190,7 @@ func getLabelValues(system, url, endpoint string, metricNames []string) (map[str } }() - // create a new collector and collect - servers := make([]*CollectedServer, 1) - servers[0] = &CollectedServer{ - ID: "id", - URL: url, - } - coll := NewCollector(system, endpoint, "", servers) + // collect metrics coll.Collect(metrics) close(metrics) @@ -746,7 +782,86 @@ func TestJetStreamMetrics(t *testing.T) { "jetstream_server_total_streams": 1, "jetstream_server_total_consumers": 1, } - verifyCollector(JetStreamSystem, url, "jsz", cases, t) + verifyJszCollector(url, "all", cases, t) +} + +func TestJetStreamMetricLabels(t *testing.T) { + clientPort := 4229 + monitorPort := 8229 + s := pet.RunJetStreamServerWithPorts(clientPort, monitorPort, "ABC") + + defer s.Shutdown() + + url := fmt.Sprintf("http://127.0.0.1:%d/", monitorPort) + nc, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", clientPort)) + if err != nil { + t.Fatal(err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatal(err) + } + + streamName := "myStr" + streamK := "streamFoo" + streamV := "bar" + _, err = js.AddStream(&nats.StreamConfig{ + Name: streamName, + Metadata: map[string]string{streamK: streamV}, + }) + if err != nil { + t.Fatal(err) + } + + consumerName := "myCon" + consumerK := "consFoo" + consumerV := "baz" + consumerConfig := nats.ConsumerConfig{Name: consumerName, Metadata: map[string]string{consumerK: consumerV}} + _, err = js.AddConsumer(streamName, &consumerConfig) + if err != nil { + t.Fatal(err) + } + + // expected label keys + streamLabelKey := "stream_meta_" + streamK + consumerLabelKey := "consumer_meta_" + consumerK + + streamMetric := "jetstream_stream_total_bytes" + consumerMetric := "jetstream_consumer_num_ack_pending" + labelValues, err := getJszLabelValues( + url, + "all", + []string{streamK}, + []string{consumerK}, + []string{streamMetric, consumerMetric}, + ) + if err != nil { + t.Fatalf("Unexpected error getting labels for %s metrics: %v", consumerMetric, err) + } + + streamMaps, found := labelValues[streamMetric] + if !found || len(streamMaps) != 1 { + t.Fatalf("No info found for metric: %v", streamMetric) + } + streamLabels := streamMaps[0] + if streamLabels[streamLabelKey] != streamV { + t.Fatalf("Value of stream label %s has unexpected value \"%s\"", streamLabelKey, streamLabels[streamLabelKey]) + } + + consumerMaps, found := labelValues[consumerMetric] + if !found || len(consumerMaps) != 1 { + t.Fatalf("No info found for metric: %v", consumerMetric) + } + consumerLabels := consumerMaps[0] + + if consumerLabels[streamLabelKey] != streamV { + t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", streamLabelKey, consumerLabels[streamLabelKey]) + } + if consumerLabels[consumerLabelKey] != consumerV { + t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", consumerLabelKey, consumerLabels[consumerLabelKey]) + } } func TestReplicatorMetrics(t *testing.T) { diff --git a/collector/jsz.go b/collector/jsz.go index a9271b7..9278a8c 100644 --- a/collector/jsz.go +++ b/collector/jsz.go @@ -56,29 +56,61 @@ type jszCollector struct { consumerNumPending *prometheus.Desc consumerAckFloorStreamSeq *prometheus.Desc consumerAckFloorConsumerSeq *prometheus.Desc + + // metadata extractors + streamMetricExtractors []func(nats.StreamDetail) string + consumerMetricExtractors []func(*nats.ConsumerInfo) string } func isJszEndpoint(system string) bool { return system == JetStreamSystem } -func newJszCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { +func newJszCollector( + system, endpoint string, + servers []*CollectedServer, + streamMetaKeys, consumerMetaKeys []string, +) prometheus.Collector { serverLabels := []string{"server_id", "server_name", "cluster", "domain", "meta_leader", "is_meta_leader"} - var streamLabels []string + var streamLabels = make([]string, 0) streamLabels = append(streamLabels, serverLabels...) streamLabels = append(streamLabels, "account") streamLabels = append(streamLabels, "account_id") streamLabels = append(streamLabels, "stream_name") streamLabels = append(streamLabels, "stream_leader") streamLabels = append(streamLabels, "is_stream_leader") + for _, k := range streamMetaKeys { + streamLabels = append(streamLabels, "stream_meta_"+k) + } + var streamMetricExtractors = make([]func(nats.StreamDetail) string, len(streamMetaKeys)) + for i, k := range streamMetaKeys { + streamMetricExtractors[i] = func(s nats.StreamDetail) string { + if s.Config == nil { + return "" + } + return s.Config.Metadata[k] // defaults to empty string + } + } - var consumerLabels []string + var consumerLabels = make([]string, 0) consumerLabels = append(consumerLabels, streamLabels...) consumerLabels = append(consumerLabels, "consumer_name") consumerLabels = append(consumerLabels, "consumer_leader") consumerLabels = append(consumerLabels, "is_consumer_leader") consumerLabels = append(consumerLabels, "consumer_desc") + for _, k := range consumerMetaKeys { + consumerLabels = append(consumerLabels, "consumer_meta_"+k) + } + var consumerMetricExtractors = make([]func(*nats.ConsumerInfo) string, len(consumerMetaKeys)) + for i, k := range consumerMetaKeys { + consumerMetricExtractors[i] = func(c *nats.ConsumerInfo) string { + if c == nil || c.Config == nil { + return "" + } + return c.Config.Metadata[k] // defaults to empty string + } + } nc := &jszCollector{ httpClient: &http.Client{ @@ -230,6 +262,8 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet consumerLabels, nil, ), + streamMetricExtractors: streamMetricExtractors, + consumerMetricExtractors: consumerMetricExtractors, } // Use the endpoint @@ -351,12 +385,18 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) { } else { isStreamLeader = "true" } + streamLabelValues := []string{ + // Server Labels + serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, + // Stream Labels + accountName, accountID, streamName, streamLeader, isStreamLeader, + } + for _, extractor := range nc.streamMetricExtractors { + value := extractor(stream) + streamLabelValues = append(streamLabelValues, value) + } streamMetric := func(key *prometheus.Desc, value float64) prometheus.Metric { - return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, - // Server Labels - serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, - // Stream Labels - accountName, accountID, streamName, streamLeader, isStreamLeader) + return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, streamLabelValues...) } ch <- streamMetric(nc.streamMessages, float64(stream.State.Msgs)) ch <- streamMetric(nc.streamBytes, float64(stream.State.Bytes)) @@ -381,15 +421,17 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) { } else { isConsumerLeader = "true" } + consumerLabelValues := streamLabelValues + consumerLabelValues = append(consumerLabelValues, + // Consumer Labels + consumerName, consumerLeader, isConsumerLeader, consumerDesc, + ) + for _, extractor := range nc.consumerMetricExtractors { + value := extractor(consumer) + consumerLabelValues = append(consumerLabelValues, value) + } consumerMetric := func(key *prometheus.Desc, value float64) prometheus.Metric { - return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, - // Server Labels - serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader, - // Stream Labels - accountName, accountID, streamName, streamLeader, isStreamLeader, - // Consumer Labels - consumerName, consumerLeader, isConsumerLeader, consumerDesc, - ) + return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, consumerLabelValues...) } ch <- consumerMetric(nc.consumerDeliveredConsumerSeq, float64(consumer.Delivered.Consumer)) ch <- consumerMetric(nc.consumerDeliveredStreamSeq, float64(consumer.Delivered.Stream)) diff --git a/exporter/exporter.go b/exporter/exporter.go index 51b62e5..7fd171c 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -60,6 +60,8 @@ type NATSExporterOptions struct { GetStreamingChannelz bool GetStreamingServerz bool GetJszFilter string + JszSteamMetaKeys string + JszConsumerMetaKeys string RetryInterval time.Duration CertFile string KeyFile string @@ -135,6 +137,15 @@ func (ne *NATSExporter) createCollector(system, endpoint string) { ne.servers)) } +func (ne *NATSExporter) createJszCollector(endpoint string, streamMetaKeys, consumerMetaKeys []string) { + ne.registerCollector(collector.JetStreamSystem, endpoint, + collector.NewJszCollector(endpoint, + ne.opts.Prefix, + ne.servers, + streamMetaKeys, + consumerMetaKeys)) +} + func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) { if err := ne.registry.Register(nc); err != nil { if _, ok := err.(prometheus.AlreadyRegisteredError); ok { @@ -225,7 +236,15 @@ func (ne *NATSExporter) InitializeCollectors() error { default: return fmt.Errorf("invalid jsz filter %q", opts.GetJszFilter) } - ne.createCollector(collector.JetStreamSystem, opts.GetJszFilter) + splitOrEmpty := func(s string) []string { + if s == "" { + return []string{} + } + return strings.Split(s, ",") + } + streamMetaKeys := splitOrEmpty(opts.JszSteamMetaKeys) + consumerMetaKeys := splitOrEmpty(opts.JszConsumerMetaKeys) + ne.createJszCollector(opts.GetJszFilter, streamMetaKeys, consumerMetaKeys) } if len(ne.Collectors) == 0 { return fmt.Errorf("no Collectors specified") diff --git a/main.go b/main.go index f5f5a14..06ba936 100644 --- a/main.go +++ b/main.go @@ -130,6 +130,10 @@ func main() { flag.BoolVar(&opts.GetStreamingServerz, "serverz", false, "Get streaming server metrics.") flag.BoolVar(&opts.GetVarz, "varz", false, "Get general metrics.") flag.StringVar(&opts.GetJszFilter, "jsz", "", "Select JetStream metrics to filter (e.g streams, accounts, consumers)") + flag.StringVar(&opts.JszSteamMetaKeys, "jsz_stream_meta_keys", "", + "Select JetStream stream metadata to output (comma separated)") + flag.StringVar(&opts.JszConsumerMetaKeys, "jsz_consumer_meta_keys", "", + "Select JetStream consumer metadata to output (comma separated)") flag.StringVar(&opts.CertFile, "tlscert", "", "Server certificate file (Enables HTTPS).") flag.StringVar(&opts.KeyFile, "tlskey", "", "Private key for server certificate (used with HTTPS).") flag.StringVar(&opts.CaFile, "tlscacert", "", "Client certificate CA for verification (used with HTTPS).") diff --git a/test/test.go b/test/test.go index 972adb3..0d191d6 100644 --- a/test/test.go +++ b/test/test.go @@ -201,7 +201,7 @@ func RunJetStreamServerWithPorts(port, monitorPort int, domain string) *server.S opts.JetStream = true opts.JetStreamDomain = domain tdir, _ := os.MkdirTemp(tempRoot, "js-storedir-") - opts.StoreDir = filepath.Dir(tdir) + opts.StoreDir = tdir opts.HTTPHost = "127.0.0.1" opts.HTTPPort = monitorPort