Skip to content

Commit

Permalink
Merge pull request #1086 from AndreKurait/JmesPathPreconditions
Browse files Browse the repository at this point in the history
Implement Transformation JMES Path Predicates
  • Loading branch information
AndreKurait authored Oct 28, 2024
2 parents 773c23d + 18824d0 commit 4769ba4
Show file tree
Hide file tree
Showing 34 changed files with 528 additions and 94 deletions.
2 changes: 1 addition & 1 deletion TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
implementation project(':TrafficCapture:captureProtobufs')
implementation project(':coreUtilities')
implementation project(':awsUtilities')

implementation project(':TrafficCapture:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':TrafficCapture:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':TrafficCapture:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':TrafficCapture:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.transform.TransformationLoader;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.migrations.transform.RemovingAuthTransformerFactory;
import org.opensearch.migrations.transform.SigV4AuthTransformerFactory;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.migrations.tracing.InstrumentationTest;
import org.opensearch.migrations.tracing.TestContext;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.transform.TransformationLoader;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
import org.opensearch.migrations.transform.TransformationLoader;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.migrations.replay.ReplayUtils;
import org.opensearch.migrations.replay.RequestTransformerAndSender;
import org.opensearch.migrations.replay.TimeShifter;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.http.retries.NoRetryEvaluatorFactory;
import org.opensearch.migrations.replay.traffic.source.BufferedFlowController;
Expand All @@ -35,6 +34,7 @@
import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection;
import org.opensearch.migrations.tracing.InstrumentationTest;
import org.opensearch.migrations.tracing.TestContext;
import org.opensearch.migrations.transform.TransformationLoader;

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.HttpHeaderNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.migrations.replay.AggregatedRawResponse;
import org.opensearch.migrations.replay.TestCapturePacketToHttpHandler;
import org.opensearch.migrations.replay.TestUtils;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection;
Expand All @@ -24,6 +23,7 @@
import org.opensearch.migrations.transform.JsonCompositeTransformer;
import org.opensearch.migrations.transform.JsonKeysForHttpMessage;
import org.opensearch.migrations.transform.RemovingAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.migrations.replay.RootReplayerConstructorExtensions;
import org.opensearch.migrations.replay.TestHttpServerContext;
import org.opensearch.migrations.replay.TimeShifter;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.traffic.source.ArrayCursorTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ArrayCursorTrafficSourceContext;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
Expand All @@ -28,6 +27,7 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.migrations.replay.TestHttpServerContext;
import org.opensearch.migrations.replay.TimeShifter;
import org.opensearch.migrations.replay.TrafficReplayerTopLevel;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamAndKey;
import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKeyAndContext;
Expand All @@ -46,6 +45,7 @@
import org.opensearch.migrations.transform.IAuthTransformerFactory;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import lombok.Lombok;
import lombok.SneakyThrows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import org.opensearch.migrations.replay.RootReplayerConstructorExtensions;
import org.opensearch.migrations.replay.TimeShifter;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.traffic.source.ArrayCursorTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ArrayCursorTrafficSourceContext;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
Expand All @@ -31,6 +30,7 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.opensearch.migrations.replay.TimeShifter;
import org.opensearch.migrations.replay.TrafficReplayer;
import org.opensearch.migrations.replay.TrafficReplayerTopLevel;
import org.opensearch.migrations.replay.TransformationLoader;
import org.opensearch.migrations.replay.datatypes.ISourceTrafficChannelKey;
import org.opensearch.migrations.replay.tracing.IRootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
import org.opensearch.migrations.tracing.TestContext;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.migrations.replay;
package org.opensearch.migrations.replay.transform;

import java.time.Duration;
import java.util.AbstractMap;
Expand All @@ -10,12 +10,16 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.opensearch.migrations.replay.AggregatedRawResponse;
import org.opensearch.migrations.replay.TestCapturePacketToHttpHandler;
import org.opensearch.migrations.replay.TestUtils;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonTransformingConsumer;
import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection;
import org.opensearch.migrations.tracing.InstrumentationTest;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.transform.TransformationLoader;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.migrations.transform;

import java.util.Map;

import io.burt.jmespath.BaseRuntime;
import io.burt.jmespath.Expression;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JsonJMESPathPredicate implements IJsonPredicate {

Expression<Object> expression;

public JsonJMESPathPredicate(BaseRuntime<Object> runtime, String script) {
this.expression = runtime.compile(script);
}

@Override
public boolean test(Map<String, Object> incomingJson) {
var output = expression.search(incomingJson);
log.atDebug().setMessage("output={}").addArgument(output).log();
return (Boolean) output;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.migrations.transform;

import java.util.Map;

import io.burt.jmespath.BaseRuntime;
import io.burt.jmespath.jcf.JcfRuntime;

public class JsonJMESPathPredicateProvider implements IJsonPredicateProvider {

public static final String SCRIPT_KEY = "script";
private BaseRuntime<Object> adapterRuntime;

public JsonJMESPathPredicateProvider() {
this.adapterRuntime = new JcfRuntime();
}

@Override
public IJsonPredicate createPredicate(Object jsonConfig) {
try {
if (jsonConfig instanceof Map) {
@SuppressWarnings("unchecked")
var jsonConfigMap = (Map<String, Object>) jsonConfig;
if (jsonConfigMap.size() != 1) {
throw new IllegalArgumentException(getConfigUsageStr());
}
var scriptValue = jsonConfigMap.get(SCRIPT_KEY);
if (!(scriptValue instanceof String)) {
throw new IllegalArgumentException(getConfigUsageStr());
}
return new JsonJMESPathPredicate(adapterRuntime, (String) scriptValue);
}
throw new IllegalArgumentException(getConfigUsageStr());
} catch (ClassCastException e) {
throw new IllegalArgumentException(getConfigUsageStr(), e);
}
}

private String getConfigUsageStr() {
return this.getClass().getName()
+ " expects the incoming configuration "
+ "to be a Map<String,Object>. "
+ "Each of the Maps should have one key-value of \"script\": \"...\". "
+ "Script values should be a fully-formed inlined JsonPath queries.";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.opensearch.migrations.transform.JsonJMESPathPredicateProvider

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.migrations.replay;
package org.opensearch.migrations.transform.replay;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
Expand All @@ -12,6 +12,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;

import org.opensearch.migrations.replay.AggregatedRawResponse;
import org.opensearch.migrations.replay.TestCapturePacketToHttpHandler;
import org.opensearch.migrations.replay.Utils;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonTransformingConsumer;
import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection;
import org.opensearch.migrations.tracing.InstrumentationTest;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.migrations.replay;
package org.opensearch.migrations.transform.replay;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -8,6 +8,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.opensearch.migrations.replay.TestUtils;
import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection;
import org.opensearch.migrations.tracing.InstrumentationTest;
import org.opensearch.migrations.transform.JsonJoltTransformBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.migrations.transform;

import java.util.Map;
import java.util.function.Predicate;

public interface IJsonPredicate extends Predicate<Map<String, Object>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.migrations.transform;

import lombok.NonNull;

public interface IJsonPredicateProvider {
/**
* Create a new Predicate from the given configuration. This Predicate
* will be used repeatedly and concurrently from different threads against
* messages.
* @param jsonConfig is a List, Map, String, or null that should be used to configure the
* IJsonPredicate that is being created
* @return
*/
IJsonPredicate createPredicate(Object jsonConfig);

/**
* Friendly name that can be used as a key to identify Predicate providers.
* @return
*/
default @NonNull String getName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.migrations.transform;

import java.util.Map;

public class JsonConditionalTransformer implements IJsonTransformer {
IJsonPredicate jsonPredicate;
IJsonTransformer jsonTransformer;

public JsonConditionalTransformer(IJsonPredicate jsonPredicate, IJsonTransformer jsonTransformer) {
this.jsonPredicate = jsonPredicate;
this.jsonTransformer = jsonTransformer;
}

@Override
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
if (jsonPredicate.test(incomingJson)) {
return jsonTransformer.transformJson(incomingJson);
}
return incomingJson;
}
}
Loading

0 comments on commit 4769ba4

Please sign in to comment.