-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding provision to invoke stop replication from other plugins #1391
base: main
Are you sure you want to change the base?
Changes from all commits
7ad58f9
6091081
afc7963
a807977
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package org.opensearch.replication.action.stop | ||
|
||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.GlobalScope | ||
import kotlinx.coroutines.launch | ||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.action.support.HandledTransportAction | ||
import org.opensearch.action.ActionRequest | ||
import org.opensearch.action.support.ActionFilters | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.client.Client | ||
import org.opensearch.common.inject.Inject | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_NAME | ||
import org.opensearch.commons.utils.recreateObject | ||
import org.opensearch.core.action.ActionListener | ||
import org.opensearch.replication.metadata.ReplicationMetadataManager | ||
import org.opensearch.replication.util.coroutineContext | ||
import org.opensearch.replication.util.stackTraceToString | ||
import org.opensearch.replication.util.suspendExecute | ||
import org.opensearch.tasks.Task | ||
import org.opensearch.threadpool.ThreadPool | ||
import org.opensearch.transport.TransportService | ||
|
||
|
||
/** | ||
* This action transforms the request from ActionRequest type to StopIndexReplicationRequest | ||
* and performs the TransportStopIndexReplicationAction on it. | ||
* While TransportStopIndexReplicationAction is used directly by the _stop replication REST API, | ||
* this action is used for inter-plugin communication by ism plugin to unfollow i.e. stop replication. | ||
*/ | ||
class TransportUnfollowIndexReplicationAction @Inject constructor ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's rename is to something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure thanks. Suggestions were given in the ISM PR to name this class as InternalTransportStopIndexReplicationAction and the action name as "indices:internal/plugins/replication/index/stop", so the name is differentiated to be something internally invoked from plugins. We hadn't added the name ism, so it remains generic to be used by something else too in future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val name: String, | ||
val transportService: TransportService, | ||
val clusterService: ClusterService, | ||
val threadPool: ThreadPool, | ||
val client: Client, | ||
val actionFilters: ActionFilters, | ||
val indexNameExpressionResolver: IndexNameExpressionResolver, | ||
val replicationMetadataManager: ReplicationMetadataManager, | ||
): HandledTransportAction<ActionRequest, AcknowledgedResponse> (UNFOLLOW_REPLICATION_ACTION_NAME, transportService, actionFilters, ::StopIndexReplicationRequest), | ||
CoroutineScope by GlobalScope { | ||
companion object { | ||
private val log = LogManager.getLogger(TransportUnfollowIndexReplicationAction::class.java) | ||
} | ||
|
||
@Throws(Exception::class) | ||
override fun doExecute(task: Task?, request: ActionRequest, listener: ActionListener<AcknowledgedResponse>?) { | ||
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { | ||
val transformedRequest = request as? StopIndexReplicationRequest | ||
?: request.let { recreateObject(it) { StopIndexReplicationRequest(it) } } | ||
try { | ||
|
||
var response = client.suspendExecute(STOP_REPLICATION_ACTION_TYPE, transformedRequest, true) | ||
log.info("Stop replication successful for index[${transformedRequest.indexName}] with response: " + response.isAcknowledged) | ||
listener?.onResponse(AcknowledgedResponse(true)) | ||
} catch (e: Exception) { | ||
log.error("Stop replication failed for index[${transformedRequest.indexName}] with error ${e.stackTraceToString()}") | ||
listener?.onFailure(e) | ||
throw e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably no need to rethrow the exception here, as the exception is sent back to the caller to handle? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are right. I had corrected that change in my local, but missed pushing it. My bad! |
||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
/* | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment here explaining that the stop replication action class in moved to common utils and link this PR here?