Skip to content

Commit

Permalink
Merge pull request #256 from nats-io/accountsz-metrics
Browse files Browse the repository at this point in the history
Add accountz metrics
  • Loading branch information
wallyqs authored Oct 26, 2023
2 parents dcc53f7 + 78f05c6 commit 7bd645a
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 160 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ prometheus-nats-exporter <flags> url
Get health metrics.
-gatewayz
Get gateway metrics.
-accstatz
Get accstatz metrics.
-leafz
Get leaf metrics.
-http_pass string
Expand Down
204 changes: 204 additions & 0 deletions collector/accstatz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2022-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package collector has various collector utilities and implementations.
package collector

import (
"net/http"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

func isAccstatzEndpoint(system, endpoint string) bool {
return system == CoreSystem && endpoint == "accstatz"
}

type accstatzCollector struct {
sync.Mutex

httpClient *http.Client
servers []*CollectedServer
accountMetrics *accountMetrics
}

func newAccstatzCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
nc := &accstatzCollector{
httpClient: http.DefaultClient,
accountMetrics: newAccountMetrics(system, endpoint),
}

nc.servers = make([]*CollectedServer, len(servers))
for i, s := range servers {
nc.servers[i] = &CollectedServer{
ID: s.ID,
URL: s.URL + "/accstatz?unused=1",
}
}

return nc
}

func (nc *accstatzCollector) Describe(ch chan<- *prometheus.Desc) {
nc.accountMetrics.Describe(ch)
}

// Collect gathers the server accstatz metrics.
func (nc *accstatzCollector) Collect(ch chan<- prometheus.Metric) {
for _, server := range nc.servers {
var resp Accstatz
if err := getMetricURL(nc.httpClient, server.URL, &resp); err != nil {
Debugf("ignoring server %s: %v", server.ID, err)
continue
}

for _, acc := range resp.Accounts {
nc.accountMetrics.Collect(server, acc, ch)
}
}
}

// accountMetrics has all of the prometheus descriptors related to
// each of the accounts of the server being scraped.
type accountMetrics struct {
connections *prometheus.Desc
totalConnections *prometheus.Desc
numSubs *prometheus.Desc
leafNodes *prometheus.Desc
sentMsgs *prometheus.Desc
sentBytes *prometheus.Desc
receivedMsgs *prometheus.Desc
receivedBytes *prometheus.Desc
slowConsumers *prometheus.Desc
}

// newAccountMetrics initializes a new instance of accountMetrics.
func newAccountMetrics(system, endpoint string) *accountMetrics {
account := &accountMetrics{
connections: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "current_connections"),
"current_connections",
[]string{"server_id", "account"},
nil),
totalConnections: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "total_connections"),
"total_connections",
[]string{"server_id", "account"},
nil),
numSubs: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "subscriptions"),
"subscriptions",
[]string{"server_id", "account"},
nil),
leafNodes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "leaf_nodes"),
"leaf_nodes",
[]string{"server_id", "account"},
nil),
sentMsgs: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "sent_messages"),
"sent_messages",
[]string{"server_id", "account"},
nil),
sentBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "sent_bytes"),
"sent_bytes",
[]string{"server_id", "account"},
nil),
receivedMsgs: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "received_messages"),
"received_messages",
[]string{"server_id", "account"},
nil),
receivedBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "received_bytes"),
"received_bytes",
[]string{"server_id", "account"},
nil),
slowConsumers: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "slow_consumers"),
"slow_consumers",
[]string{"server_id", "account"},
nil),
}

return account
}

// Describe
func (am *accountMetrics) Describe(ch chan<- *prometheus.Desc) {
ch <- am.connections
ch <- am.totalConnections
ch <- am.numSubs
ch <- am.leafNodes
ch <- am.sentMsgs
ch <- am.sentBytes
ch <- am.receivedMsgs
ch <- am.receivedBytes
ch <- am.slowConsumers
}

// Collect collects all the metrics about an account.
func (am *accountMetrics) Collect(server *CollectedServer, acc *Account, ch chan<- prometheus.Metric) {

ch <- prometheus.MustNewConstMetric(am.connections, prometheus.GaugeValue, float64(acc.Conns),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.totalConnections, prometheus.GaugeValue, float64(acc.TotalConns),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.numSubs, prometheus.GaugeValue, float64(acc.NumSubs),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.leafNodes, prometheus.GaugeValue, float64(acc.LeafNodes),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.sentMsgs, prometheus.GaugeValue, float64(acc.Sent.Msgs),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.sentBytes, prometheus.GaugeValue, float64(acc.Sent.Bytes),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.receivedMsgs, prometheus.GaugeValue, float64(acc.Received.Msgs),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.receivedBytes, prometheus.GaugeValue, float64(acc.Received.Bytes),
server.ID, acc.Account)

ch <- prometheus.MustNewConstMetric(am.slowConsumers, prometheus.GaugeValue, float64(acc.SlowConsumers),
server.ID, acc.Account)
}

// Accstatz output
type Accstatz struct {
Accounts []*Account `json:"account_statz"`
}

