diff options
author | Julius Härtl <jus@bitgrid.net> | 2021-05-06 18:26:42 +0200 |
---|---|---|
committer | Julius Härtl <jus@bitgrid.net> | 2023-03-08 14:00:04 +0100 |
commit | e23aa8883ec0dff03b973fb0bf690cb8482218cf (patch) | |
tree | a0ced21511e1dc90d3bd450ea39d88cda0d729b2 /lib/private/Files | |
parent | 80e12cf72608b7c5776f02f04da98d7a5968bc73 (diff) | |
download | nextcloud-server-e23aa8883ec0dff03b973fb0bf690cb8482218cf.tar.gz nextcloud-server-e23aa8883ec0dff03b973fb0bf690cb8482218cf.zip |
feat(s3): Use multipart upload for chunked uploading
This allows to stream file chunks directly to S3 during upload.
Signed-off-by: Julius Härtl <jus@bitgrid.net>
Diffstat (limited to 'lib/private/Files')
-rw-r--r-- | lib/private/Files/ObjectStore/ObjectStoreStorage.php | 76 | ||||
-rw-r--r-- | lib/private/Files/ObjectStore/S3.php | 60 |
2 files changed, 133 insertions, 3 deletions
diff --git a/lib/private/Files/ObjectStore/ObjectStoreStorage.php b/lib/private/Files/ObjectStore/ObjectStoreStorage.php index d0c5bd14b38..4ca00cf6a16 100644 --- a/lib/private/Files/ObjectStore/ObjectStoreStorage.php +++ b/lib/private/Files/ObjectStore/ObjectStoreStorage.php @@ -29,6 +29,8 @@ */ namespace OC\Files\ObjectStore; +use Aws\S3\Exception\S3Exception; +use Aws\S3\Exception\S3MultipartUploadException; use Icewind\Streams\CallbackWrapper; use Icewind\Streams\CountWrapper; use Icewind\Streams\IteratorDirectory; @@ -37,11 +39,14 @@ use OC\Files\Cache\CacheEntry; use OC\Files\Storage\PolyFill\CopyDirectory; use OCP\Files\Cache\ICacheEntry; use OCP\Files\FileInfo; +use OCP\Files\GenericFileException; use OCP\Files\NotFoundException; use OCP\Files\ObjectStore\IObjectStore; +use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload; +use OCP\Files\Storage\IChunkedFileWrite; use OCP\Files\Storage\IStorage; -class ObjectStoreStorage extends \OC\Files\Storage\Common { +class ObjectStoreStorage extends \OC\Files\Storage\Common implements IChunkedFileWrite { use CopyDirectory; /** @@ -91,7 +96,6 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common { public function mkdir($path) { $path = $this->normalizePath($path); - if ($this->file_exists($path)) { return false; } @@ -627,4 +631,72 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common { throw $e; } } + + public function startChunkedWrite(string $targetPath): string { + if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) { + throw new GenericFileException('Object store does not support multipart upload'); + } + $cacheEntry = $this->getCache()->get($targetPath); + $urn = $this->getURN($cacheEntry->getId()); + return $this->objectStore->initiateMultipartUpload($urn); + } + + /** + * + * @throws GenericFileException + */ + public function putChunkedWritePart(string $targetPath, string $writeToken, string $chunkId, $data, $size = null): ?array { + if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) { + throw new GenericFileException('Object store does not support multipart upload'); + } + $cacheEntry = $this->getCache()->get($targetPath); + $urn = $this->getURN($cacheEntry->getId()); + + $result = $this->objectStore->uploadMultipartPart($urn, $writeToken, (int)$chunkId, $data, $size); + + $parts[$chunkId] = [ + 'PartNumber' => $chunkId, + 'ETag' => trim($result->get('ETag'), '"') + ]; + return $parts[$chunkId]; + } + + public function completeChunkedWrite(string $targetPath, string $writeToken): int { + if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) { + throw new GenericFileException('Object store does not support multipart upload'); + } + $cacheEntry = $this->getCache()->get($targetPath); + $urn = $this->getURN($cacheEntry->getId()); + $parts = $this->objectStore->getMultipartUploads($urn, $writeToken); + $sortedParts = array_values($parts); + sort($sortedParts); + try { + $size = $this->objectStore->completeMultipartUpload($urn, $writeToken, $sortedParts); + $stat = $this->stat($targetPath); + $mtime = time(); + if (is_array($stat)) { + $stat['size'] = $size; + $stat['mtime'] = $mtime; + $stat['mimetype'] = $this->getMimeType($targetPath); + $this->getCache()->update($stat['fileid'], $stat); + } + } catch (S3MultipartUploadException | S3Exception $e) { + $this->objectStore->abortMultipartUpload($urn, $writeToken); + $this->logger->logException($e, [ + 'app' => 'objectstore', + 'message' => 'Could not compete multipart upload ' . $urn. ' with uploadId ' . $writeToken + ]); + throw new GenericFileException('Could not write chunked file'); + } + return $size; + } + + public function cancelChunkedWrite(string $targetPath, string $writeToken): void { + if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) { + throw new GenericFileException('Object store does not support multipart upload'); + } + $cacheEntry = $this->getCache()->get($targetPath); + $urn = $this->getURN($cacheEntry->getId()); + $this->objectStore->abortMultipartUpload($urn, $writeToken); + } } diff --git a/lib/private/Files/ObjectStore/S3.php b/lib/private/Files/ObjectStore/S3.php index 6492145fb63..ebc8886f12d 100644 --- a/lib/private/Files/ObjectStore/S3.php +++ b/lib/private/Files/ObjectStore/S3.php @@ -23,9 +23,12 @@ */ namespace OC\Files\ObjectStore; +use Aws\Result; +use Exception; use OCP\Files\ObjectStore\IObjectStore; +use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload; -class S3 implements IObjectStore { +class S3 implements IObjectStore, IObjectStoreMultiPartUpload { use S3ConnectionTrait; use S3ObjectTrait; @@ -41,4 +44,59 @@ class S3 implements IObjectStore { public function getStorageId() { return $this->id; } + + public function initiateMultipartUpload(string $urn): string { + $upload = $this->getConnection()->createMultipartUpload([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + ]); + $uploadId = $upload->get('UploadId'); + if ($uploadId === null) { + throw new Exception('No upload id returned'); + } + return (string)$uploadId; + } + + public function uploadMultipartPart(string $urn, string $uploadId, int $partId, $stream, $size): Result { + return $this->getConnection()->uploadPart([ + 'Body' => $stream, + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'ContentLength' => $size, + 'PartNumber' => $partId, + 'UploadId' => $uploadId, + ]); + } + + public function getMultipartUploads(string $urn, string $uploadId): array { + $parts = $this->getConnection()->listParts([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'UploadId' => $uploadId, + 'MaxParts' => 10000 + ]); + return $parts->get('Parts') ?? []; + } + + public function completeMultipartUpload(string $urn, string $uploadId, array $result): int { + $this->getConnection()->completeMultipartUpload([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'UploadId' => $uploadId, + 'MultipartUpload' => ['Parts' => $result], + ]); + $stat = $this->getConnection()->headObject([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + ]); + return (int)$stat->get('ContentLength'); + } + + public function abortMultipartUpload($urn, $uploadId): void { + $this->getConnection()->abortMultipartUpload([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'UploadId' => $uploadId + ]); + } } |