Skip to content

Commit

Permalink
Add reindexFromSnapshotWorkerSize to cdk with default and maximum mod…
Browse files Browse the repository at this point in the history
…es (opensearch-project#1085)

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait authored and gregschohn committed Oct 24, 2024
1 parent db0075e commit 2d34f19
Show file tree
Hide file tree
Showing 15 changed files with 296 additions and 299 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.migrations.data;

public enum ElasticsearchType {
DATE("date"),
GEO_POINT("geo_point"),
INTEGER("integer"),
KEYWORD("keyword"),
LONG("long"),
TEXT("text"),
SCALED_FLOAT("scaled_float"),
IP("ip"),
NESTED("nested");

private final String value;

ElasticsearchType(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.opensearch.migrations.data;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* Helpers to build fields for index mappings.
*/
public interface IFieldCreator {
ObjectMapper mapper = new ObjectMapper();

default ObjectNode createField(ElasticsearchType type) {
return mapper.createObjectNode().put("type", type.getValue());
}

default ObjectNode fieldGeoPoint() { return createField(ElasticsearchType.GEO_POINT); }
default ObjectNode fieldInt() { return createField(ElasticsearchType.INTEGER); }
default ObjectNode fieldIP() { return createField(ElasticsearchType.IP); }
default ObjectNode fieldKeyword() { return createField(ElasticsearchType.KEYWORD); }
default ObjectNode fieldLong() { return createField(ElasticsearchType.LONG); }
default ObjectNode fieldNested() { return createField(ElasticsearchType.NESTED); }
default ObjectNode fieldText() { return createField(ElasticsearchType.TEXT); }

default ObjectNode fieldRawTextKeyword() {
return mapper.createObjectNode()
.put("type", "text")
.set("fields", mapper.createObjectNode()
.set("raw", createField(ElasticsearchType.KEYWORD)));
}

default ObjectNode fieldScaledFloat(int scalingFactor) {
return createField(ElasticsearchType.SCALED_FLOAT)
.put("scaling_factor", scalingFactor);
}
default ObjectNode fieldScaledFloat() { return fieldScaledFloat(100); }

default ObjectNode fieldDate() { return createField(ElasticsearchType.DATE); }
default ObjectNode fieldDateISO() {
return fieldDate().put("format", "yyyy-MM-dd HH:mm:ss");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opensearch.migrations.data;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;

/** Shared ways to build random data */
public interface IRandomDataBuilders {
ZoneId UTC_ZONE = ZoneId.of("UTC");
DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000;

default long randomTime(long timeFrom, Random random) {
return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS);
}

default String randomTimeISOString(long timeFrom, Random random) {
var timeMillis = randomTime(timeFrom, random);
var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE);
return SIMPLE_DATE_PATTERN.format(timeInstant);
}

default double randomDouble(Random random, double min, double max) {
return min + (max - min) * random.nextDouble();
}

default String randomElement(String[] elements, Random random) {
return elements[random.nextInt(elements.length)];
}

default int randomElement(int[] elements, Random random) {
return elements[random.nextInt(elements.length)];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class IndexOptions {
private static final ObjectMapper mapper = new ObjectMapper();

/** Improvement to add more flexibility with these values */
public ObjectNode indexSettings = mapper.createObjectNode()
public final ObjectNode indexSettings = mapper.createObjectNode()
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
.put("index.queries.cache.enabled", false)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.data.workloads.Workload;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -43,11 +45,25 @@ public void generate(WorkloadOptions options) {

private List<CompletableFuture<?>> generateDocs(String indexName, Workload workload, WorkloadOptions options) {
// This happens inline to be sure the index exists before docs are indexed on it
client.createIndex(indexName, workload.createIndex(options.index.indexSettings.deepCopy()), null);
var indexRequestDoc = workload.createIndex(options.index.indexSettings.deepCopy());
log.atInfo().setMessage("Creating index {} with {}").addArgument(indexName).addArgument(indexRequestDoc).log();
client.createIndex(indexName, indexRequestDoc, null);

var docIdCounter = new AtomicInteger(0);
var allDocs = workload.createDocs(options.totalDocs)
.map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_ " + docIdCounter.incrementAndGet(), doc.toString()))
.peek(n -> {
log.atTrace().setMessage("Constructed doc for index {}: {}")
.addArgument(indexName)
.addArgument(() -> {
try {
return new ObjectMapper().writeValueAsString(n);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
).log();
})
.map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), doc.toString()))
.collect(Collectors.toList());

var bulkDocGroups = new ArrayList<List<DocumentReindexer.BulkDocSection>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public class WorkloadOptions {
@Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests")
public int maxBulkBatchSize = 50;

public IndexOptions index = new IndexOptions();
public final IndexOptions index = new IndexOptions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.opensearch.migrations.data.IFieldCreator;
import org.opensearch.migrations.data.IRandomDataBuilders;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import static org.opensearch.migrations.data.FieldBuilders.createField;
import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword;
import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble;
import static org.opensearch.migrations.data.RandomDataBuilders.randomElement;

/**
* Workload based off of Geonames
* https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames
*/
public class Geonames implements Workload {
public class Geonames implements Workload, IFieldCreator, IRandomDataBuilders {

private static final ObjectMapper mapper = new ObjectMapper();
private static final String[] COUNTRY_CODES = { "US", "DE", "FR", "GB", "CN", "IN", "BR" };
Expand All @@ -34,36 +32,29 @@ public List<String> indexNames() {
*/
@Override
public ObjectNode createIndex(ObjectNode defaultSettings) {
var properties = mapper.createObjectNode();
properties.set("geonameid", createField("long"));
properties.set("name", createFieldTextRawKeyword());
properties.set("asciiname", createFieldTextRawKeyword());
properties.set("alternatenames", createFieldTextRawKeyword());
properties.set("feature_class", createFieldTextRawKeyword());
properties.set("feature_code", createFieldTextRawKeyword());
properties.set("cc2", createFieldTextRawKeyword());
properties.set("admin1_code", createFieldTextRawKeyword());
properties.set("admin2_code", createFieldTextRawKeyword());
properties.set("admin3_code", createFieldTextRawKeyword());
properties.set("admin4_code", createFieldTextRawKeyword());
properties.set("elevation", createField("integer"));
properties.set("population", createField("long"));
properties.set("dem", createFieldTextRawKeyword());
properties.set("timezone", createFieldTextRawKeyword());
properties.set("location", createField("geo_point"));

var countryCodeField = createFieldTextRawKeyword();
countryCodeField.put("fielddata", true);
properties.set("country_code", countryCodeField);

var mappings = mapper.createObjectNode();
mappings.put("dynamic", "strict");
mappings.set("properties", properties);

var index = mapper.createObjectNode();
index.set("mappings", mappings);
index.set("settings", defaultSettings);
return index;
return mapper.createObjectNode()
.<ObjectNode>set("mappings", mapper.createObjectNode()
.<ObjectNode>put("dynamic", "strict")
.<ObjectNode>set("properties", mapper.createObjectNode()
.<ObjectNode>set("geonameid", fieldLong())
.<ObjectNode>set("name", fieldRawTextKeyword())
.<ObjectNode>set("asciiname", fieldRawTextKeyword())
.<ObjectNode>set("alternatenames", fieldRawTextKeyword())
.<ObjectNode>set("feature_class", fieldRawTextKeyword())
.<ObjectNode>set("feature_code", fieldRawTextKeyword())
.<ObjectNode>set("cc2", fieldRawTextKeyword())
.<ObjectNode>set("admin1_code", fieldRawTextKeyword())
.<ObjectNode>set("admin2_code", fieldRawTextKeyword())
.<ObjectNode>set("admin3_code", fieldRawTextKeyword())
.<ObjectNode>set("admin4_code", fieldRawTextKeyword())
.<ObjectNode>set("elevation", fieldInt())
.<ObjectNode>set("population", fieldLong())
.<ObjectNode>set("dem", fieldRawTextKeyword())
.<ObjectNode>set("timezone", fieldRawTextKeyword())
.<ObjectNode>set("location", fieldGeoPoint())
.<ObjectNode>set("country_code", fieldRawTextKeyword()
.put("fielddata", true))))
.<ObjectNode>set("settings", defaultSettings);
}

/**
Expand Down Expand Up @@ -94,33 +85,32 @@ public Stream<ObjectNode> createDocs(int numDocs) {
// These documents are have a low degree of uniqueness,
// there is an opportunity to augment them by using Random more.
var random = new Random(i);
var doc = mapper.createObjectNode();
doc.put("geonameid", i + 1000);
doc.put("name", "City" + (i + 1));
doc.put("asciiname", "City" + (i + 1));
doc.put("alternatenames", "City" + (i + 1));
doc.put("feature_class", "FCl" + (i + 1));
doc.put("feature_code", "FCo" + (i + 1));
doc.put("country_code", randomCountryCode(random));
doc.put("cc2", "cc2" + (i + 1));
doc.put("admin1_code", "admin" + (i + 1));
doc.put("population", random.nextInt(1000));
doc.put("dem", random.nextInt(1000) + "");
doc.put("timezone", "TZ" + (i + 1));
doc.set("location", randomLocation(random));
return doc;
return mapper.createObjectNode()
.<ObjectNode>put("geonameid", i + 1000)
.<ObjectNode>put("name", "City" + (i + 1))
.<ObjectNode>put("asciiname", "City" + (i + 1))
.<ObjectNode>put("alternatenames", "City" + (i + 1))
.<ObjectNode>put("feature_class", "FCl" + (i + 1))
.<ObjectNode>put("feature_code", "FCo" + (i + 1))
.<ObjectNode>put("country_code", randomCountryCode(random))
.<ObjectNode>put("cc2", "cc2" + (i + 1))
.<ObjectNode>put("admin1_code", "admin" + (i + 1))
.<ObjectNode>put("population", random.nextInt(1000))
.<ObjectNode>put("dem", random.nextInt(1000) + "")
.<ObjectNode>put("timezone", "TZ" + (i + 1))
.<ObjectNode>set("location", randomLocation(random));
}
);
}

private static ArrayNode randomLocation(Random random) {
private ArrayNode randomLocation(Random random) {
var location = mapper.createArrayNode();
location.add(randomDouble(random, -180, 180)); // Longitude
location.add(randomDouble(random, -90, 90)); // Latitude
return location;
}

private static String randomCountryCode(Random random) {
private String randomCountryCode(Random random) {
return randomElement(COUNTRY_CODES, random);
}
}
Loading

0 comments on commit 2d34f19

Please sign in to comment.