]> source.dussan.org Git - nextcloud-server.git/commitdiff
make seekable s3 stream generic
authorRobin Appelman <robin@icewind.nl>
Thu, 19 Mar 2020 13:32:25 +0000 (14:32 +0100)
committerMorris Jobke <hey@morrisjobke.de>
Fri, 22 May 2020 13:31:06 +0000 (15:31 +0200)
Signed-off-by: Robin Appelman <robin@icewind.nl>
lib/private/Files/ObjectStore/S3ObjectTrait.php
lib/private/Files/ObjectStore/S3SeekableReadStream.php [deleted file]
lib/private/Files/Stream/SeekableHttpStream.php [new file with mode: 0644]

index 79a0c0a87afd228b87d3daca6a43c5116530e0a1..84c766c1df5eb9e673eadf4d384adf6091846463 100644 (file)
@@ -28,6 +28,7 @@ use Aws\S3\MultipartUploader;
 use Aws\S3\ObjectUploader;
 use Aws\S3\S3Client;
 use Icewind\Streams\CallbackWrapper;
+use OC\Files\Stream\SeekableHttpStream;
 
 const S3_UPLOAD_PART_SIZE = 524288000; // 500MB
 
@@ -47,16 +48,29 @@ trait S3ObjectTrait {
         * @since 7.0.0
         */
        function readObject($urn) {
-               $context = stream_context_create([
-                       's3seek' => [
-                               'client' => $this->getConnection(),
-                               'bucket' => $this->bucket,
-                               'urn' => $urn,
-                       ],
-               ]);
+               return SeekableHttpStream::open(function ($range) use ($urn) {
+                       $command = $this->getConnection()->getCommand('GetObject', [
+                               'Bucket' => $this->bucket,
+                               'Key' => $urn,
+                               'Range' => 'bytes=' . $range,
+                       ]);
+                       $request = \Aws\serialize($command);
+                       $headers = [];
+                       foreach ($request->getHeaders() as $key => $values) {
+                               foreach ($values as $value) {
+                                       $headers[] = "$key: $value";
+                               }
+                       }
+                       $opts = [
+                               'http' => [
+                                       'protocol_version' => 1.1,
+                                       'header' => $headers,
+                               ],
+                       ];
 
-               S3SeekableReadStream::registerIfNeeded();
-               return fopen('s3seek://', 'r', false, $context);
+                       $context = stream_context_create($opts);
+                       return fopen($request->getUri(), 'r', false, $context);
+               });
        }
 
        /**
@@ -74,7 +88,7 @@ trait S3ObjectTrait {
                $uploader = new MultipartUploader($this->getConnection(), $countStream, [
                        'bucket' => $this->bucket,
                        'key' => $urn,
-                       'part_size' => S3_UPLOAD_PART_SIZE
+                       'part_size' => S3_UPLOAD_PART_SIZE,
                ]);
 
                try {
@@ -101,7 +115,7 @@ trait S3ObjectTrait {
        function deleteObject($urn) {
                $this->getConnection()->deleteObject([
                        'Bucket' => $this->bucket,
-                       'Key' => $urn
+                       'Key' => $urn,
                ]);
        }
 
diff --git a/lib/private/Files/ObjectStore/S3SeekableReadStream.php b/lib/private/Files/ObjectStore/S3SeekableReadStream.php
deleted file mode 100644 (file)
index 7085553..0000000
+++ /dev/null
@@ -1,139 +0,0 @@
-<?php
-/**
- *
- * @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de)
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-namespace OC\Files\ObjectStore;
-
-/**
- * A stream wrapper that uses http range requests to provide a seekable
- * stream of a file in S3 storage.
- */
-class S3SeekableReadStream {
-       private static $registered = false;
-
-       /**
-        * Registers the stream wrapper using the `s3seek://` url scheme
-        * $return void
-        */
-       public static function registerIfNeeded() {
-               if (!self::$registered) {
-                       stream_wrapper_register(
-                               's3seek',
-                               'OC\Files\ObjectStore\S3SeekableReadStream'
-                       );
-                       self::$registered = true;
-               }
-       }
-
-       private $client;
-       private $bucket;
-       private $urn;
-
-       private $current;
-       private $offset = 0;
-
-       private function reconnect($range) {
-               if ($this->current != null) {
-                       fclose($this->current);
-               }
-
-               $command = $this->client->getCommand('GetObject', [
-                       'Bucket' => $this->bucket,
-                       'Key' => $this->urn,
-                       'Range' => 'bytes=' . $range,
-               ]);
-               $request = \Aws\serialize($command);
-               $headers = [];
-               foreach ($request->getHeaders() as $key => $values) {
-                       foreach ($values as $value) {
-                               $headers[] = "$key: $value";
-                       }
-               }
-               $opts = [
-                       'http' => [
-                               'protocol_version' => 1.1,
-                               'header' => $headers,
-                       ],
-               ];
-
-               $context = stream_context_create($opts);
-               $this->current = fopen($request->getUri(), 'r', false, $context);
-
-               if ($this->current === false) {return false;}
-
-               $responseHead = stream_get_meta_data($this->current)['wrapper_data'];
-               $contentRange = array_values(array_filter($responseHead, function ($v) {
-                       return preg_match('#^content-range:#i', $v) === 1;
-               }))[0];
-
-               $content = trim(explode(':', $contentRange)[1]);
-               $range = trim(explode(' ', $content)[1]);
-               $begin = explode('-', $range)[0];
-               $this->offset = intval($begin);
-
-               return true;
-       }
-
-       function stream_open($path, $mode, $options, &$opened_path) {
-               $o = stream_context_get_options($this->context)['s3seek'];
-               $this->bucket = $o['bucket'];
-               $this->urn = $o['urn'];
-               $this->client = $o['client'];
-
-               return $this->reconnect('0-');
-       }
-
-       function stream_read($count) {
-               $ret = fread($this->current, $count);
-               $this->offset += strlen($ret);
-               return $ret;
-       }
-
-       function stream_seek($offset, $whence) {
-               switch ($whence) {
-               case SEEK_SET:
-                       if ($offset === $this->offset) {return true;}
-                       return $this->reconnect($offset . '-');
-               case SEEK_CUR:
-                       if ($offset === 0) {return true;}
-                       return $this->reconnect(($this->offset + $offset) . '-');
-               case SEEK_END:
-                       return false;
-               }
-               return false;
-       }
-
-       function stream_tell() {
-               return $this->offset;
-       }
-
-       function stream_stat() {
-               return fstat($this->current);
-       }
-
-       function stream_eof() {
-               return feof($this->current);
-       }
-
-       function stream_close() {
-               fclose($this->current);
-       }
-}
diff --git a/lib/private/Files/Stream/SeekableHttpStream.php b/lib/private/Files/Stream/SeekableHttpStream.php
new file mode 100644 (file)
index 0000000..80f05f7
--- /dev/null
@@ -0,0 +1,174 @@
+<?php
+/**
+ *
+ * @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de)
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace OC\Files\Stream;
+
+use Icewind\Streams\File;
+
+/**
+ * A stream wrapper that uses http range requests to provide a seekable stream for http reading
+ */
+class SeekableHttpStream implements File {
+       private const PROTOCOL = 'httpseek';
+
+       private static $registered = false;
+
+       /**
+        * Registers the stream wrapper using the `httpseek://` url scheme
+        * $return void
+        */
+       private static function registerIfNeeded() {
+               if (!self::$registered) {
+                       stream_wrapper_register(
+                               self::PROTOCOL,
+                               self::class
+                       );
+                       self::$registered = true;
+               }
+       }
+
+       /**
+        * Open a readonly-seekable http stream
+        *
+        * The provided callback will be called with byte range and should return an http stream for the requested range
+        *
+        * @param callable $callback
+        * @return false|resource
+        */
+       public static function open(callable $callback) {
+               $context = stream_context_create([
+                       SeekableHttpStream::PROTOCOL => [
+                               'callback' => $callback
+                       ],
+               ]);
+
+               SeekableHttpStream::registerIfNeeded();
+               return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
+       }
+
+       /** @var resource */
+       public $context;
+
+       /** @var callable */
+       private $openCallback;
+
+       /** @var resource */
+       private $current;
+       /** @var int */
+       private $offset = 0;
+
+       private function reconnect($range) {
+               if ($this->current != null) {
+                       fclose($this->current);
+               }
+
+               $this->current = ($this->openCallback)($range);
+
+               if ($this->current === false) {
+                       return false;
+               }
+
+               $responseHead = stream_get_meta_data($this->current)['wrapper_data'];
+               $contentRange = array_values(array_filter($responseHead, function ($v) {
+                       return preg_match('#^content-range:#i', $v) === 1;
+               }))[0];
+
+               $content = trim(explode(':', $contentRange)[1]);
+               $range = trim(explode(' ', $content)[1]);
+               $begin = explode('-', $range)[0];
+               $this->offset = intval($begin);
+
+               return true;
+       }
+
+       function stream_open($path, $mode, $options, &$opened_path) {
+               $options = stream_context_get_options($this->context)[self::PROTOCOL];
+               $this->openCallback = $options['callback'];
+
+               return $this->reconnect('0-');
+       }
+
+       function stream_read($count) {
+               if (!$this->current) {
+                       return false;
+               }
+               $ret = fread($this->current, $count);
+               $this->offset += strlen($ret);
+               return $ret;
+       }
+
+       function stream_seek($offset, $whence = SEEK_SET) {
+               switch ($whence) {
+                       case SEEK_SET:
+                               if ($offset === $this->offset) {
+                                       return true;
+                               }
+                               return $this->reconnect($offset . '-');
+                       case SEEK_CUR:
+                               if ($offset === 0) {
+                                       return true;
+                               }
+                               return $this->reconnect(($this->offset + $offset) . '-');
+                       case SEEK_END:
+                               return false;
+               }
+               return false;
+       }
+
+       function stream_tell() {
+               return $this->offset;
+       }
+
+       function stream_stat() {
+               return fstat($this->current);
+       }
+
+       function stream_eof() {
+               return feof($this->current);
+       }
+
+       function stream_close() {
+               fclose($this->current);
+       }
+
+       public function stream_write($data) {
+               return false;
+       }
+
+       public function stream_set_option($option, $arg1, $arg2) {
+               return false;
+       }
+
+       public function stream_truncate($size) {
+               return false;
+       }
+
+       public function stream_lock($operation) {
+               return false;
+       }
+
+       public function stream_flush() {
+               return; //noop because readonly stream
+       }
+
+
+}