summaryrefslogtreecommitdiffstats
path: root/lib/private/Files
diff options
context:
space:
mode:
authorJulius Härtl <jus@bitgrid.net>2021-05-06 18:26:42 +0200
committerJulius Härtl <jus@bitgrid.net>2023-03-08 14:00:04 +0100
commite23aa8883ec0dff03b973fb0bf690cb8482218cf (patch)
treea0ced21511e1dc90d3bd450ea39d88cda0d729b2 /lib/private/Files
parent80e12cf72608b7c5776f02f04da98d7a5968bc73 (diff)
downloadnextcloud-server-e23aa8883ec0dff03b973fb0bf690cb8482218cf.tar.gz
nextcloud-server-e23aa8883ec0dff03b973fb0bf690cb8482218cf.zip
feat(s3): Use multipart upload for chunked uploading
This allows to stream file chunks directly to S3 during upload. Signed-off-by: Julius Härtl <jus@bitgrid.net>
Diffstat (limited to 'lib/private/Files')
-rw-r--r--lib/private/Files/ObjectStore/ObjectStoreStorage.php76
-rw-r--r--lib/private/Files/ObjectStore/S3.php60
2 files changed, 133 insertions, 3 deletions
diff --git a/lib/private/Files/ObjectStore/ObjectStoreStorage.php b/lib/private/Files/ObjectStore/ObjectStoreStorage.php
index d0c5bd14b38..4ca00cf6a16 100644
--- a/lib/private/Files/ObjectStore/ObjectStoreStorage.php
+++ b/lib/private/Files/ObjectStore/ObjectStoreStorage.php
@@ -29,6 +29,8 @@
*/
namespace OC\Files\ObjectStore;
+use Aws\S3\Exception\S3Exception;
+use Aws\S3\Exception\S3MultipartUploadException;
use Icewind\Streams\CallbackWrapper;
use Icewind\Streams\CountWrapper;
use Icewind\Streams\IteratorDirectory;
@@ -37,11 +39,14 @@ use OC\Files\Cache\CacheEntry;
use OC\Files\Storage\PolyFill\CopyDirectory;
use OCP\Files\Cache\ICacheEntry;
use OCP\Files\FileInfo;
+use OCP\Files\GenericFileException;
use OCP\Files\NotFoundException;
use OCP\Files\ObjectStore\IObjectStore;
+use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
+use OCP\Files\Storage\IChunkedFileWrite;
use OCP\Files\Storage\IStorage;
-class ObjectStoreStorage extends \OC\Files\Storage\Common {
+class ObjectStoreStorage extends \OC\Files\Storage\Common implements IChunkedFileWrite {
use CopyDirectory;
/**
@@ -91,7 +96,6 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
public function mkdir($path) {
$path = $this->normalizePath($path);
-
if ($this->file_exists($path)) {
return false;
}
@@ -627,4 +631,72 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
throw $e;
}
}
+
+ public function startChunkedWrite(string $targetPath): string {
+ if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) {
+ throw new GenericFileException('Object store does not support multipart upload');
+ }
+ $cacheEntry = $this->getCache()->get($targetPath);
+ $urn = $this->getURN($cacheEntry->getId());
+ return $this->objectStore->initiateMultipartUpload($urn);
+ }
+
+ /**
+ *
+ * @throws GenericFileException
+ */
+ public function putChunkedWritePart(string $targetPath, string $writeToken, string $chunkId, $data, $size = null): ?array {
+ if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) {
+ throw new GenericFileException('Object store does not support multipart upload');
+ }
+ $cacheEntry = $this->getCache()->get($targetPath);
+ $urn = $this->getURN($cacheEntry->getId());
+
+ $result = $this->objectStore->uploadMultipartPart($urn, $writeToken, (int)$chunkId, $data, $size);
+
+ $parts[$chunkId] = [
+ 'PartNumber' => $chunkId,
+ 'ETag' => trim($result->get('ETag'), '"')
+ ];
+ return $parts[$chunkId];
+ }
+
+ public function completeChunkedWrite(string $targetPath, string $writeToken): int {
+ if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) {
+ throw new GenericFileException('Object store does not support multipart upload');
+ }
+ $cacheEntry = $this->getCache()->get($targetPath);
+ $urn = $this->getURN($cacheEntry->getId());
+ $parts = $this->objectStore->getMultipartUploads($urn, $writeToken);
+ $sortedParts = array_values($parts);
+ sort($sortedParts);
+ try {
+ $size = $this->objectStore->completeMultipartUpload($urn, $writeToken, $sortedParts);
+ $stat = $this->stat($targetPath);
+ $mtime = time();
+ if (is_array($stat)) {
+ $stat['size'] = $size;
+ $stat['mtime'] = $mtime;
+ $stat['mimetype'] = $this->getMimeType($targetPath);
+ $this->getCache()->update($stat['fileid'], $stat);
+ }
+ } catch (S3MultipartUploadException | S3Exception $e) {
+ $this->objectStore->abortMultipartUpload($urn, $writeToken);
+ $this->logger->logException($e, [
+ 'app' => 'objectstore',
+ 'message' => 'Could not compete multipart upload ' . $urn. ' with uploadId ' . $writeToken
+ ]);
+ throw new GenericFileException('Could not write chunked file');
+ }
+ return $size;
+ }
+
+ public function cancelChunkedWrite(string $targetPath, string $writeToken): void {
+ if (!$this->objectStore instanceof IObjectStoreMultiPartUpload) {
+ throw new GenericFileException('Object store does not support multipart upload');
+ }
+ $cacheEntry = $this->getCache()->get($targetPath);
+ $urn = $this->getURN($cacheEntry->getId());
+ $this->objectStore->abortMultipartUpload($urn, $writeToken);
+ }
}
diff --git a/lib/private/Files/ObjectStore/S3.php b/lib/private/Files/ObjectStore/S3.php
index 6492145fb63..ebc8886f12d 100644
--- a/lib/private/Files/ObjectStore/S3.php
+++ b/lib/private/Files/ObjectStore/S3.php
@@ -23,9 +23,12 @@
*/
namespace OC\Files\ObjectStore;
+use Aws\Result;
+use Exception;
use OCP\Files\ObjectStore\IObjectStore;
+use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
-class S3 implements IObjectStore {
+class S3 implements IObjectStore, IObjectStoreMultiPartUpload {
use S3ConnectionTrait;
use S3ObjectTrait;
@@ -41,4 +44,59 @@ class S3 implements IObjectStore {
public function getStorageId() {
return $this->id;
}
+
+ public function initiateMultipartUpload(string $urn): string {
+ $upload = $this->getConnection()->createMultipartUpload([
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ ]);
+ $uploadId = $upload->get('UploadId');
+ if ($uploadId === null) {
+ throw new Exception('No upload id returned');
+ }
+ return (string)$uploadId;
+ }
+
+ public function uploadMultipartPart(string $urn, string $uploadId, int $partId, $stream, $size): Result {
+ return $this->getConnection()->uploadPart([
+ 'Body' => $stream,
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ 'ContentLength' => $size,
+ 'PartNumber' => $partId,
+ 'UploadId' => $uploadId,
+ ]);
+ }
+
+ public function getMultipartUploads(string $urn, string $uploadId): array {
+ $parts = $this->getConnection()->listParts([
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ 'UploadId' => $uploadId,
+ 'MaxParts' => 10000
+ ]);
+ return $parts->get('Parts') ?? [];
+ }
+
+ public function completeMultipartUpload(string $urn, string $uploadId, array $result): int {
+ $this->getConnection()->completeMultipartUpload([
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ 'UploadId' => $uploadId,
+ 'MultipartUpload' => ['Parts' => $result],
+ ]);
+ $stat = $this->getConnection()->headObject([
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ ]);
+ return (int)$stat->get('ContentLength');
+ }
+
+ public function abortMultipartUpload($urn, $uploadId): void {
+ $this->getConnection()->abortMultipartUpload([
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ 'UploadId' => $uploadId
+ ]);
+ }
}