Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28600: Iceberg: Check that table/partition requires compaction b… #5529

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2229,12 +2229,18 @@ public static enum ConfVars {
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4,
"The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" +
" defult DirectExecutorService"),

HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false,
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.major.compaction.file.size.threshold", "96mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.minor.compaction.file.size.threshold", "16mb",
new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
ICEBERG_COMPACTION_FILE_SIZE_RATIO("hive.iceberg.compaction.file.size.ratio", 0.1f,
"Ratio of # data files below threshold / # data files above threshold above which compaction is needed"),
ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD("hive.iceberg.delete.records.threshold", 100,
"Number of delete records in a table/partition above which a file needs to be compacted."),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand Down Expand Up @@ -2232,4 +2233,59 @@ private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
private static List<FieldSchema> orderBy(VirtualColumn... exprs) {
return schema(Arrays.asList(exprs));
}

@Override
public boolean canCompact(HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table table, String partitionPath,
CompactionType compactionType) throws HiveException {
Table icebergTable = IcebergTableUtil.getTable(hiveConf, table.getTTable());
return canCompact(hiveConf, icebergTable, partitionPath, compactionType);
}

@VisibleForTesting
boolean canCompact(HiveConf hiveConf, Table icebergTable, String partitionPath,
CompactionType compactionType) throws HiveException {

if (icebergTable.currentSnapshot() == null) {
LOG.info("Table {}{} doesn't require compaction because it is empty", icebergTable,
partitionPath == null ? "" : " partition " + partitionPath);
return false;
}

int deleteRecordsThreshold = HiveConf.getIntVar(hiveConf, ConfVars.ICEBERG_COMPACTION_DELETE_RECORDS_THRESHOLD);
long deleteRecordsCount = IcebergTableUtil.countDeleteRecords(icebergTable, partitionPath);
if (deleteRecordsCount > deleteRecordsThreshold) {
return true;
}

int dataFilesCount = IcebergTableUtil.getDataFiles(icebergTable, partitionPath).size();
if (dataFilesCount < 2) {
LOG.info("Table {}{} doesn't require compaction because it has less than 2 data files", icebergTable,
partitionPath == null ? "" : " partition " + partitionPath);
return false;
}

long fileSizeInBytesThreshold;
switch (compactionType) {
case MAJOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
case MINOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
default:
throw new HiveException(String.format("Invalid compaction type: %s", compactionType.name()));
}

float fileSizeRatioThreshold = HiveConf.getFloatVar(hiveConf, ConfVars.ICEBERG_COMPACTION_FILE_SIZE_RATIO);
float fileSizeRatio = IcebergTableUtil.getUncompactedRatio(icebergTable, partitionPath, fileSizeInBytesThreshold);
if (fileSizeRatio > fileSizeRatioThreshold) {
return true;
}
LOG.info("Table {}{} doesn't require compaction because its uncompacted ratio of {} is below the threshold of {}",
icebergTable, partitionPath == null ? "" : " partition " + partitionPath, fileSizeRatio,
fileSizeRatioThreshold);
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,52 @@ public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath)
t -> ((PositionDeletesScanTask) t).file()));
}

public static float getUncompactedRatio(Table table, String partitionPath, long fileSizeInBytesThreshold) {
long uncompactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, true);
long compactedFilesCount = getDataFileCount(table, partitionPath, fileSizeInBytesThreshold, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the fileSizeInBytesThreshold is the comparator if a file is uncompacted or compacted?
Isn't there a better approach to make this check?

Copy link
Contributor Author

@difin difin Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A file is decided to be compacted or uncompacted based on comparing its actual size with the threshold defined in Conf depending on compaction type:

HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.major.compaction.file.size.threshold", "96mb",
        new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),
HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD("hive.iceberg.minor.compaction.file.size.threshold", "16mb",
        new SizeValidator(), "Iceberg data file size in megabytes below which a file needs to be compacted."),

fileSizeInBytesThreshold gets the value from these configs depending on compaction type.


if (uncompactedFilesCount == 0) {
return 0;
} else if (compactedFilesCount == 0) {
return 1;
} else {
return uncompactedFilesCount * 1.0f / (uncompactedFilesCount + compactedFilesCount);
}
}

private static long getDataFileCount(Table table, String partitionPath, long fileSizeInBytesThreshold,
boolean isLess) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return (!table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use better separation between operands to make it readable/understandable and reliable, since now the precedence can cause issues.

Like: ((a==b && c!=2) && (a!=b && c=2) && (...) )

Copy link
Contributor Author

@difin difin Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I used braces in very similar case, and @deniskuzZ asked to remove them: #5389 (comment)

The precedence won't cause issues because in Java the && operand has higher precedence over ||, so it will be evaluated deterministically.

partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath)) &&
(isLess ? file.fileSizeInBytes() < fileSizeInBytesThreshold :
file.fileSizeInBytes() >= fileSizeInBytesThreshold);
});
return Lists.newArrayList(filteredFileScanTasks).size();
}

public static long countDeleteRecords(Table table, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return !table.spec().isPartitioned() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use better separation between operands to make it readable/understandable and reliable, since now the precedence can cause issues.

Like: ((a==b && c!=2) && (a!=b && c=2) && (...) )

Copy link
Contributor Author

@difin difin Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to previous comment, previously I used braces in very similar case, and @deniskuzZ asked to remove them: #5389 (comment)

The precedence won't cause issues because in Java the && operand has higher precedence over ||, so it will be evaluated deterministically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highly disagree. Please check this:

boolean x = false;
boolean y = true;
boolean z = true;
if(x && y ||z){
  System.out.println("true");
}else{
  System.out.println("false");
}

if(x && (y ||z)){
  System.out.println("true");
}else{
  System.out.println("false");
}

The result for the first is true, for the second it is false.

Copy link
Contributor Author

@difin difin Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be no contradiction, your test confirms that && has higher precedence than || as it gets evaluated first.

First case:

x && y || z
false && true || true
false || true
true

Second case:

x && (y || z)
false && (true || true)
false && (true)
false

partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file().recordCount())).stream().mapToLong(Long::longValue).sum();
}

public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table).stream()
Expand Down
Loading
Loading