Skip to content

Commit

Permalink
HIVE-28600: Iceberg: Check that table/partition requires compaction b…
Browse files Browse the repository at this point in the history
…efore compacting
  • Loading branch information
Dmitriy Fingerman committed Nov 6, 2024
1 parent 18f34e7 commit 485cb6f
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 107 deletions.
10 changes: 8 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2229,12 +2229,18 @@ public static enum ConfVars {
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4,
"The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" +
" defult DirectExecutorService"),

HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false,
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.major.compaction.file.size.threshold", "96mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.minor.compaction.file.size.threshold", "16mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
ICEBERG_COMPACTION_FILE_SIZE_RATIO("hive.iceberg.compaction.file.size.ratio", 0.1f,
"Ratio of # data files below threshold / # data files above threshold above which compaction is needed"),
ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD("hive.iceberg.delete.records.threshold", 100,
"Number of delete records in a table/partition above which a file needs to be compacted."),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand Down Expand Up @@ -2232,4 +2233,52 @@ private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
private static List<FieldSchema> orderBy(VirtualColumn... exprs) {
return schema(Arrays.asList(exprs));
}

@Override
public boolean canCompact(HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table table, String partitionPath,
CompactionType compactionType) throws HiveException {
Table icebergTable = IcebergTableUtil.getTable(hiveConf, table.getTTable());
return canCompact(hiveConf, icebergTable, partitionPath, compactionType);
}

@VisibleForTesting
boolean canCompact(HiveConf hiveConf, Table icebergTable, String partitionPath,
CompactionType compactionType) throws HiveException {

if (icebergTable.currentSnapshot() == null) {
return false;
}

int deleteRecordsThreshold = HiveConf.getIntVar(hiveConf, ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD);
long deleteRecordsCount = IcebergTableUtil.countDeleteRecords(icebergTable, partitionPath);
if (deleteRecordsCount > deleteRecordsThreshold) {
return true;
}

int dataFilesCount = IcebergTableUtil.getDataFiles(icebergTable, partitionPath).size();
if (dataFilesCount < 2) {
return false;
}

long fileSizeInBytesThreshold;
switch (compactionType) {
case MAJOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
case MINOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
default:
throw new HiveException(String.format("Invalid compaction type: %s", compactionType.name()));
}

float fileSizeRatioThreshold = HiveConf.getFloatVar(hiveConf, ConfVars.ICEBERG_COMPACTION_FILE_SIZE_RATIO);
float fileSizeRatio = IcebergTableUtil.getFileSizeRatio(icebergTable, partitionPath, fileSizeInBytesThreshold);
if (fileSizeRatio > fileSizeRatioThreshold) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,52 @@ public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath)
t -> ((PositionDeletesScanTask) t).file()));
}

public static float getFileSizeRatio(Table table, String partitionPath, long fileSizeInBytesThreshold) {
long uncompactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, true);
long compactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, false);

if (uncompactedFilesCount == 0) {
return 0;
} else if (compactedFilesCount == 0) {
return 1;
} else {
return uncompactedFilesCount * 1.0f / (uncompactedFilesCount + compactedFilesCount);
}
}

private static long getDataFileCount(Table table, String partitionPath, long fileSizeInBytesThreshold,
boolean isLess) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return (!table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath)) &&
(isLess ? file.fileSizeInBytes() < fileSizeInBytesThreshold :
file.fileSizeInBytes() >= fileSizeInBytesThreshold);
});
return Lists.newArrayList(filteredFileScanTasks).size();
}

public static long countDeleteRecords(Table table, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return !table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file().recordCount())).stream().mapToLong(Long::longValue).sum();
}

public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table).stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestHiveIcebergCompaction {

private static TestHiveShell shell;
private TestTables testTables;
@Rule
public TemporaryFolder temp = new TemporaryFolder();

static final List<Record> CUSTOMER_RECORDS_1 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown")
.add(1L, "Bob", "Green")
.build();

static final List<Record> CUSTOMER_RECORDS_2 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(2L, "Bruce", "Brown")
.add(3L, "Trudy", "Green")
.add(4L, "Alex", "Pink")
.build();

static final List<Record> CUSTOMER_RECORDS_3 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(5L, "Bruce", "Blue")
.add(6L, "Trudy", "Blue")
.build();

static final List<Record> CUSTOMER_RECORDS_4 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown").build();

static final List<Record> CUSTOMER_RECORDS_5 = TestHelper.RecordsBuilder.newInstance(
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(2L, "Bruce", "Brown")
.build();

@BeforeClass
public static void beforeClass() {
shell = HiveIcebergStorageHandlerTestUtils.shell();
}

@AfterClass
public static void afterClass() throws Exception {
shell.stop();
}

@Before
public void before() throws IOException {
testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "tez");
HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}

@After
public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
ExecMapper.setDone(false);
}

