Skip to content

Commit

Permalink
add /accstatz metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tharkunns authored and piotrpio committed Oct 25, 2023
1 parent 0fd6d26 commit 6546411
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ prometheus-nats-exporter <flags> url
Get health metrics.
-gatewayz
Get gateway metrics.
-accstatz
Get accstatz metrics.
-leafz
Get leaf metrics.
-http_pass string
Expand Down
193 changes: 193 additions & 0 deletions collector/accstatz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2017-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package collector has various collector utilities and implementations.
package collector

import (
"net/http"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

func isAccstatzEndpoint(system, endpoint string) bool {
return system == CoreSystem && endpoint == "accstatz"
}

type accstatzCollector struct {
sync.Mutex

httpClient *http.Client
servers []*CollectedServer
accountMetrics *accountMetrics
}

func newAccstatzCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
nc := &accstatzCollector{
httpClient: http.DefaultClient,
accountMetrics: newAccountMetrics(system, endpoint),
}

nc.servers = make([]*CollectedServer, len(servers))
for i, s := range servers {
nc.servers[i] = &CollectedServer{
ID: s.ID,
URL: s.URL + "/accstatz?unused=1",
}
}

return nc
}

func (nc *accstatzCollector) Describe(ch chan<- *prometheus.Desc) {
nc.accountMetrics.Describe(ch)
}

// Collect gathers the server accstatz metrics.
func (nc *accstatzCollector) Collect(ch chan<- prometheus.Metric) {
for _, server := range nc.servers {
var resp Accstatz
if err := getMetricURL(nc.httpClient, server.URL, &resp); err != nil {
Debugf("ignoring server %s: %v", server.ID, err)
continue
}

for _, acc := range resp.Accounts {
nc.accountMetrics.Collect(server, acc, ch)
}
}
}

// accountMetrics has all of the prometheus descriptors related to
// each of the accounts of the server being scraped.
type accountMetrics struct {
connections *prometheus.Desc
totalConnections *prometheus.Desc
leafNodes *prometheus.Desc
sentMsgs *prometheus.Desc
sentBytes *prometheus.Desc
receivedMsgs *prometheus.Desc
receivedBytes *prometheus.Desc
slowConsumers *prometheus.Desc
}

// newAccountMetrics initializes a new instance of accountMetrics.
func newAccountMetrics(system, endpoint string) *accountMetrics {
account := &accountMetrics{
connections: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "current_connections"),
"current_connections",
[]string{"server_id", "account"},
nil),
totalConnections: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "total_connections"),
"total_connections",
[]string{"server_id", "account"},
nil),
leafNodes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "leaf_nodes"),
"leaf_nodes",
[]string{"server_id", "account"},
nil),
sentMsgs: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "sent_messages"),
"sent_messages",
[]string{"server_id", "account"},
nil),
sentBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "sent_bytes"),
"sent_bytes",
[]string{"server_id", "account"},
nil),
receivedMsgs: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "received_messages"),
"received_messages",
[]string{"server_id", "account"},
nil),
receivedBytes: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "received_bytes"),
"received_bytes",
[]string{"server_id", "account"},
nil),
slowConsumers: prometheus.NewDesc(
prometheus.BuildFQName(system, endpoint, "slow_consumers"),
"slow_consumers",
[]string{"server_id", "account"},
nil),
}

return account
}

// Describe
func (am *accountMetrics) Describe(ch chan<- *prometheus.Desc) {
ch <- am.connections
ch <- am.totalConnections
ch <- am.leafNodes
ch <- am.sentMsgs
ch <- am.sentBytes
ch <- am.receivedMsgs
ch <- am.receivedBytes
ch <- am.slowConsumers
}

// Collect collects all the metrics about an account.
func (am *accountMetrics) Collect(server *CollectedServer, acc *Account, ch chan<- prometheus.Metric) {

ch <- prometheus.MustNewConstMetric(am.connections, prometheus.GaugeValue, float64(acc.Connections),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.totalConnections, prometheus.GaugeValue, float64(acc.TotalConnections),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.leafNodes, prometheus.GaugeValue, float64(acc.LeafNodes),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.sentMsgs, prometheus.GaugeValue, float64(acc.Sent.Messages),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.sentBytes, prometheus.GaugeValue, float64(acc.Sent.Bytes),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.receivedMsgs, prometheus.GaugeValue, float64(acc.Received.Messages),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.receivedBytes, prometheus.GaugeValue, float64(acc.Received.Bytes),
server.ID, acc.AccountId)

ch <- prometheus.MustNewConstMetric(am.slowConsumers, prometheus.GaugeValue, float64(acc.SlowConsumers),
server.ID, acc.AccountId)
}

// Accstatz output
type Accstatz struct {
Accounts []*Account `json:"account_statz"`
}

// Leaf output
type Account struct {
AccountId string `json:"acc"`
Connections int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConnections int `json:"total_conns"`
Sent Data `json:"sent"`
Received Data `json:"received"`
SlowConsumers int `json:"slow_consumers"`
}

