diff --git a/pkg/clusterresource/controller.go b/pkg/clusterresource/controller.go index 348d61c5e..36b62caa0 100644 --- a/pkg/clusterresource/controller.go +++ b/pkg/clusterresource/controller.go @@ -273,8 +273,8 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string // a) read template file // b) substitute templatized variables with their resolved values // 2. create the resource on the kubernetes cluster and cache successful outcomes -func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName, - templateValues, customTemplateValues templateValuesType) error { +func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, + namespace NamespaceName, templateValues, customTemplateValues templateValuesType) (ResourceSyncStats, error) { templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath() if c.lastAppliedTemplateDir != templateDir { // Invalidate all caches @@ -283,12 +283,13 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, } templateFiles, err := ioutil.ReadDir(templateDir) if err != nil { - return errors.NewFlyteAdminErrorf(codes.Internal, + return ResourceSyncStats{}, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to read config template dir [%s] for namespace [%s] with err: %v", namespace, templateDir, err) } collectedErrs := make([]error, 0) + stats := ResourceSyncStats{} for _, templateFile := range templateFiles { templateFileName := templateFile.Name() if filepath.Ext(templateFileName) != ".yaml" { @@ -309,6 +310,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, if c.templateAlreadyApplied(namespace, templateFileName, checksum) { // nothing to do. logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name()) + stats.AlreadyThere++ continue } @@ -320,6 +322,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, "into a dynamic unstructured mapping with err: %v, manifest: %v", namespace, err, k8sManifest) collectedErrs = append(collectedErrs, err) c.metrics.KubernetesResourcesCreateErrors.Inc() + stats.Errored++ continue } @@ -341,6 +344,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Warningf(ctx, "Failed to get current resource from server [%+v] in namespace [%s] with err: %v", dynamicObj.obj.GetKind(), namespace, err) collectedErrs = append(collectedErrs, err) + stats.Errored++ continue } @@ -350,6 +354,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Warningf(ctx, "Failed to marshal resource [%+v] in namespace [%s] to json with err: %v", dynamicObj.obj.GetKind(), namespace, err) collectedErrs = append(collectedErrs, err) + stats.Errored++ continue } @@ -359,12 +364,14 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Warningf(ctx, "Failed to create patch for resource [%+v] in namespace [%s] err: %v", dynamicObj.obj.GetKind(), namespace, err) collectedErrs = append(collectedErrs, err) + stats.Errored++ continue } if string(patch) == noChange { logger.Infof(ctx, "Resource [%+v] in namespace [%s] is not modified", dynamicObj.obj.GetKind(), namespace) + stats.AlreadyThere++ continue } @@ -375,9 +382,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Warningf(ctx, "Failed to patch resource [%+v] in namespace [%s] with err: %v", dynamicObj.obj.GetKind(), namespace, err) collectedErrs = append(collectedErrs, err) + stats.Errored++ continue } + stats.Updated++ logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]", dynamicObj.obj.GetKind(), namespace) c.setTemplateChecksum(namespace, templateFileName, checksum) @@ -389,9 +398,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, err := errors.NewFlyteAdminErrorf(codes.Internal, "Failed to create kubernetes object from config template [%s] for namespace [%s] with err: %v", templateFileName, namespace, err) + stats.Errored++ collectedErrs = append(collectedErrs, err) } } else { + stats.Created++ logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes", dynamicObj.obj.GetKind(), namespace) c.metrics.KubernetesResourcesCreated.Inc() @@ -400,9 +411,10 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, } } if len(collectedErrs) > 0 { - return errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs) + return stats, errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs) } - return nil + + return stats, nil } var metadataAccessor = meta.NewAccessor() @@ -573,16 +585,19 @@ func (c *controller) Sync(ctx context.Context) error { errs = append(errs, err) } + stats := ResourceSyncStats{} + for _, project := range projects.Projects { for _, domain := range project.Domains { namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Id, domain.Name) customTemplateValues, err := c.getCustomTemplateValues( ctx, project.Id, domain.Id, domainTemplateValues[domain.Id]) if err != nil { - logger.Warningf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err) + logger.Errorf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err) errs = append(errs, err) } - err = c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues) + + newStats, err := c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues) if err != nil { logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err) c.metrics.ResourceAddErrors.Inc() @@ -590,23 +605,32 @@ func (c *controller) Sync(ctx context.Context) error { } else { c.metrics.ResourcesAdded.Inc() logger.Debugf(ctx, "Successfully created kubernetes resources for [%s]", namespace) + stats.Add(newStats) } + + logger.Infof(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, newStats) } } + + logger.Infof(ctx, "Completed cluster resource creation loop with stats: [%+v]", stats) + if len(errs) > 0 { return errors.NewCollectedFlyteAdminError(codes.Internal, errs) } + return nil } func (c *controller) Run() { ctx := context.Background() - logger.Debugf(ctx, "Running ClusterResourceController") + logger.Info(ctx, "Running ClusterResourceController") interval := c.config.ClusterResourceConfiguration().GetRefreshInterval() wait.Forever(func() { err := c.Sync(ctx) if err != nil { - logger.Warningf(ctx, "Failed cluster resource creation loop with: %v", err) + logger.Errorf(ctx, "Failed cluster resource creation loop with: %v", err) + } else { + logger.Infof(ctx, "Successfully completed cluster resource creation loop") } }, interval) } diff --git a/pkg/clusterresource/sync_stats.go b/pkg/clusterresource/sync_stats.go new file mode 100644 index 000000000..38d960919 --- /dev/null +++ b/pkg/clusterresource/sync_stats.go @@ -0,0 +1,17 @@ +package clusterresource + +// ResourceSyncStats is a simple struct to track the number of resources created, updated, already there, and errored +type ResourceSyncStats struct { + Created int + Updated int + AlreadyThere int + Errored int +} + +// Add adds the values of the other ResourceSyncStats to this one +func (m *ResourceSyncStats) Add(other ResourceSyncStats) { + m.Created += other.Created + m.Updated += other.Updated + m.AlreadyThere += other.AlreadyThere + m.Errored += other.Errored +}