@Test
public void testCanCompactPartitioned() {
PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.identity("last_name").build();

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, FileFormat.ORC, CUSTOMER_RECORDS_1, 2);

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_2,
TableIdentifier.of("default", "customers"), false));

shell.executeStatement("DELETE FROM customers WHERE customer_id=3");

List<Object[]> objects = shell.executeStatement("SELECT * FROM customers ORDER BY customer_id");
Assert.assertEquals(4, objects.size());
List<Record> expected = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown")
.add(1L, "Bob", "Green")
.add(2L, "Bruce", "Brown")
.add(4L, "Alex", "Pink")
.build();
HiveIcebergTestUtils.validateData(expected,
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, objects), 0);

HiveConf conf = new HiveConf();
conf.setIntVar(HiveConf.ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD, 1);
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(conf);
table.refresh();

try {
/*
* Partition: 'last_name=Brown'.
* 2 data files, 0 delete files.
* Existing data size of the partition: 955 bytes
* 1st file size in bytes: 479
* 2nd files size in bytes: 476
*/

// Does not need compaction because ratio of uncompacted/compacted file sizes = 0%, all files compacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "10bytes");
assertFalse(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 50%, above allowed threshold of 10%.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "477bytes");
assertTrue(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 100%, all files uncompacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "1Mb");
assertTrue(storageHandler.canCompact(conf, table, "last_name=Brown", CompactionType.MAJOR));

// Delete records count is below threshold and only 1 data file, cannot compact.
assertFalse(storageHandler.canCompact(conf, table, "last_name=Green", CompactionType.MAJOR));

// No delete files, only one data file, cannot compact.
assertFalse(storageHandler.canCompact(conf, table, "last_name=Pink", CompactionType.MAJOR));

// Needs compaction because ratio of deleted records (2) exceeds threshold (1)
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_3,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement("DELETE FROM customers WHERE customer_id=5");
shell.executeStatement("DELETE FROM customers WHERE customer_id=6");
table.refresh();
assertTrue(storageHandler.canCompact(conf, table, "last_name=Blue", CompactionType.MAJOR));
} catch (Exception e) {
fail("Exception is unexpected here");
}
}

@Test
public void testCanCompactUnpartitioned() {
PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).build();

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, FileFormat.ORC, null, 2);

HiveConf conf = new HiveConf();
conf.setIntVar(HiveConf.ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD, 1);
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(conf);

try {
// Zero data/delete files - cannot compact
assertFalse(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_4,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_5,
TableIdentifier.of("default", "customers"), false));
table.refresh();

/*
* 2 data files, 0 delete files.
* Existing data size of the partition: 955 bytes
* 1st file size in bytes: 479
* 2nd files size in bytes: 476
*/

// Does not need compaction because ratio of uncompacted/compacted file sizes = 0%, all files compacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "10bytes");
assertFalse(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 50%, above allowed threshold of 10%.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "477bytes");
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

// Needs compaction because ratio of uncompacted/compacted file sizes = 100%, all files uncompacted.
conf.set(HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD.varname, "1Mb");
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));

shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_1,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement(testTables.getInsertQuery(CUSTOMER_RECORDS_2,
TableIdentifier.of("default", "customers"), false));
shell.executeStatement("DELETE FROM customers where customer_id=0");
shell.executeStatement("DELETE FROM customers where customer_id=1");
table.refresh();

// Needs compaction because ratio of deleted records (3) exceeds threshold (1)
assertTrue(storageHandler.canCompact(conf, table, null, CompactionType.MAJOR));
} catch (Exception e) {
fail("Exception is unexpected here");
}
}
}
Loading

0 comments on commit 485cb6f

Please sign in to comment.