Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the AddSchemaPrefix operator #498

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,11 @@ private Operator mergeRawData(Map<TimeInterval, List<FragmentMeta>> fragments, L
Operator operator = OperatorUtils.unionOperators(unionList);
if (!dummyFragments.isEmpty()) {
List<Operator> joinList = new ArrayList<>();
dummyFragments.forEach(meta -> joinList.add(new Project(new FragmentSource(meta),
pathMatchPrefix(pathList,meta.getTsInterval().getTimeSeries(), meta.getTsInterval().getSchemaPrefix()), tagFilter)));
dummyFragments.forEach(meta -> {
String schemaPrefix = meta.getTsInterval().getSchemaPrefix();
joinList.add(new AddSchemaPrefix(new OperatorSource(new Project(new FragmentSource(meta),
pathMatchPrefix(pathList, meta.getTsInterval().getTimeSeries(), schemaPrefix), tagFilter)), schemaPrefix));
});
joinList.add(operator);
operator = OperatorUtils.joinOperatorsByTime(joinList);
}
Expand All @@ -346,13 +349,30 @@ private Pair<Map<TimeInterval, List<FragmentMeta>>, List<FragmentMeta>> getFragm
return keyFromTSIntervalToTimeInterval(fragmentsByTSInterval);
}

// 筛选出满足 dataPrefix前缀,并且去除 schemaPrefix
private List<String> pathMatchPrefix(List<String> pathList, String prefix, String schemaPrefix) {
if (prefix == null) return pathList;
if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix; // deal with the schemaPrefix
if (prefix == null && schemaPrefix == null) return pathList;
List<String> ans = new ArrayList<>();

if (prefix == null) { // deal with the schemaPrefix
for(String path : pathList) {
if (path.equals("*.*") || path.equals("*")) {
ans.add(path);
} else if (path.indexOf(schemaPrefix) == 0) {
path = path.substring(schemaPrefix.length() + 1);
ans.add(path);
}
}
return ans;
}
// if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix;

for(String path : pathList) {
if (path.equals("*.*")) {
ans.add(path);
if (schemaPrefix != null && path.indexOf(schemaPrefix) == 0) {
path = path.substring(schemaPrefix.length() + 1);
}
if (path.equals("*.*") || path.equals("*")) {
ans.add(prefix + ".*");
} else if (path.charAt(path.length()-1) == '*' && path.length() != 1) { // 通配符匹配,例如 a.b.*
String queryPrefix = path.substring(0,path.length()-2) + ".(.*)";
if (prefix.matches(queryPrefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream)
return executeRename((Rename) operator, transformToTable(stream));
case Reorder:
return executeReorder((Reorder) operator, transformToTable(stream));
case AddSchemaPrefix:
return executeAddSchemaPrefix((AddSchemaPrefix) operator, transformToTable(stream));
default:
throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType());
}
Expand Down Expand Up @@ -331,6 +333,32 @@ private RowStream executeRename(Rename rename, Table table) throws PhysicalExcep
return new Table(newHeader, rows);
}

private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, Table table) throws PhysicalException {
Header header = table.getHeader();
String schemaPrefix = addSchemaPrefix.getSchemaPrefix();

List<Field> fields = new ArrayList<>();
header.getFields().forEach(field -> {
if (schemaPrefix != null)
fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags()));
else
fields.add(new Field(field.getName(), field.getType(), field.getTags()));
});

Header newHeader = new Header(header.getKey(), fields);

List<Row> rows = new ArrayList<>();
table.getRows().forEach(row -> {
if (newHeader.hasKey()) {
rows.add(new Row(newHeader, row.getKey(), row.getValues()));
} else {
rows.add(new Row(newHeader, row.getValues()));
}
});

return new Table(newHeader, rows);
}

