Skip to content

Commit

Permalink
Addressed PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed committed Jul 26, 2023
1 parent b14dd35 commit eb8d0a3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 19 deletions.
6 changes: 3 additions & 3 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ All Duration values are a string that represents a duration. They support ISO_86

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

* `delete_s3_objects` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after processing. If `acknowledgments` is enabled, S3 objects will be delete only if positive acknowledgment is received by S3 source. Defaults to `false`.
* `delete_s3_objects` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after processing. If `acknowledgments` is enabled, S3 objects will be deleted only if positive acknowledgment is received by S3 source. Defaults to `false`.

### <a name="s3_select_configuration">S3 Select Configuration</a>

Expand Down Expand Up @@ -171,10 +171,10 @@ Schedule frequency and amount of times an object should be processed when using
a `rate` of `PT1H` and a `job_count` of 3 would result in each object getting processed 3 times, starting after source is ready
and then every hour after the first time the object is processed.

* `rate` (Optional) : A String that indicates the rate to process an index based on the `job_count`.
* `rate` (Optional) : A String that indicates the rate to process an S3 object based on the `job_count`.
Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
Defaults to 8 hours, and is only applicable when `job_count` is greater than 1.
* `job_count` (Optional) : An Integer that specifies how many times each index should be processed. Defaults to 1.
* `job_count` (Optional) : An Integer that specifies how many times each S3 object should be processed. Defaults to 1.


### <a name="aws_configuration">AWS Configuration</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETE_FAILED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;

@ExtendWith(MockitoExtension.class)
public class S3ScanObjectWorkerIT {
Expand Down Expand Up @@ -123,20 +123,13 @@ private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequ

@BeforeEach
void setUp() {
// s3Client = S3Client.builder()
// .region(Region.of(System.getProperty("tests.s3source.region")))
// .build();
// s3AsyncClient = S3AsyncClient.builder()
// .region(Region.of(System.getProperty("tests.s3source.region")))
// .build();
// bucket = System.getProperty("tests.s3source.bucket");
s3Client = S3Client.builder()
.region(Region.US_EAST_1)
.region(Region.of(System.getProperty("tests.s3source.region")))
.build();
s3AsyncClient = S3AsyncClient.builder()
.region(Region.US_EAST_1)
.region(Region.of(System.getProperty("tests.s3source.region")))
.build();
bucket = "s3-logs-nsifmoh";
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);

Expand Down Expand Up @@ -206,7 +199,7 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
.compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE)
.s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build();

when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME)).thenReturn(acknowledgementCounter);
when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(acknowledgementCounter);
when(pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME)).thenReturn(s3DeletedCounter);
when(pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME)).thenReturn(s3DeleteFailedCounter);
S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3Client, pluginMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public class ScanObjectWorker implements Runnable{

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

// Keeping this same as source coordinator ownership time
private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = 10_000;
static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE;
static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter";

private final S3Client s3Client;

Expand Down Expand Up @@ -91,7 +90,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.deleteS3Objects = s3SourceConfig.isDeleteS3Objects();
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
this.pluginMetrics = pluginMetrics;
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME);
this.sourceCoordinator.initialize();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList);
Expand Down

0 comments on commit eb8d0a3

Please sign in to comment.