Skip to content

Commit

Permalink
Add catching and logging of exceptions for s3 scan worker (opensearch…
Browse files Browse the repository at this point in the history
…-project#3159)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 15, 2023
1 parent f11d882 commit 252a0dd
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,14 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())
.collect(Collectors.toList()));

LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket);

mostRecentLastModifiedTimestamp = getMostRecentLastModifiedTimestamp(listObjectsV2Response, mostRecentLastModifiedTimestamp);
} while (listObjectsV2Response.isTruncated());

globalStateMap.put(bucket, Objects.nonNull(mostRecentLastModifiedTimestamp) ? mostRecentLastModifiedTimestamp.toString() : null);

LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket);
return allPartitionIdentifiers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ScanObjectWorker implements Runnable{
private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class);

private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;

static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE;
static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
Expand Down Expand Up @@ -99,7 +100,18 @@ public ScanObjectWorker(final S3Client s3Client,
@Override
public void run() {
while (!shouldStopProcessing) {
startProcessingObject(STANDARD_BACKOFF_MILLIS);

try {
startProcessingObject(STANDARD_BACKOFF_MILLIS);
} catch (final Exception e) {
LOG.error("Received an exception while processing S3 objects, backing off and retrying", e);
try {
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
} catch (InterruptedException ex) {
LOG.error("S3 Scan worker thread interrupted while backing off.", ex);
}
}

}
}

Expand Down

0 comments on commit 252a0dd

Please sign in to comment.