private RowStream executeReorder(Reorder reorder, Table table) throws PhysicalException {
List<String> patterns = reorder.getPatterns();
Header header = table.getHeader();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix;
import cn.edu.tsinghua.iginx.engine.shared.operator.Rename;
import cn.edu.tsinghua.iginx.utils.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public class AddSchemaPrefixLazyStream extends UnaryLazyStream {

private final AddSchemaPrefix addSchemaPrefix;

private Header header;

public AddSchemaPrefixLazyStream(AddSchemaPrefix addSchemaPrefix, RowStream stream) {
super(stream);
this.addSchemaPrefix = addSchemaPrefix;
}

@Override
public Header getHeader() throws PhysicalException {
if (header == null) {
Header header = stream.getHeader();
String schemaPrefix = addSchemaPrefix.getSchemaPrefix();

List<Field> fields = new ArrayList<>();
header.getFields().forEach(field -> {
if (schemaPrefix != null)
fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags()));
else
fields.add(new Field(field.getName(), field.getType(), field.getTags()));
});

this.header = new Header(header.getKey(), fields);
}
return header;
}

@Override
public boolean hasNext() throws PhysicalException {
return stream.hasNext();
}

@Override
public Row next() throws PhysicalException {
if (!hasNext()) {
throw new IllegalStateException("row stream doesn't have more data!");
}

Row row = stream.next();
if (header.hasKey()) {
return new Row(header, row.getKey(), row.getValues());
} else {
return new Row(header, row.getValues());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream)
return executeRename((Rename) operator, stream);
case Reorder:
return executeReorder((Reorder) operator, stream);
case AddSchemaPrefix:
return executeAddSchemaPrefix((AddSchemaPrefix) operator, stream);
default:
throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType());
}
Expand Down Expand Up @@ -128,6 +130,10 @@ private RowStream executeReorder(Reorder reorder, RowStream stream) {
return new ReorderLazyStream(reorder, stream);
}

private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, RowStream stream) {
return new AddSchemaPrefixLazyStream(addSchemaPrefix, stream);
}

