diff --git a/README.md b/README.md index 7e72bbe..1030c73 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,8 @@ prometheus-nats-exporter url Get health metrics. -gatewayz Get gateway metrics. + -accstatz + Get accstatz metrics. -leafz Get leaf metrics. -http_pass string diff --git a/collector/accstatz.go b/collector/accstatz.go new file mode 100644 index 0000000..34247c0 --- /dev/null +++ b/collector/accstatz.go @@ -0,0 +1,193 @@ +// Copyright 2017-2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector has various collector utilities and implementations. +package collector + +import ( + "net/http" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +func isAccstatzEndpoint(system, endpoint string) bool { + return system == CoreSystem && endpoint == "accstatz" +} + +type accstatzCollector struct { + sync.Mutex + + httpClient *http.Client + servers []*CollectedServer + accountMetrics *accountMetrics +} + +func newAccstatzCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { + nc := &accstatzCollector{ + httpClient: http.DefaultClient, + accountMetrics: newAccountMetrics(system, endpoint), + } + + nc.servers = make([]*CollectedServer, len(servers)) + for i, s := range servers { + nc.servers[i] = &CollectedServer{ + ID: s.ID, + URL: s.URL + "/accstatz?unused=1", + } + } + + return nc +} + +func (nc *accstatzCollector) Describe(ch chan<- *prometheus.Desc) { + nc.accountMetrics.Describe(ch) +} + +// Collect gathers the server accstatz metrics. +func (nc *accstatzCollector) Collect(ch chan<- prometheus.Metric) { + for _, server := range nc.servers { + var resp Accstatz + if err := getMetricURL(nc.httpClient, server.URL, &resp); err != nil { + Debugf("ignoring server %s: %v", server.ID, err) + continue + } + + for _, acc := range resp.Accounts { + nc.accountMetrics.Collect(server, acc, ch) + } + } +} + +// accountMetrics has all of the prometheus descriptors related to +// each of the accounts of the server being scraped. +type accountMetrics struct { + connections *prometheus.Desc + totalConnections *prometheus.Desc + leafNodes *prometheus.Desc + sentMsgs *prometheus.Desc + sentBytes *prometheus.Desc + receivedMsgs *prometheus.Desc + receivedBytes *prometheus.Desc + slowConsumers *prometheus.Desc +} + +// newAccountMetrics initializes a new instance of accountMetrics. +func newAccountMetrics(system, endpoint string) *accountMetrics { + account := &accountMetrics{ + connections: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "current_connections"), + "current_connections", + []string{"server_id", "account"}, + nil), + totalConnections: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "total_connections"), + "total_connections", + []string{"server_id", "account"}, + nil), + leafNodes: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "leaf_nodes"), + "leaf_nodes", + []string{"server_id", "account"}, + nil), + sentMsgs: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "sent_messages"), + "sent_messages", + []string{"server_id", "account"}, + nil), + sentBytes: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "sent_bytes"), + "sent_bytes", + []string{"server_id", "account"}, + nil), + receivedMsgs: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "received_messages"), + "received_messages", + []string{"server_id", "account"}, + nil), + receivedBytes: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "received_bytes"), + "received_bytes", + []string{"server_id", "account"}, + nil), + slowConsumers: prometheus.NewDesc( + prometheus.BuildFQName(system, endpoint, "slow_consumers"), + "slow_consumers", + []string{"server_id", "account"}, + nil), + } + + return account +} + +// Describe +func (am *accountMetrics) Describe(ch chan<- *prometheus.Desc) { + ch <- am.connections + ch <- am.totalConnections + ch <- am.leafNodes + ch <- am.sentMsgs + ch <- am.sentBytes + ch <- am.receivedMsgs + ch <- am.receivedBytes + ch <- am.slowConsumers +} + +// Collect collects all the metrics about an account. +func (am *accountMetrics) Collect(server *CollectedServer, acc *Account, ch chan<- prometheus.Metric) { + + ch <- prometheus.MustNewConstMetric(am.connections, prometheus.GaugeValue, float64(acc.Connections), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.totalConnections, prometheus.GaugeValue, float64(acc.TotalConnections), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.leafNodes, prometheus.GaugeValue, float64(acc.LeafNodes), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.sentMsgs, prometheus.GaugeValue, float64(acc.Sent.Messages), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.sentBytes, prometheus.GaugeValue, float64(acc.Sent.Bytes), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.receivedMsgs, prometheus.GaugeValue, float64(acc.Received.Messages), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.receivedBytes, prometheus.GaugeValue, float64(acc.Received.Bytes), + server.ID, acc.AccountId) + + ch <- prometheus.MustNewConstMetric(am.slowConsumers, prometheus.GaugeValue, float64(acc.SlowConsumers), + server.ID, acc.AccountId) +} + +// Accstatz output +type Accstatz struct { + Accounts []*Account `json:"account_statz"` +} + +// Leaf output +type Account struct { + AccountId string `json:"acc"` + Connections int `json:"conns"` + LeafNodes int `json:"leafnodes"` + TotalConnections int `json:"total_conns"` + Sent Data `json:"sent"` + Received Data `json:"received"` + SlowConsumers int `json:"slow_consumers"` +} + +// Data output +type Data struct { + Messages int `json:"msgs"` + Bytes int `json:"bytes"` +} diff --git a/collector/collector.go b/collector/collector.go index c269676..32df3ee 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -428,6 +428,9 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p if isGatewayzEndpoint(system, endpoint) { return newGatewayzCollector(getSystem(system, prefix), endpoint, servers) } + if isAccstatzEndpoint(system, endpoint) { + return newAccstatzCollector(getSystem(system, prefix), endpoint, servers) + } if isLeafzEndpoint(system, endpoint) { return newLeafzCollector(getSystem(system, prefix), endpoint, servers) } diff --git a/exporter/exporter.go b/exporter/exporter.go index 62f72b5..bda47f7 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -51,6 +51,7 @@ type NATSExporterOptions struct { GetSubz bool GetRoutez bool GetGatewayz bool + GetAccstatz bool GetLeafz bool GetReplicatorVarz bool GetStreamingChannelz bool @@ -177,8 +178,9 @@ func (ne *NATSExporter) InitializeCollectors() error { getJsz := opts.GetJszFilter != "" if !opts.GetHealthz && !opts.GetConnz && !opts.GetConnzDetailed && !opts.GetRoutez && - !opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetLeafz && - !opts.GetStreamingChannelz && !opts.GetStreamingServerz && !opts.GetReplicatorVarz && !getJsz { + !opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetAccstatz && !opts.GetLeafz && + !opts.GetStreamingChannelz && !opts.GetStreamingServerz && + !opts.GetReplicatorVarz && !getJsz { return fmt.Errorf("no Collectors specfied") } if opts.GetReplicatorVarz && opts.GetVarz { @@ -201,6 +203,9 @@ func (ne *NATSExporter) InitializeCollectors() error { if opts.GetGatewayz { ne.createCollector(collector.CoreSystem, "gatewayz") } + if opts.GetAccstatz { + ne.createCollector(collector.CoreSystem, "accstatz") + } if opts.GetLeafz { ne.createCollector(collector.CoreSystem, "leafz") } diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index 6d2bc75..04401d5 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -644,6 +644,30 @@ func TestExporterGatewayz(t *testing.T) { } } +func TestExporterAccstatz(t *testing.T) { + opts := getStaticExporterTestOptions() + opts.ListenAddress = "localhost" + opts.ListenPort = 0 + opts.GetAccstatz = true + + serverExit := &sync.WaitGroup{} + + serverExit.Add(1) + s := pet.RunAccstatzStaticServer(serverExit) + defer s.Shutdown(context.TODO()) + + exp := NewExporter(opts) + if err := exp.Start(); err != nil { + t.Fatalf("%v", err) + } + defer exp.Stop() + + _, err := checkExporterForResult(exp.http.Addr().String(), "gnatsd_accstatz_current_connections") + if err != nil { + t.Fatalf("%v", err) + } +} + func TestExporterLeafz(t *testing.T) { opts := getStaticExporterTestOptions() opts.ListenAddress = "localhost" diff --git a/main.go b/main.go index d76a496..441f7ef 100644 --- a/main.go +++ b/main.go @@ -77,7 +77,7 @@ func updateOptions(debugAndTrace, useSysLog bool, opts *exporter.NATSExporterOpt } metricsSpecified := opts.GetConnz || opts.GetVarz || opts.GetSubz || opts.GetHealthz || - opts.GetRoutez || opts.GetGatewayz || opts.GetLeafz || opts.GetStreamingChannelz || + opts.GetRoutez || opts.GetGatewayz || opts.GetAccstatz || opts.GetLeafz || opts.GetStreamingChannelz || opts.GetStreamingServerz || opts.GetReplicatorVarz || opts.GetJszFilter == "" if !metricsSpecified { // No logger setup yet, so use fmt @@ -118,6 +118,7 @@ func main() { flag.BoolVar(&opts.GetHealthz, "healthz", false, "Get health metrics.") flag.BoolVar(&opts.GetReplicatorVarz, "replicatorVarz", false, "Get replicator general metrics.") flag.BoolVar(&opts.GetGatewayz, "gatewayz", false, "Get gateway metrics.") + flag.BoolVar(&opts.GetAccstatz, "accstatz", false, "Get accstatz metrics.") flag.BoolVar(&opts.GetLeafz, "leafz", false, "Get leaf metrics.") flag.BoolVar(&opts.GetRoutez, "routez", false, "Get route metrics.") flag.BoolVar(&opts.GetSubz, "subz", false, "Get subscription metrics.") diff --git a/test/data.go b/test/data.go index ecc7ace..79af8c1 100644 --- a/test/data.go +++ b/test/data.go @@ -107,6 +107,48 @@ func GatewayzTestResponse() string { } +// AccstatzTestResponse is static data for tests +func AccstatzTestResponse() string { + return `{ + "server_id": "SERVER_ID", + "now": "2021-05-07T18:13:47.70796395Z", + "account_statz": [ + { + "acc": "$G", + "conns": 100, + "leafnodes": 0, + "total_conns": 1000, + "sent": { + "msgs": 0, + "bytes": 0 + }, + "received": { + "msgs": 35922, + "bytes": 4574155 + }, + "slow_consumers": 0 + }, + { + "acc": "$A", + "conns": 0, + "leafnodes": 0, + "total_conns": 0, + "sent": { + "msgs": 0, + "bytes": 0 + }, + "received": { + "msgs": 0, + "bytes": 0 + }, + "slow_consumers": 0 + } + ] +} +` + +} + func leafzTestResponse() string { return `{ "server_id": "NC2FJCRMPBE5RI5OSRN7TKUCWQONCKNXHKJXCJIDVSAZ6727M7MQFVT3", diff --git a/test/test.go b/test/test.go index ac85d99..692a4f8 100644 --- a/test/test.go +++ b/test/test.go @@ -74,6 +74,20 @@ func RunGatewayzStaticServer(wg *sync.WaitGroup) *http.Server { return srv } +// RunAccstatzStaticServer starts an http server with static content +func RunAccstatzStaticServer(wg *sync.WaitGroup) *http.Server { + srv := &http.Server{Addr: ":" + strconv.Itoa(StaticPort)} + http.Handle("/accstatz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, AccstatzTestResponse()) + })) + + go func() { + defer wg.Done() + srv.ListenAndServe() + }() + return srv +} + // RunLeafzStaticServer runs a leafz static server. func RunLeafzStaticServer(wg *sync.WaitGroup) *http.Server { srv := &http.Server{Addr: ":" + strconv.Itoa(StaticPort)}