aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/DB/QueryBuilder/Sharded
diff options
context:
space:
mode:
Diffstat (limited to 'lib/private/DB/QueryBuilder/Sharded')
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php155
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php162
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php21
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php29
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php20
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php52
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php80
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php200
-rw-r--r--lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php407
9 files changed, 1126 insertions, 0 deletions
diff --git a/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php b/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php
new file mode 100644
index 00000000000..3a230ea544d
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php
@@ -0,0 +1,155 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OCP\ICacheFactory;
+use OCP\IMemcache;
+use OCP\IMemcacheTTL;
+
+/**
+ * A helper to atomically determine the next auto increment value for a sharded table
+ *
+ * Since we can't use the database's auto-increment (since each db doesn't know about the keys in the other shards)
+ * we need external logic for doing the auto increment
+ */
+class AutoIncrementHandler {
+ public const MIN_VALID_KEY = 1000;
+ public const TTL = 365 * 24 * 60 * 60;
+
+ private ?IMemcache $cache = null;
+
+ public function __construct(
+ private ICacheFactory $cacheFactory,
+ private ShardConnectionManager $shardConnectionManager,
+ ) {
+ if (PHP_INT_SIZE < 8) {
+ throw new \Exception('sharding is only supported with 64bit php');
+ }
+ }
+
+ private function getCache(): IMemcache {
+ if (is_null($this->cache)) {
+ $cache = $this->cacheFactory->createDistributed('shared_autoincrement');
+ if ($cache instanceof IMemcache) {
+ $this->cache = $cache;
+ } else {
+ throw new \Exception('Distributed cache ' . get_class($cache) . ' is not suitable');
+ }
+ }
+ return $this->cache;
+ }
+
+ /**
+ * Get the next value for the given shard definition
+ *
+ * The returned key is unique and incrementing, but not sequential.
+ * The shard id is encoded in the first byte of the returned value
+ *
+ * @param ShardDefinition $shardDefinition
+ * @return int
+ * @throws \Exception
+ */
+ public function getNextPrimaryKey(ShardDefinition $shardDefinition, int $shard): int {
+ $retries = 0;
+ while ($retries < 5) {
+ $next = $this->getNextInner($shardDefinition);
+ if ($next !== null) {
+ if ($next > ShardDefinition::MAX_PRIMARY_KEY) {
+ throw new \Exception('Max primary key of ' . ShardDefinition::MAX_PRIMARY_KEY . ' exceeded');
+ }
+ // we encode the shard the primary key was originally inserted into to allow guessing the shard by primary key later on
+ return ($next << 8) | $shard;
+ } else {
+ $retries++;
+ }
+ }
+ throw new \Exception('Failed to get next primary key');
+ }
+
+ /**
+ * auto increment logic without retry
+ *
+ * @param ShardDefinition $shardDefinition
+ * @return int|null either the next primary key or null if the call needs to be retried
+ */
+ private function getNextInner(ShardDefinition $shardDefinition): ?int {
+ $cache = $this->getCache();
+ // because this function will likely be called concurrently from different requests
+ // the implementation needs to ensure that the cached value can be cleared, invalidated or re-calculated at any point between our cache calls
+ // care must be taken that the logic remains fully resilient against race conditions
+
+ // in the ideal case, the last primary key is stored in the cache and we can just do an `inc`
+ // if that is not the case we find the highest used id in the database increment it, and save it in the cache
+
+ // prevent inc from returning `1` if the key doesn't exist by setting it to a non-numeric value
+ $cache->add($shardDefinition->table, 'empty-placeholder', self::TTL);
+ $next = $cache->inc($shardDefinition->table);
+
+ if ($cache instanceof IMemcacheTTL) {
+ $cache->setTTL($shardDefinition->table, self::TTL);
+ }
+
+ // the "add + inc" trick above isn't strictly atomic, so as a safety we reject any result that to small
+ // to handle the edge case of the stored value disappearing between the add and inc
+ if (is_int($next) && $next >= self::MIN_VALID_KEY) {
+ return $next;
+ } elseif (is_int($next)) {
+ // we hit the edge case, so invalidate the cached value
+ if (!$cache->cas($shardDefinition->table, $next, 'empty-placeholder')) {
+ // someone else is changing the value concurrently, give up and retry
+ return null;
+ }
+ }
+
+ // discard the encoded initial shard
+ $current = $this->getMaxFromDb($shardDefinition);
+ $next = max($current, self::MIN_VALID_KEY) + 1;
+ if ($cache->cas($shardDefinition->table, 'empty-placeholder', $next)) {
+ return $next;
+ }
+
+ // another request set the cached value before us, so we should just be able to inc
+ $next = $cache->inc($shardDefinition->table);
+ if (is_int($next) && $next >= self::MIN_VALID_KEY) {
+ return $next;
+ } elseif (is_int($next)) {
+ // key got cleared, invalidate and retry
+ $cache->cas($shardDefinition->table, $next, 'empty-placeholder');
+ return null;
+ } else {
+ // cleanup any non-numeric value other than the placeholder if that got stored somehow
+ $cache->ncad($shardDefinition->table, 'empty-placeholder');
+ // retry
+ return null;
+ }
+ }
+
+ /**
+ * Get the maximum primary key value from the shards, note that this has already stripped any embedded shard id
+ */
+ private function getMaxFromDb(ShardDefinition $shardDefinition): int {
+ $max = $shardDefinition->fromFileId;
+ $query = $this->shardConnectionManager->getConnection($shardDefinition, 0)->getQueryBuilder();
+ $query->select($shardDefinition->primaryKey)
+ ->from($shardDefinition->table)
+ ->orderBy($shardDefinition->primaryKey, 'DESC')
+ ->setMaxResults(1);
+ foreach ($shardDefinition->getAllShards() as $shard) {
+ $connection = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
+ $result = $query->executeQuery($connection)->fetchOne();
+ if ($result) {
+ if ($result > $shardDefinition->fromFileId) {
+ $result = $result >> 8;
+ }
+ $max = max($max, $result);
+ }
+ }
+ return $max;
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php b/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php
new file mode 100644
index 00000000000..81530b56725
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php
@@ -0,0 +1,162 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\IDBConnection;
+
+/**
+ * Utility methods for implementing logic that moves data across shards
+ */
+class CrossShardMoveHelper {
+ public function __construct(
+ private ShardConnectionManager $connectionManager,
+ ) {
+ }
+
+ public function getConnection(ShardDefinition $shardDefinition, int $shardKey): IDBConnection {
+ return $this->connectionManager->getConnection($shardDefinition, $shardDefinition->getShardForKey($shardKey));
+ }
+
+ /**
+ * Update the shard key of a set of rows, moving them to a different shard if needed
+ *
+ * @param ShardDefinition $shardDefinition
+ * @param string $table
+ * @param string $shardColumn
+ * @param int $sourceShardKey
+ * @param int $targetShardKey
+ * @param string $primaryColumn
+ * @param int[] $primaryKeys
+ * @return void
+ */
+ public function moveCrossShards(ShardDefinition $shardDefinition, string $table, string $shardColumn, int $sourceShardKey, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
+ $sourceShard = $shardDefinition->getShardForKey($sourceShardKey);
+ $targetShard = $shardDefinition->getShardForKey($targetShardKey);
+ $sourceConnection = $this->connectionManager->getConnection($shardDefinition, $sourceShard);
+ if ($sourceShard === $targetShard) {
+ $this->updateItems($sourceConnection, $table, $shardColumn, $targetShardKey, $primaryColumn, $primaryKeys);
+
+ return;
+ }
+ $targetConnection = $this->connectionManager->getConnection($shardDefinition, $targetShard);
+
+ $sourceItems = $this->loadItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
+ foreach ($sourceItems as &$sourceItem) {
+ $sourceItem[$shardColumn] = $targetShardKey;
+ }
+ if (!$sourceItems) {
+ return;
+ }
+
+ $sourceConnection->beginTransaction();
+ $targetConnection->beginTransaction();
+ try {
+ $this->saveItems($targetConnection, $table, $sourceItems);
+ $this->deleteItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
+
+ $targetConnection->commit();
+ $sourceConnection->commit();
+ } catch (\Exception $e) {
+ $sourceConnection->rollback();
+ $targetConnection->rollback();
+ throw $e;
+ }
+ }
+
+ /**
+ * Load rows from a table to move
+ *
+ * @param IDBConnection $connection
+ * @param string $table
+ * @param string $primaryColumn
+ * @param int[] $primaryKeys
+ * @return array[]
+ */
+ public function loadItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): array {
+ $query = $connection->getQueryBuilder();
+ $query->select('*')
+ ->from($table)
+ ->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
+
+ $chunks = array_chunk($primaryKeys, 1000);
+
+ $results = [];
+ foreach ($chunks as $chunk) {
+ $query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
+ $results = array_merge($results, $query->execute()->fetchAll());
+ }
+
+ return $results;
+ }
+
+ /**
+ * Save modified rows
+ *
+ * @param IDBConnection $connection
+ * @param string $table
+ * @param array[] $items
+ * @return void
+ */
+ public function saveItems(IDBConnection $connection, string $table, array $items): void {
+ if (count($items) === 0) {
+ return;
+ }
+ $query = $connection->getQueryBuilder();
+ $query->insert($table);
+ foreach ($items[0] as $column => $value) {
+ $query->setValue($column, $query->createParameter($column));
+ }
+
+ foreach ($items as $item) {
+ foreach ($item as $column => $value) {
+ if (is_int($column)) {
+ $query->setParameter($column, $value, IQueryBuilder::PARAM_INT);
+ } else {
+ $query->setParameter($column, $value);
+ }
+ }
+ $query->executeStatement();
+ }
+ }
+
+ /**
+ * @param IDBConnection $connection
+ * @param string $table
+ * @param string $primaryColumn
+ * @param int[] $primaryKeys
+ * @return void
+ */
+ public function updateItems(IDBConnection $connection, string $table, string $shardColumn, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
+ $query = $connection->getQueryBuilder();
+ $query->update($table)
+ ->set($shardColumn, $query->createNamedParameter($targetShardKey, IQueryBuilder::PARAM_INT))
+ ->where($query->expr()->in($primaryColumn, $query->createNamedParameter($primaryKeys, IQueryBuilder::PARAM_INT_ARRAY)));
+ $query->executeQuery()->fetchAll();
+ }
+
+ /**
+ * @param IDBConnection $connection
+ * @param string $table
+ * @param string $primaryColumn
+ * @param int[] $primaryKeys
+ * @return void
+ */
+ public function deleteItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): void {
+ $query = $connection->getQueryBuilder();
+ $query->delete($table)
+ ->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
+ $chunks = array_chunk($primaryKeys, 1000);
+
+ foreach ($chunks as $chunk) {
+ $query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
+ $query->executeStatement();
+ }
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php b/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php
new file mode 100644
index 00000000000..af778489a2d
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php
@@ -0,0 +1,21 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OCP\DB\QueryBuilder\Sharded\IShardMapper;
+
+/**
+ * Map string key to an int-range by hashing the key
+ */
+class HashShardMapper implements IShardMapper {
+ public function getShardForKey(int $key, int $count): int {
+ $int = unpack('L', substr(md5((string)$key, true), 0, 4))[1];
+ return $int % $count;
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php b/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php
new file mode 100644
index 00000000000..733a6acaf9d
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php
@@ -0,0 +1,29 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+/**
+ * Queries on sharded table has the following limitations:
+ *
+ * 1. Either the shard key (e.g. "storage") or primary key (e.g. "fileid") must be mentioned in the query.
+ * Or the query must be explicitly marked as running across all shards.
+ *
+ * For queries where it isn't possible to set one of these keys in the query normally, you can set it using `hintShardKey`
+ *
+ * 2. Insert statements must always explicitly set the shard key
+ * 3. A query on a sharded table is not allowed to join on the same table
+ * 4. Right joins are not allowed on sharded tables
+ * 5. Updating the shard key where the new shard key maps to a different shard is not allowed
+ *
+ * Moving rows to a different shard needs to be implemented manually. `CrossShardMoveHelper` provides
+ * some tools to help make this easier.
+ */
+class InvalidShardedQueryException extends \Exception {
+
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php b/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php
new file mode 100644
index 00000000000..a5694b06507
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php
@@ -0,0 +1,20 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OCP\DB\QueryBuilder\Sharded\IShardMapper;
+
+/**
+ * Map string key to an int-range by hashing the key
+ */
+class RoundRobinShardMapper implements IShardMapper {
+ public function getShardForKey(int $key, int $count): int {
+ return $key % $count;
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php b/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php
new file mode 100644
index 00000000000..74358e3ca96
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php
@@ -0,0 +1,52 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OC\DB\ConnectionAdapter;
+use OC\DB\ConnectionFactory;
+use OC\SystemConfig;
+use OCP\IDBConnection;
+
+/**
+ * Keeps track of the db connections to the various shards
+ */
+class ShardConnectionManager {
+ /** @var array<string, IDBConnection> */
+ private array $connections = [];
+
+ public function __construct(
+ private SystemConfig $config,
+ private ConnectionFactory $factory,
+ ) {
+ }
+
+ public function getConnection(ShardDefinition $shardDefinition, int $shard): IDBConnection {
+ $connectionKey = $shardDefinition->table . '_' . $shard;
+
+ if (isset($this->connections[$connectionKey])) {
+ return $this->connections[$connectionKey];
+ }
+
+ if ($shard === ShardDefinition::MIGRATION_SHARD) {
+ $this->connections[$connectionKey] = \OC::$server->get(IDBConnection::class);
+ } elseif (isset($shardDefinition->shards[$shard])) {
+ $this->connections[$connectionKey] = $this->createConnection($shardDefinition->shards[$shard]);
+ } else {
+ throw new \InvalidArgumentException("invalid shard key $shard only " . count($shardDefinition->shards) . ' configured');
+ }
+
+ return $this->connections[$connectionKey];
+ }
+
+ private function createConnection(array $shardConfig): IDBConnection {
+ $shardConfig['sharding'] = [];
+ $type = $this->config->getValue('dbtype', 'sqlite');
+ return new ConnectionAdapter($this->factory->getConnection($type, $shardConfig));
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php b/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php
new file mode 100644
index 00000000000..4f98079d92d
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php
@@ -0,0 +1,80 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OCP\DB\QueryBuilder\Sharded\IShardMapper;
+
+/**
+ * Configuration for a shard setup
+ */
+class ShardDefinition {
+ // we reserve the bottom byte of the primary key for the initial shard, so the total shard count is limited to what we can fit there
+ // additionally, shard id 255 is reserved for migration purposes
+ public const MAX_SHARDS = 255;
+ public const MIGRATION_SHARD = 255;
+
+ public const PRIMARY_KEY_MASK = 0x7F_FF_FF_FF_FF_FF_FF_00;
+ public const PRIMARY_KEY_SHARD_MASK = 0x00_00_00_00_00_00_00_FF;
+ // since we reserve 1 byte for the shard index, we only have 56 bits of primary key space
+ public const MAX_PRIMARY_KEY = PHP_INT_MAX >> 8;
+
+ /**
+ * @param string $table
+ * @param string $primaryKey
+ * @param string $shardKey
+ * @param string[] $companionKeys
+ * @param IShardMapper $shardMapper
+ * @param string[] $companionTables
+ * @param array $shards
+ */
+ public function __construct(
+ public string $table,
+ public string $primaryKey,
+ public array $companionKeys,
+ public string $shardKey,
+ public IShardMapper $shardMapper,
+ public array $companionTables,
+ public array $shards,
+ public int $fromFileId,
+ public int $fromStorageId,
+ ) {
+ if (count($this->shards) >= self::MAX_SHARDS) {
+ throw new \Exception('Only allowed maximum of ' . self::MAX_SHARDS . ' shards allowed');
+ }
+ }
+
+ public function hasTable(string $table): bool {
+ if ($this->table === $table) {
+ return true;
+ }
+ return in_array($table, $this->companionTables);
+ }
+
+ public function getShardForKey(int $key): int {
+ if ($key < $this->fromStorageId) {
+ return self::MIGRATION_SHARD;
+ }
+ return $this->shardMapper->getShardForKey($key, count($this->shards));
+ }
+
+ /**
+ * @return list<int>
+ */
+ public function getAllShards(): array {
+ if ($this->fromStorageId !== 0) {
+ return array_merge(array_keys($this->shards), [self::MIGRATION_SHARD]);
+ } else {
+ return array_keys($this->shards);
+ }
+ }
+
+ public function isKey(string $column): bool {
+ return $column === $this->primaryKey || in_array($column, $this->companionKeys);
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php b/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php
new file mode 100644
index 00000000000..25e2a3d5f2d
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php
@@ -0,0 +1,200 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OC\DB\ArrayResult;
+use OCP\DB\IResult;
+use OCP\DB\QueryBuilder\IQueryBuilder;
+
+/**
+ * Logic for running a query across a number of shards, combining the results
+ */
+class ShardQueryRunner {
+ public function __construct(
+ private ShardConnectionManager $shardConnectionManager,
+ private ShardDefinition $shardDefinition,
+ ) {
+ }
+
+ /**
+ * Get the shards for a specific query or null if the shards aren't known in advance
+ *
+ * @param bool $allShards
+ * @param int[] $shardKeys
+ * @return null|int[]
+ */
+ public function getShards(bool $allShards, array $shardKeys): ?array {
+ if ($allShards) {
+ return $this->shardDefinition->getAllShards();
+ }
+ $allConfiguredShards = $this->shardDefinition->getAllShards();
+ if (count($allConfiguredShards) === 1) {
+ return $allConfiguredShards;
+ }
+ if (empty($shardKeys)) {
+ return null;
+ }
+ $shards = array_map(function ($shardKey) {
+ return $this->shardDefinition->getShardForKey((int)$shardKey);
+ }, $shardKeys);
+ return array_values(array_unique($shards));
+ }
+
+ /**
+ * Try to get the shards that the keys are likely to be in, based on the shard the row was created
+ *
+ * @param int[] $primaryKeys
+ * @return int[]
+ */
+ private function getLikelyShards(array $primaryKeys): array {
+ $shards = [];
+ foreach ($primaryKeys as $primaryKey) {
+ if ($primaryKey < $this->shardDefinition->fromFileId && !in_array(ShardDefinition::MIGRATION_SHARD, $shards)) {
+ $shards[] = ShardDefinition::MIGRATION_SHARD;
+ }
+ $encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK;
+ if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) {
+ $shards[] = $encodedShard;
+ }
+ }
+ return $shards;
+ }
+
+ /**
+ * Execute a SELECT statement across the configured shards
+ *
+ * @param IQueryBuilder $query
+ * @param bool $allShards
+ * @param int[] $shardKeys
+ * @param int[] $primaryKeys
+ * @param array{column: string, order: string}[] $sortList
+ * @param int|null $limit
+ * @param int|null $offset
+ * @return IResult
+ */
+ public function executeQuery(
+ IQueryBuilder $query,
+ bool $allShards,
+ array $shardKeys,
+ array $primaryKeys,
+ ?array $sortList = null,
+ ?int $limit = null,
+ ?int $offset = null,
+ ): IResult {
+ $shards = $this->getShards($allShards, $shardKeys);
+ $results = [];
+ if ($shards && count($shards) === 1) {
+ // trivial case
+ return $query->executeQuery($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
+ }
+ // we have to emulate limit and offset, so we select offset+limit from all shards to ensure we have enough rows
+ // and then filter them down after we merged the results
+ if ($limit !== null && $offset !== null) {
+ $query->setMaxResults($limit + $offset);
+ }
+
+ if ($shards) {
+ // we know exactly what shards we need to query
+ foreach ($shards as $shard) {
+ $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
+ $subResult = $query->executeQuery($shardConnection);
+ $results = array_merge($results, $subResult->fetchAll());
+ $subResult->closeCursor();
+ }
+ } else {
+ // we don't know for sure what shards we need to query,
+ // we first try the shards that are "likely" to have the rows we want, based on the shard that the row was
+ // originally created in. If we then still haven't found all rows we try the rest of the shards
+ $likelyShards = $this->getLikelyShards($primaryKeys);
+ $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
+ $shards = array_merge($likelyShards, $unlikelyShards);
+
+ foreach ($shards as $shard) {
+ $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
+ $subResult = $query->executeQuery($shardConnection);
+ $rows = $subResult->fetchAll();
+ $results = array_merge($results, $rows);
+ $subResult->closeCursor();
+
+ if (count($rows) >= count($primaryKeys)) {
+ // we have all the rows we're looking for
+ break;
+ }
+ }
+ }
+
+ if ($sortList) {
+ usort($results, function ($a, $b) use ($sortList) {
+ foreach ($sortList as $sort) {
+ $valueA = $a[$sort['column']] ?? null;
+ $valueB = $b[$sort['column']] ?? null;
+ $cmp = $valueA <=> $valueB;
+ if ($cmp === 0) {
+ continue;
+ }
+ if ($sort['order'] === 'DESC') {
+ $cmp = -$cmp;
+ }
+ return $cmp;
+ }
+ });
+ }
+
+ if ($limit !== null && $offset !== null) {
+ $results = array_slice($results, $offset, $limit);
+ } elseif ($limit !== null) {
+ $results = array_slice($results, 0, $limit);
+ } elseif ($offset !== null) {
+ $results = array_slice($results, $offset);
+ }
+
+ return new ArrayResult($results);
+ }
+
+ /**
+ * Execute an UPDATE or DELETE statement
+ *
+ * @param IQueryBuilder $query
+ * @param bool $allShards
+ * @param int[] $shardKeys
+ * @param int[] $primaryKeys
+ * @return int
+ * @throws \OCP\DB\Exception
+ */
+ public function executeStatement(IQueryBuilder $query, bool $allShards, array $shardKeys, array $primaryKeys): int {
+ if ($query->getType() === \Doctrine\DBAL\Query\QueryBuilder::INSERT) {
+ throw new \Exception('insert queries need special handling');
+ }
+
+ $shards = $this->getShards($allShards, $shardKeys);
+ $maxCount = count($primaryKeys);
+ if ($shards && count($shards) === 1) {
+ return $query->executeStatement($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
+ } elseif ($shards) {
+ $maxCount = PHP_INT_MAX;
+ } else {
+ // sort the likely shards before the rest, similar logic to `self::executeQuery`
+ $likelyShards = $this->getLikelyShards($primaryKeys);
+ $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
+ $shards = array_merge($likelyShards, $unlikelyShards);
+ }
+
+ $count = 0;
+
+ foreach ($shards as $shard) {
+ $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
+ $count += $query->executeStatement($shardConnection);
+
+ if ($count >= $maxCount) {
+ break;
+ }
+ }
+ return $count;
+ }
+}
diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php b/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php
new file mode 100644
index 00000000000..04082f76ae8
--- /dev/null
+++ b/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php
@@ -0,0 +1,407 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\DB\QueryBuilder\Sharded;
+
+use OC\DB\QueryBuilder\CompositeExpression;
+use OC\DB\QueryBuilder\ExtendedQueryBuilder;
+use OC\DB\QueryBuilder\Parameter;
+use OCP\DB\IResult;
+use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\IDBConnection;
+
+/**
+ * A special query builder that automatically distributes queries over multiple database shards.
+ *
+ * This relies on `PartitionedQueryBuilder` to handle splitting of parts of the query that touch the sharded tables
+ * from the non-sharded tables. So the query build here should only either touch only sharded table or only non-sharded tables.
+ *
+ * Most of the logic in this class is concerned with extracting either the shard key (e.g. "storage") or primary key (e.g. "fileid")
+ * from the query. The logic for actually running the query across the shards is mostly delegated to `ShardQueryRunner`.
+ */
+class ShardedQueryBuilder extends ExtendedQueryBuilder {
+ private array $shardKeys = [];
+ private array $primaryKeys = [];
+ private ?ShardDefinition $shardDefinition = null;
+ /** @var bool Run the query across all shards */
+ private bool $allShards = false;
+ private ?string $insertTable = null;
+ private mixed $lastInsertId = null;
+ private ?IDBConnection $lastInsertConnection = null;
+ private ?int $updateShardKey = null;
+ private ?int $limit = null;
+ private ?int $offset = null;
+ /** @var array{column: string, order: string}[] */
+ private array $sortList = [];
+ private string $mainTable = '';
+
+ public function __construct(
+ IQueryBuilder $builder,
+ protected array $shardDefinitions,
+ protected ShardConnectionManager $shardConnectionManager,
+ protected AutoIncrementHandler $autoIncrementHandler,
+ ) {
+ parent::__construct($builder);
+ }
+
+ public function getShardKeys(): array {
+ return $this->getKeyValues($this->shardKeys);
+ }
+
+ public function getPrimaryKeys(): array {
+ return $this->getKeyValues($this->primaryKeys);
+ }
+
+ private function getKeyValues(array $keys): array {
+ $values = [];
+ foreach ($keys as $key) {
+ $values = array_merge($values, $this->getKeyValue($key));
+ }
+ return array_values(array_unique($values));
+ }
+
+ private function getKeyValue($value): array {
+ if ($value instanceof Parameter) {
+ $value = (string)$value;
+ }
+ if (is_string($value) && str_starts_with($value, ':')) {
+ $param = $this->getParameter(substr($value, 1));
+ if (is_array($param)) {
+ return $param;
+ } else {
+ return [$param];
+ }
+ } elseif ($value !== null) {
+ return [$value];
+ } else {
+ return [];
+ }
+ }
+
+ public function where(...$predicates) {
+ return $this->andWhere(...$predicates);
+ }
+
+ public function andWhere(...$where) {
+ if ($where) {
+ foreach ($where as $predicate) {
+ $this->tryLoadShardKey($predicate);
+ }
+ parent::andWhere(...$where);
+ }
+ return $this;
+ }
+
+ private function tryLoadShardKey($predicate): void {
+ if (!$this->shardDefinition) {
+ return;
+ }
+ if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->shardKey)) {
+ $this->shardKeys += $keys;
+ }
+ if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->primaryKey)) {
+ $this->primaryKeys += $keys;
+ }
+ foreach ($this->shardDefinition->companionKeys as $companionKey) {
+ if ($keys = $this->tryExtractShardKeys($predicate, $companionKey)) {
+ $this->primaryKeys += $keys;
+ }
+ }
+ }
+
+ /**
+ * @param $predicate
+ * @param string $column
+ * @return string[]
+ */
+ private function tryExtractShardKeys($predicate, string $column): array {
+ if ($predicate instanceof CompositeExpression) {
+ $values = [];
+ foreach ($predicate->getParts() as $part) {
+ $partValues = $this->tryExtractShardKeys($part, $column);
+ // for OR expressions, we can only rely on the predicate if all parts contain the comparison
+ if ($predicate->getType() === CompositeExpression::TYPE_OR && !$partValues) {
+ return [];
+ }
+ $values = array_merge($values, $partValues);
+ }
+ return $values;
+ }
+ $predicate = (string)$predicate;
+ // expect a condition in the form of 'alias1.column1 = placeholder' or 'alias1.column1 in placeholder'
+ if (substr_count($predicate, ' ') > 2) {
+ return [];
+ }
+ if (str_contains($predicate, ' = ')) {
+ $parts = explode(' = ', $predicate);
+ if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) {
+ return [$parts[1]];
+ } else {
+ return [];
+ }
+ }
+
+ if (str_contains($predicate, ' IN ')) {
+ $parts = explode(' IN ', $predicate);
+ if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) {
+ return [trim(trim($parts[1], '('), ')')];
+ } else {
+ return [];
+ }
+ }
+
+ return [];
+ }
+
+ public function set($key, $value) {
+ if ($this->shardDefinition && $key === $this->shardDefinition->shardKey) {
+ $updateShardKey = $value;
+ }
+ return parent::set($key, $value);
+ }
+
+ public function setValue($column, $value) {
+ if ($this->shardDefinition) {
+ if ($this->shardDefinition->isKey($column)) {
+ $this->primaryKeys[] = $value;
+ }
+ if ($column === $this->shardDefinition->shardKey) {
+ $this->shardKeys[] = $value;
+ }
+ }
+ return parent::setValue($column, $value);
+ }
+
+ public function values(array $values) {
+ foreach ($values as $column => $value) {
+ $this->setValue($column, $value);
+ }
+ return $this;
+ }
+
+ private function actOnTable(string $table): void {
+ $this->mainTable = $table;
+ foreach ($this->shardDefinitions as $shardDefinition) {
+ if ($shardDefinition->hasTable($table)) {
+ $this->shardDefinition = $shardDefinition;
+ }
+ }
+ }
+
+ public function from($from, $alias = null) {
+ if (is_string($from) && $from) {
+ $this->actOnTable($from);
+ }
+ return parent::from($from, $alias);
+ }
+
+ public function update($update = null, $alias = null) {
+ if (is_string($update) && $update) {
+ $this->actOnTable($update);
+ }
+ return parent::update($update, $alias);
+ }
+
+ public function insert($insert = null) {
+ if (is_string($insert) && $insert) {
+ $this->insertTable = $insert;
+ $this->actOnTable($insert);
+ }
+ return parent::insert($insert);
+ }
+
+ public function delete($delete = null, $alias = null) {
+ if (is_string($delete) && $delete) {
+ $this->actOnTable($delete);
+ }
+ return parent::delete($delete, $alias);
+ }
+
+ private function checkJoin(string $table): void {
+ if ($this->shardDefinition) {
+ if ($table === $this->mainTable) {
+ throw new InvalidShardedQueryException("Sharded query on {$this->mainTable} isn't allowed to join on itself");
+ }
+ if (!$this->shardDefinition->hasTable($table)) {
+ // this generally shouldn't happen as the partitioning logic should prevent this
+ // but the check is here just in case
+ throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to join on $table");
+ }
+ }
+ }
+
+ public function innerJoin($fromAlias, $join, $alias, $condition = null) {
+ $this->checkJoin($join);
+ return parent::innerJoin($fromAlias, $join, $alias, $condition);
+ }
+
+ public function leftJoin($fromAlias, $join, $alias, $condition = null) {
+ $this->checkJoin($join);
+ return parent::leftJoin($fromAlias, $join, $alias, $condition);
+ }
+
+ public function rightJoin($fromAlias, $join, $alias, $condition = null) {
+ if ($this->shardDefinition) {
+ throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to right join");
+ }
+ return parent::rightJoin($fromAlias, $join, $alias, $condition);
+ }
+
+ public function join($fromAlias, $join, $alias, $condition = null) {
+ return $this->innerJoin($fromAlias, $join, $alias, $condition);
+ }
+
+ public function setMaxResults($maxResults) {
+ if ($maxResults > 0) {
+ $this->limit = (int)$maxResults;
+ }
+ return parent::setMaxResults($maxResults);
+ }
+
+ public function setFirstResult($firstResult) {
+ if ($firstResult > 0) {
+ $this->offset = (int)$firstResult;
+ }
+ if ($this->shardDefinition && count($this->shardDefinition->shards) > 1) {
+ // we have to emulate offset
+ return $this;
+ } else {
+ return parent::setFirstResult($firstResult);
+ }
+ }
+
+ public function addOrderBy($sort, $order = null) {
+ $this->registerOrder((string)$sort, (string)$order ?? 'ASC');
+ return parent::addOrderBy($sort, $order);
+ }
+
+ public function orderBy($sort, $order = null) {
+ $this->sortList = [];
+ $this->registerOrder((string)$sort, (string)$order ?? 'ASC');
+ return parent::orderBy($sort, $order);
+ }
+
+ private function registerOrder(string $column, string $order): void {
+ // handle `mime + 0` and similar by just sorting on the first part of the expression
+ [$column] = explode(' ', $column);
+ $column = trim($column, '`');
+ $this->sortList[] = [
+ 'column' => $column,
+ 'order' => strtoupper($order),
+ ];
+ }
+
+ public function hintShardKey(string $column, mixed $value, bool $overwrite = false): self {
+ if ($overwrite) {
+ $this->primaryKeys = [];
+ $this->shardKeys = [];
+ }
+ if ($this->shardDefinition?->isKey($column)) {
+ $this->primaryKeys[] = $value;
+ }
+ if ($column === $this->shardDefinition?->shardKey) {
+ $this->shardKeys[] = $value;
+ }
+ return $this;
+ }
+
+ public function runAcrossAllShards(): self {
+ $this->allShards = true;
+ return $this;
+ }
+
+ /**
+ * @throws InvalidShardedQueryException
+ */
+ public function validate(): void {
+ if ($this->shardDefinition && $this->insertTable) {
+ if ($this->allShards) {
+ throw new InvalidShardedQueryException("Can't insert across all shards");
+ }
+ if (empty($this->getShardKeys())) {
+ throw new InvalidShardedQueryException("Can't insert without shard key");
+ }
+ }
+ if ($this->shardDefinition && !$this->allShards) {
+ if (empty($this->getShardKeys()) && empty($this->getPrimaryKeys())) {
+ throw new InvalidShardedQueryException('No shard key or primary key set for query');
+ }
+ }
+ if ($this->shardDefinition && $this->updateShardKey) {
+ $newShardKey = $this->getKeyValue($this->updateShardKey);
+ $oldShardKeys = $this->getShardKeys();
+ if (count($newShardKey) !== 1) {
+ throw new InvalidShardedQueryException("Can't set shard key to an array");
+ }
+ $newShardKey = current($newShardKey);
+ if (empty($oldShardKeys)) {
+ throw new InvalidShardedQueryException("Can't update without shard key");
+ }
+ $oldShards = array_values(array_unique(array_map(function ($shardKey) {
+ return $this->shardDefinition->getShardForKey((int)$shardKey);
+ }, $oldShardKeys)));
+ $newShard = $this->shardDefinition->getShardForKey((int)$newShardKey);
+ if ($oldShards === [$newShard]) {
+ throw new InvalidShardedQueryException('Update statement would move rows to a different shard');
+ }
+ }
+ }
+
+ public function executeQuery(?IDBConnection $connection = null): IResult {
+ $this->validate();
+ if ($this->shardDefinition) {
+ $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition);
+ return $runner->executeQuery($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys(), $this->sortList, $this->limit, $this->offset);
+ }
+ return parent::executeQuery($connection);
+ }
+
+ public function executeStatement(?IDBConnection $connection = null): int {
+ $this->validate();
+ if ($this->shardDefinition) {
+ $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition);
+ if ($this->insertTable) {
+ $shards = $runner->getShards($this->allShards, $this->getShardKeys());
+ if (!$shards) {
+ throw new InvalidShardedQueryException("Can't insert without shard key");
+ }
+ $count = 0;
+ foreach ($shards as $shard) {
+ $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
+ if (!$this->primaryKeys && $this->shardDefinition->table === $this->insertTable) {
+ $id = $this->autoIncrementHandler->getNextPrimaryKey($this->shardDefinition, $shard);
+ parent::setValue($this->shardDefinition->primaryKey, $this->createParameter('__generated_primary_key'));
+ $this->setParameter('__generated_primary_key', $id, self::PARAM_INT);
+ $this->lastInsertId = $id;
+ }
+ $count += parent::executeStatement($shardConnection);
+
+ $this->lastInsertConnection = $shardConnection;
+ }
+ return $count;
+ } else {
+ return $runner->executeStatement($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys());
+ }
+ }
+ return parent::executeStatement($connection);
+ }
+
+ public function getLastInsertId(): int {
+ if ($this->lastInsertId) {
+ return $this->lastInsertId;
+ }
+ if ($this->lastInsertConnection) {
+ $table = $this->builder->prefixTableName($this->insertTable);
+ return $this->lastInsertConnection->lastInsertId($table);
+ } else {
+ return parent::getLastInsertId();
+ }
+ }
+
+
+}