private RowStream executeJoin(Join join, RowStream streamA, RowStream streamB) throws PhysicalException {
if (!join.getJoinBy().equals(Constants.KEY) && !join.getJoinBy().equals(Constants.ORDINAL)) {
throw new InvalidOperatorParameterException("join operator is not support for field " + join.getJoinBy() + " except for " + Constants.KEY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cn.edu.tsinghua.iginx.engine.shared.operator;

import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;

import java.util.ArrayList;
import java.util.List;

public class AddSchemaPrefix extends AbstractUnaryOperator {

private final String schemaPrefix;// 可以为 null

public AddSchemaPrefix(Source source, String schemaPrefix) {
super(OperatorType.AddSchemaPrefix, source);
this.schemaPrefix = schemaPrefix;
}

@Override
public Operator copy() {
return new AddSchemaPrefix(getSource().copy(), schemaPrefix);
}

public String getSchemaPrefix() {
return schemaPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum OperatorType {
Rename,

Reorder,
AddSchemaPrefix,

Delete,
Insert,
Expand All @@ -55,7 +56,7 @@ public static boolean isBinaryOperator(OperatorType op) {
}

public static boolean isUnaryOperator(OperatorType op) {
return op == Project || op == Select || op == Sort || op == Limit || op == Downsample || op == RowTransform || op == SetTransform || op == MappingTransform || op == Delete || op == Insert || op == Rename || op == Reorder;
return op == Project || op == Select || op == Sort || op == Limit || op == Downsample || op == RowTransform || op == SetTransform || op == MappingTransform || op == Delete || op == Insert || op == Rename || op == Reorder || op == AddSchemaPrefix;
RemHero marked this conversation as resolved.
Show resolved Hide resolved
}

public static boolean isMultipleOperator(OperatorType op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,11 @@ public TaskExecuteResult execute(StoragePhysicalTask task) {
return new TaskExecuteResult(new NonExecutablePhysicalTaskException("unsupported physical task"));
}

private String getRealPathWithoutPrefix(String oriPath, String prefix) {
if (prefix != null && !prefix.isEmpty() && oriPath.contains(prefix)) {
return oriPath.substring(oriPath.indexOf(prefix) + prefix.length() + 1);
}
return oriPath;
}

private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesInterval, TimeInterval timeInterval, Project project) {
Map<String, String> bucketQueries = new HashMap<>();
TagFilter tagFilter = project.getTagFilter();
for (String pattern: project.getPatterns()) {
Pair<String, String> pair = SchemaTransformer.processPatternForQuery(getRealPathWithoutPrefix(pattern, timeSeriesInterval.getSchemaPrefix()), tagFilter);
Pair<String, String> pair = SchemaTransformer.processPatternForQuery(pattern, tagFilter);
String bucketName = pair.k;
String query = pair.v;
String fullQuery = "";
Expand Down Expand Up @@ -276,7 +269,7 @@ private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesIn
bucketQueryResults.put(bucket, client.getQueryApi().query(statement, organization.getId()));
}

InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns(), timeSeriesInterval.getSchemaPrefix());
InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns());
return new TaskExecuteResult(rowStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ public class InfluxDBHistoryQueryRowStream implements RowStream {
private int hasMoreRecords;

private int size;
public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns) {
this(bucketQueryResults, patterns, null);
}

public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns, String prefix) {
public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns) {
this.bucketQueryResults = new ArrayList<>(bucketQueryResults.entrySet());
this.indexList = new ArrayList<>();
List<Field> fields = new ArrayList<>();
Expand All @@ -58,7 +55,7 @@ public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryRes
List<FluxTable> tables = this.bucketQueryResults.get(i).getValue();
this.indexList.add(new int[tables.size()]);
for (FluxTable table: tables) {
fields.add(SchemaTransformer.toField(bucket, table, prefix));
fields.add(SchemaTransformer.toField(bucket, table));
this.hasMoreRecords++;
this.size++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
public class SchemaTransformer {

public static Field toField(String bucket, FluxTable table) {
return toField(bucket, table, null);
}

public static Field toField(String bucket, FluxTable table, String prefix) {
FluxRecord record = table.getRecords().get(0);
String measurement = record.getMeasurement();
String field = record.getField();
Expand All @@ -36,10 +32,6 @@ public static Field toField(String bucket, FluxTable table, String prefix) {
DataType dataType = fromInfluxDB(table.getColumns().stream().filter(x -> x.getLabel().equals("_value")).collect(Collectors.toList()).get(0).getDataType());

StringBuilder pathBuilder = new StringBuilder();
if (prefix != null) {
pathBuilder.append(prefix);
pathBuilder.append('.');
}
pathBuilder.append(bucket);
pathBuilder.append('.');
pathBuilder.append(measurement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte
try {
StringBuilder builder = new StringBuilder();
for (String path : project.getPatterns()) {
builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix()));
builder.append(path);
builder.append(',');
}
String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter));
logger.info("[Query] execute query: " + statement);
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix()));
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project));
return new TaskExecuteResult(rowStream);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ enum State {
private State state;

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) {
this(dataset, trimStorageUnit, project, null);
}

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) {
this.dataset = dataset;
this.trimStorageUnit = trimStorageUnit;
this.filterByTags = project.getTagFilter() != null;
Expand All @@ -91,7 +87,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni
}
name = transformColumnName(name);
Pair<String, Map<String, String>> pair = TagKVUtils.splitFullName(name);
Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV());
Field field = new Field(pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV());
if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) {
filterList.add(true);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte
try {
StringBuilder builder = new StringBuilder();
for (String path : project.getPatterns()) {
builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix()));
builder.append(path);
builder.append(',');
}
String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter));
logger.info("[Query] execute query: " + statement);
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix()));
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project));
return new TaskExecuteResult(rowStream);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ enum State {
private State state;

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) {
this(dataset, trimStorageUnit, project, null);
}

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) {
this.dataset = dataset;
this.trimStorageUnit = trimStorageUnit;
this.filterByTags = project.getTagFilter() != null;
Expand All @@ -90,7 +86,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni
}
name = transformColumnName(name);
Pair<String, Map<String, String>> pair = TagKVUtils.splitFullName(name);
Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV());
Field field = new Field(pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV());
if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) {
filterList.add(true);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public TaskExecuteResult execute(StoragePhysicalTask task) {
project.getTagFilter(),
FilterTransformer.toString(filter),
storageUnit,
isDummyStorageUnit,
task.getTargetFragment().getTsInterval().getSchemaPrefix());
isDummyStorageUnit);
} else if (op.getType() == OperatorType.Insert) {
Insert insert = (Insert) op;
return executor.executeInsertTask(
Expand Down
Loading