Skip to content

Commit

Permalink
Fix observability (#285)
Browse files Browse the repository at this point in the history
* Fix observability

* fix split branches

* fix linter

* fix split branches

* fix linter

* Update pkg/telemetry/events/events.go

Co-authored-by: Edouard Schweisguth <[email protected]>

* rework event api

* PR comment

* unifying events API

---------

Co-authored-by: Edouard Schweisguth <[email protected]>
  • Loading branch information
jt-dd and edznux-dd authored Nov 14, 2024
1 parent 80d8a55 commit 2d1e729
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 129 deletions.
10 changes: 5 additions & 5 deletions datadog/span-metrics.tf
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ resource "datadog_spans_metric" "kubehound_ingest_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.ingestData"
query = "service:kubehound-ingestor operation_name:kubehound.ingestData"
}

dynamic "group_by" {
Expand Down Expand Up @@ -63,7 +63,7 @@ resource "datadog_spans_metric" "kubehound_graph_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.buildGraph"
query = "service:kubehound-ingestor operation_name:kubehound.buildGraph"
}

dynamic "group_by" {
Expand Down Expand Up @@ -103,7 +103,7 @@ resource "datadog_spans_metric" "kubehound_collector_stream_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.collector.stream"
query = "service:kubehound-collector operation_name:kubehound.collector.stream"
}

dynamic "group_by" {
Expand Down Expand Up @@ -144,7 +144,7 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.graph.builder.edge"
query = "service:kubehound-ingestor operation_name:kubehound.graph.builder.edge"
}

