diff options
Diffstat (limited to 'lib/private/Files')
-rw-r--r-- | lib/private/Files/ObjectStore/S3ObjectTrait.php | 45 | ||||
-rw-r--r-- | lib/private/Files/Stream/SeekableHttpStream.php | 182 |
2 files changed, 206 insertions, 21 deletions
diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index 6188357521b..0939dd2a23c 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -30,6 +30,7 @@ use Aws\S3\MultipartUploader; use Aws\S3\ObjectUploader; use Aws\S3\S3Client; use Icewind\Streams\CallbackWrapper; +use OC\Files\Stream\SeekableHttpStream; const S3_UPLOAD_PART_SIZE = 524288000; // 500MB @@ -49,27 +50,29 @@ trait S3ObjectTrait { * @since 7.0.0 */ function readObject($urn) { - $client = $this->getConnection(); - $command = $client->getCommand('GetObject', [ - 'Bucket' => $this->bucket, - 'Key' => $urn - ]); - $request = \Aws\serialize($command); - $headers = []; - foreach ($request->getHeaders() as $key => $values) { - foreach ($values as $value) { - $headers[] = "$key: $value"; + return SeekableHttpStream::open(function ($range) use ($urn) { + $command = $this->getConnection()->getCommand('GetObject', [ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'Range' => 'bytes=' . $range, + ]); + $request = \Aws\serialize($command); + $headers = []; + foreach ($request->getHeaders() as $key => $values) { + foreach ($values as $value) { + $headers[] = "$key: $value"; + } } - } - $opts = [ - 'http' => [ - 'protocol_version' => 1.1, - 'header' => $headers - ] - ]; + $opts = [ + 'http' => [ + 'protocol_version' => 1.1, + 'header' => $headers, + ], + ]; - $context = stream_context_create($opts); - return fopen($request->getUri(), 'r', false, $context); + $context = stream_context_create($opts); + return fopen($request->getUri(), 'r', false, $context); + }); } /** @@ -87,7 +90,7 @@ trait S3ObjectTrait { $uploader = new MultipartUploader($this->getConnection(), $countStream, [ 'bucket' => $this->bucket, 'key' => $urn, - 'part_size' => S3_UPLOAD_PART_SIZE + 'part_size' => S3_UPLOAD_PART_SIZE, ]); try { @@ -114,7 +117,7 @@ trait S3ObjectTrait { function deleteObject($urn) { $this->getConnection()->deleteObject([ 'Bucket' => $this->bucket, - 'Key' => $urn + 'Key' => $urn, ]); } diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php new file mode 100644 index 00000000000..fdcd9ea8cfb --- /dev/null +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -0,0 +1,182 @@ +<?php +/** + * + * @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de) + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +namespace OC\Files\Stream; + +use Icewind\Streams\File; + +/** + * A stream wrapper that uses http range requests to provide a seekable stream for http reading + */ +class SeekableHttpStream implements File { + private const PROTOCOL = 'httpseek'; + + private static $registered = false; + + /** + * Registers the stream wrapper using the `httpseek://` url scheme + * $return void + */ + private static function registerIfNeeded() { + if (!self::$registered) { + stream_wrapper_register( + self::PROTOCOL, + self::class + ); + self::$registered = true; + } + } + + /** + * Open a readonly-seekable http stream + * + * The provided callback will be called with byte range and should return an http stream for the requested range + * + * @param callable $callback + * @return false|resource + */ + public static function open(callable $callback) { + $context = stream_context_create([ + SeekableHttpStream::PROTOCOL => [ + 'callback' => $callback + ], + ]); + + SeekableHttpStream::registerIfNeeded(); + return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context); + } + + /** @var resource */ + public $context; + + /** @var callable */ + private $openCallback; + + /** @var resource */ + private $current; + /** @var int */ + private $offset = 0; + + private function reconnect(int $start) { + $range = $start . '-'; + if ($this->current != null) { + fclose($this->current); + } + + $this->current = ($this->openCallback)($range); + + if ($this->current === false) { + return false; + } + + $responseHead = stream_get_meta_data($this->current)['wrapper_data']; + $rangeHeaders = array_values(array_filter($responseHead, function ($v) { + return preg_match('#^content-range:#i', $v) === 1; + })); + if (!$rangeHeaders) { + return false; + } + $contentRange = $rangeHeaders[0]; + + $content = trim(explode(':', $contentRange)[1]); + $range = trim(explode(' ', $content)[1]); + $begin = intval(explode('-', $range)[0]); + + if ($begin !== $start) { + return false; + } + + $this->offset = $begin; + + return true; + } + + public function stream_open($path, $mode, $options, &$opened_path) { + $options = stream_context_get_options($this->context)[self::PROTOCOL]; + $this->openCallback = $options['callback']; + + return $this->reconnect(0); + } + + public function stream_read($count) { + if (!$this->current) { + return false; + } + $ret = fread($this->current, $count); + $this->offset += strlen($ret); + return $ret; + } + + public function stream_seek($offset, $whence = SEEK_SET) { + switch ($whence) { + case SEEK_SET: + if ($offset === $this->offset) { + return true; + } + return $this->reconnect($offset); + case SEEK_CUR: + if ($offset === 0) { + return true; + } + return $this->reconnect($this->offset + $offset); + case SEEK_END: + return false; + } + return false; + } + + public function stream_tell() { + return $this->offset; + } + + public function stream_stat() { + return fstat($this->current); + } + + public function stream_eof() { + return feof($this->current); + } + + public function stream_close() { + fclose($this->current); + } + + public function stream_write($data) { + return false; + } + + public function stream_set_option($option, $arg1, $arg2) { + return false; + } + + public function stream_truncate($size) { + return false; + } + + public function stream_lock($operation) { + return false; + } + + public function stream_flush() { + return; //noop because readonly stream + } +} |