Skip to content

Commit

Permalink
Add capability to restrict async durability mode for remote indexes (o…
Browse files Browse the repository at this point in the history
…pensearch-project#10189)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored and vikasvb90 committed Oct 10, 2023
1 parent 93cc81b commit 9542e59
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
-
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -323,6 +324,86 @@ public void testClusterBufferIntervalValidation() {
);
}

public void testRequestDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException {
// Explicit node settings and request durability
testRestrictSettingFalse(true, Durability.REQUEST);
}

public void testAsyncDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException {
// Explicit node settings and async durability
testRestrictSettingFalse(true, Durability.ASYNC);
}

public void testRequestDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException {
// No node settings and request durability
testRestrictSettingFalse(false, Durability.REQUEST);
}

public void testAsyncDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException {
// No node settings and async durability
testRestrictSettingFalse(false, Durability.ASYNC);
}

private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durability) throws ExecutionException, InterruptedException {
String clusterManagerName;
if (setRestrictFalse) {
clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build()
);
} else {
clusterManagerName = internalCluster().startClusterManagerOnlyNode();
}
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability)
.build();
createIndex(INDEX_NAME, indexSettings);
IndexShard indexShard = getIndexShard(dataNode);
assertEquals(durability, indexShard.indexSettings().getTranslogDurability());

durability = randomFrom(Durability.values());
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability)
)
)
.get();
assertEquals(durability, indexShard.indexSettings().getTranslogDurability());
}

public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws ExecutionException, InterruptedException {
String expectedExceptionMsg =
"index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]";
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build()
);
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);

// Case 1 - Test create index fails
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.build();
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME, indexSettings));
assertEquals(expectedExceptionMsg, exception.getMessage());

// Case 2 - Test update index fails
createIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(dataNode);
assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability());
exception = assertThrows(
IllegalArgumentException.class,
() -> client(clusterManagerName).admin()
.indices()
.updateSettings(new UpdateSettingsRequest(INDEX_NAME).settings(indexSettings))
.actionGet()
);
assertEquals(expectedExceptionMsg, exception.getMessage());
}

private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndexCreationException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidIndexNameException;
Expand Down Expand Up @@ -922,6 +923,7 @@ static Settings aggregateIndexSettings(
validateTranslogRetentionSettings(indexSettings);
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);

return indexSettings;
}
Expand Down Expand Up @@ -1491,7 +1493,7 @@ public static void validateTranslogRetentionSettings(Settings indexSettings) {
/**
* Validates {@code index.refresh_interval} is equal or below the {@code cluster.minimum.index.refresh_interval}.
*
* @param requestSettings settings passed in during index create request
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
static void validateRefreshIntervalSettings(Settings requestSettings, ClusterSettings clusterSettings) {
Expand All @@ -1510,4 +1512,27 @@ static void validateRefreshIntervalSettings(Settings requestSettings, ClusterSet
);
}
}

/**
* Validates {@code index.translog.durability} is not async if the {@code cluster.remote_store.index.restrict.async-durability} is set to true.
*
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
}
Translog.Durability durability = IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.get(requestSettings);
if (durability.equals(Translog.Durability.ASYNC)) {
throw new IllegalArgumentException(
"index setting [index.translog.durability=async] is not allowed as cluster setting ["
+ IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey()
+ "=true]"
);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import java.util.Set;

import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
import static org.opensearch.index.IndexSettings.same;

Expand Down Expand Up @@ -127,7 +129,8 @@ public void updateSettings(
.normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX)
.build();

MetadataCreateIndexService.validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings());
validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings());
validateTranslogDurabilitySettings(normalizedSettings, clusterService.getClusterSettings(), clusterService.getSettings());

Settings.Builder settingsForClosedIndices = Settings.builder();
Settings.Builder settingsForOpenIndices = Settings.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING
)
)
);
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Dynamic
);

/**
* This setting is used to restrict creation or updation of index where the `index.translog.durability` index setting
* is set as ASYNC if enabled. If disabled, any of the durability mode can be used and switched at any later time from
* one to another.
*/
public static final Setting<Boolean> CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING = Setting.boolSetting(
"cluster.remote_store.index.restrict.async-durability",
false,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidAliasNameException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.ShardLimitValidator;
Expand Down Expand Up @@ -132,8 +134,10 @@
import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
import static org.opensearch.node.Node.NODE_ATTRIBUTES;
Expand Down Expand Up @@ -1725,6 +1729,87 @@ public void testRefreshIntervalValidationFailureWithIndexSetting() {
);
}

public void testAnyTranslogDurabilityWhenRestrictSettingFalse() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Translog.Durability durability = randomFrom(Translog.Durability.values());
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability);
request.settings(requestSettings.build());
if (randomBoolean()) {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
}
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertFalse(clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING));
assertEquals(durability, INDEX_TRANSLOG_DURABILITY_SETTING.get(indexSettings));
}

public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC);
request.settings(requestSettings.build());
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
)
);
// verify that the message is as expected
assertEquals(
"index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]",
exception.getMessage()
);
}

public void testRequestDurabilityWhenRestrictSettingTrue() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST);
request.settings(requestSettings.build());
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertTrue(clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING));
assertEquals(Translog.Durability.REQUEST, INDEX_TRANSLOG_DURABILITY_SETTING.get(indexSettings));
}

private IndexTemplateMetadata addMatchingTemplate(Consumer<IndexTemplateMetadata.Builder> configurator) {
IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*");
configurator.accept(builder);
Expand Down

0 comments on commit 9542e59

Please sign in to comment.