Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Convert-Index-To-Remote Action for issue #808 #1301

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityActionParser
Expand Down Expand Up @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() {
ShrinkActionParser(),
SnapshotActionParser(),
TransformActionParser(),
ConvertIndexToRemoteActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class ConvertIndexToRemoteAction(
val repository: String,
index: Int,
) : Action(name, index) {

companion object {
const val name = "convert_index_to_remote"
const val REPOSITORY_FIELD = "repository"

@JvmStatic
fun fromStreamInput(si: StreamInput): ConvertIndexToRemoteAction {
val repository = si.readString()
val index = si.readInt()
return ConvertIndexToRemoteAction(repository, index)
}
}

private val attemptRestoreStep = AttemptRestoreStep(this)
private val waitForRestoreStep = WaitForRestoreStep()

private val steps = listOf(attemptRestoreStep, waitForRestoreStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// If stepMetaData is null, return the first step (attemptRestoreStep)
val stepMetaData = context.metadata.stepMetaData ?: return attemptRestoreStep

// If the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptRestoreStep.name -> waitForRestoreStep
else -> attemptRestoreStep // Default to the first step
}
}

// If the current step is not completed, continue executing it
return when (stepMetaData.name) {
AttemptRestoreStep.name -> attemptRestoreStep
else -> waitForRestoreStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(REPOSITORY_FIELD, repository)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeString(repository)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class ConvertIndexToRemoteActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
return ConvertIndexToRemoteAction.fromStreamInput(sin)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var repository: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.")
}
}

return ConvertIndexToRemoteAction(
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" },
index = index,
)
}

override fun getActionType(): String {
return ConvertIndexToRemoteAction.name
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotException
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod")
override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val repository = action.repository

try {
val mutableInfo = mutableMapOf<String, String>()

// List snapshots matching the pattern
val getSnapshotsRequest = GetSnapshotsRequest()
.repository(repository)
.snapshots(arrayOf("$indexName*"))
.ignoreUnavailable(true)
.verbose(true)

val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil {
getSnapshots(getSnapshotsRequest, it)
}
val snapshots = getSnapshotsResponse.snapshots
if (snapshots.isNullOrEmpty()) {
val message = getFailedMessage(indexName, "No snapshots found matching pattern [$indexName*]")
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS }

if (successfulSnapshots.isEmpty()) {
val message = getFailedMessage(
indexName,
"No successful snapshots found matching pattern [$indexName*]",
)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

// Select the latest snapshot
val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!!
logger.info("Restoring snapshot info: $latestSnapshotInfo")

// Use the snapshot name from the selected SnapshotInfo
snapshotName = latestSnapshotInfo.snapshotId().name

// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices("*")
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotException) {
handleRestoreException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotException) {
handleRestoreException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleRestoreException(indexName: String, e: SnapshotException) {
val message = getFailedRestoreMessage(indexName)
logger.debug(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
return currentMetadata.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = false

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause"
fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException

class WaitForRestoreStep : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index

try {
val clusterState = context.clusterService.state()
val restoreInProgress = clusterState.custom<RestoreInProgress>(RestoreInProgress.TYPE)

val restoreOngoing = restoreInProgress?.let { rip ->
rip.any { entry ->
entry.indices().contains(indexName)
}
} ?: false

if (restoreOngoing) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName))
} else {
// Restore is complete
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
}
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val cause = (e as? RemoteTransportException)?.cause

val errorMessage = cause?.message ?: e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = true

companion object {
const val name = "wait_for_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to check restore status for [index=$index], cause: $cause"
fun getPendingMessage(index: String) = "Restore not complete for [index=$index], retrying..."
fun getSuccessMessage(index: String) = "Restore complete for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ActionValidation(
"read_write" -> ValidateReadWrite(settings, clusterService, jvmService).execute(indexName)
"replica_count" -> ValidateReplicaCount(settings, clusterService, jvmService).execute(indexName)
"snapshot" -> ValidateSnapshot(settings, clusterService, jvmService).execute(indexName)
"convert_index_to_remote" -> ValidateConvertIndexToRemote(settings, clusterService, jvmService).execute(indexName)
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
Expand Down
Loading