diff --git a/README.md b/README.md index e6b85acb..fa993daf 100644 --- a/README.md +++ b/README.md @@ -61,34 +61,41 @@ spec: scaleDownThresholdDurationSeconds: 1800 scaleDownCooldownSeconds: 3600 diskUsagePercentScaledownWatermark: 80 + experimental: + draining: + maxRetries: 999 + maximumWaitTimeDurationSeconds: 30 + minimumWaitTimeDurationSeconds: 10 ``` ### Custom resource properties - -| Key | Description | Type | -|----------|---------------|---------| -| spec.replicas | Initial size of the StatefulSet. If auto-scaling is disabled, this is your desired cluster size. | Int | -| spec.excludeSystemIndices | Enable or disable inclusion of system indices like '.kibana' when calculating shard-per-node ratio and scaling index replica counts. Those are usually managed by Elasticsearch internally. Default is false for backwards compatibility | Boolean | -| spec.skipDraining | Allows the ES Operator to terminate an Elasticsearch node without re-allocating its data. This is useful for persistent disk setups, like EBS volumes. Beware that the ES Operator does not verify that you have more than one copy of your indices and therefore wouldn't protect you from potential data loss. (default=false) | Boolean | -| spec.scaling.enabled | Enable or disable auto-scaling. May be necessary to enforce manual scaling. | Boolean | -| spec.scaling.minReplicas | Minimum Pod replicas. Lower bound (inclusive) when scaling down. | Int | -| spec.scaling.maxReplicas | Maximum Pod replicas. Upper bound (inclusive) when scaling up. | Int | -| spec.scaling.minIndexReplicas | Minimum index replicas. Lower bound (inclusive) when reducing index copies. (reminder: total copies is replicas+1 in Elasticsearch) | Int | -| spec.scaling.maxIndexReplicas | Maximum index replicas. Upper bound (inclusive) when increasing index copies. | Int | -| spec.scaling.minShardsPerNode | Minimum shard per node ratio. When reached, scaling up also requires adding more index replicas. | Int | -| spec.scaling.maxShardsPerNode | Maximum shard per node ratio. Boundary for scaling down. | Int | -| spec.scaling.scaleUpCPUBoundary | (Median) CPU consumption/request ratio to consistently exceed in order to trigger scale up. | Int | -| spec.scaling.scaleUpThresholdDurationSeconds | Duration in seconds required to meet the scale-up criteria before scaling. | Int | -| spec.scaling.scaleUpCooldownSeconds | Minimum duration in seconds between two scale up operations. | Int | -| spec.scaling.scaleDownCPUBoundary | (Median) CPU consumption/request ratio to consistently fall below in order to trigger scale down. | Int | -| spec.scaling.scaleDownThresholdDurationSeconds | Duration in seconds required to meet the scale-down criteria before scaling. | Int | -| spec.scaling.scaleDownCooldownSeconds | Minimum duration in seconds between two scale-down operations. | Int | -| spec.scaling.diskUsagePercentScaledownWatermark | If disk usage on one of the nodes exceeds this threshold, scaling down will be prevented. | Float | -| status.lastScaleUpStarted | Timestamp of start of last scale-up activity | Timestamp | -| status.lastScaleUpEnded | Timestamp of end of last scale-up activity | Timestamp | -| status.lastScaleDownStarted | Timestamp of start of last scale-down activity | Timestamp | -| status.lastScaleDownEnded | Timestamp of end of last scale-down activity | Timestamp | +| Key | Description | Type | +|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------| +| spec.replicas | Initial size of the StatefulSet. If auto-scaling is disabled, this is your desired cluster size. | Int | +| spec.excludeSystemIndices | Enable or disable inclusion of system indices like '.kibana' when calculating shard-per-node ratio and scaling index replica counts. Those are usually managed by Elasticsearch internally. Default is false for backwards compatibility | Boolean | +| spec.skipDraining | Allows the ES Operator to terminate an Elasticsearch node without re-allocating its data. This is useful for persistent disk setups, like EBS volumes. Beware that the ES Operator does not verify that you have more than one copy of your indices and therefore wouldn't protect you from potential data loss. (default=false) | Boolean | +| spec.scaling.enabled | Enable or disable auto-scaling. May be necessary to enforce manual scaling. | Boolean | +| spec.scaling.minReplicas | Minimum Pod replicas. Lower bound (inclusive) when scaling down. | Int | +| spec.scaling.maxReplicas | Maximum Pod replicas. Upper bound (inclusive) when scaling up. | Int | +| spec.scaling.minIndexReplicas | Minimum index replicas. Lower bound (inclusive) when reducing index copies. (reminder: total copies is replicas+1 in Elasticsearch) | Int | +| spec.scaling.maxIndexReplicas | Maximum index replicas. Upper bound (inclusive) when increasing index copies. | Int | +| spec.scaling.minShardsPerNode | Minimum shard per node ratio. When reached, scaling up also requires adding more index replicas. | Int | +| spec.scaling.maxShardsPerNode | Maximum shard per node ratio. Boundary for scaling down. | Int | +| spec.scaling.scaleUpCPUBoundary | (Median) CPU consumption/request ratio to consistently exceed in order to trigger scale up. | Int | +| spec.scaling.scaleUpThresholdDurationSeconds | Duration in seconds required to meet the scale-up criteria before scaling. | Int | +| spec.scaling.scaleUpCooldownSeconds | Minimum duration in seconds between two scale up operations. | Int | +| spec.scaling.scaleDownCPUBoundary | (Median) CPU consumption/request ratio to consistently fall below in order to trigger scale down. | Int | +| spec.scaling.scaleDownThresholdDurationSeconds | Duration in seconds required to meet the scale-down criteria before scaling. | Int | +| spec.scaling.scaleDownCooldownSeconds | Minimum duration in seconds between two scale-down operations. | Int | +| spec.scaling.diskUsagePercentScaledownWatermark | If disk usage on one of the nodes exceeds this threshold, scaling down will be prevented. | Float | +| spec.experimental.draining.maxRetries | MaxRetries specifies the maximum number of attempts to drain a node. | Int | +| spec.experimental.draining.maximumWaitTimeDurationSeconds | MaximumWaitTimeDurationSeconds specifies the maximum wait time in seconds between retry attempts after a failed node drain. | Int | +| spec.experimental.draining.minimumWaitTimeDurationSeconds | MMinimumWaitTimeDurationSeconds specifies the minimum wait time in seconds between retry attempts after a failed node drain. | Int | +| status.lastScaleUpStarted | Timestamp of start of last scale-up activity | Timestamp | +| status.lastScaleUpEnded | Timestamp of end of last scale-up activity | Timestamp | +| status.lastScaleDownStarted |  Timestamp of start of last scale-down activity | Timestamp | +| status.lastScaleDownEnded |  Timestamp of end of last scale-down activity | Timestamp | ## How it scales diff --git a/cmd/e2e/test_environment.go b/cmd/e2e/test_environment.go index 69976b00..e695f794 100644 --- a/cmd/e2e/test_environment.go +++ b/cmd/e2e/test_environment.go @@ -4,6 +4,7 @@ import ( "fmt" "net/url" "os" + "time" "github.com/sirupsen/logrus" "github.com/zalando-incubator/es-operator/operator" @@ -90,5 +91,10 @@ func setupESClient(defaultServiceEndpoint, version string) (*operator.ESClient, if err != nil { return nil, err } - return &operator.ESClient{Endpoint: endpoint}, nil + config := &operator.DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + return &operator.ESClient{Endpoint: endpoint, DrainingConfig: config}, nil } diff --git a/docs/zalando.org_elasticsearchdatasets.yaml b/docs/zalando.org_elasticsearchdatasets.yaml index 201f6fcc..e2f1745c 100644 --- a/docs/zalando.org_elasticsearchdatasets.yaml +++ b/docs/zalando.org_elasticsearchdatasets.yaml @@ -58,6 +58,44 @@ spec: description: Exclude management of System Indices on this Data Set. Defaults to false type: boolean + experimental: + description: Experimental represents configurations marked as experimental + that may change in future releases. Currently, manages the draining + behavior. + properties: + draining: + description: Draining controls behaviour of the EDS while draining + nodes + properties: + maxRetries: + default: 999 + description: MaxRetries specifies the maximum number of attempts + to drain a node. The default value is 999. + format: int32 + minimum: 0 + type: integer + maximumWaitTimeDurationSeconds: + default: 30 + description: MaximumWaitTimeDurationSeconds specifies the + maximum wait time in seconds between retry attempts after + a failed node drain. The default value is 30 seconds. + format: int64 + minimum: 0 + type: integer + minimumWaitTimeDurationSeconds: + default: 10 + description: MinimumWaitTimeDurationSeconds specifies the + minimum wait time in seconds between retry attempts after + a failed node drain. The default value is 10 seconds. + format: int64 + minimum: 0 + type: integer + required: + - maxRetries + - maximumWaitTimeDurationSeconds + - minimumWaitTimeDurationSeconds + type: object + type: object replicas: description: |- Number of desired pods. This is a pointer to distinguish between explicit diff --git a/operator/elasticsearch.go b/operator/elasticsearch.go index 9edfcaf2..956be538 100644 --- a/operator/elasticsearch.go +++ b/operator/elasticsearch.go @@ -56,6 +56,13 @@ type operatingEntry struct { logger *log.Entry } +// DrainingConfig specifies the configuration settings for the behavior of draining Elasticsearch nodes. +type DrainingConfig struct { + MaxRetries int + MinimumWaitTime time.Duration + MaximumWaitTime time.Duration +} + // NewElasticsearchOperator initializes a new ElasticsearchDataSet operator instance. func NewElasticsearchOperator( client *clientset.Clientset, @@ -241,10 +248,10 @@ func (o *ElasticsearchOperator) runAutoscaler(ctx context.Context) { for _, es := range resources { if es.ElasticsearchDataSet.Spec.Scaling != nil && es.ElasticsearchDataSet.Spec.Scaling.Enabled { endpoint := o.getElasticsearchEndpoint(es.ElasticsearchDataSet) - client := &ESClient{ Endpoint: endpoint, excludeSystemIndices: es.ElasticsearchDataSet.Spec.ExcludeSystemIndices, + DrainingConfig: o.getDrainingConfig(es.ElasticsearchDataSet), } err := o.scaleEDS(ctx, es.ElasticsearchDataSet, es, client) @@ -676,7 +683,8 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete // TODO: abstract this client := &ESClient{ - Endpoint: endpoint, + Endpoint: endpoint, + DrainingConfig: o.getDrainingConfig(eds), } operator := &Operator{ @@ -731,6 +739,23 @@ func (o *ElasticsearchOperator) getElasticsearchEndpoint(eds *zv1.ElasticsearchD } } +// DrainingConfig returns the draining specification which control how should we handle draining nodes. +func (o *ElasticsearchOperator) getDrainingConfig(eds *zv1.ElasticsearchDataSet) *DrainingConfig { + // Fallback to default configurations if draining configuration is not specified. + if eds.Spec.Experimental == nil || eds.Spec.Experimental.Draining == nil { + return &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + } + return &DrainingConfig{ + MaxRetries: int(eds.Spec.Experimental.Draining.MaxRetries), + MinimumWaitTime: time.Duration(eds.Spec.Experimental.Draining.MinimumWaitTimeDurationSeconds) * time.Second, + MaximumWaitTime: time.Duration(eds.Spec.Experimental.Draining.MaximumWaitTimeDurationSeconds) * time.Second, + } +} + type ESResource struct { ElasticsearchDataSet *zv1.ElasticsearchDataSet StatefulSet *appsv1.StatefulSet diff --git a/operator/elasticsearch_test.go b/operator/elasticsearch_test.go index 9e085db9..34718e2c 100644 --- a/operator/elasticsearch_test.go +++ b/operator/elasticsearch_test.go @@ -64,6 +64,55 @@ func TestGetElasticsearchEndpoint(t *testing.T) { assert.Equal(t, customURL, url.String()) } +func TestGetEmptyElasticSearchDrainingSpec(t *testing.T) { + faker := &clientset.Clientset{ + Interface: fake.NewSimpleClientset(), + } + esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil) + + eds := &zv1.ElasticsearchDataSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + } + + config := esOperator.getDrainingConfig(eds) + assert.NotNil(t, config) + assert.Equal(t, config.MaxRetries, 999) + assert.Equal(t, config.MinimumWaitTime, 10*time.Second) + assert.Equal(t, config.MaximumWaitTime, 30*time.Second) +} + +func TestGetNotEmptyElasticSearchDrainingSpec(t *testing.T) { + faker := &clientset.Clientset{ + Interface: fake.NewSimpleClientset(), + } + esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil) + + eds := &zv1.ElasticsearchDataSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + Spec: zv1.ElasticsearchDataSetSpec{ + Experimental: &zv1.ExperimentalSpec{ + Draining: &zv1.ElasticsearchDataSetDraining{ + MaxRetries: 7, + MinimumWaitTimeDurationSeconds: 2, + MaximumWaitTimeDurationSeconds: 34, + }, + }, + }, + } + + config := esOperator.getDrainingConfig(eds) + assert.NotNil(t, config) + assert.Equal(t, config.MaxRetries, 7) + assert.Equal(t, config.MinimumWaitTime, 2*time.Second) + assert.Equal(t, config.MaximumWaitTime, 34*time.Second) +} + func TestGetOwnerUID(t *testing.T) { objectMeta := metav1.ObjectMeta{ OwnerReferences: []metav1.OwnerReference{ diff --git a/operator/es_client.go b/operator/es_client.go index bee481ba..65fa7278 100644 --- a/operator/es_client.go +++ b/operator/es_client.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "sync" - "time" log "github.com/sirupsen/logrus" "github.com/zalando-incubator/es-operator/operator/null" @@ -18,18 +17,12 @@ import ( v1 "k8s.io/api/core/v1" ) -// TODO make configurable as flags. -var ( - defaultRetryCount = 999 - defaultRetryWaitTime = 10 * time.Second - defaultRetryMaxWaitTime = 30 * time.Second -) - // ESClient is a pod drainer which can drain data from Elasticsearch pods. type ESClient struct { Endpoint *url.URL mux sync.Mutex excludeSystemIndices bool + DrainingConfig *DrainingConfig } // ESIndex represent an index to be used in public APIs @@ -358,9 +351,9 @@ func (esSettings *ESSettings) updateRebalance(value string) { func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { podIP := pod.Status.PodIP _, err := resty.New(). - SetRetryCount(defaultRetryCount). - SetRetryWaitTime(defaultRetryWaitTime). - SetRetryMaxWaitTime(defaultRetryMaxWaitTime). + SetRetryCount(c.DrainingConfig.MaxRetries). + SetRetryWaitTime(c.DrainingConfig.MinimumWaitTime). + SetRetryMaxWaitTime(c.DrainingConfig.MaximumWaitTime). AddRetryCondition( // It is expected to return (bool, error) pair. Resty will retry // in case condition returns true or non nil error. diff --git a/operator/es_client_test.go b/operator/es_client_test.go index 9dce6e47..9793e4da 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "testing" + "time" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" @@ -29,10 +30,16 @@ func TestDrain(t *testing.T) { httpmock.NewStringResponder(200, `[{"index":"a","ip":"10.2.19.5"},{"index":"b","ip":"10.2.10.2"},{"index":"c","ip":"10.2.16.2"}]`)) esUrl, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: esUrl, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, } - err := systemUnderTest.Drain(context.TODO(), &v1.Pod{ + client := &ESClient{ + Endpoint: esUrl, + DrainingConfig: config, + } + err := client.Drain(context.TODO(), &v1.Pod{ Status: v1.PodStatus{ PodIP: "1.2.3.4", }, @@ -89,10 +96,16 @@ func TestDrainWithTransientSettings(t *testing.T) { httpmock.NewStringResponder(200, `[{"index":"a","ip":"10.2.19.5"},{"index":"b","ip":"10.2.10.2"},{"index":"c","ip":"10.2.16.2"}]`)) esUrl, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: esUrl, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: esUrl, + DrainingConfig: config, } - err := systemUnderTest.Drain(context.TODO(), &v1.Pod{ + err := client.Drain(context.TODO(), &v1.Pod{ Status: v1.PodStatus{ PodIP: "1.2.3.4", }, @@ -107,6 +120,104 @@ func TestDrainWithTransientSettings(t *testing.T) { require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cat/shards"]) } +func TestDrainRetriesUntilMax(t *testing.T) { + + // Given + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + // Mock responses for the Elasticsearch endpoints + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(http.StatusOK, `{"persistent":{"cluster":{"routing":{"rebalance":{"enable":"all"}}}}}`)) + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(http.StatusOK, `{}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/health", + httpmock.NewStringResponder(http.StatusOK, `{"status":"green"}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cat/shards", + httpmock.NewStringResponder(http.StatusInternalServerError, `{}`)) + + // Configuration for draining client + esUrl, _ := url.Parse("http://elasticsearch:9200") + config := &DrainingConfig{ + MaxRetries: 5, + MinimumWaitTime: 1 * time.Millisecond, + MaximumWaitTime: 3 * time.Millisecond, + } + client := &ESClient{ + Endpoint: esUrl, + DrainingConfig: config, + } + + // When + err := client.Drain(context.TODO(), &v1.Pod{ + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }) + + // Then + assert.NoError(t, err) + + // Verify the number of calls made to each endpoint + info := httpmock.GetCallCountInfo() + require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/health"]) + require.EqualValues(t, 2, info["PUT http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 2, info["GET http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 5, info["GET http://elasticsearch:9200/_cat/shards"]) +} + +func TestDrainRetriesUntilSuccess(t *testing.T) { + + // Given + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + // Mock responses for the Elasticsearch endpoints + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(http.StatusOK, `{"persistent":{"cluster":{"routing":{"rebalance":{"enable":"all"}}}}}`)) + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(http.StatusOK, `{}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/health", + httpmock.NewStringResponder(http.StatusOK, `{"status":"green"}`)) + numCall := 0 + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cat/shards", func(request *http.Request) (*http.Response, error) { + numCall += 1 + if numCall <= 2 { + return httpmock.NewStringResponse(http.StatusInternalServerError, `{}`), nil + } + return httpmock.NewStringResponse(http.StatusOK, `[]`), nil + }) + + // Configuration for draining client + esUrl, _ := url.Parse("http://elasticsearch:9200") + config := &DrainingConfig{ + MaxRetries: 5, + MinimumWaitTime: 1 * time.Millisecond, + MaximumWaitTime: 3 * time.Millisecond, + } + client := &ESClient{ + Endpoint: esUrl, + DrainingConfig: config, + } + + // When + err := client.Drain(context.TODO(), &v1.Pod{ + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }) + + // Then + assert.NoError(t, err) + + // Verify the number of calls made to each endpoint + info := httpmock.GetCallCountInfo() + require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/health"]) + require.EqualValues(t, 2, info["PUT http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 2, info["GET http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 3, info["GET http://elasticsearch:9200/_cat/shards"]) +} + func TestCleanup(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -119,11 +230,17 @@ func TestCleanup(t *testing.T) { httpmock.NewStringResponder(200, `{}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } - err := systemUnderTest.Cleanup(context.TODO()) + err := client.Cleanup(context.TODO()) assert.NoError(t, err) @@ -141,11 +258,17 @@ func TestGetNodes(t *testing.T) { httpmock.NewStringResponder(200, `[{"ip":"10.2.10.2","dup":"22.92"},{"ip":"10.2.16.2","dup":"11.17"},{"ip":"10.2.23.2","dup":"10.97"},{"ip":"10.2.11.3","dup":"18.95"},{"ip":"10.2.25.4","dup":"21.26"},{"ip":"10.2.4.21","dup":"33.19"},{"ip":"10.2.60.19","dup":"21.60"},{"ip":"10.2.19.5","dup":"16.55"},{"ip":"10.2.27.11","dup":"29.80"},{"ip":"10.2.24.13","dup":"31.25"},{"ip":"10.2.18.2","dup":"12.94"}]`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } - nodes, err := systemUnderTest.GetNodes() + nodes, err := client.GetNodes() assert.NoError(t, err) @@ -163,11 +286,11 @@ func TestGetShards(t *testing.T) { httpmock.NewStringResponder(200, `[{"index":"a","ip":"10.2.19.5"},{"index":"b","ip":"10.2.10.2"},{"index":"c","ip":"10.2.16.2"}]`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ + client := &ESClient{ Endpoint: url, } - shards, err := systemUnderTest.GetShards() + shards, err := client.GetShards() assert.NoError(t, err) @@ -185,11 +308,11 @@ func TestGetIndices(t *testing.T) { httpmock.NewStringResponder(200, `[{"index":"a","pri":"2","rep":"1"},{"index":"b","pri":"3","rep":"1"},{"index":"c","pri":"6","rep":"1"}]`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ + client := &ESClient{ Endpoint: url, } - indices, err := systemUnderTest.GetIndices() + indices, err := client.GetIndices() assert.NoError(t, err) @@ -210,8 +333,14 @@ func TestUpdateIndexSettings(t *testing.T) { httpmock.NewStringResponder(200, `{}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } indices := make([]ESIndex, 0, 1) @@ -220,7 +349,7 @@ func TestUpdateIndexSettings(t *testing.T) { Replicas: 1, Index: "myindex", }) - err := systemUnderTest.UpdateIndexSettings(indices) + err := client.UpdateIndexSettings(indices) assert.NoError(t, err) } @@ -235,8 +364,14 @@ func TestUpdateIndexSettingsIgnoresUnknownIndex(t *testing.T) { httpmock.NewStringResponder(404, `{}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } indices := make([]ESIndex, 0, 1) @@ -245,7 +380,7 @@ func TestUpdateIndexSettingsIgnoresUnknownIndex(t *testing.T) { Replicas: 1, Index: "myindex", }) - err := systemUnderTest.UpdateIndexSettings(indices) + err := client.UpdateIndexSettings(indices) info := httpmock.GetCallCountInfo() assert.NoError(t, err) @@ -260,11 +395,17 @@ func TestCreateIndex(t *testing.T) { httpmock.NewStringResponder(200, `{}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } - err := systemUnderTest.CreateIndex("myindex", "mygroup", 2, 2) + err := client.CreateIndex("myindex", "mygroup", 2, 2) assert.NoError(t, err) @@ -278,11 +419,17 @@ func TestDeleteIndex(t *testing.T) { httpmock.NewStringResponder(200, `{}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } - err := systemUnderTest.DeleteIndex("myindex") + err := client.DeleteIndex("myindex") assert.NoError(t, err) } @@ -295,11 +442,17 @@ func TestEnsureGreenClusterState(t *testing.T) { httpmock.NewStringResponder(200, `{"status":"yellow"}`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ - Endpoint: url, + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ + Endpoint: url, + DrainingConfig: config, } - err := systemUnderTest.ensureGreenClusterState() + err := client.ensureGreenClusterState() assert.Error(t, err) } @@ -312,11 +465,17 @@ func TestExcludeSystemIndices(t *testing.T) { httpmock.NewStringResponder(200, `[{"index":".system","pri":"1","rep":"1"},{"index":"a","pri":"1","rep":"1"}]`)) url, _ := url.Parse("http://elasticsearch:9200") - systemUnderTest := &ESClient{ + config := &DrainingConfig{ + MaxRetries: 999, + MinimumWaitTime: 10 * time.Second, + MaximumWaitTime: 30 * time.Second, + } + client := &ESClient{ Endpoint: url, excludeSystemIndices: true, + DrainingConfig: config, } - indices, err := systemUnderTest.GetIndices() + indices, err := client.GetIndices() assert.NoError(t, err) assert.Equal(t, 1, len(indices), indices) diff --git a/operator/operator.go b/operator/operator.go index 639ee946..e0e69ec1 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -58,7 +58,6 @@ type StatefulResource interface { Labels() map[string]string // LabelSelector returns a set of labels to be used for label selecting. LabelSelector() map[string]string - // Replicas returns the desired replicas of the resource. Replicas() int32 // PodTemplateSpec returns the pod template spec of the resource. This @@ -67,6 +66,7 @@ type StatefulResource interface { // VolumeClaimTemplates returns the volume claim templates of the // resource. This is added to the underlying StatefulSet. VolumeClaimTemplates() []v1.PersistentVolumeClaim + Self() runtime.Object // EnsureResources diff --git a/operator/operator_test.go b/operator/operator_test.go index 59bd8640..27d3a5eb 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -15,17 +15,16 @@ import ( ) type mockResource struct { - apiVersion string - kind string - name string - namespace string - uid types.UID - generation int64 - labels map[string]string - labelSelector map[string]string - replicas int32 - eds *zv1.ElasticsearchDataSet - + apiVersion string + kind string + name string + namespace string + uid types.UID + generation int64 + labels map[string]string + labelSelector map[string]string + replicas int32 + eds *zv1.ElasticsearchDataSet podTemplateSpec *v1.PodTemplateSpec volumeClaimTemplates []v1.PersistentVolumeClaim } diff --git a/pkg/apis/zalando.org/v1/types.go b/pkg/apis/zalando.org/v1/types.go index 383e5a59..551cd905 100644 --- a/pkg/apis/zalando.org/v1/types.go +++ b/pkg/apis/zalando.org/v1/types.go @@ -57,6 +57,42 @@ type ElasticsearchDataSetSpec struct { // Template describe the volumeClaimTemplates VolumeClaimTemplates []PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty" protobuf:"bytes,4,rep,name=volumeClaimTemplates"` + + // Experimental represents configurations marked as experimental that may change in future releases. + // Currently, manages the draining behavior. + // +optional + Experimental *ExperimentalSpec `json:"experimental,omitempty"` +} + +// ExperimentalSpec represents the configurations that might change in the future. +// IMPORTANT: These fields might change in a none backward compatible manner. +// +k8s:deepcopy-gen=true +type ExperimentalSpec struct { + // Draining controls behaviour of the EDS while draining nodes + // +optional + Draining *ElasticsearchDataSetDraining `json:"draining,omitempty"` +} + +// ElasticsearchDataSetDraining represents the configuration for draining nodes within an ElasticsearchDataSet. +// +k8s:deepcopy-gen=true +type ElasticsearchDataSetDraining struct { + + // MaxRetries specifies the maximum number of attempts to drain a node. The default value is 999. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=999 + MaxRetries int32 `json:"maxRetries"` + + // MinimumWaitTimeDurationSeconds specifies the minimum wait time in seconds between retry attempts after a failed node drain. + // The default value is 10 seconds. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=10 + MinimumWaitTimeDurationSeconds int64 `json:"minimumWaitTimeDurationSeconds"` + + // MaximumWaitTimeDurationSeconds specifies the maximum wait time in seconds between retry attempts after a failed node drain. + // The default value is 30 seconds. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=30 + MaximumWaitTimeDurationSeconds int64 `json:"maximumWaitTimeDurationSeconds"` } // PersistentVolumeClaim is a user's request for and claim to a persistent volume diff --git a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go index d9a4f221..9205c860 100644 --- a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go @@ -53,6 +53,22 @@ func (in *ElasticsearchDataSet) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ElasticsearchDataSetDraining) DeepCopyInto(out *ElasticsearchDataSetDraining) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticsearchDataSetDraining. +func (in *ElasticsearchDataSetDraining) DeepCopy() *ElasticsearchDataSetDraining { + if in == nil { + return nil + } + out := new(ElasticsearchDataSetDraining) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ElasticsearchDataSetList) DeepCopyInto(out *ElasticsearchDataSetList) { *out = *in @@ -123,6 +139,11 @@ func (in *ElasticsearchDataSetSpec) DeepCopyInto(out *ElasticsearchDataSetSpec) (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Experimental != nil { + in, out := &in.Experimental, &out.Experimental + *out = new(ExperimentalSpec) + (*in).DeepCopyInto(*out) + } return } @@ -316,6 +337,27 @@ func (in *EmbeddedObjectMetaWithName) DeepCopy() *EmbeddedObjectMetaWithName { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExperimentalSpec) DeepCopyInto(out *ExperimentalSpec) { + *out = *in + if in.Draining != nil { + in, out := &in.Draining, &out.Draining + *out = new(ElasticsearchDataSetDraining) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExperimentalSpec. +func (in *ExperimentalSpec) DeepCopy() *ExperimentalSpec { + if in == nil { + return nil + } + out := new(ExperimentalSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PersistentVolumeClaim) DeepCopyInto(out *PersistentVolumeClaim) { *out = *in