From 7a62514c3144c3f35f098944566a55c5641bfa2d Mon Sep 17 00:00:00 2001 From: Lukas Stabe Date: Wed, 22 Jan 2020 09:45:41 +0100 Subject: Enable fseek for files in S3 storage Signed-off-by: Lukas Stabe --- lib/private/Files/ObjectStore/S3ObjectTrait.php | 27 ++-- .../Files/ObjectStore/S3SeekableReadStream.php | 139 +++++++++++++++++++++ 2 files changed, 147 insertions(+), 19 deletions(-) create mode 100644 lib/private/Files/ObjectStore/S3SeekableReadStream.php (limited to 'lib/private/Files') diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index 6188357521b..1e9d095b9eb 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -49,27 +49,16 @@ trait S3ObjectTrait { * @since 7.0.0 */ function readObject($urn) { - $client = $this->getConnection(); - $command = $client->getCommand('GetObject', [ - 'Bucket' => $this->bucket, - 'Key' => $urn + $context = stream_context_create([ + 's3seek' => [ + 'client' => $this->getConnection(), + 'bucket' => $this->bucket, + 'urn' => $urn, + ], ]); - $request = \Aws\serialize($command); - $headers = []; - foreach ($request->getHeaders() as $key => $values) { - foreach ($values as $value) { - $headers[] = "$key: $value"; - } - } - $opts = [ - 'http' => [ - 'protocol_version' => 1.1, - 'header' => $headers - ] - ]; - $context = stream_context_create($opts); - return fopen($request->getUri(), 'r', false, $context); + S3SeekableReadStream::registerIfNeeded(); + return fopen('s3seek://', 'r', false, $context); } /** diff --git a/lib/private/Files/ObjectStore/S3SeekableReadStream.php b/lib/private/Files/ObjectStore/S3SeekableReadStream.php new file mode 100644 index 00000000000..70855537034 --- /dev/null +++ b/lib/private/Files/ObjectStore/S3SeekableReadStream.php @@ -0,0 +1,139 @@ +. + * + */ + +namespace OC\Files\ObjectStore; + +/** + * A stream wrapper that uses http range requests to provide a seekable + * stream of a file in S3 storage. + */ +class S3SeekableReadStream { + private static $registered = false; + + /** + * Registers the stream wrapper using the `s3seek://` url scheme + * $return void + */ + public static function registerIfNeeded() { + if (!self::$registered) { + stream_wrapper_register( + 's3seek', + 'OC\Files\ObjectStore\S3SeekableReadStream' + ); + self::$registered = true; + } + } + + private $client; + private $bucket; + private $urn; + + private $current; + private $offset = 0; + + private function reconnect($range) { + if ($this->current != null) { + fclose($this->current); + } + + $command = $this->client->getCommand('GetObject', [ + 'Bucket' => $this->bucket, + 'Key' => $this->urn, + 'Range' => 'bytes=' . $range, + ]); + $request = \Aws\serialize($command); + $headers = []; + foreach ($request->getHeaders() as $key => $values) { + foreach ($values as $value) { + $headers[] = "$key: $value"; + } + } + $opts = [ + 'http' => [ + 'protocol_version' => 1.1, + 'header' => $headers, + ], + ]; + + $context = stream_context_create($opts); + $this->current = fopen($request->getUri(), 'r', false, $context); + + if ($this->current === false) {return false;} + + $responseHead = stream_get_meta_data($this->current)['wrapper_data']; + $contentRange = array_values(array_filter($responseHead, function ($v) { + return preg_match('#^content-range:#i', $v) === 1; + }))[0]; + + $content = trim(explode(':', $contentRange)[1]); + $range = trim(explode(' ', $content)[1]); + $begin = explode('-', $range)[0]; + $this->offset = intval($begin); + + return true; + } + + function stream_open($path, $mode, $options, &$opened_path) { + $o = stream_context_get_options($this->context)['s3seek']; + $this->bucket = $o['bucket']; + $this->urn = $o['urn']; + $this->client = $o['client']; + + return $this->reconnect('0-'); + } + + function stream_read($count) { + $ret = fread($this->current, $count); + $this->offset += strlen($ret); + return $ret; + } + + function stream_seek($offset, $whence) { + switch ($whence) { + case SEEK_SET: + if ($offset === $this->offset) {return true;} + return $this->reconnect($offset . '-'); + case SEEK_CUR: + if ($offset === 0) {return true;} + return $this->reconnect(($this->offset + $offset) . '-'); + case SEEK_END: + return false; + } + return false; + } + + function stream_tell() { + return $this->offset; + } + + function stream_stat() { + return fstat($this->current); + } + + function stream_eof() { + return feof($this->current); + } + + function stream_close() { + fclose($this->current); + } +} -- cgit v1.2.3 From 8434d0af9f0a0df726b21214ec02afb10863d374 Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Thu, 19 Mar 2020 14:32:25 +0100 Subject: make seekable s3 stream generic Signed-off-by: Robin Appelman --- lib/private/Files/ObjectStore/S3ObjectTrait.php | 36 +++-- .../Files/ObjectStore/S3SeekableReadStream.php | 139 ---------------- lib/private/Files/Stream/SeekableHttpStream.php | 174 +++++++++++++++++++++ 3 files changed, 199 insertions(+), 150 deletions(-) delete mode 100644 lib/private/Files/ObjectStore/S3SeekableReadStream.php create mode 100644 lib/private/Files/Stream/SeekableHttpStream.php (limited to 'lib/private/Files') diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index 1e9d095b9eb..0939dd2a23c 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -30,6 +30,7 @@ use Aws\S3\MultipartUploader; use Aws\S3\ObjectUploader; use Aws\S3\S3Client; use Icewind\Streams\CallbackWrapper; +use OC\Files\Stream\SeekableHttpStream; const S3_UPLOAD_PART_SIZE = 524288000; // 500MB @@ -49,16 +50,29 @@ trait S3ObjectTrait { * @since 7.0.0 */ function readObject($urn) { - $context = stream_context_create([ - 's3seek' => [ - 'client' => $this->getConnection(), - 'bucket' => $this->bucket, - 'urn' => $urn, - ], - ]); + return SeekableHttpStream::open(function ($range) use ($urn) { + $command = $this->getConnection()->getCommand('GetObject', [ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'Range' => 'bytes=' . $range, + ]); + $request = \Aws\serialize($command); + $headers = []; + foreach ($request->getHeaders() as $key => $values) { + foreach ($values as $value) { + $headers[] = "$key: $value"; + } + } + $opts = [ + 'http' => [ + 'protocol_version' => 1.1, + 'header' => $headers, + ], + ]; - S3SeekableReadStream::registerIfNeeded(); - return fopen('s3seek://', 'r', false, $context); + $context = stream_context_create($opts); + return fopen($request->getUri(), 'r', false, $context); + }); } /** @@ -76,7 +90,7 @@ trait S3ObjectTrait { $uploader = new MultipartUploader($this->getConnection(), $countStream, [ 'bucket' => $this->bucket, 'key' => $urn, - 'part_size' => S3_UPLOAD_PART_SIZE + 'part_size' => S3_UPLOAD_PART_SIZE, ]); try { @@ -103,7 +117,7 @@ trait S3ObjectTrait { function deleteObject($urn) { $this->getConnection()->deleteObject([ 'Bucket' => $this->bucket, - 'Key' => $urn + 'Key' => $urn, ]); } diff --git a/lib/private/Files/ObjectStore/S3SeekableReadStream.php b/lib/private/Files/ObjectStore/S3SeekableReadStream.php deleted file mode 100644 index 70855537034..00000000000 --- a/lib/private/Files/ObjectStore/S3SeekableReadStream.php +++ /dev/null @@ -1,139 +0,0 @@ -. - * - */ - -namespace OC\Files\ObjectStore; - -/** - * A stream wrapper that uses http range requests to provide a seekable - * stream of a file in S3 storage. - */ -class S3SeekableReadStream { - private static $registered = false; - - /** - * Registers the stream wrapper using the `s3seek://` url scheme - * $return void - */ - public static function registerIfNeeded() { - if (!self::$registered) { - stream_wrapper_register( - 's3seek', - 'OC\Files\ObjectStore\S3SeekableReadStream' - ); - self::$registered = true; - } - } - - private $client; - private $bucket; - private $urn; - - private $current; - private $offset = 0; - - private function reconnect($range) { - if ($this->current != null) { - fclose($this->current); - } - - $command = $this->client->getCommand('GetObject', [ - 'Bucket' => $this->bucket, - 'Key' => $this->urn, - 'Range' => 'bytes=' . $range, - ]); - $request = \Aws\serialize($command); - $headers = []; - foreach ($request->getHeaders() as $key => $values) { - foreach ($values as $value) { - $headers[] = "$key: $value"; - } - } - $opts = [ - 'http' => [ - 'protocol_version' => 1.1, - 'header' => $headers, - ], - ]; - - $context = stream_context_create($opts); - $this->current = fopen($request->getUri(), 'r', false, $context); - - if ($this->current === false) {return false;} - - $responseHead = stream_get_meta_data($this->current)['wrapper_data']; - $contentRange = array_values(array_filter($responseHead, function ($v) { - return preg_match('#^content-range:#i', $v) === 1; - }))[0]; - - $content = trim(explode(':', $contentRange)[1]); - $range = trim(explode(' ', $content)[1]); - $begin = explode('-', $range)[0]; - $this->offset = intval($begin); - - return true; - } - - function stream_open($path, $mode, $options, &$opened_path) { - $o = stream_context_get_options($this->context)['s3seek']; - $this->bucket = $o['bucket']; - $this->urn = $o['urn']; - $this->client = $o['client']; - - return $this->reconnect('0-'); - } - - function stream_read($count) { - $ret = fread($this->current, $count); - $this->offset += strlen($ret); - return $ret; - } - - function stream_seek($offset, $whence) { - switch ($whence) { - case SEEK_SET: - if ($offset === $this->offset) {return true;} - return $this->reconnect($offset . '-'); - case SEEK_CUR: - if ($offset === 0) {return true;} - return $this->reconnect(($this->offset + $offset) . '-'); - case SEEK_END: - return false; - } - return false; - } - - function stream_tell() { - return $this->offset; - } - - function stream_stat() { - return fstat($this->current); - } - - function stream_eof() { - return feof($this->current); - } - - function stream_close() { - fclose($this->current); - } -} diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php new file mode 100644 index 00000000000..80f05f7ea09 --- /dev/null +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -0,0 +1,174 @@ +. + * + */ + +namespace OC\Files\Stream; + +use Icewind\Streams\File; + +/** + * A stream wrapper that uses http range requests to provide a seekable stream for http reading + */ +class SeekableHttpStream implements File { + private const PROTOCOL = 'httpseek'; + + private static $registered = false; + + /** + * Registers the stream wrapper using the `httpseek://` url scheme + * $return void + */ + private static function registerIfNeeded() { + if (!self::$registered) { + stream_wrapper_register( + self::PROTOCOL, + self::class + ); + self::$registered = true; + } + } + + /** + * Open a readonly-seekable http stream + * + * The provided callback will be called with byte range and should return an http stream for the requested range + * + * @param callable $callback + * @return false|resource + */ + public static function open(callable $callback) { + $context = stream_context_create([ + SeekableHttpStream::PROTOCOL => [ + 'callback' => $callback + ], + ]); + + SeekableHttpStream::registerIfNeeded(); + return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context); + } + + /** @var resource */ + public $context; + + /** @var callable */ + private $openCallback; + + /** @var resource */ + private $current; + /** @var int */ + private $offset = 0; + + private function reconnect($range) { + if ($this->current != null) { + fclose($this->current); + } + + $this->current = ($this->openCallback)($range); + + if ($this->current === false) { + return false; + } + + $responseHead = stream_get_meta_data($this->current)['wrapper_data']; + $contentRange = array_values(array_filter($responseHead, function ($v) { + return preg_match('#^content-range:#i', $v) === 1; + }))[0]; + + $content = trim(explode(':', $contentRange)[1]); + $range = trim(explode(' ', $content)[1]); + $begin = explode('-', $range)[0]; + $this->offset = intval($begin); + + return true; + } + + function stream_open($path, $mode, $options, &$opened_path) { + $options = stream_context_get_options($this->context)[self::PROTOCOL]; + $this->openCallback = $options['callback']; + + return $this->reconnect('0-'); + } + + function stream_read($count) { + if (!$this->current) { + return false; + } + $ret = fread($this->current, $count); + $this->offset += strlen($ret); + return $ret; + } + + function stream_seek($offset, $whence = SEEK_SET) { + switch ($whence) { + case SEEK_SET: + if ($offset === $this->offset) { + return true; + } + return $this->reconnect($offset . '-'); + case SEEK_CUR: + if ($offset === 0) { + return true; + } + return $this->reconnect(($this->offset + $offset) . '-'); + case SEEK_END: + return false; + } + return false; + } + + function stream_tell() { + return $this->offset; + } + + function stream_stat() { + return fstat($this->current); + } + + function stream_eof() { + return feof($this->current); + } + + function stream_close() { + fclose($this->current); + } + + public function stream_write($data) { + return false; + } + + public function stream_set_option($option, $arg1, $arg2) { + return false; + } + + public function stream_truncate($size) { + return false; + } + + public function stream_lock($operation) { + return false; + } + + public function stream_flush() { + return; //noop because readonly stream + } + + +} -- cgit v1.2.3 From 136a716df0cbdf9e02573d2b2939ab28494b2d9d Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Thu, 19 Mar 2020 15:28:02 +0100 Subject: add basic tests for s3 seeking and add some error handling if reopen return the wrong range Signed-off-by: Robin Appelman --- lib/private/Files/Stream/SeekableHttpStream.php | 26 +++++++++++++++++-------- tests/lib/Files/ObjectStore/ObjectStoreTest.php | 2 +- tests/lib/Files/ObjectStore/S3Test.php | 18 ++++++++++++++++- 3 files changed, 36 insertions(+), 10 deletions(-) (limited to 'lib/private/Files') diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php index 80f05f7ea09..8fe54839e25 100644 --- a/lib/private/Files/Stream/SeekableHttpStream.php +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -76,7 +76,8 @@ class SeekableHttpStream implements File { /** @var int */ private $offset = 0; - private function reconnect($range) { + private function reconnect(int $start) { + $range = $start . '-'; if ($this->current != null) { fclose($this->current); } @@ -88,14 +89,23 @@ class SeekableHttpStream implements File { } $responseHead = stream_get_meta_data($this->current)['wrapper_data']; - $contentRange = array_values(array_filter($responseHead, function ($v) { + $rangeHeaders = array_values(array_filter($responseHead, function ($v) { return preg_match('#^content-range:#i', $v) === 1; - }))[0]; + })); + if (!$rangeHeaders) { + return false; + } + $contentRange = $rangeHeaders[0]; $content = trim(explode(':', $contentRange)[1]); $range = trim(explode(' ', $content)[1]); - $begin = explode('-', $range)[0]; - $this->offset = intval($begin); + $begin = intval(explode('-', $range)[0]); + + if ($begin !== $start) { + return false; + } + + $this->offset = $begin; return true; } @@ -104,7 +114,7 @@ class SeekableHttpStream implements File { $options = stream_context_get_options($this->context)[self::PROTOCOL]; $this->openCallback = $options['callback']; - return $this->reconnect('0-'); + return $this->reconnect(0); } function stream_read($count) { @@ -122,12 +132,12 @@ class SeekableHttpStream implements File { if ($offset === $this->offset) { return true; } - return $this->reconnect($offset . '-'); + return $this->reconnect($offset); case SEEK_CUR: if ($offset === 0) { return true; } - return $this->reconnect(($this->offset + $offset) . '-'); + return $this->reconnect($this->offset + $offset); case SEEK_END: return false; } diff --git a/tests/lib/Files/ObjectStore/ObjectStoreTest.php b/tests/lib/Files/ObjectStore/ObjectStoreTest.php index 1383c0149a2..67c41eb7ccc 100644 --- a/tests/lib/Files/ObjectStore/ObjectStoreTest.php +++ b/tests/lib/Files/ObjectStore/ObjectStoreTest.php @@ -31,7 +31,7 @@ abstract class ObjectStoreTest extends TestCase { */ abstract protected function getInstance(); - private function stringToStream($data) { + protected function stringToStream($data) { $stream = fopen('php://temp', 'w+'); fwrite($stream, $data); rewind($stream); diff --git a/tests/lib/Files/ObjectStore/S3Test.php b/tests/lib/Files/ObjectStore/S3Test.php index 91b24d8b615..b3d65d9eff4 100644 --- a/tests/lib/Files/ObjectStore/S3Test.php +++ b/tests/lib/Files/ObjectStore/S3Test.php @@ -27,7 +27,7 @@ use OC\Files\ObjectStore\S3; class MultiPartUploadS3 extends S3 { function writeObject($urn, $stream) { $this->getConnection()->upload($this->bucket, $urn, $stream, 'private', [ - 'mup_threshold' => 1 + 'mup_threshold' => 1, ]); } } @@ -83,4 +83,20 @@ class S3Test extends ObjectStoreTest { $this->assertEquals(file_get_contents(__FILE__), stream_get_contents($result)); } + + public function testSeek() { + $data = file_get_contents(__FILE__); + + $instance = $this->getInstance(); + $instance->writeObject('seek', $this->stringToStream($data)); + + $read = $instance->readObject('seek'); + $this->assertEquals(substr($data, 0, 100), fread($read, 100)); + + fseek($read, 10); + $this->assertEquals(substr($data, 10, 100), fread($read, 100)); + + fseek($read, 100, SEEK_CUR); + $this->assertEquals(substr($data, 210, 100), fread($read, 100)); + } } -- cgit v1.2.3 From 25f5a5e5757f1a1ac6af0bd94bf78066163d927f Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Wed, 1 Apr 2020 18:48:40 +0200 Subject: update autoloader Signed-off-by: Robin Appelman --- lib/composer/composer/autoload_classmap.php | 1 + lib/composer/composer/autoload_static.php | 1 + lib/private/Files/Stream/SeekableHttpStream.php | 16 +++++++--------- 3 files changed, 9 insertions(+), 9 deletions(-) (limited to 'lib/private/Files') diff --git a/lib/composer/composer/autoload_classmap.php b/lib/composer/composer/autoload_classmap.php index d66039ae1fd..998fe176967 100644 --- a/lib/composer/composer/autoload_classmap.php +++ b/lib/composer/composer/autoload_classmap.php @@ -1007,6 +1007,7 @@ return array( 'OC\\Files\\Storage\\Wrapper\\Wrapper' => $baseDir . '/lib/private/Files/Storage/Wrapper/Wrapper.php', 'OC\\Files\\Stream\\Encryption' => $baseDir . '/lib/private/Files/Stream/Encryption.php', 'OC\\Files\\Stream\\Quota' => $baseDir . '/lib/private/Files/Stream/Quota.php', + 'OC\\Files\\Stream\\SeekableHttpStream' => $baseDir . '/lib/private/Files/Stream/SeekableHttpStream.php', 'OC\\Files\\Type\\Detection' => $baseDir . '/lib/private/Files/Type/Detection.php', 'OC\\Files\\Type\\Loader' => $baseDir . '/lib/private/Files/Type/Loader.php', 'OC\\Files\\Type\\TemplateManager' => $baseDir . '/lib/private/Files/Type/TemplateManager.php', diff --git a/lib/composer/composer/autoload_static.php b/lib/composer/composer/autoload_static.php index 0a65eae24bb..7dd0e15e51d 100644 --- a/lib/composer/composer/autoload_static.php +++ b/lib/composer/composer/autoload_static.php @@ -1036,6 +1036,7 @@ class ComposerStaticInit53792487c5a8370acc0b06b1a864ff4c 'OC\\Files\\Storage\\Wrapper\\Wrapper' => __DIR__ . '/../../..' . '/lib/private/Files/Storage/Wrapper/Wrapper.php', 'OC\\Files\\Stream\\Encryption' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Encryption.php', 'OC\\Files\\Stream\\Quota' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Quota.php', + 'OC\\Files\\Stream\\SeekableHttpStream' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/SeekableHttpStream.php', 'OC\\Files\\Type\\Detection' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Detection.php', 'OC\\Files\\Type\\Loader' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Loader.php', 'OC\\Files\\Type\\TemplateManager' => __DIR__ . '/../../..' . '/lib/private/Files/Type/TemplateManager.php', diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php index 8fe54839e25..fdcd9ea8cfb 100644 --- a/lib/private/Files/Stream/SeekableHttpStream.php +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -110,14 +110,14 @@ class SeekableHttpStream implements File { return true; } - function stream_open($path, $mode, $options, &$opened_path) { + public function stream_open($path, $mode, $options, &$opened_path) { $options = stream_context_get_options($this->context)[self::PROTOCOL]; $this->openCallback = $options['callback']; return $this->reconnect(0); } - function stream_read($count) { + public function stream_read($count) { if (!$this->current) { return false; } @@ -126,7 +126,7 @@ class SeekableHttpStream implements File { return $ret; } - function stream_seek($offset, $whence = SEEK_SET) { + public function stream_seek($offset, $whence = SEEK_SET) { switch ($whence) { case SEEK_SET: if ($offset === $this->offset) { @@ -144,19 +144,19 @@ class SeekableHttpStream implements File { return false; } - function stream_tell() { + public function stream_tell() { return $this->offset; } - function stream_stat() { + public function stream_stat() { return fstat($this->current); } - function stream_eof() { + public function stream_eof() { return feof($this->current); } - function stream_close() { + public function stream_close() { fclose($this->current); } @@ -179,6 +179,4 @@ class SeekableHttpStream implements File { public function stream_flush() { return; //noop because readonly stream } - - } -- cgit v1.2.3 From 3d3ee1bfaefb16a5fe912932abff90bf972f3736 Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Thu, 9 Apr 2020 14:37:01 +0200 Subject: harden seekable http stream a bit against failures Signed-off-by: Robin Appelman --- lib/private/Files/Stream/SeekableHttpStream.php | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) (limited to 'lib/private/Files') diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php index fdcd9ea8cfb..113ba19a17a 100644 --- a/lib/private/Files/Stream/SeekableHttpStream.php +++ b/lib/private/Files/Stream/SeekableHttpStream.php @@ -149,15 +149,25 @@ class SeekableHttpStream implements File { } public function stream_stat() { - return fstat($this->current); + if (is_resource($this->current)) { + return fstat($this->current); + } else { + return false; + } } public function stream_eof() { - return feof($this->current); + if (is_resource($this->current)) { + return feof($this->current); + } else { + return true; + } } public function stream_close() { - fclose($this->current); + if (is_resource($this->current)) { + fclose($this->current); + } } public function stream_write($data) { -- cgit v1.2.3