diff --git a/README.md b/README.md index ba3b467e..c928436e 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo | ------------------------ | --------------------------------- | ----------------------------- | ------------------------------- | | [Creation][] | ✅ | ✅ | ✅ | | [Creation With Upload][] | ✅ | ✅ | ✅ | -| [Expiration][] | ✅ | ❌ | ❌ | +| [Expiration][] | ✅ | ✅ | ❌ | | [Checksum][] | ❌ | ❌ | ❌ | | [Termination][] | ✅ | ✅ | ❌ | | [Concatenation][] | ❌ | ❌ | ❌ | diff --git a/packages/s3-store/README.md b/packages/s3-store/README.md index 20c25504..5aec2d00 100644 --- a/packages/s3-store/README.md +++ b/packages/s3-store/README.md @@ -79,7 +79,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo | ------------------------ | --------------- | | [Creation][] | ✅ | | [Creation With Upload][] | ✅ | -| [Expiration][] | ❌ | +| [Expiration][] | ✅ | | [Checksum][] | ❌ | | [Termination][] | ✅ | | [Concatenation][] | ❌ | @@ -88,6 +88,32 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo After a multipart upload is aborted, no additional parts can be uploaded using that upload ID. The storage consumed by any previously uploaded parts will be freed. However, if any part uploads are currently in progress, those part uploads might or might not succeed. As a result, it might be necessary to set an [S3 Lifecycle configuration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html) to abort incomplete multipart uploads. +### Expiration + +Unlike other stores, the expiration extension on the S3 store does not need to call [`server.cleanExpiredUploads()`][cleanExpiredUploads]. +The store creates a `Tus-Complete` tag for all objects, including `.part` and `.info` files, to indicate whether an upload is finished. +This means you could setup a [lifecyle][] policy to automatically clean them up without a CRON job. + +```json +{ + "Rules": [ + { + "Filter": { + "Tag": { + "Key": "Tus-Complete", + "Value": "false" + } + }, + "Expiration": { + "Days": 2 + } + } + ] +} +``` + +If you want more granularity, it is still possible to configure a CRON job to call [`server.cleanExpiredUploads()`][cleanExpiredUploads] yourself. + ## Examples ### Example: using `credentials` to fetch credentials inside a AWS container @@ -137,3 +163,5 @@ See [`contributing.md`](https://github.com/tus/tus-node-server/blob/main/.github [checksum]: https://tus.io/protocols/resumable-upload.html#checksum [termination]: https://tus.io/protocols/resumable-upload.html#termination [concatenation]: https://tus.io/protocols/resumable-upload.html#concatenation +[cleanExpiredUploads]: https://github.com/tus/tus-node-server/tree/main/packages/server#servercleanupexpireduploads +[lifecyle]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html diff --git a/packages/s3-store/index.ts b/packages/s3-store/index.ts index e81c5dd7..bcb412ad 100644 --- a/packages/s3-store/index.ts +++ b/packages/s3-store/index.ts @@ -22,6 +22,7 @@ type Options = { // The server calculates the optimal part size, which takes this size into account, // but may increase it to not exceed the S3 10K parts limit. partSize?: number + expirationPeriodInMilliseconds?: number // Options to pass to the AWS S3 SDK. s3ClientConfig: S3ClientConfig & {bucket: string} } @@ -69,6 +70,7 @@ export class S3Store extends DataStore { private cache: Map = new Map() private client: S3 private preferredPartSize: number + private expirationPeriodInMilliseconds = 0 public maxMultipartParts = 10_000 as const public minPartSize = 5_242_880 as const // 5MiB public maxUploadSize = 5_497_558_138_880 as const // 5TiB @@ -82,9 +84,11 @@ export class S3Store extends DataStore { 'creation-with-upload', 'creation-defer-length', 'termination', + 'expiration', ] this.bucket = bucket this.preferredPartSize = partSize || 8 * 1024 * 1024 + this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0 this.client = new S3(restS3ClientConfig) } @@ -98,8 +102,9 @@ export class S3Store extends DataStore { log(`[${upload.id}] saving metadata`) await this.client.putObject({ Bucket: this.bucket, - Key: `${upload.id}.info`, + Key: this.infoKey(upload.id), Body: JSON.stringify(upload), + Tagging: `Tus-Completed=false`, Metadata: { 'upload-id': uploadId, 'tus-version': TUS_RESUMABLE, @@ -108,6 +113,20 @@ export class S3Store extends DataStore { log(`[${upload.id}] metadata file saved`) } + private async completeMetadata(upload: Upload) { + const {'upload-id': uploadId} = await this.getMetadata(upload.id) + await this.client.putObject({ + Bucket: this.bucket, + Key: this.infoKey(upload.id), + Body: JSON.stringify(upload), + Tagging: `Tus-Completed=true`, + Metadata: { + 'upload-id': uploadId, + 'tus-version': TUS_RESUMABLE, + }, + }) + } + /** * Retrieves upload metadata previously saved in `${file_id}.info`. * There's a small and simple caching mechanism to avoid multiple @@ -121,7 +140,7 @@ export class S3Store extends DataStore { const {Metadata, Body} = await this.client.getObject({ Bucket: this.bucket, - Key: `${id}.info`, + Key: this.infoKey(id), }) const file = JSON.parse((await Body?.transformToString()) as string) this.cache.set(id, { @@ -132,11 +151,16 @@ export class S3Store extends DataStore { size: file.size ? Number.parseInt(file.size, 10) : undefined, offset: Number.parseInt(file.offset, 10), metadata: file.metadata, + creation_date: file.creation_date, }), }) return this.cache.get(id) as MetadataValue } + private infoKey(id: string) { + return `${id}.info` + } + private partKey(id: string, isIncomplete = false) { if (isIncomplete) { id += '.part' @@ -173,6 +197,7 @@ export class S3Store extends DataStore { Bucket: this.bucket, Key: this.partKey(id, true), Body: readStream, + Tagging: 'Tus-Completed=false', }) log(`[${id}] finished uploading incomplete part`) return data.ETag as string @@ -452,6 +477,8 @@ export class S3Store extends DataStore { request.ContentType = upload.metadata.contentType } + upload.creation_date = new Date().toISOString() + const res = await this.client.createMultipartUpload(request) await this.saveMetadata(upload, res.UploadId as string) log(`[${upload.id}] multipart upload created (${res.UploadId})`) @@ -495,6 +522,7 @@ export class S3Store extends DataStore { try { const parts = await this.retrieveParts(id) await this.finishMultipartUpload(metadata, parts) + await this.completeMetadata(metadata.file) this.clearCache(id) } catch (error) { log(`[${id}] failed to finish upload`, error) @@ -558,7 +586,7 @@ export class S3Store extends DataStore { file.size = upload_length - this.saveMetadata(file, uploadId) + await this.saveMetadata(file, uploadId) } public async remove(id: string): Promise { @@ -582,10 +610,101 @@ export class S3Store extends DataStore { await this.client.deleteObjects({ Bucket: this.bucket, Delete: { - Objects: [{Key: id}, {Key: `${id}.info`}], + Objects: [{Key: id}, {Key: this.infoKey(id)}], }, }) this.clearCache(id) } + + protected getExpirationDate(created_at: string) { + const date = new Date(created_at) + + return new Date(date.getTime() + this.getExpiration()) + } + + getExpiration(): number { + return this.expirationPeriodInMilliseconds + } + + async deleteExpired(): Promise { + if (this.getExpiration() === 0) { + return 0 + } + + let keyMarker: string | undefined = undefined + let uploadIdMarker: string | undefined = undefined + let isTruncated = true + let deleted = 0 + + while (isTruncated) { + const listResponse: AWS.ListMultipartUploadsCommandOutput = + await this.client.listMultipartUploads({ + Bucket: this.bucket, + KeyMarker: keyMarker, + UploadIdMarker: uploadIdMarker, + }) + + const expiredUploads = + listResponse.Uploads?.filter((multiPartUpload) => { + const initiatedDate = multiPartUpload.Initiated + return ( + initiatedDate && + new Date().getTime() > + this.getExpirationDate(initiatedDate.toISOString()).getTime() + ) + }) || [] + + const objectsToDelete = expiredUploads.reduce((all, expiredUpload) => { + all.push( + { + key: this.infoKey(expiredUpload.Key as string), + }, + { + key: this.partKey(expiredUpload.Key as string, true), + } + ) + return all + }, [] as {key: string}[]) + + const deletions: Promise[] = [] + + // Batch delete 1000 items at a time + while (objectsToDelete.length > 0) { + const objects = objectsToDelete.splice(0, 1000) + deletions.push( + this.client.deleteObjects({ + Bucket: this.bucket, + Delete: { + Objects: objects.map((object) => ({ + Key: object.key, + })), + }, + }) + ) + } + + const [objectsDeleted] = await Promise.all([ + Promise.all(deletions), + ...expiredUploads.map((expiredUpload) => { + return this.client.abortMultipartUpload({ + Bucket: this.bucket, + Key: expiredUpload.Key, + UploadId: expiredUpload.UploadId, + }) + }), + ]) + + deleted += objectsDeleted.reduce((all, acc) => all + (acc.Deleted?.length ?? 0), 0) + + isTruncated = Boolean(listResponse.IsTruncated) + + if (isTruncated) { + keyMarker = listResponse.NextKeyMarker + uploadIdMarker = listResponse.NextUploadIdMarker + } + } + + return deleted + } } diff --git a/test/package.json b/test/package.json index 4f15a025..e5833471 100644 --- a/test/package.json +++ b/test/package.json @@ -4,7 +4,7 @@ "private": true, "scripts": { "build": "tsc", - "test": "mocha e2e.test.ts --timeout 40000 --exit --extension ts --require ts-node/register" + "test": "mocha e2e.test.ts s3.e2e.ts --timeout 40000 --exit --extension ts --require ts-node/register" }, "dependencies": { "@tus/file-store": "workspace:^", diff --git a/test/s3.e2e.ts b/test/s3.e2e.ts new file mode 100644 index 00000000..7b1f21a1 --- /dev/null +++ b/test/s3.e2e.ts @@ -0,0 +1,207 @@ +import {S3Store} from '@tus/s3-store' +import {Server, TUS_RESUMABLE} from '@tus/server' +import {SuperAgentTest} from 'supertest' +import request from 'supertest' +import http from 'node:http' +import {describe} from 'node:test' +import {strict as assert} from 'node:assert' +import {S3, S3ServiceException} from '@aws-sdk/client-s3' + +const STORE_PATH = '/upload' + +interface S3Options { + partSize?: number + expirationPeriodInMilliseconds?: number +} + +const s3Credentials = { + bucket: process.env.AWS_BUCKET as string, + region: process.env.AWS_REGION, + credentials: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID as string, + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string, + }, +} + +const s3Client = new S3(s3Credentials) + +const createStore = (options: S3Options = {}) => + new S3Store({ + ...options, + s3ClientConfig: s3Credentials, + }) + +const createUpload = async (agent: SuperAgentTest, uploadLength: number) => { + const response = await agent + .post(STORE_PATH) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Length', uploadLength.toString()) + .expect(201) + + assert(Boolean(response.headers.location), 'location not returned') + const uploadId = response.headers.location.split('/').pop() + return {uploadId: uploadId as string, expires: response.headers['upload-expires']} +} + +const allocMB = (mb: number) => Buffer.alloc(1024 * 1024 * mb) +const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + +const patchUpload = async ( + agent: SuperAgentTest, + uploadId: string, + data: Buffer, + offset = 0 +) => { + const res = await agent + .patch(`${STORE_PATH}/${uploadId}`) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Offset', offset.toString()) + .set('Content-Type', 'application/offset+octet-stream') + .send(data) + .expect(204) + + return {offset: parseInt(res.headers['upload-offset'], 10)} +} + +describe('S3 Store E2E', () => { + describe('Expiration extension', () => { + let server: Server + let listener: http.Server + let agent: SuperAgentTest + let store: S3Store + + before((done) => { + store = createStore({ + expirationPeriodInMilliseconds: 5000, + partSize: 5_242_880, + }) + server = new Server({ + path: STORE_PATH, + datastore: store, + }) + listener = server.listen() + agent = request.agent(listener) + done() + }) + + after((done) => { + listener.close(done) + }) + + it('should set Tus-Completed=false when the upload is not completed', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + await patchUpload(agent, uploadId, data.subarray(0, 1024 * 1024 * 6)) + + const {TagSet} = await s3Client.getObjectTagging({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }) + + assert( + TagSet?.find((tag) => tag.Key === 'Tus-Completed')?.Value === 'false', + 'object tag Tus-Completed not set to "false"' + ) + }) + + it('should set Tus-Completed=true when the upload is completed', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + const {offset} = await patchUpload( + agent, + uploadId, + data.subarray(0, 1024 * 1024 * 6) + ) + + await patchUpload(agent, uploadId, data.subarray(offset), offset) + + const {TagSet} = await s3Client.getObjectTagging({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }) + + assert( + TagSet?.find((tag) => tag.Key === 'Tus-Completed')?.Value === 'true', + 'object tag Tus-Completed not set to "true"' + ) + }) + + it('calling deleteExpired will delete all expired objects', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + await patchUpload(agent, uploadId, data.subarray(0, 1024 * 1024 * 6)) + + const [infoFile, partFile] = await Promise.all([ + s3Client.getObject({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }), + s3Client.getObject({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.part', + }), + ]) + + await store.deleteExpired() + + // make sure the files are not deleted + assert(infoFile.$metadata.httpStatusCode === 200) + assert(partFile.$metadata.httpStatusCode === 200) + + await wait(5000) + + // .info file and .part should be deleted since now they should be expired + const deletedFiles = await store.deleteExpired() + assert(deletedFiles === 2, `not all parts were deleted, deleted ${deletedFiles}`) + + const files = await Promise.allSettled([ + s3Client.getObject({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }), + s3Client.getObject({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.part', + }), + ]) + + assert( + files.every((p) => p.status === 'rejected') === true, + 'fetching deleted object succeeded' + ) + + assert( + files.every((p) => { + assert(p.status === 'rejected') + assert( + p.reason instanceof S3ServiceException, + 'error is not of type S3ServiceException' + ) + + return p.reason.$metadata.httpStatusCode === 404 + }) === true, + 'not all rejections were 404' + ) + }) + + it('will not allow to upload to an expired url', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + const {offset} = await patchUpload( + agent, + uploadId, + data.subarray(0, 1024 * 1024 * 6) + ) + + await wait(5000) + + await agent + .patch(`${STORE_PATH}/${uploadId}`) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Offset', offset.toString()) + .set('Content-Type', 'application/offset+octet-stream') + .send(data.subarray(offset)) + .expect(410) + }) + }) +})