dynamic "group_by" {
Expand All @@ -154,4 +154,4 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" {
path = group_by.value
}
}
}
}
14 changes: 7 additions & 7 deletions pkg/collector/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *FileCollector) streamPodsNamespace(ctx context.Context, fp string, inge
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1)
i := item
err = ingestor.IngestPod(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (c *FileCollector) streamRolesNamespace(ctx context.Context, fp string, ing
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.role, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1)
i := item
err = ingestor.IngestRole(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *FileCollector) streamRoleBindingsNamespace(ctx context.Context, fp stri
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1)
i := item
err = ingestor.IngestRoleBinding(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (c *FileCollector) streamEndpointsNamespace(ctx context.Context, fp string,
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1)
i := item
err = ingestor.IngestEndpoint(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -339,7 +339,7 @@ func (c *FileCollector) StreamNodes(ctx context.Context, ingestor NodeIngestor)
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.node, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1)
i := item
err = ingestor.IngestNode(ctx, &i)
if err != nil {
Expand All @@ -366,7 +366,7 @@ func (c *FileCollector) StreamClusterRoles(ctx context.Context, ingestor Cluster
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1)
i := item
err = ingestor.IngestClusterRole(ctx, &i)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (c *FileCollector) StreamClusterRoleBindings(ctx context.Context, ingestor
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1)
i := item
err = ingestor.IngestClusterRoleBinding(ctx, &i)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions pkg/collector/k8s_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *k8sAPICollector) wait(ctx context.Context, resourceType string, tags []
}

// entity := tag.Entity(resourceType)
err := statsd.Gauge(metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1)
err := statsd.Gauge(ctx, metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1)
if err != nil {
l.Error("could not send gauge", log.ErrorField(err))
}
Expand Down Expand Up @@ -213,19 +213,19 @@ func (c *k8sAPICollector) computeMetrics(ctx context.Context) (Metrics, error) {
}

runDuration := time.Since(c.startTime)
err := statsd.Gauge(metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1)
err := statsd.Gauge(ctx, metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
}
err = statsd.Gauge(metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1)
err = statsd.Gauge(ctx, metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
}

runThrottlingPercentage := 1 - (float64(runDuration-runTotalWaitTime) / float64(runDuration))
err = statsd.Gauge(metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1)
err = statsd.Gauge(ctx, metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
Expand Down Expand Up @@ -288,7 +288,7 @@ func (c *k8sAPICollector) streamPodsNamespace(ctx context.Context, namespace str
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1)
c.wait(ctx, entity, c.tags.pod)
item, ok := obj.(*corev1.Pod)
if !ok {
Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *k8sAPICollector) streamRolesNamespace(ctx context.Context, namespace st
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.role, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1)
c.wait(ctx, entity, c.tags.role)
item, ok := obj.(*rbacv1.Role)
if !ok {
Expand Down Expand Up @@ -398,7 +398,7 @@ func (c *k8sAPICollector) streamRoleBindingsNamespace(ctx context.Context, names
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1)
c.wait(ctx, entity, c.tags.rolebinding)
item, ok := obj.(*rbacv1.RoleBinding)
if !ok {
Expand Down Expand Up @@ -453,7 +453,7 @@ func (c *k8sAPICollector) streamEndpointsNamespace(ctx context.Context, namespac
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1)
c.wait(ctx, entity, c.tags.endpoint)
item, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
Expand Down Expand Up @@ -507,7 +507,7 @@ func (c *k8sAPICollector) StreamNodes(ctx context.Context, ingestor NodeIngestor
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.node, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1)
c.wait(ctx, entity, c.tags.node)
item, ok := obj.(*corev1.Node)
if !ok {
Expand Down Expand Up @@ -550,7 +550,7 @@ func (c *k8sAPICollector) StreamClusterRoles(ctx context.Context, ingestor Clust
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1)
c.wait(ctx, entity, c.tags.clusterrole)
item, ok := obj.(*rbacv1.ClusterRole)
if !ok {
Expand Down Expand Up @@ -593,7 +593,7 @@ func (c *k8sAPICollector) StreamClusterRoleBindings(ctx context.Context, ingesto
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1)
c.wait(ctx, entity, c.tags.clusterrolebinding)
item, ok := obj.(*rbacv1.ClusterRoleBinding)
if !ok {
Expand Down
33 changes: 12 additions & 21 deletions pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@ import (
"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/dump"
"github.com/DataDog/KubeHound/pkg/ingestor"
grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb"
"github.com/DataDog/KubeHound/pkg/ingestor/notifier"
"github.com/DataDog/KubeHound/pkg/ingestor/puller"
"github.com/DataDog/KubeHound/pkg/kubehound/graph"
"github.com/DataDog/KubeHound/pkg/kubehound/graph/adapter"
"github.com/DataDog/KubeHound/pkg/kubehound/providers"
"github.com/DataDog/KubeHound/pkg/kubehound/store/collections"
"github.com/DataDog/KubeHound/pkg/telemetry/events"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

Expand Down Expand Up @@ -153,27 +151,24 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
runCtx := context.Background()
runCtx = context.WithValue(runCtx, log.ContextFieldCluster, clusterName)
runCtx = context.WithValue(runCtx, log.ContextFieldRunID, runID)
l = log.Logger(runCtx) //nolint: contextcheck
spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob)
defer func() { spanJob.Finish(tracer.WithError(err)) }()

events.PushEvent(
fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID),
fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID),
[]string{
tag.IngestionRunID(runID),
},
)

l = log.Logger(runCtx) //nolint: contextcheck
alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck
if err != nil {
return err
}

if alreadyIngested {
_ = events.PushEvent(runCtx, events.IngestSkip, "") //nolint: contextcheck

return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID)
}

spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob)
spanJob.SetTag(ext.ManualKeep, true)
defer func() { spanJob.Finish(tracer.WithError(err)) }()

_ = events.PushEvent(runCtx, events.IngestStarted, "") //nolint: contextcheck

// We need to flush the cache to prevent warnings/errors when overwriting elements in cache from the previous ingestion
// This avoid conflicts from previous ingestion (there is no need to reuse the cache from a previous ingestion)
l.Info("Preparing cache provider")
Expand Down Expand Up @@ -209,15 +204,11 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
}
}

err = ingestor.IngestData(runCtx, runCfg, collect, g.providers.CacheProvider, g.providers.StoreProvider, g.providers.GraphProvider) //nolint: contextcheck
if err != nil {
return fmt.Errorf("raw data ingest: %w", err)
}

err = graph.BuildGraph(runCtx, runCfg, g.providers.StoreProvider, g.providers.GraphProvider, g.providers.CacheProvider) //nolint: contextcheck
err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck
if err != nil {
return err
}

err = g.notifier.Notify(runCtx, clusterName, runID) //nolint: contextcheck
if err != nil {
return fmt.Errorf("notifying: %w", err)
Expand Down
48 changes: 33 additions & 15 deletions pkg/ingestor/puller/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/memblob"
"gocloud.dev/blob/s3blob"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

Expand Down Expand Up @@ -51,6 +53,9 @@ func NewBlobStorage(cfg *config.KubehoundConfig, blobConfig *config.BlobConfig)
}

func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) {
l := log.Logger(ctx)
l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))

