Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparing for 2.17.1 release. #31

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ build
build-idea/
out/

volumes/
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM opensearchstaging/opensearch:2.16.0
FROM opensearchstaging/opensearch:2.17.1

COPY ./build/distributions/opensearch-ubi-2.16.0.0.zip /tmp/
COPY ./build/distributions/opensearch-ubi-2.17.1.0.zip /tmp/

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-2.16.0.0.zip
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch telemetry-otel
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-2.17.1.0.zip
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ For details on the JSON Schema used by UBI to send and receive queries and event
* [Query Response Schema](https://o19s.github.io/ubi/docs/html/query.response.schema.html)
* [Event Schema](https://o19s.github.io/ubi/docs/html/event.schema.html)

## UBI, Data Prepper, and Open Telemetry

The UBI plugin can store UBI query data in one of three ways:

- By directly indexing the UBI query data in the `ubi_queries` index in the same OpenSearch cluster as the plugin.
- By sending the UBI query data as JSON to a Data Prepper [http](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http/) source. The Data Prepper endpoint is provided via the `ubi.dataprepper.url` setting.
- By sending the UBI query data as Open Telemetry traces. This utilizes the native OpenSearch OTel capabilities which are exposed via the `TelemetryAwarePlugin` interface. As UBI queries are received, trace events will be generated. OpenSearch must be configured as described in [Distributed tracing](https://opensearch.org/docs/latest/observing-your-data/trace/distributed-tracing/) for the events to be sent.

## Getting Help

* Start with the [Documentation](https://opensearch.org/docs/latest/search-plugins/ubi/index/) site to how to use this plugin.
* For questions or help getting started, please find us in the [OpenSearch Slack](https://opensearch.org/slack.html) in the `#plugins` channel.
* For bugs or feature requests, please create [a new issue](https://github.com/o19s/opensearch-ubi/issues/new/choose).

Expand All @@ -37,7 +46,7 @@ To get started, download the plugin zip file from the [releases](https://github.
bin/opensearch-plugin install file:/opensearch-ubi-1.0.0-os2.14.0.zip
```

You will be prompted while installing the plugin beacuse the plugin defines additional security permissions. These permissions allow the plugin to serialize query requests to JSON for storing and to allow the plugin to send query requests to Data Prepper. You can skip the prompt by adding the `--batch` argument to the above command.
You will be prompted while installing the plugin because the plugin defines additional security permissions. These permissions allow the plugin to serialize query requests to JSON for storing and to allow the plugin to send query requests to Data Prepper. You can skip the prompt by adding the `--batch` argument to the above command.

To create the UBI indexes called `ubi_queries` and `ubi_events`, send a query to an OpenSearch index with the `ubi` query block added:

Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ thirdPartyAudit.enabled = false

dependencies {
runtimeOnly "org.apache.logging.log4j:log4j-core:${versions.log4j}"
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "commons-logging:commons-logging:${versions.commonslogging}"

yamlRestTestImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

Expand Down
53 changes: 24 additions & 29 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
services:

dataprepper-dev-os:
depends_on:
- ubi-dev-os
container_name: dataprepper
image: opensearchproject/data-prepper:2.8.0
ports:
- 4900:4900
- 2021:2021
volumes:
- ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
networks:
- ubi-dev-os-net
# Uncomment to use OTel or Data Prepper -> OpenSearch pipelines.
# dataprepper-dev-os:
# depends_on:
# - ubi-dev-os
# container_name: dataprepper
# image: opensearchproject/data-prepper:2.8.0
# ports:
# - 4900:4900
# - 2021:2021
# volumes:
# - ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
# - ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
# networks:
# - ubi-dev-os-net

ubi-dev-os:
build: ./
Expand All @@ -23,7 +24,16 @@ services:
plugins.security.disabled: "true"
logger.level: info
OPENSEARCH_INITIAL_ADMIN_PASSWORD: SuperSecretPassword_123
#ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# Requires the Data Prepper container:
# ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# Requires the OTel plugin to be installed.
#telemetry.feature.tracer.enabled: true
#telemetry.tracer.enabled: true
#telemetry.tracer.sampler.probability: 1.0
#opensearch.experimental.feature.telemetry.enabled: true
#telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.logging.LoggingSpanExporter
#telemetry.otel.tracer.exporter.batch_size: 1
#telemetry.otel.tracer.exporter.max_queue_size: 3
ulimits:
memlock:
soft: -1
Expand All @@ -40,21 +50,6 @@ services:
networks:
- ubi-dev-os-net

# ubi-dev-os-dashboards:
# image: opensearchproject/opensearch-dashboards:2.12.0
# container_name: ubi-dev-os-dashboards
# ports:
# - 5601:5601
# expose:
# - 5601
# environment:
# OPENSEARCH_HOSTS: '["http://ubi-dev-os:9200"]'
# DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
# depends_on:
# - ubi-dev-os
# networks:
# - ubi-dev-os-net

networks:
ubi-dev-os-net:
driver: bridge
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
opensearchVersion = 2.16.0-SNAPSHOT
ubiVersion = 2.16.0.0
opensearchVersion = 2.17.1
ubiVersion = 2.17.1.0
1 change: 0 additions & 1 deletion licenses/jackson-annotations-2.17.1.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions licenses/jackson-annotations-2.17.2.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
147b7b9412ffff24339f8aba080b292448e08698
1 change: 0 additions & 1 deletion licenses/jackson-databind-2.17.1.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions licenses/jackson-databind-2.17.2.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e6deb029e5901e027c129341fac39e515066b68c
Empty file modified scripts/get-indexed-queries.sh
100644 → 100755
Empty file.
8 changes: 8 additions & 0 deletions scripts/msearch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash -e

curl -s -X GET "http://localhost:9200/_msearch" -H 'Content-Type: application/json' -d'
{ "index": "ecommerce"}
{ "query": { "match_all": {} }, "ext": { "ubi": { "query_id": "11111" } } }
{ "index": "ecommerce"}
{ "query": { "match_all": {} }, "ext": { "ubi": { "query_id": "22222" } } }
'
153 changes: 98 additions & 55 deletions src/main/java/org/opensearch/ubi/UbiActionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

package org.opensearch.ubi;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
Expand All @@ -25,6 +21,8 @@
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilter;
Expand All @@ -39,6 +37,9 @@
import org.opensearch.env.Environment;
import org.opensearch.search.SearchHit;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.ubi.ext.UbiParameters;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -69,15 +70,18 @@ public class UbiActionFilter implements ActionFilter {

private final Client client;
private final Environment environment;
private final Tracer tracer;

/**
* Creates a new filter.
* @param client An OpenSearch {@link Client}.
* @param environment The OpenSearch {@link Environment}.
* @param tracer An Open Telemetry {@link Tracer tracer}.
*/
public UbiActionFilter(Client client, Environment environment) {
public UbiActionFilter(Client client, Environment environment, Tracer tracer) {
this.client = client;
this.environment = environment;
this.tracer = tracer;
}

@Override
Expand All @@ -94,7 +98,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
ActionFilterChain<Request, Response> chain
) {

if (!(request instanceof SearchRequest)) {
if (!(request instanceof SearchRequest || request instanceof MultiSearchRequest)) {
chain.proceed(task, action, request, listener);
return;
}
Expand All @@ -104,82 +108,99 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
@Override
public void onResponse(Response response) {

final SearchRequest searchRequest = (SearchRequest) request;
if (request instanceof MultiSearchRequest) {

if (response instanceof SearchResponse) {
final MultiSearchRequest multiSearchRequest = (MultiSearchRequest) request;

final UbiParameters ubiParameters = UbiParameters.getUbiParameters(searchRequest);
for(final SearchRequest searchRequest : multiSearchRequest.requests()) {
handleSearchRequest(searchRequest, response);
}

if (ubiParameters != null) {
}

final String queryId = ubiParameters.getQueryId();
final String userQuery = ubiParameters.getUserQuery();
final String userId = ubiParameters.getClientId();
final String objectIdField = ubiParameters.getObjectIdField();
final Map<String, String> queryAttributes = ubiParameters.getQueryAttributes();
if(request instanceof SearchRequest) {
response = (Response) handleSearchRequest((SearchRequest) request, response);
}

listener.onResponse(response);

}

@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}

// TODO: Ignore the UBI in ext.
final String query = searchRequest.source().toString();
});

final List<String> queryResponseHitIds = new LinkedList<>();
}

for (final SearchHit hit : ((SearchResponse) response).getHits()) {
private ActionResponse handleSearchRequest(final SearchRequest searchRequest, ActionResponse response) {

if (objectIdField == null || objectIdField.isEmpty()) {
// Use the result's docId since no object_id was given for the search.
queryResponseHitIds.add(String.valueOf(hit.docId()));
} else {
final Map<String, Object> source = hit.getSourceAsMap();
queryResponseHitIds.add((String) source.get(objectIdField));
}
if (response instanceof SearchResponse) {

}
final UbiParameters ubiParameters = UbiParameters.getUbiParameters(searchRequest);

final String queryResponseId = UUID.randomUUID().toString();
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(queryId, userQuery, userId, query, queryAttributes, queryResponse);
if (ubiParameters != null) {

final String dataPrepperUrl = environment.settings().get(UbiSettings.DATA_PREPPER_URL);
if(dataPrepperUrl != null) {
sendToDataPrepper(dataPrepperUrl, queryRequest);
} else {
indexQuery(queryRequest);
}
final String queryId = ubiParameters.getQueryId();
final String userQuery = ubiParameters.getUserQuery();
final String userId = ubiParameters.getClientId();
final String objectIdField = ubiParameters.getObjectIdField();
final Map<String, String> queryAttributes = ubiParameters.getQueryAttributes();

final String query = searchRequest.source().toString();

SearchResponse searchResponse = (SearchResponse) response;
final List<String> queryResponseHitIds = new LinkedList<>();

response = (Response) new UbiSearchResponse(
searchResponse.getInternalResponse(),
searchResponse.getScrollId(),
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getTook().millis(),
searchResponse.getShardFailures(),
searchResponse.getClusters(),
queryId
);
for (final SearchHit hit : ((SearchResponse) response).getHits()) {

if (objectIdField == null || objectIdField.isEmpty()) {
// Use the result's docId since no object_id was given for the search.
queryResponseHitIds.add(String.valueOf(hit.docId()));
} else {
final Map<String, Object> source = hit.getSourceAsMap();
queryResponseHitIds.add((String) source.get(objectIdField));
}

}

listener.onResponse(response);
final String queryResponseId = UUID.randomUUID().toString();
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(queryId, userQuery, userId, query, queryAttributes, queryResponse);

}
final String dataPrepperUrl = environment.settings().get(UbiSettings.DATA_PREPPER_URL);
if (dataPrepperUrl != null) {
sendToDataPrepper(dataPrepperUrl, queryRequest);
} else {
indexQuery(queryRequest);
}

final SearchResponse searchResponse = (SearchResponse) response;

response = new UbiSearchResponse(
searchResponse.getInternalResponse(),
searchResponse.getScrollId(),
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getTook().millis(),
searchResponse.getShardFailures(),
searchResponse.getClusters(),
queryId
);

@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}

});
}

return response;

}

private void sendToDataPrepper(final String dataPrepperUrl, final QueryRequest queryRequest) {

LOGGER.debug("Sending query to DataPrepper at " + dataPrepperUrl);
LOGGER.debug("Sending query to DataPrepper at {}", dataPrepperUrl);

// TODO: Do this in a background thread?
try {
Expand Down Expand Up @@ -303,4 +324,26 @@ private String getResourceFile(final String fileName) {
}
}

private void sendOtelTrace(final Task task, final Tracer tracer, final QueryRequest queryRequest) {

final Span span = tracer.startSpan(SpanBuilder.from(task, "ubi_search"));

span.addAttribute("ubi.user_id", queryRequest.getQueryId());
span.addAttribute("ubi.query", queryRequest.getQuery());
span.addAttribute("ubi.user_query", queryRequest.getUserQuery());
span.addAttribute("ubi.client_id", queryRequest.getClientId());
span.addAttribute("ubi.timestamp", queryRequest.getTimestamp());

for (final String key : queryRequest.getQueryAttributes().keySet()) {
span.addAttribute("ubi.attribute." + key, queryRequest.getQueryAttributes().get(key));
}

span.addAttribute("ubi.query_response.response_id", queryRequest.getQueryResponse().getQueryResponseId());
span.addAttribute("ubi.query_response.query_id", queryRequest.getQueryResponse().getQueryId());
span.addAttribute("ubi.query_response.response_id", String.join(",", queryRequest.getQueryResponse().getQueryResponseObjectIds()));

span.endSpan();

}

}
Loading
Loading