// Data output
type Data struct {
Messages int `json:"msgs"`
Bytes int `json:"bytes"`
}
3 changes: 3 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p
if isGatewayzEndpoint(system, endpoint) {
return newGatewayzCollector(getSystem(system, prefix), endpoint, servers)
}
if isAccstatzEndpoint(system, endpoint) {
return newAccstatzCollector(getSystem(system, prefix), endpoint, servers)
}
if isLeafzEndpoint(system, endpoint) {
return newLeafzCollector(getSystem(system, prefix), endpoint, servers)
}
Expand Down
9 changes: 7 additions & 2 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type NATSExporterOptions struct {
GetSubz bool
GetRoutez bool
GetGatewayz bool
GetAccstatz bool
GetLeafz bool
GetReplicatorVarz bool
GetStreamingChannelz bool
Expand Down Expand Up @@ -177,8 +178,9 @@ func (ne *NATSExporter) InitializeCollectors() error {

getJsz := opts.GetJszFilter != ""
if !opts.GetHealthz && !opts.GetConnz && !opts.GetConnzDetailed && !opts.GetRoutez &&
!opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetLeafz &&
!opts.GetStreamingChannelz && !opts.GetStreamingServerz && !opts.GetReplicatorVarz && !getJsz {
!opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetAccstatz && !opts.GetLeafz &&
!opts.GetStreamingChannelz && !opts.GetStreamingServerz &&
!opts.GetReplicatorVarz && !getJsz {
return fmt.Errorf("no Collectors specfied")
}
if opts.GetReplicatorVarz && opts.GetVarz {
Expand All @@ -201,6 +203,9 @@ func (ne *NATSExporter) InitializeCollectors() error {
if opts.GetGatewayz {
ne.createCollector(collector.CoreSystem, "gatewayz")
}
if opts.GetAccstatz {
ne.createCollector(collector.CoreSystem, "accstatz")
}
if opts.GetLeafz {
ne.createCollector(collector.CoreSystem, "leafz")
}
Expand Down
24 changes: 24 additions & 0 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,30 @@ func TestExporterGatewayz(t *testing.T) {
}
}

func TestExporterAccstatz(t *testing.T) {
opts := getStaticExporterTestOptions()
opts.ListenAddress = "localhost"
opts.ListenPort = 0
opts.GetAccstatz = true

serverExit := &sync.WaitGroup{}

serverExit.Add(1)
s := pet.RunAccstatzStaticServer(serverExit)
defer s.Shutdown(context.TODO())

exp := NewExporter(opts)
if err := exp.Start(); err != nil {
t.Fatalf("%v", err)
}
defer exp.Stop()

_, err := checkExporterForResult(exp.http.Addr().String(), "gnatsd_accstatz_current_connections")
if err != nil {
t.Fatalf("%v", err)
}
}

func TestExporterLeafz(t *testing.T) {
opts := getStaticExporterTestOptions()
opts.ListenAddress = "localhost"
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func updateOptions(debugAndTrace, useSysLog bool, opts *exporter.NATSExporterOpt
}

metricsSpecified := opts.GetConnz || opts.GetVarz || opts.GetSubz || opts.GetHealthz ||
opts.GetRoutez || opts.GetGatewayz || opts.GetLeafz || opts.GetStreamingChannelz ||
opts.GetRoutez || opts.GetGatewayz || opts.GetAccstatz || opts.GetLeafz || opts.GetStreamingChannelz ||
opts.GetStreamingServerz || opts.GetReplicatorVarz || opts.GetJszFilter == ""
if !metricsSpecified {
// No logger setup yet, so use fmt
Expand Down Expand Up @@ -118,6 +118,7 @@ func main() {
flag.BoolVar(&opts.GetHealthz, "healthz", false, "Get health metrics.")
flag.BoolVar(&opts.GetReplicatorVarz, "replicatorVarz", false, "Get replicator general metrics.")
flag.BoolVar(&opts.GetGatewayz, "gatewayz", false, "Get gateway metrics.")
flag.BoolVar(&opts.GetAccstatz, "accstatz", false, "Get accstatz metrics.")
flag.BoolVar(&opts.GetLeafz, "leafz", false, "Get leaf metrics.")
flag.BoolVar(&opts.GetRoutez, "routez", false, "Get route metrics.")
flag.BoolVar(&opts.GetSubz, "subz", false, "Get subscription metrics.")
Expand Down
42 changes: 42 additions & 0 deletions test/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,48 @@ func GatewayzTestResponse() string {

}

// AccstatzTestResponse is static data for tests
func AccstatzTestResponse() string {
return `{
"server_id": "SERVER_ID",
"now": "2021-05-07T18:13:47.70796395Z",
"account_statz": [
{
"acc": "$G",
"conns": 100,
"leafnodes": 0,
"total_conns": 1000,
"sent": {
"msgs": 0,
"bytes": 0
},
"received": {
"msgs": 35922,
"bytes": 4574155
},
"slow_consumers": 0
},
{
"acc": "$A",
"conns": 0,
"leafnodes": 0,
"total_conns": 0,
"sent": {
"msgs": 0,
"bytes": 0
},
"received": {
"msgs": 0,
"bytes": 0
},
"slow_consumers": 0
}
]
}
`

}

func leafzTestResponse() string {
return `{
"server_id": "NC2FJCRMPBE5RI5OSRN7TKUCWQONCKNXHKJXCJIDVSAZ6727M7MQFVT3",
Expand Down
14 changes: 14 additions & 0 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ func RunGatewayzStaticServer(wg *sync.WaitGroup) *http.Server {
return srv
}

// RunAccstatzStaticServer starts an http server with static content
func RunAccstatzStaticServer(wg *sync.WaitGroup) *http.Server {
srv := &http.Server{Addr: ":" + strconv.Itoa(StaticPort)}
http.Handle("/accstatz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, AccstatzTestResponse())
}))

go func() {
defer wg.Done()
srv.ListenAndServe()
}()
return srv
}

// RunLeafzStaticServer runs a leafz static server.
func RunLeafzStaticServer(wg *sync.WaitGroup) *http.Server {
srv := &http.Server{Addr: ":" + strconv.Itoa(StaticPort)}
Expand Down

0 comments on commit 6546411

Please sign in to comment.