summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorRobin Appelman <robin@icewind.nl>2020-03-19 14:32:25 +0100
committerRobin Appelman <robin@icewind.nl>2020-04-01 15:21:05 +0200
commit7b07e7251c8a92e95da922f34dde158ddffbeeee (patch)
tree3cae032a5965aeb2604896c5aa51f263c33103fb /lib
parent14401efb0f6792415f15c92c1db07fe9e25ea466 (diff)
downloadnextcloud-server-7b07e7251c8a92e95da922f34dde158ddffbeeee.tar.gz
nextcloud-server-7b07e7251c8a92e95da922f34dde158ddffbeeee.zip
make seekable s3 stream generic
Signed-off-by: Robin Appelman <robin@icewind.nl>
Diffstat (limited to 'lib')
-rw-r--r--lib/private/Files/ObjectStore/S3ObjectTrait.php36
-rw-r--r--lib/private/Files/Stream/SeekableHttpStream.php (renamed from lib/private/Files/ObjectStore/S3SeekableReadStream.php)127
2 files changed, 106 insertions, 57 deletions
diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php
index 1e9d095b9eb..0939dd2a23c 100644
--- a/lib/private/Files/ObjectStore/S3ObjectTrait.php
+++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php
@@ -30,6 +30,7 @@ use Aws\S3\MultipartUploader;
use Aws\S3\ObjectUploader;
use Aws\S3\S3Client;
use Icewind\Streams\CallbackWrapper;
+use OC\Files\Stream\SeekableHttpStream;
const S3_UPLOAD_PART_SIZE = 524288000; // 500MB
@@ -49,16 +50,29 @@ trait S3ObjectTrait {
* @since 7.0.0
*/
function readObject($urn) {
- $context = stream_context_create([
- 's3seek' => [
- 'client' => $this->getConnection(),
- 'bucket' => $this->bucket,
- 'urn' => $urn,
- ],
- ]);
+ return SeekableHttpStream::open(function ($range) use ($urn) {
+ $command = $this->getConnection()->getCommand('GetObject', [
+ 'Bucket' => $this->bucket,
+ 'Key' => $urn,
+ 'Range' => 'bytes=' . $range,
+ ]);
+ $request = \Aws\serialize($command);
+ $headers = [];
+ foreach ($request->getHeaders() as $key => $values) {
+ foreach ($values as $value) {
+ $headers[] = "$key: $value";
+ }
+ }
+ $opts = [
+ 'http' => [
+ 'protocol_version' => 1.1,
+ 'header' => $headers,
+ ],
+ ];
- S3SeekableReadStream::registerIfNeeded();
- return fopen('s3seek://', 'r', false, $context);
+ $context = stream_context_create($opts);
+ return fopen($request->getUri(), 'r', false, $context);
+ });
}
/**
@@ -76,7 +90,7 @@ trait S3ObjectTrait {
$uploader = new MultipartUploader($this->getConnection(), $countStream, [
'bucket' => $this->bucket,
'key' => $urn,
- 'part_size' => S3_UPLOAD_PART_SIZE
+ 'part_size' => S3_UPLOAD_PART_SIZE,
]);
try {
@@ -103,7 +117,7 @@ trait S3ObjectTrait {
function deleteObject($urn) {
$this->getConnection()->deleteObject([
'Bucket' => $this->bucket,
- 'Key' => $urn
+ 'Key' => $urn,
]);
}
diff --git a/lib/private/Files/ObjectStore/S3SeekableReadStream.php b/lib/private/Files/Stream/SeekableHttpStream.php
index 70855537034..80f05f7ea09 100644
--- a/lib/private/Files/ObjectStore/S3SeekableReadStream.php
+++ b/lib/private/Files/Stream/SeekableHttpStream.php
@@ -20,34 +20,60 @@
*
*/
-namespace OC\Files\ObjectStore;
+namespace OC\Files\Stream;
+
+use Icewind\Streams\File;
/**
- * A stream wrapper that uses http range requests to provide a seekable
- * stream of a file in S3 storage.
+ * A stream wrapper that uses http range requests to provide a seekable stream for http reading
*/
-class S3SeekableReadStream {
+class SeekableHttpStream implements File {
+ private const PROTOCOL = 'httpseek';
+
private static $registered = false;
/**
- * Registers the stream wrapper using the `s3seek://` url scheme
+ * Registers the stream wrapper using the `httpseek://` url scheme
* $return void
*/
- public static function registerIfNeeded() {
+ private static function registerIfNeeded() {
if (!self::$registered) {
stream_wrapper_register(
- 's3seek',
- 'OC\Files\ObjectStore\S3SeekableReadStream'
+ self::PROTOCOL,
+ self::class
);
self::$registered = true;
}
}
- private $client;
- private $bucket;
- private $urn;
+ /**
+ * Open a readonly-seekable http stream
+ *
+ * The provided callback will be called with byte range and should return an http stream for the requested range
+ *
+ * @param callable $callback
+ * @return false|resource
+ */
+ public static function open(callable $callback) {
+ $context = stream_context_create([
+ SeekableHttpStream::PROTOCOL => [
+ 'callback' => $callback
+ ],
+ ]);
+
+ SeekableHttpStream::registerIfNeeded();
+ return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
+ }
+
+ /** @var resource */
+ public $context;
+ /** @var callable */
+ private $openCallback;
+
+ /** @var resource */
private $current;
+ /** @var int */
private $offset = 0;
private function reconnect($range) {
@@ -55,29 +81,11 @@ class S3SeekableReadStream {
fclose($this->current);
}
- $command = $this->client->getCommand('GetObject', [
- 'Bucket' => $this->bucket,
- 'Key' => $this->urn,
- 'Range' => 'bytes=' . $range,
- ]);
- $request = \Aws\serialize($command);
- $headers = [];
- foreach ($request->getHeaders() as $key => $values) {
- foreach ($values as $value) {
- $headers[] = "$key: $value";
- }
- }
- $opts = [
- 'http' => [
- 'protocol_version' => 1.1,
- 'header' => $headers,
- ],
- ];
-
- $context = stream_context_create($opts);
- $this->current = fopen($request->getUri(), 'r', false, $context);
+ $this->current = ($this->openCallback)($range);
- if ($this->current === false) {return false;}
+ if ($this->current === false) {
+ return false;
+ }
$responseHead = stream_get_meta_data($this->current)['wrapper_data'];
$contentRange = array_values(array_filter($responseHead, function ($v) {
@@ -93,30 +101,35 @@ class S3SeekableReadStream {
}
function stream_open($path, $mode, $options, &$opened_path) {
- $o = stream_context_get_options($this->context)['s3seek'];
- $this->bucket = $o['bucket'];
- $this->urn = $o['urn'];
- $this->client = $o['client'];
+ $options = stream_context_get_options($this->context)[self::PROTOCOL];
+ $this->openCallback = $options['callback'];
return $this->reconnect('0-');
}
function stream_read($count) {
+ if (!$this->current) {
+ return false;
+ }
$ret = fread($this->current, $count);
$this->offset += strlen($ret);
return $ret;
}
- function stream_seek($offset, $whence) {
+ function stream_seek($offset, $whence = SEEK_SET) {
switch ($whence) {
- case SEEK_SET:
- if ($offset === $this->offset) {return true;}
- return $this->reconnect($offset . '-');
- case SEEK_CUR:
- if ($offset === 0) {return true;}
- return $this->reconnect(($this->offset + $offset) . '-');
- case SEEK_END:
- return false;
+ case SEEK_SET:
+ if ($offset === $this->offset) {
+ return true;
+ }
+ return $this->reconnect($offset . '-');
+ case SEEK_CUR:
+ if ($offset === 0) {
+ return true;
+ }
+ return $this->reconnect(($this->offset + $offset) . '-');
+ case SEEK_END:
+ return false;
}
return false;
}
@@ -136,4 +149,26 @@ class S3SeekableReadStream {
function stream_close() {
fclose($this->current);
}
+
+ public function stream_write($data) {
+ return false;
+ }
+
+ public function stream_set_option($option, $arg1, $arg2) {
+ return false;
+ }
+
+ public function stream_truncate($size) {
+ return false;
+ }
+
+ public function stream_lock($operation) {
+ return false;
+ }
+
+ public function stream_flush() {
+ return; //noop because readonly stream
+ }
+
+
}