diff options
Diffstat (limited to 'lib/private/Files/ObjectStore/S3ObjectTrait.php')
-rw-r--r-- | lib/private/Files/ObjectStore/S3ObjectTrait.php | 227 |
1 files changed, 158 insertions, 69 deletions
diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index 8fa6d67faa3..89405de2e8e 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -1,32 +1,15 @@ <?php + /** - * @copyright Copyright (c) 2017 Robin Appelman <robin@icewind.nl> - * - * @author Christoph Wurst <christoph@winzerhof-wurst.at> - * @author Florent <florent@coppint.com> - * @author Morris Jobke <hey@morrisjobke.de> - * @author Robin Appelman <robin@icewind.nl> - * @author Roeland Jago Douma <roeland@famdouma.nl> - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * + * SPDX-FileCopyrightText: 2017 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later */ namespace OC\Files\ObjectStore; +use Aws\Command; +use Aws\Exception\MultipartUploadException; use Aws\S3\Exception\S3MultipartUploadException; +use Aws\S3\MultipartCopy; use Aws\S3\MultipartUploader; use Aws\S3\S3Client; use GuzzleHttp\Psr7; @@ -35,6 +18,8 @@ use OC\Files\Stream\SeekableHttpStream; use Psr\Http\Message\StreamInterface; trait S3ObjectTrait { + use S3ConfigTrait; + /** * Returns the connection * @@ -54,7 +39,7 @@ trait S3ObjectTrait { * @since 7.0.0 */ public function readObject($urn) { - return SeekableHttpStream::open(function ($range) use ($urn) { + $fh = SeekableHttpStream::open(function ($range) use ($urn) { $command = $this->getConnection()->getCommand('GetObject', [ 'Bucket' => $this->bucket, 'Key' => $urn, @@ -88,26 +73,48 @@ trait S3ObjectTrait { $context = stream_context_create($opts); return fopen($request->getUri(), 'r', false, $context); }); + if (!$fh) { + throw new \Exception("Failed to read object $urn"); + } + return $fh; } + private function buildS3Metadata(array $metadata): array { + $result = []; + foreach ($metadata as $key => $value) { + $result['x-amz-meta-' . $key] = $value; + } + return $result; + } /** * Single object put helper * * @param string $urn the unified resource name used to identify the object * @param StreamInterface $stream stream with the data to write - * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0 + * @param array $metaData the metadata to set for the object * @throws \Exception when something goes wrong, message will be logged */ - protected function writeSingle(string $urn, StreamInterface $stream, string $mimetype = null): void { - $this->getConnection()->putObject([ + protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void { + $mimetype = $metaData['mimetype'] ?? null; + unset($metaData['mimetype']); + unset($metaData['size']); + + $args = [ 'Bucket' => $this->bucket, 'Key' => $urn, 'Body' => $stream, 'ACL' => 'private', 'ContentType' => $mimetype, + 'Metadata' => $this->buildS3Metadata($metaData), 'StorageClass' => $this->storageClass, - ] + $this->getSSECParameters()); + ] + $this->getSSECParameters(); + + if ($size = $stream->getSize()) { + $args['ContentLength'] = $size; + } + + $this->getConnection()->putObject($args); } @@ -116,56 +123,116 @@ trait S3ObjectTrait { * * @param string $urn the unified resource name used to identify the object * @param StreamInterface $stream stream with the data to write - * @param string|null $mimetype the mimetype to set for the remove object + * @param array $metaData the metadata to set for the object * @throws \Exception when something goes wrong, message will be logged */ - protected function writeMultiPart(string $urn, StreamInterface $stream, string $mimetype = null): void { - $uploader = new MultipartUploader($this->getConnection(), $stream, [ - 'bucket' => $this->bucket, - 'key' => $urn, - 'part_size' => $this->uploadPartSize, - 'params' => [ - 'ContentType' => $mimetype, - 'StorageClass' => $this->storageClass, - ] + $this->getSSECParameters(), - ]); + protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void { + $mimetype = $metaData['mimetype'] ?? null; + unset($metaData['mimetype']); + unset($metaData['size']); + + $attempts = 0; + $uploaded = false; + $concurrency = $this->concurrency; + $exception = null; + $state = null; + $size = $stream->getSize(); + $totalWritten = 0; + + // retry multipart upload once with concurrency at half on failure + while (!$uploaded && $attempts <= 1) { + $uploader = new MultipartUploader($this->getConnection(), $stream, [ + 'bucket' => $this->bucket, + 'concurrency' => $concurrency, + 'key' => $urn, + 'part_size' => $this->uploadPartSize, + 'state' => $state, + 'params' => [ + 'ContentType' => $mimetype, + 'Metadata' => $this->buildS3Metadata($metaData), + 'StorageClass' => $this->storageClass, + ] + $this->getSSECParameters(), + 'before_upload' => function (Command $command) use (&$totalWritten) { + $totalWritten += $command['ContentLength']; + }, + 'before_complete' => function ($_command) use (&$totalWritten, $size, &$uploader, &$attempts) { + if ($size !== null && $totalWritten != $size) { + $e = new \Exception('Incomplete multi part upload, expected ' . $size . ' bytes, wrote ' . $totalWritten); + throw new MultipartUploadException($uploader->getState(), $e); + } + }, + ]); + + try { + $uploader->upload(); + $uploaded = true; + } catch (S3MultipartUploadException $e) { + $exception = $e; + $attempts++; + + if ($concurrency > 1) { + $concurrency = round($concurrency / 2); + } + + if ($stream->isSeekable()) { + $stream->rewind(); + } + } catch (MultipartUploadException $e) { + $exception = $e; + break; + } + } - try { - $uploader->upload(); - } catch (S3MultipartUploadException $e) { + if (!$uploaded) { // if anything goes wrong with multipart, make sure that you don“t poison and // slow down s3 bucket with orphaned fragments - $uploadInfo = $e->getState()->getId(); - if ($e->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) { + $uploadInfo = $exception->getState()->getId(); + if ($exception->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) { $this->getConnection()->abortMultipartUpload($uploadInfo); } - throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway("Error while uploading to S3 bucket", 0, $e); + + throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway('Error while uploading to S3 bucket', 0, $exception); } } + public function writeObject($urn, $stream, ?string $mimetype = null) { + $metaData = []; + if ($mimetype) { + $metaData['mimetype'] = $mimetype; + } + $this->writeObjectWithMetaData($urn, $stream, $metaData); + } - /** - * @param string $urn the unified resource name used to identify the object - * @param resource $stream stream with the data to write - * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0 - * @throws \Exception when something goes wrong, message will be logged - * @since 7.0.0 - */ - public function writeObject($urn, $stream, string $mimetype = null) { - $psrStream = Utils::streamFor($stream); - - // ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream - // so the optimisation does not apply - $buffer = new Psr7\Stream(fopen("php://memory", 'rwb+')); - Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit); - $buffer->seek(0); - if ($buffer->getSize() < $this->putSizeLimit) { - // buffer is fully seekable, so use it directly for the small upload - $this->writeSingle($urn, $buffer, $mimetype); + public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void { + $canSeek = fseek($stream, 0, SEEK_CUR) === 0; + $psrStream = Utils::streamFor($stream, [ + 'size' => $metaData['size'] ?? null, + ]); + + + $size = $psrStream->getSize(); + if ($size === null || !$canSeek) { + // The s3 single-part upload requires the size to be known for the stream. + // So for input streams that don't have a known size, we need to copy (part of) + // the input into a temporary stream so the size can be determined + $buffer = new Psr7\Stream(fopen('php://temp', 'rw+')); + Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit); + $buffer->seek(0); + if ($buffer->getSize() < $this->putSizeLimit) { + // buffer is fully seekable, so use it directly for the small upload + $this->writeSingle($urn, $buffer, $metaData); + } else { + $loadStream = new Psr7\AppendStream([$buffer, $psrStream]); + $this->writeMultiPart($urn, $loadStream, $metaData); + } } else { - $loadStream = new Psr7\AppendStream([$buffer, $psrStream]); - $this->writeMultiPart($urn, $loadStream, $mimetype); + if ($size < $this->putSizeLimit) { + $this->writeSingle($urn, $psrStream, $metaData); + } else { + $this->writeMultiPart($urn, $psrStream, $metaData); + } } + $psrStream->close(); } /** @@ -185,9 +252,31 @@ trait S3ObjectTrait { return $this->getConnection()->doesObjectExist($this->bucket, $urn, $this->getSSECParameters()); } - public function copyObject($from, $to) { - $this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', [ - 'params' => $this->getSSECParameters() + $this->getSSECParameters(true) - ]); + public function copyObject($from, $to, array $options = []) { + $sourceMetadata = $this->getConnection()->headObject([ + 'Bucket' => $this->getBucket(), + 'Key' => $from, + ] + $this->getSSECParameters()); + + $size = (int)($sourceMetadata->get('Size') ?? $sourceMetadata->get('ContentLength')); + + if ($this->useMultipartCopy && $size > $this->copySizeLimit) { + $copy = new MultipartCopy($this->getConnection(), [ + 'source_bucket' => $this->getBucket(), + 'source_key' => $from + ], array_merge([ + 'bucket' => $this->getBucket(), + 'key' => $to, + 'acl' => 'private', + 'params' => $this->getSSECParameters() + $this->getSSECParameters(true), + 'source_metadata' => $sourceMetadata + ], $options)); + $copy->copy(); + } else { + $this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', array_merge([ + 'params' => $this->getSSECParameters() + $this->getSSECParameters(true), + 'mup_threshold' => PHP_INT_MAX, + ], $options)); + } } } |