// Account stats output
type Account struct {
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
NumSubs uint32 `json:"num_subscriptions"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
}

// DataStats output
type DataStats struct {
Msgs int64 `json:"msgs"`
Bytes int64 `json:"bytes"`
}
5 changes: 4 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 The NATS Authors
// Copyright 2017-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -428,6 +428,9 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p
if isGatewayzEndpoint(system, endpoint) {
return newGatewayzCollector(getSystem(system, prefix), endpoint, servers)
}
if isAccstatzEndpoint(system, endpoint) {
return newAccstatzCollector(getSystem(system, prefix), endpoint, servers)
}
if isLeafzEndpoint(system, endpoint) {
return newLeafzCollector(getSystem(system, prefix), endpoint, servers)
}
Expand Down
8 changes: 4 additions & 4 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2018 The NATS Authors
// Copyright 2017-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestVarz(t *testing.T) {
"gnatsd_varz_out_msgs": 1,
"gnatsd_varz_in_bytes": 5,
"gnatsd_varz_out_bytes": 5,
"gnatsd_varz_subscriptions": 44,
"gnatsd_varz_subscriptions": 55,
}

verifyCollector(CoreSystem, url, "varz", cases, t)
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestAllEndpoints(t *testing.T) {
verifyCollector(CoreSystem, url, "routez", cases, t)

cases = map[string]float64{
"gnatsd_subsz_num_subscriptions": 44,
"gnatsd_subsz_num_subscriptions": 55,
}
verifyCollector(CoreSystem, url, "subsz", cases, t)

Expand Down Expand Up @@ -377,7 +377,7 @@ func TestStreamingVarz(t *testing.T) {
"gnatsd_varz_out_msgs": 44,
"gnatsd_varz_in_bytes": 1594,
"gnatsd_varz_out_bytes": 1549,
"gnatsd_varz_subscriptions": 57,
"gnatsd_varz_subscriptions": 68,
}

verifyCollector(CoreSystem, url, "varz", cases, t)
Expand Down
15 changes: 8 additions & 7 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type NATSExporterOptions struct {
GetSubz bool
GetRoutez bool
GetGatewayz bool
GetAccstatz bool
GetLeafz bool
GetReplicatorVarz bool
GetStreamingChannelz bool
Expand Down Expand Up @@ -175,12 +176,6 @@ func (ne *NATSExporter) InitializeCollectors() error {
return fmt.Errorf("no servers configured to obtain metrics")
}

getJsz := opts.GetJszFilter != ""
if !opts.GetHealthz && !opts.GetConnz && !opts.GetConnzDetailed && !opts.GetRoutez &&
!opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetLeafz &&
!opts.GetStreamingChannelz && !opts.GetStreamingServerz && !opts.GetReplicatorVarz && !getJsz {
return fmt.Errorf("no Collectors specfied")
}
if opts.GetReplicatorVarz && opts.GetVarz {
return fmt.Errorf("replicatorVarz cannot be used with varz")
}
Expand All @@ -201,6 +196,9 @@ func (ne *NATSExporter) InitializeCollectors() error {
if opts.GetGatewayz {
ne.createCollector(collector.CoreSystem, "gatewayz")
}
if opts.GetAccstatz {
ne.createCollector(collector.CoreSystem, "accstatz")
}
if opts.GetLeafz {
ne.createCollector(collector.CoreSystem, "leafz")
}
Expand All @@ -216,14 +214,17 @@ func (ne *NATSExporter) InitializeCollectors() error {
if opts.GetReplicatorVarz {
ne.createCollector(collector.ReplicatorSystem, "varz")
}
if getJsz {
if opts.GetJszFilter != "" {
switch strings.ToLower(opts.GetJszFilter) {
case "account", "accounts", "consumer", "consumers", "all", "stream", "streams":
default:
return fmt.Errorf("invalid jsz filter %q", opts.GetJszFilter)
}
ne.createCollector(collector.JetStreamSystem, opts.GetJszFilter)
}
if len(ne.Collectors) == 0 {
return fmt.Errorf("no Collectors specified")
}

return nil
}
Expand Down
26 changes: 25 additions & 1 deletion exporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2018 The NATS Authors
// Copyright 2017-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -644,6 +644,30 @@ func TestExporterGatewayz(t *testing.T) {
}
}

func TestExporterAccstatz(t *testing.T) {
opts := getStaticExporterTestOptions()
opts.ListenAddress = "localhost"
opts.ListenPort = 0
opts.GetAccstatz = true

serverExit := &sync.WaitGroup{}

serverExit.Add(1)
s := pet.RunAccstatzStaticServer(serverExit)
defer s.Shutdown(context.TODO())

exp := NewExporter(opts)
if err := exp.Start(); err != nil {
t.Fatalf("%v", err)
}
defer exp.Stop()

_, err := checkExporterForResult(exp.http.Addr().String(), "gnatsd_accstatz_current_connections")
if err != nil {
t.Fatalf("%v", err)
}
}

func TestExporterLeafz(t *testing.T) {
opts := getStaticExporterTestOptions()
opts.ListenAddress = "localhost"
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.20

require (
github.com/nats-io/nats-replicator v0.1.0
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nats-server/v2 v2.10.3
github.com/nats-io/nats-streaming-server v0.25.5
github.com/nats-io/nats.go v1.27.1
github.com/nats-io/nats.go v1.30.2
github.com/nats-io/stan.go v0.10.4
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0
golang.org/x/crypto v0.10.0
golang.org/x/crypto v0.13.0
)

require (
Expand All @@ -26,18 +26,18 @@ require (
github.com/hashicorp/go-msgpack/v2 v2.1.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/raft v1.5.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
Loading

0 comments on commit 7bd645a

Please sign in to comment.