urlStruct, err := url.Parse(bs.bucketName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -132,18 +137,23 @@ func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive boo
// Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive
func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error {
l := log.Logger(outer)
l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID))
spanPut, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull)
var err error
defer func() { spanPut.Finish(tracer.WithError(err)) }()

// Triggering a span only when it is an actual run and not the rehydration process (download the kubehound dump to get the metadata)
if log.GetRunIDFromContext(outer) != "" {
var spanPut ddtrace.Span
spanPut, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull)
defer func() { spanPut.Finish(tracer.WithError(err)) }()
}
l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID))

dumpResult, err := dump.NewDumpResult(clusterName, runID, true)
if err != nil {
return err
}
key := dumpResult.GetFullPath()
l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))
b, err := bs.openBucket(ctx)
b, err := bs.openBucket(outer)
if err != nil {
return err
}
Expand All @@ -157,7 +167,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName

l.Info("Uploading archive from blob store", log.String("key", key))
w := bufio.NewReader(f)
err = b.Upload(ctx, key, w, &blob.WriterOptions{
err = b.Upload(outer, key, w, &blob.WriterOptions{
ContentType: "application/gzip",
})
if err != nil {
Expand All @@ -175,13 +185,15 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName
// Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive
func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {
l := log.Logger(outer)
l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key))
spanPull, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull)
var err error
defer func() { spanPull.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(outer) != "" {
var spanPull ddtrace.Span
spanPull, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull)
defer func() { spanPull.Finish(tracer.WithError(err)) }()
}
l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key))

l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))
b, err := bs.openBucket(ctx)
b, err := bs.openBucket(outer)
if err != nil {
return "", err
}
Expand All @@ -207,7 +219,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {

l.Info("Downloading archive from blob store", log.String("key", key))
w := bufio.NewWriter(f)
err = b.Download(ctx, key, w, nil)
err = b.Download(outer, key, w, nil)
if err != nil {
return archivePath, err
}
Expand All @@ -221,9 +233,12 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {
}

func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error {
spanExtract, _ := span.SpanRunFromContext(ctx, span.IngestorBlobExtract)
var err error
defer func() { spanExtract.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(ctx) != "" {
var spanPull ddtrace.Span
spanPull, ctx = span.SpanRunFromContext(ctx, span.IngestorBlobExtract)
defer func() { spanPull.Finish(tracer.WithError(err)) }()
}

basePath := filepath.Dir(archivePath)
err = puller.CheckSanePath(archivePath, basePath)
Expand All @@ -243,9 +258,12 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error {
// Once downloaded and processed, we should cleanup the disk so we can reduce the disk usage
// required for large infrastructure
func (bs *BlobStore) Close(ctx context.Context, archivePath string) error {
spanClose, _ := span.SpanRunFromContext(ctx, span.IngestorBlobClose)
var err error
defer func() { spanClose.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(ctx) != "" {
var spanClose ddtrace.Span
spanClose, _ = span.SpanRunFromContext(ctx, span.IngestorBlobClose)
defer func() { spanClose.Finish(tracer.WithError(err)) }()
}

path := filepath.Dir(archivePath)
err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir)
Expand Down
Loading

0 comments on commit 2d1e729

Please sign in to comment.