Skip to content

Commit

Permalink
HIVE-28586 Support write order for Iceberg tables at CREATE TABLE
Browse files Browse the repository at this point in the history
Change-Id: Ia9a0a92d19d33693887137c797e0662088a314db
  • Loading branch information
zratkai committed Nov 14, 2024
1 parent d4cef17 commit ca442ed
Show file tree
Hide file tree
Showing 20 changed files with 5,804 additions and 5,278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.Strings;

/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
Expand Down Expand Up @@ -140,15 +143,23 @@ public static Table createTable(Configuration conf, Properties props) {
Map<String, String> map = filterIcebergTableProperties(props);

Optional<Catalog> catalog = loadCatalog(conf, catalogName);

SortOrder sortOrder = getSortOrder(props, schema);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map);
return catalog.get().buildTable(TableIdentifier.parse(name), schema).withPartitionSpec(spec)
.withLocation(location).withProperties(map).withSortOrder(sortOrder).create();
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

private static SortOrder getSortOrder(Properties props, Schema schema) {
String sortOrderJsonString = props.getProperty(InputFormatConfig.INSERT_WRITE_ORDER);
SortOrder sortOrder = Strings.isNullOrEmpty(sortOrderJsonString) ?
SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString);
return sortOrder;
}

/**
Expand Down Expand Up @@ -215,9 +226,9 @@ public static Table registerTable(Configuration conf, Properties props, String m
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation);
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
SortOrder sortOrder = getSortOrder(props, schema);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

public static void renameTable(Configuration conf, Properties props, TableIdentifier to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private InputFormatConfig() {
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public static final String QUERY_FILTERS = "iceberg.query.filters";
public static final String INSERT_WRITE_ORDER = "iceberg.write-order";

public enum InMemoryDataModel {
PIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NullOrderingType;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
Expand Down Expand Up @@ -82,13 +83,16 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand Down Expand Up @@ -271,6 +275,23 @@ public void preCreateTable(CreateTableRequest request) {
setOrcOnlyFilesParam(hmsTable);
// Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
request.setPrimaryKeys(null);
addSortOrder(hmsTable, schema, catalogProperties);
}

private void addSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
Properties properties) {
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
hmsTable.getSd().getSortCols().forEach(item -> {
NullOrder nullOrder = item.getNullOrdering() == NullOrderingType.NULLS_FIRST ?
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
if (item.getOrder() == 0) {
sortOderBuilder.desc(item.getCol(), nullOrder);
} else {
sortOderBuilder.asc(item.getCol(), nullOrder);
}

});
properties.put(InputFormatConfig.INSERT_WRITE_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
}

@Override
Expand Down Expand Up @@ -781,7 +802,7 @@ private void setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metast
* @param hmsTable Table for which we are calculating the properties
* @return The properties we can provide for Iceberg functions, like {@link Catalogs}
*/
private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Properties properties = new Properties();

hmsTable.getParameters().entrySet().stream().filter(e -> e.getKey() != null && e.getValue() != null).forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;


create table ice_orc_sorted (id int, text string) write ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc;

insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a");

describe formatted ice_orc_sorted;
describe extended ice_orc_sorted;

select * from ice_orc_sorted;

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
PREHOOK: query: create table ice_orc_sorted (id int, text string) write ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc_sorted
POSTHOOK: query: create table ice_orc_sorted (id int, text string) write ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc_sorted
PREHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc_sorted
POSTHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc_sorted
PREHOOK: query: describe formatted ice_orc_sorted
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc_sorted
POSTHOOK: query: describe formatted ice_orc_sorted
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc_sorted
# col_name data_type comment
id int
text string

