Skip to content

Commit

Permalink
Add doc removal transform
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Oct 30, 2024
1 parent ff52d88 commit 94b5d94
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 49 deletions.
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(":RFS")
implementation project(":transformation")
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class RfsMigrateDocuments {
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

private static final String DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG = "[" +
" {" +
" \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"" +
" }" +
"]";

public static class DurationConverter implements IStringConverter<Duration> {
@Override
public Duration convert(String value) {
Expand Down Expand Up @@ -254,12 +260,15 @@ public static void main(String[] args) throws Exception {


String docTransformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.docTransformationParams);
IJsonTransformer docTransformer = null;
if (docTransformerConfig != null) {
log.atInfo().setMessage("Doc Transformations config string: {}")
.addArgument(docTransformerConfig).log();
docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);
} else {
log.atInfo().setMessage("Using default transformation config: {}")
.addArgument(DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG).log();
docTransformerConfig = DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG;
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
Expand Down
1 change: 1 addition & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'

testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class BulkDocSection {
private final ObjectNode indexCommand;
private final ObjectNode source;

private StringBuilder bulkDocSectionStringCache = null;

public BulkDocSection(String id, String indexName, String type, String docBody) {
this.docId = id;
this.indexCommand = createIndexCommand(id, indexName, type);
Expand All @@ -47,7 +49,9 @@ private static ObjectNode createIndexCommand(final String docId, final String in
ObjectNode indexNode = OBJECT_MAPPER.createObjectNode();
ObjectNode metadataNode = OBJECT_MAPPER.createObjectNode();
metadataNode.put(FIELD_INDEX, indexName);
metadataNode.put(FIELD_TYPE, type);
if(type != null) {
metadataNode.put(FIELD_TYPE, type);
}
metadataNode.put(FIELD_ID, docId);
indexNode.set(COMMAND_INDEX, metadataNode);
return indexNode;
Expand All @@ -66,12 +70,17 @@ public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSec
return builder.toString();
}

public StringBuilder asStringBuilder() {
private StringBuilder asStringBuilder() {
if (this.bulkDocSectionStringCache != null) {
return this.bulkDocSectionStringCache;
}

StringBuilder builder = new StringBuilder();
try {
String indexCommand = asBulkIndex();
String sourceJson = asBulkSource();
String indexCommand = asBulkIndexString();
String sourceJson = asBulkSourceString();
builder.append(indexCommand).append(NEWLINE).append(sourceJson);
bulkDocSectionStringCache = builder;
return builder;
} catch (JsonProcessingException e) {
throw new RuntimeException(SERIALIZATION_ERROR_MESSAGE, e);
Expand All @@ -82,18 +91,18 @@ public String asString() {
return asStringBuilder().toString();
}

private String asString(ObjectNode node) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(node);
}

private String asBulkIndex() throws JsonProcessingException {
private String asBulkIndexString() throws JsonProcessingException {
return asString(this.indexCommand);
}

private String asBulkSource() throws JsonProcessingException {
private String asBulkSourceString() throws JsonProcessingException {
return asString(this.source);
}

private String asString(ObjectNode node) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(node);
}

@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
var indexMap = OBJECT_MAPPER.convertValue(this.indexCommand, HashMap.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void reindex_shouldTrimAndRemoveNewlineFromSource() {

var capturedBulkRequests = bulkRequestCaptor.getValue();
assertEquals(1, capturedBulkRequests.size(), "Should contain 1 document");
assertEquals("{\"index\":{\"_id\":\"MQAA\", \"_type\":\"_doc\", \"_index\":\"test-index\"}}\n{\"field\":\"value\"}", capturedBulkRequests.get(0).asString()); }
assertEquals("{\"index\":{\"_id\":\"MQAA\",\"_index\":\"test-index\"}}\n{\"field\":\"value\"}", capturedBulkRequests.get(0).asString()); }

private RfsLuceneDocument createTestDocument(String id) {
return new RfsLuceneDocument(id, null, "{\"field\":\"value\"}");
Expand Down Expand Up @@ -215,27 +215,7 @@ void reindex_shouldTransformDocuments() {
// Define the transformation configuration
final String CONFIG = "[" +
" {" +
" \"JsonConditionalTransformerProvider\": [" +
" {" +
" \"JsonJMESPathPredicateProvider\": {" +
" \"script\": \"index._type != '_doc'\"" +
" }" +
" }," +
" [" +
" {" +
" \"JsonJoltTransformerProvider\": {" +
" \"script\": {" +
" \"operation\": \"modify-overwrite-beta\"," +
" \"spec\": {" +
" \"index\": {" +
" \"\\\\_type\": \"_doc\"" +
" }" +
" }" +
" }" +
" }" +
" }" +
" ]" +
" ]" +
" \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"" +
" }" +
"]";

Expand All @@ -248,7 +228,7 @@ void reindex_shouldTransformDocuments() {
// Create a stream of documents, some requiring transformation and some not
Flux<RfsLuceneDocument> documentStream = Flux.just(
createTestDocumentWithType("1", "_type1"),
createTestDocumentWithType("2", "_doc"),
createTestDocumentWithType("2", null),
createTestDocumentWithType("3", "_type3")
);

Expand Down Expand Up @@ -278,12 +258,12 @@ void reindex_shouldTransformDocuments() {
BulkDocSection transformedDoc2 = capturedBulkRequests.get(1);
BulkDocSection transformedDoc3 = capturedBulkRequests.get(2);

assertEquals("{\"index\":{\"_index\":\"test-index\",\"_type\":\"_doc\",\"_id\":\"1\"}}\n{\"field\":\"value\"}", transformedDoc1.asString(),
"Document 1 should have _type transformed to '_doc'");
assertEquals("{\"index\":{\"_index\":\"test-index\",\"_type\":\"_doc\",\"_id\":\"2\"}}\n{\"field\":\"value\"}", transformedDoc2.asString(),
"Document 2 should remain unchanged as _type is already '_doc'");
assertEquals("{\"index\":{\"_index\":\"test-index\",\"_type\":\"_doc\",\"_id\":\"3\"}}\n{\"field\":\"value\"}", transformedDoc3.asString(),
"Document 3 should have _type transformed to '_doc'");
assertEquals("{\"index\":{\"_index\":\"test-index\",\"_id\":\"1\"}}\n{\"field\":\"value\"}", transformedDoc1.asString(),
"Document 1 should have _type removed");
assertEquals("{\"index\":{\"_index\":\"test-index\",\"_id\":\"2\"}}\n{\"field\":\"value\"}", transformedDoc2.asString(),
"Document 2 should remain unchanged as _type is not defined");
assertEquals("{\"index\":{\"_index\":\"test-index\",\"_id\":\"3\"}}\n{\"field\":\"value\"}", transformedDoc3.asString(),
"Document 3 should have _type removed");
}

/**
Expand Down
14 changes: 7 additions & 7 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ target URI.
## Transformations

Transformations are performed via a simple interface defined by
[IJsonTransformer](../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/IJsonTransformer.java) ('transformer'). They are loaded dynamically and are designed to allow for easy extension
[IJsonTransformer](../../transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/IJsonTransformer.java) ('transformer'). They are loaded dynamically and are designed to allow for easy extension
of the TrafficReplayer to support a diverse set of needs.

The input to the transformer is an HTTP message represented as a json-like `Map<String,Object>` with
top-level key-value pairs defined in
[JsonKeysForHttpMessage.java](../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java).
[JsonKeysForHttpMessage.java](../../transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java).
Bodies that are json-formatted will be accessible via the path `payload.inlinedJsonBody` and they will be accessible
as a fully-parsed Map. Newline-delimited json (ndjson) sequences will be accessible via
`payload.inlinedJsonSequenceBodies` as a List of json Maps. These two payload entries are mutually exclusive.
Expand All @@ -98,7 +98,7 @@ Transformers may be used simultaneously from concurrent threads over the lifetim
a message will only be processed by one transformer at a time.

Transformer implementations are loaded via [Java's ServiceLoader](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html)
by loading a jarfile that implements the [IJsonTransformerProvider](../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/IJsonTransformerProvider.java).
by loading a jarfile that implements the [IJsonTransformerProvider](../../transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/IJsonTransformerProvider.java).
That jarfile will be loaded by specifying the provider jarfile (and any of its dependencies) in the classpath.
For the ServiceLoader to load the IJsonTransformerProvider, the provided jarfile needs
to supply a _provider-configuration_ file (`META-INF/services/org.opensearch.migrations.transform.IJsonTransformerProvider`)
Expand All @@ -117,17 +117,17 @@ The name is defined by the `IJsonTransformerProvider::getName()`, which unless o
IJsonTransformer object.

The jsonMessageTransformerInterface package includes [JsonCompositeTransformer.java]
(../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonCompositeTransformer.java),
(../../transformation/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonCompositeTransformer.java),
which runs configured transformers in serial.
That composite transformer is also utilized by the TrafficReplayer to combine the
list of loaded transformations with a transformer to rewrite the 'Host' header. That host transformation changes the
host header of every HTTP message to use the target domain-name rather than the source's. That will be run after
all loaded/specified transformations.

Currently, there are multiple, nascent implementations included in the repository. The
[JsonJMESPathTransformerProvider](../transformationPlugins/jsonMessageTransformers/jsonJMESPathMessageTransformerProvider)
[JsonJMESPathTransformerProvider](../../transformation/transformationPlugins/jsonMessageTransformers/jsonJMESPathMessageTransformerProvider)
package uses JMESPath expressions to transform requests and the
[jsonJoltMessageTransformerProvider](../transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider)
[jsonJoltMessageTransformerProvider](../../transformation/transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider)
package uses [JOLT](https://github.com/bazaarvoice/jolt) to perform transforms. The JMESPathTransformer takes an inlined script as shown below.
The Jolt transformer can be configured to apply a full script or to use a "canned" transform whose script is
already included with the library.
Expand All @@ -153,7 +153,7 @@ also pass the script as an argument via `--transformer-config-base64`. Each of
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
is a work-in-progress to excise type mapping references from URIs and message payloads since versions of OpenSource
greater than 2.3 do not support them.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.migrations.transform;

import java.util.Map;

/**
* This is a JsonTransformer for doc transforms to remove type. Used for ES7+ and OS
*/
public class JsonTransformerForDocumentTypeRemovalProvider implements IJsonTransformerProvider {
@Override
public IJsonTransformer createTransformer(Object jsonConfig) {
return new Transformer();
}

private static class Transformer implements IJsonTransformer {
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
((Map<String, Object>) incomingJson.get("index")).remove("_type");
return incomingJson;
}
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.opensearch.migrations.transform.JsonTransformerForOpenSearch23PlusTargetTransformerProvider
org.opensearch.migrations.transform.JsonTransformerForOpenSearch23PlusTargetTransformerProvider
org.opensearch.migrations.transform.JsonTransformerForDocumentTypeRemovalProvider

0 comments on commit 94b5d94

Please sign in to comment.