aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/Files/ObjectStore/S3ObjectTrait.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/private/Files/ObjectStore/S3ObjectTrait.php')
-rw-r--r--lib/private/Files/ObjectStore/S3ObjectTrait.php227
1 files changed, 158 insertions, 69 deletions
diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php
index 8fa6d67faa3..89405de2e8e 100644
--- a/lib/private/Files/ObjectStore/S3ObjectTrait.php
+++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php
@@ -1,32 +1,15 @@
<?php
+
/**
- * @copyright Copyright (c) 2017 Robin Appelman <robin@icewind.nl>
- *
- * @author Christoph Wurst <christoph@winzerhof-wurst.at>
- * @author Florent <florent@coppint.com>
- * @author Morris Jobke <hey@morrisjobke.de>
- * @author Robin Appelman <robin@icewind.nl>
- * @author Roeland Jago Douma <roeland@famdouma.nl>
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
+ * SPDX-FileCopyrightText: 2017 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Files\ObjectStore;
+use Aws\Command;
+use Aws\Exception\MultipartUploadException;
use Aws\S3\Exception\S3MultipartUploadException;
+use Aws\S3\MultipartCopy;
use Aws\S3\MultipartUploader;
use Aws\S3\S3Client;
use GuzzleHttp\Psr7;
@@ -35,6 +18,8 @@ use OC\Files\Stream\SeekableHttpStream;
use Psr\Http\Message\StreamInterface;
trait S3ObjectTrait {
+ use S3ConfigTrait;
+
/**
* Returns the connection
*
@@ -54,7 +39,7 @@ trait S3ObjectTrait {
* @since 7.0.0
*/
public function readObject($urn) {
- return SeekableHttpStream::open(function ($range) use ($urn) {
+ $fh = SeekableHttpStream::open(function ($range) use ($urn) {
$command = $this->getConnection()->getCommand('GetObject', [
'Bucket' => $this->bucket,
'Key' => $urn,
@@ -88,26 +73,48 @@ trait S3ObjectTrait {
$context = stream_context_create($opts);
return fopen($request->getUri(), 'r', false, $context);
});
+ if (!$fh) {
+ throw new \Exception("Failed to read object $urn");
+ }
+ return $fh;
}
+ private function buildS3Metadata(array $metadata): array {
+ $result = [];
+ foreach ($metadata as $key => $value) {
+ $result['x-amz-meta-' . $key] = $value;
+ }
+ return $result;
+ }
/**
* Single object put helper
*
* @param string $urn the unified resource name used to identify the object
* @param StreamInterface $stream stream with the data to write
- * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
+ * @param array $metaData the metadata to set for the object
* @throws \Exception when something goes wrong, message will be logged
*/
- protected function writeSingle(string $urn, StreamInterface $stream, string $mimetype = null): void {
- $this->getConnection()->putObject([
+ protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void {
+ $mimetype = $metaData['mimetype'] ?? null;
+ unset($metaData['mimetype']);
+ unset($metaData['size']);
+
+ $args = [
'Bucket' => $this->bucket,
'Key' => $urn,
'Body' => $stream,
'ACL' => 'private',
'ContentType' => $mimetype,
+ 'Metadata' => $this->buildS3Metadata($metaData),
'StorageClass' => $this->storageClass,
- ] + $this->getSSECParameters());
+ ] + $this->getSSECParameters();
+
+ if ($size = $stream->getSize()) {
+ $args['ContentLength'] = $size;
+ }
+
+ $this->getConnection()->putObject($args);
}
@@ -116,56 +123,116 @@ trait S3ObjectTrait {
*
* @param string $urn the unified resource name used to identify the object
* @param StreamInterface $stream stream with the data to write
- * @param string|null $mimetype the mimetype to set for the remove object
+ * @param array $metaData the metadata to set for the object
* @throws \Exception when something goes wrong, message will be logged
*/
- protected function writeMultiPart(string $urn, StreamInterface $stream, string $mimetype = null): void {
- $uploader = new MultipartUploader($this->getConnection(), $stream, [
- 'bucket' => $this->bucket,
- 'key' => $urn,
- 'part_size' => $this->uploadPartSize,
- 'params' => [
- 'ContentType' => $mimetype,
- 'StorageClass' => $this->storageClass,
- ] + $this->getSSECParameters(),
- ]);
+ protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void {
+ $mimetype = $metaData['mimetype'] ?? null;
+ unset($metaData['mimetype']);
+ unset($metaData['size']);
+
+ $attempts = 0;
+ $uploaded = false;
+ $concurrency = $this->concurrency;
+ $exception = null;
+ $state = null;
+ $size = $stream->getSize();
+ $totalWritten = 0;
+
+ // retry multipart upload once with concurrency at half on failure
+ while (!$uploaded && $attempts <= 1) {
+ $uploader = new MultipartUploader($this->getConnection(), $stream, [
+ 'bucket' => $this->bucket,
+ 'concurrency' => $concurrency,
+ 'key' => $urn,
+ 'part_size' => $this->uploadPartSize,
+ 'state' => $state,
+ 'params' => [
+ 'ContentType' => $mimetype,
+ 'Metadata' => $this->buildS3Metadata($metaData),
+ 'StorageClass' => $this->storageClass,
+ ] + $this->getSSECParameters(),
+ 'before_upload' => function (Command $command) use (&$totalWritten) {
+ $totalWritten += $command['ContentLength'];
+ },
+ 'before_complete' => function ($_command) use (&$totalWritten, $size, &$uploader, &$attempts) {
+ if ($size !== null && $totalWritten != $size) {
+ $e = new \Exception('Incomplete multi part upload, expected ' . $size . ' bytes, wrote ' . $totalWritten);
+ throw new MultipartUploadException($uploader->getState(), $e);
+ }
+ },
+ ]);
+
+ try {
+ $uploader->upload();
+ $uploaded = true;
+ } catch (S3MultipartUploadException $e) {
+ $exception = $e;
+ $attempts++;
+
+ if ($concurrency > 1) {
+ $concurrency = round($concurrency / 2);
+ }
+
+ if ($stream->isSeekable()) {
+ $stream->rewind();
+ }
+ } catch (MultipartUploadException $e) {
+ $exception = $e;
+ break;
+ }
+ }
- try {
- $uploader->upload();
- } catch (S3MultipartUploadException $e) {
+ if (!$uploaded) {
// if anything goes wrong with multipart, make sure that you don“t poison and
// slow down s3 bucket with orphaned fragments
- $uploadInfo = $e->getState()->getId();
- if ($e->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) {
+ $uploadInfo = $exception->getState()->getId();
+ if ($exception->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) {
$this->getConnection()->abortMultipartUpload($uploadInfo);
}
- throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway("Error while uploading to S3 bucket", 0, $e);
+
+ throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway('Error while uploading to S3 bucket', 0, $exception);
}
}
+ public function writeObject($urn, $stream, ?string $mimetype = null) {
+ $metaData = [];
+ if ($mimetype) {
+ $metaData['mimetype'] = $mimetype;
+ }
+ $this->writeObjectWithMetaData($urn, $stream, $metaData);
+ }
- /**
- * @param string $urn the unified resource name used to identify the object
- * @param resource $stream stream with the data to write
- * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
- * @throws \Exception when something goes wrong, message will be logged
- * @since 7.0.0
- */
- public function writeObject($urn, $stream, string $mimetype = null) {
- $psrStream = Utils::streamFor($stream);
-
- // ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream
- // so the optimisation does not apply
- $buffer = new Psr7\Stream(fopen("php://memory", 'rwb+'));
- Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
- $buffer->seek(0);
- if ($buffer->getSize() < $this->putSizeLimit) {
- // buffer is fully seekable, so use it directly for the small upload
- $this->writeSingle($urn, $buffer, $mimetype);
+ public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void {
+ $canSeek = fseek($stream, 0, SEEK_CUR) === 0;
+ $psrStream = Utils::streamFor($stream, [
+ 'size' => $metaData['size'] ?? null,
+ ]);
+
+
+ $size = $psrStream->getSize();
+ if ($size === null || !$canSeek) {
+ // The s3 single-part upload requires the size to be known for the stream.
+ // So for input streams that don't have a known size, we need to copy (part of)
+ // the input into a temporary stream so the size can be determined
+ $buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
+ Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
+ $buffer->seek(0);
+ if ($buffer->getSize() < $this->putSizeLimit) {
+ // buffer is fully seekable, so use it directly for the small upload
+ $this->writeSingle($urn, $buffer, $metaData);
+ } else {
+ $loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
+ $this->writeMultiPart($urn, $loadStream, $metaData);
+ }
} else {
- $loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
- $this->writeMultiPart($urn, $loadStream, $mimetype);
+ if ($size < $this->putSizeLimit) {
+ $this->writeSingle($urn, $psrStream, $metaData);
+ } else {
+ $this->writeMultiPart($urn, $psrStream, $metaData);
+ }
}
+ $psrStream->close();
}
/**
@@ -185,9 +252,31 @@ trait S3ObjectTrait {
return $this->getConnection()->doesObjectExist($this->bucket, $urn, $this->getSSECParameters());
}
- public function copyObject($from, $to) {
- $this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', [
- 'params' => $this->getSSECParameters() + $this->getSSECParameters(true)
- ]);
+ public function copyObject($from, $to, array $options = []) {
+ $sourceMetadata = $this->getConnection()->headObject([
+ 'Bucket' => $this->getBucket(),
+ 'Key' => $from,
+ ] + $this->getSSECParameters());
+
+ $size = (int)($sourceMetadata->get('Size') ?? $sourceMetadata->get('ContentLength'));
+
+ if ($this->useMultipartCopy && $size > $this->copySizeLimit) {
+ $copy = new MultipartCopy($this->getConnection(), [
+ 'source_bucket' => $this->getBucket(),
+ 'source_key' => $from
+ ], array_merge([
+ 'bucket' => $this->getBucket(),
+ 'key' => $to,
+ 'acl' => 'private',
+ 'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
+ 'source_metadata' => $sourceMetadata
+ ], $options));
+ $copy->copy();
+ } else {
+ $this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', array_merge([
+ 'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
+ 'mup_threshold' => PHP_INT_MAX,
+ ], $options));
+ }
}
}