# Detailed Table Information
Database: default
#### A masked pattern was here ####
Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"text\":\"true\"}}
EXTERNAL TRUE
SORTBUCKETCOLSPREFIX TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"text\",\"required\":false,\"type\":\"string\"}]}
current-snapshot-id 7320236589871429354
current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"9\",\"added-files-size\":\"373\",\"changed-partition-count\":\"1\",\"total-records\":\"9\",\"total-files-size\":\"373\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086)\"}
current-snapshot-timestamp-ms 1731506391808
default-sort-order {\"order-id\":1,\"fields\":[{\"transform\":\"identity\",\"source-id\":1,\"direction\":\"desc\",\"null-order\":\"nulls-first\"},{\"transform\":\"identity\",\"source-id\":2,\"direction\":\"asc\",\"null-order\":\"nulls-last\"}]}
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":1,\"fields\":[{\"transform\":\"identity\",\"source-id\":1,\"direction\":\"desc\",\"null-order\":\"nulls-first\"},{\"transform\":\"identity\",\"source-id\":2,\"direction\":\"asc\",\"null-order\":\"nulls-last\"}]}
#### A masked pattern was here ####
numFiles 1
numRows 9
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 1
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
#### A masked pattern was here ####
uuid fb098a98-2597-4f22-b933-27b71f674f08
write.delete.mode merge-on-read
write.format.default orc
write.merge.mode merge-on-read
write.update.mode merge-on-read

# Storage Information
SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: [FieldSchema(name:id, type:int, comment:Transform: identity, Sort direction: DESC, Null sort order: NULLS_FIRST), FieldSchema(name:text, type:string, comment:Transform: identity, Sort direction: ASC, Null sort order: NULLS_LAST)]
PREHOOK: query: describe extended ice_orc_sorted
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc_sorted
POSTHOOK: query: describe extended ice_orc_sorted
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc_sorted
id int
text string

#### A masked pattern was here ####
PREHOOK: query: select * from ice_orc_sorted
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc_sorted
#### A masked pattern was here ####
POSTHOOK: query: select * from ice_orc_sorted
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc_sorted
#### A masked pattern was here ####
NULL a
5 5
4 4
3 3
3 NULL
2 2
2 3
2 NULL
1 1
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ createTableStatement
tableComment?
createTablePartitionSpec?
tableBuckets?
tableWriteOrdered?
tableSkewed?
tableRowFormat?
tableFileFormat?
Expand All @@ -77,6 +78,7 @@ createTableStatement
tableComment?
createTablePartitionSpec?
tableBuckets?
tableWriteOrdered?
tableSkewed?
tableRowFormat?
tableFileFormat?
Expand All @@ -94,6 +96,7 @@ createTableStatement
tableComment?
createTablePartitionSpec?
tableBuckets?
tableWriteOrdered?
tableSkewed?
tableRowFormat?
tableFileFormat?
Expand All @@ -107,6 +110,7 @@ createTableStatement
tableComment?
createTablePartitionSpec?
tableBuckets?
tableWriteOrdered?
tableSkewed?
tableRowFormat?
tableFileFormat?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ KW_DESC : 'DESC';
KW_NULLS : 'NULLS';
KW_LAST : 'LAST';
KW_ORDER : 'ORDER';
KW_ORDERED : 'ORDERED';
KW_GROUP : 'GROUP';
KW_BY : 'BY';
KW_HAVING : 'HAVING';
Expand Down
11 changes: 11 additions & 0 deletions parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
xlateMap.put("KW_NULLS", "NULLS");
xlateMap.put("KW_LAST", "LAST");
xlateMap.put("KW_ORDER", "ORDER");
xlateMap.put("KW_ORDERED", "ORDERED");
xlateMap.put("KW_BY", "BY");
xlateMap.put("KW_GROUP", "GROUP");
xlateMap.put("KW_WHERE", "WHERE");
Expand Down Expand Up @@ -1840,6 +1841,14 @@ tableImplBuckets
-> ^(TOK_ALTERTABLE_BUCKETS $num)
;
tableWriteOrdered
@init { pushMsg("table sorted specification", state); }
@after { popMsg(state); }
:
KW_WRITE KW_ORDERED KW_BY sortCols=columnNameOrderList
-> ^(TOK_ALTERTABLE_BUCKETS $sortCols?)
;
tableSkewed
@init { pushMsg("table skewed specification", state); }
@after { popMsg(state); }
Expand Down Expand Up @@ -2201,6 +2210,8 @@ columnNameOrder
^(TOK_TABSORTCOLNAMEDESC ^(TOK_NULLS_LAST identifier))
-> {$orderSpec.tree.getType()==HiveParser.KW_ASC}?
^(TOK_TABSORTCOLNAMEASC ^($nullSpec identifier))
-> {$orderSpec.tree.getType()==HiveParser.KW_DESC}?
^(TOK_TABSORTCOLNAMEDESC ^($nullSpec identifier))
-> ^(TOK_TABSORTCOLNAMEDESC ^($nullSpec identifier))
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.hive.metastore.api.DataConnector;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NullOrderingType;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
Expand Down Expand Up @@ -966,15 +967,10 @@ protected List<Order> getColumnNamesOrder(ASTNode ast) throws SemanticException
ASTNode child = (ASTNode) ast.getChild(i);
int directionCode = DirectionUtils.tokenToCode(child.getToken().getType());
child = (ASTNode) child.getChild(0);
if (child.getToken().getType() != HiveParser.TOK_NULLS_FIRST && directionCode == DirectionUtils.ASCENDING_CODE) {
throw new SemanticException(
"create/alter bucketed table: not supported NULLS LAST for SORTED BY in ASC order");
}
if (child.getToken().getType() != HiveParser.TOK_NULLS_LAST && directionCode == DirectionUtils.DESCENDING_CODE) {
throw new SemanticException(
"create/alter bucketed table: not supported NULLS FIRST for SORTED BY in DESC order");
}
colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), directionCode));
Order order = new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), directionCode);
order.setNullOrdering(child.getToken().getType()== HiveParser.TOK_NULLS_FIRST?
NullOrderingType.NULLS_FIRST : NullOrderingType.NULLS_LAST);
colList.add(order);
}
return colList;
}
Expand Down
37 changes: 31 additions & 6 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NullOrderingType;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
Expand Down Expand Up @@ -14119,12 +14120,19 @@ ASTNode analyzeCreateTable(
partColNames = getColumnNames(child);
break;
case HiveParser.TOK_ALTERTABLE_BUCKETS:
bucketCols = getColumnNames((ASTNode) child.getChild(0));
if (child.getChildCount() == 2) {
numBuckets = Integer.parseInt(child.getChild(1).getText());
} else {
sortCols = getColumnNamesOrder((ASTNode) child.getChild(1));
numBuckets = Integer.parseInt(child.getChild(2).getText());
switch(child.getChildCount()){
case 1:
sortCols = getColumnNamesOrder((ASTNode) child.getChild(0));
break;
case 2:
bucketCols = getColumnNames((ASTNode) child.getChild(0));
numBuckets = Integer.parseInt(child.getChild(1).getText());
break;
case 3:
bucketCols = getColumnNames((ASTNode) child.getChild(0));
sortCols = getColumnNamesOrder((ASTNode) child.getChild(1));
numBuckets = Integer.parseInt(child.getChild(2).getText());
break;
}
break;
case HiveParser.TOK_TABLEROWFORMAT:
Expand Down Expand Up @@ -14249,6 +14257,7 @@ ASTNode analyzeCreateTable(
throw new SemanticException(
"Partition columns can only declared using their name and types in regular CREATE TABLE statements");
}
checkSortCols(sortCols, storageFormat);
tblProps = validateAndAddDefaultProperties(
tblProps, isExt, storageFormat, dbDotTab, sortCols, isMaterialization, isTemporary,
isTransactional, isManaged, new String[] {qualifiedTabName.getDb(), qualifiedTabName.getTable()}, isDefaultTableTypeChanged);
Expand Down Expand Up @@ -14444,6 +14453,22 @@ ASTNode analyzeCreateTable(
return null;
}

private void checkSortCols(List<Order> sortCols, StorageFormat storageFormat) throws SemanticException{
if("org.apache.iceberg.mr.hive.HiveIcebergStorageHandler".equalsIgnoreCase(storageFormat.getStorageHandler())){
return;
}
for (Order sortCol : sortCols) {
if (sortCol.getNullOrdering() != NullOrderingType.NULLS_FIRST && sortCol.getOrder() == DirectionUtils.ASCENDING_CODE) {
throw new SemanticException(
"create/alter bucketed table: not supported NULLS LAST for SORTED BY in ASC order");
}
if (sortCol.getNullOrdering() != NullOrderingType.NULLS_LAST && sortCol.getOrder() == DirectionUtils.DESCENDING_CODE) {
throw new SemanticException(
"create/alter bucketed table: not supported NULLS FIRST for SORTED BY in DESC order");
}
}
}

private String getDefaultLocation(String dbName, String tableName, boolean isExt) throws SemanticException {
String tblLocation;
try {
Expand Down
Loading

0 comments on commit ca442ed

Please sign in to comment.