From 2ecf23a4dca154c6e0415bacd0a779c5db830667 Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Thu, 19 Mar 2020 14:32:25 +0100 Subject: [PATCH] make seekable s3 stream generic Signed-off-by: Robin Appelman --- .../Files/ObjectStore/S3ObjectTrait.php | 36 +++-- .../SeekableHttpStream.php} | 127 +++++++++++------- 2 files changed, 106 insertions(+), 57 deletions(-) rename lib/private/Files/{ObjectStore/S3SeekableReadStream.php => Stream/SeekableHttpStream.php} (53%) diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index 79a0c0a87af..84c766c1df5 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -28,6 +28,7 @@ use Aws\S3\MultipartUploader; use Aws\S3\ObjectUploader; use Aws\S3\S3Client; use Icewind\Streams\CallbackWrapper; +use OC\Files\Stream\SeekableHttpStream; const S3_UPLOAD_PART_SIZE = 524288000; // 500MB @@ -47,16 +48,29 @@ trait S3ObjectTrait { * @since 7.0.0 */ function readObject($urn) { - $context = stream_context_create([ - 's3seek' => [ - 'client' => $this->getConnection(), - 'bucket' => $this->bucket, - 'urn' => $urn, - ], - ]); + return SeekableHttpStream::open(function ($range) use ($urn) { + $command = $this->getConnection()->getCommand('GetObject', [ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'Range' => 'bytes=' . $range, + ]); + $request = \Aws\serialize($command); + $headers = []; + foreach ($request->getHeaders() as $key => $values) { + foreach ($values as $value) { + $headers[] = "$key: $value"; + } + } + $opts = [ + 'http' => [ + 'protocol_version' => 1.1, + 'header' => $headers, + ], + ]; - S3SeekableReadStream::registerIfNeeded(); - return fopen('s3seek://', 'r', false, $context); + $context = stream_context_create($opts); + return fopen($request->getUri(), 'r', false, $context); + }); } /** @@ -74,7 +88,7 @@ trait S3ObjectTrait { $uploader = new MultipartUploader($this->getConnection(), $countStream, [ 'bucket' => $this->bucket, 'key' => $urn, - 'part_size' => S3_UPLOAD_PART_SIZE + 'part_size' => S3_UPLOAD_PART_SIZE, ]); try { @@ -101,7 +115,7 @@ trait S3ObjectTrait { function deleteObject($urn) { $this->getConnection()->deleteObject([ 'Bucket' => $this->bucket, - 'Key' => $urn + 'Key' => $urn, ]); } diff --git a/lib/private/Files/ObjectStore/S3SeekableReadStream.php b/lib/private/Files/Stream/SeekableHttpStream.php similarity index 53% rename from lib/private/Files/ObjectStore/S3SeekableReadStream.php rename to lib/private/Files/Stream/SeekableHttpStream.php index 70855537034..80f05f7ea09 100644 --- a/lib/private/Files/ObjectStore/S3SeekableReadStream.php +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -20,34 +20,60 @@ * */ -namespace OC\Files\ObjectStore; +namespace OC\Files\Stream; + +use Icewind\Streams\File; /** - * A stream wrapper that uses http range requests to provide a seekable - * stream of a file in S3 storage. + * A stream wrapper that uses http range requests to provide a seekable stream for http reading */ -class S3SeekableReadStream { +class SeekableHttpStream implements File { + private const PROTOCOL = 'httpseek'; + private static $registered = false; /** - * Registers the stream wrapper using the `s3seek://` url scheme + * Registers the stream wrapper using the `httpseek://` url scheme * $return void */ - public static function registerIfNeeded() { + private static function registerIfNeeded() { if (!self::$registered) { stream_wrapper_register( - 's3seek', - 'OC\Files\ObjectStore\S3SeekableReadStream' + self::PROTOCOL, + self::class ); self::$registered = true; } } - private $client; - private $bucket; - private $urn; + /** + * Open a readonly-seekable http stream + * + * The provided callback will be called with byte range and should return an http stream for the requested range + * + * @param callable $callback + * @return false|resource + */ + public static function open(callable $callback) { + $context = stream_context_create([ + SeekableHttpStream::PROTOCOL => [ + 'callback' => $callback + ], + ]); + + SeekableHttpStream::registerIfNeeded(); + return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context); + } + + /** @var resource */ + public $context; + /** @var callable */ + private $openCallback; + + /** @var resource */ private $current; + /** @var int */ private $offset = 0; private function reconnect($range) { @@ -55,29 +81,11 @@ class S3SeekableReadStream { fclose($this->current); } - $command = $this->client->getCommand('GetObject', [ - 'Bucket' => $this->bucket, - 'Key' => $this->urn, - 'Range' => 'bytes=' . $range, - ]); - $request = \Aws\serialize($command); - $headers = []; - foreach ($request->getHeaders() as $key => $values) { - foreach ($values as $value) { - $headers[] = "$key: $value"; - } - } - $opts = [ - 'http' => [ - 'protocol_version' => 1.1, - 'header' => $headers, - ], - ]; - - $context = stream_context_create($opts); - $this->current = fopen($request->getUri(), 'r', false, $context); + $this->current = ($this->openCallback)($range); - if ($this->current === false) {return false;} + if ($this->current === false) { + return false; + } $responseHead = stream_get_meta_data($this->current)['wrapper_data']; $contentRange = array_values(array_filter($responseHead, function ($v) { @@ -93,30 +101,35 @@ class S3SeekableReadStream { } function stream_open($path, $mode, $options, &$opened_path) { - $o = stream_context_get_options($this->context)['s3seek']; - $this->bucket = $o['bucket']; - $this->urn = $o['urn']; - $this->client = $o['client']; + $options = stream_context_get_options($this->context)[self::PROTOCOL]; + $this->openCallback = $options['callback']; return $this->reconnect('0-'); } function stream_read($count) { + if (!$this->current) { + return false; + } $ret = fread($this->current, $count); $this->offset += strlen($ret); return $ret; } - function stream_seek($offset, $whence) { + function stream_seek($offset, $whence = SEEK_SET) { switch ($whence) { - case SEEK_SET: - if ($offset === $this->offset) {return true;} - return $this->reconnect($offset . '-'); - case SEEK_CUR: - if ($offset === 0) {return true;} - return $this->reconnect(($this->offset + $offset) . '-'); - case SEEK_END: - return false; + case SEEK_SET: + if ($offset === $this->offset) { + return true; + } + return $this->reconnect($offset . '-'); + case SEEK_CUR: + if ($offset === 0) { + return true; + } + return $this->reconnect(($this->offset + $offset) . '-'); + case SEEK_END: + return false; } return false; } @@ -136,4 +149,26 @@ class S3SeekableReadStream { function stream_close() { fclose($this->current); } + + public function stream_write($data) { + return false; + } + + public function stream_set_option($option, $arg1, $arg2) { + return false; + } + + public function stream_truncate($size) { + return false; + } + + public function stream_lock($operation) { + return false; + } + + public function stream_flush() { + return; //noop because readonly stream + } + + } -- 2.39.5