diff options
author | Robin Appelman <robin@icewind.nl> | 2024-07-31 18:41:11 +0200 |
---|---|---|
committer | Louis Chemineau <louis@chmn.me> | 2024-08-28 10:21:19 +0200 |
commit | 62f8b6517f4492b220ebd9df415f2b134735768b (patch) | |
tree | 4b1bee39c02d7f2cfeb7c975e4be88579ff3d932 /lib/private/DB | |
parent | f5b348674449d023367b5e5f84cb4ac1de98605b (diff) | |
download | nextcloud-server-62f8b6517f4492b220ebd9df415f2b134735768b.tar.gz nextcloud-server-62f8b6517f4492b220ebd9df415f2b134735768b.zip |
feat: implement distributing partitioned queries over multiple shards
Signed-off-by: Robin Appelman <robin@icewind.nl>
Diffstat (limited to 'lib/private/DB')
18 files changed, 1247 insertions, 21 deletions
diff --git a/lib/private/DB/Connection.php b/lib/private/DB/Connection.php index 74cdba3e218..447c164c1a4 100644 --- a/lib/private/DB/Connection.php +++ b/lib/private/DB/Connection.php @@ -23,12 +23,19 @@ use Doctrine\DBAL\Platforms\SqlitePlatform; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Statement; -use OC\DB\QueryBuilder\Partitioned\PartitionSplit; use OC\DB\QueryBuilder\Partitioned\PartitionedQueryBuilder; +use OC\DB\QueryBuilder\Partitioned\PartitionSplit; use OC\DB\QueryBuilder\QueryBuilder; +use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler; +use OC\DB\QueryBuilder\Sharded\CrossShardMoveHelper; +use OC\DB\QueryBuilder\Sharded\RoundRobinShardMapper; +use OC\DB\QueryBuilder\Sharded\ShardConnectionManager; +use OC\DB\QueryBuilder\Sharded\ShardDefinition; use OC\SystemConfig; use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\DB\QueryBuilder\Sharded\IShardMapper; use OCP\Diagnostics\IEventLogger; +use OCP\ICacheFactory; use OCP\IDBConnection; use OCP\ILogger; use OCP\IRequestId; @@ -80,6 +87,10 @@ class Connection extends PrimaryReadReplicaConnection { /** @var array<string, list<string>> */ protected array $partitions; + /** @var ShardDefinition[] */ + protected array $shards = []; + protected ShardConnectionManager $shardConnectionManager; + protected AutoIncrementHandler $autoIncrementHandler; /** * Initializes a new instance of the Connection class. @@ -105,6 +116,13 @@ class Connection extends PrimaryReadReplicaConnection { $this->adapter = new $params['adapter']($this); $this->tablePrefix = $params['tablePrefix']; + /** @psalm-suppress InvalidArrayOffset */ + $this->shardConnectionManager = $this->params['shard_connection_manager'] ?? Server::get(ShardConnectionManager::class); + /** @psalm-suppress InvalidArrayOffset */ + $this->autoIncrementHandler = $this->params['auto_increment_handler'] ?? new AutoIncrementHandler( + Server::get(ICacheFactory::class), + $this->shardConnectionManager, + ); $this->systemConfig = \OC::$server->getSystemConfig(); $this->clock = Server::get(ClockInterface::class); $this->logger = Server::get(LoggerInterface::class); @@ -123,12 +141,45 @@ class Connection extends PrimaryReadReplicaConnection { $this->_config->setSQLLogger($debugStack); } - $this->partitions = $this->systemConfig->getValue('db.partitions', []); + // todo: only allow specific, pre-defined shard configurations, the current config exists for easy testing setup + $this->shards = array_map(function (array $config) { + $shardMapperClass = $config['mapper'] ?? RoundRobinShardMapper::class; + $shardMapper = Server::get($shardMapperClass); + if (!$shardMapper instanceof IShardMapper) { + throw new \Exception("Invalid shard mapper: $shardMapperClass"); + } + return new ShardDefinition( + $config['table'], + $config['primary_key'], + $config['companion_keys'], + $config['shard_key'], + $shardMapper, + $config['companion_tables'], + $config['shards'] + ); + }, $this->params['sharding']); + $this->partitions = array_map(function (ShardDefinition $shard) { + return array_merge([$shard->table], $shard->companionTables); + }, $this->shards); $this->setNestTransactionsWithSavepoints(true); } /** + * @return IDBConnection[] + */ + public function getShardConnections(): array { + $connections = []; + foreach ($this->shards as $shardDefinition) { + foreach ($shardDefinition->getAllShards() as $shard) { + /** @var ConnectionAdapter $connection */ + $connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard); + } + } + return $connections; + } + + /** * @throws Exception */ public function connect($connectionName = null) { @@ -176,13 +227,19 @@ class Connection extends PrimaryReadReplicaConnection { */ public function getQueryBuilder(): IQueryBuilder { $this->queriesBuilt++; + $builder = new QueryBuilder( new ConnectionAdapter($this), $this->systemConfig, $this->logger ); if (count($this->partitions) > 0) { - $builder = new PartitionedQueryBuilder($builder); + $builder = new PartitionedQueryBuilder( + $builder, + $this->shards, + $this->shardConnectionManager, + $this->autoIncrementHandler, + ); foreach ($this->partitions as $name => $tables) { $partition = new PartitionSplit($name, $tables); $builder->addPartition($partition); @@ -704,6 +761,9 @@ class Connection extends PrimaryReadReplicaConnection { return $migrator->generateChangeScript($toSchema); } else { $migrator->migrate($toSchema); + foreach ($this->getShardConnections() as $shardConnection) { + $shardConnection->migrateToSchema($toSchema); + } } } @@ -846,4 +906,12 @@ class Connection extends PrimaryReadReplicaConnection { } } } + + public function getShardDefinition(string $name): ?ShardDefinition { + return $this->shards[$name] ?? null; + } + + public function getCrossShardMoveHelper(): CrossShardMoveHelper { + return new CrossShardMoveHelper($this->shardConnectionManager); + } } diff --git a/lib/private/DB/ConnectionAdapter.php b/lib/private/DB/ConnectionAdapter.php index 88083711195..2baeda9cfb7 100644 --- a/lib/private/DB/ConnectionAdapter.php +++ b/lib/private/DB/ConnectionAdapter.php @@ -12,6 +12,8 @@ use Doctrine\DBAL\Exception; use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Schema\Schema; use OC\DB\Exceptions\DbalException; +use OC\DB\QueryBuilder\Sharded\CrossShardMoveHelper; +use OC\DB\QueryBuilder\Sharded\ShardDefinition; use OCP\DB\IPreparedStatement; use OCP\DB\IResult; use OCP\DB\QueryBuilder\IQueryBuilder; @@ -244,4 +246,12 @@ class ConnectionAdapter implements IDBConnection { public function logDatabaseException(\Exception $exception) { $this->inner->logDatabaseException($exception); } + + public function getShardDefinition(string $name): ?ShardDefinition { + return $this->inner->getShardDefinition($name); + } + + public function getCrossShardMoveHelper(): CrossShardMoveHelper { + return $this->inner->getCrossShardMoveHelper(); + } } diff --git a/lib/private/DB/ConnectionFactory.php b/lib/private/DB/ConnectionFactory.php index af182243787..8f161b68ecb 100644 --- a/lib/private/DB/ConnectionFactory.php +++ b/lib/private/DB/ConnectionFactory.php @@ -11,7 +11,11 @@ use Doctrine\Common\EventManager; use Doctrine\DBAL\Configuration; use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Event\Listeners\OracleSessionInit; +use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler; +use OC\DB\QueryBuilder\Sharded\ShardConnectionManager; use OC\SystemConfig; +use OCP\ICacheFactory; +use OCP\Server; /** * Takes care of creating and configuring Doctrine connections. @@ -54,9 +58,12 @@ class ConnectionFactory { ], ]; + private ShardConnectionManager $shardConnectionManager; + private ICacheFactory $cacheFactory; public function __construct( - private SystemConfig $config + private SystemConfig $config, + ?ICacheFactory $cacheFactory = null, ) { if ($this->config->getValue('mysql.utf8mb4', false)) { $this->defaultConnectionParams['mysql']['charset'] = 'utf8mb4'; @@ -65,6 +72,8 @@ class ConnectionFactory { if ($collationOverride) { $this->defaultConnectionParams['mysql']['collation'] = $collationOverride; } + $this->shardConnectionManager = new ShardConnectionManager($this->config, $this); + $this->cacheFactory = $cacheFactory ?? Server::get(ICacheFactory::class); } /** @@ -214,6 +223,14 @@ class ConnectionFactory { if ($this->config->getValue('dbpersistent', false)) { $connectionParams['persistent'] = true; } + + $connectionParams['sharding'] = $this->config->getValue('dbsharding', []); + $connectionParams['shard_connection_manager'] = $this->shardConnectionManager; + $connectionParams['auto_increment_handler'] = new AutoIncrementHandler( + $this->cacheFactory, + $this->shardConnectionManager, + ); + $connectionParams = array_merge($connectionParams, $additionalConnectionParams); $replica = $this->config->getValue($configPrefix . 'dbreplica', $this->config->getValue('dbreplica', [])) ?: [$connectionParams]; diff --git a/lib/private/DB/QueryBuilder/ExtendedQueryBuilder.php b/lib/private/DB/QueryBuilder/ExtendedQueryBuilder.php index f96ed76f6bd..8ed88198c19 100644 --- a/lib/private/DB/QueryBuilder/ExtendedQueryBuilder.php +++ b/lib/private/DB/QueryBuilder/ExtendedQueryBuilder.php @@ -289,7 +289,21 @@ abstract class ExtendedQueryBuilder implements IQueryBuilder { return $this->builder->executeStatement($connection); } + public function hintShardKey(string $column, mixed $value) { + $this->builder->hintShardKey($column, $value); + return $this; + } + + public function runAcrossAllShards() { + $this->builder->runAcrossAllShards(); + return $this; + } + public function getOutputColumns(): array { return $this->builder->getOutputColumns(); } + + public function prefixTableName(string $table): string { + return $this->builder->prefixTableName($table); + } } diff --git a/lib/private/DB/QueryBuilder/Partitioned/JoinCondition.php b/lib/private/DB/QueryBuilder/Partitioned/JoinCondition.php index 54d913251d4..ff4e1da70b9 100644 --- a/lib/private/DB/QueryBuilder/Partitioned/JoinCondition.php +++ b/lib/private/DB/QueryBuilder/Partitioned/JoinCondition.php @@ -44,7 +44,7 @@ class JoinCondition { $fromConditions = []; $toConditions = []; foreach ($conditions as $condition) { - if (($condition->fromColumn && $fromColumn) ||($condition->toColumn && $toColumn)) { + if (($condition->fromColumn && $fromColumn) || ($condition->toColumn && $toColumn)) { throw new InvalidPartitionedQueryException("Can't join from {$condition->fromColumn} to {$condition->toColumn} as it already join froms {$fromColumn} to {$toColumn}"); } if ($condition->fromColumn) { diff --git a/lib/private/DB/QueryBuilder/Partitioned/PartitionQuery.php b/lib/private/DB/QueryBuilder/Partitioned/PartitionQuery.php index 8504c62d6d1..a5024b478d3 100644 --- a/lib/private/DB/QueryBuilder/Partitioned/PartitionQuery.php +++ b/lib/private/DB/QueryBuilder/Partitioned/PartitionQuery.php @@ -14,12 +14,12 @@ use OCP\DB\QueryBuilder\IQueryBuilder; * A sub-query from a partitioned join */ class PartitionQuery { - const JOIN_MODE_INNER = 'inner'; - const JOIN_MODE_LEFT = 'left'; + public const JOIN_MODE_INNER = 'inner'; + public const JOIN_MODE_LEFT = 'left'; // left-join where the left side IS NULL - const JOIN_MODE_LEFT_NULL = 'left_null'; + public const JOIN_MODE_LEFT_NULL = 'left_null'; - const JOIN_MODE_RIGHT = 'right'; + public const JOIN_MODE_RIGHT = 'right'; public function __construct( public IQueryBuilder $query, diff --git a/lib/private/DB/QueryBuilder/Partitioned/PartitionedQueryBuilder.php b/lib/private/DB/QueryBuilder/Partitioned/PartitionedQueryBuilder.php index 1ce2bb67620..8fcde0d24ae 100644 --- a/lib/private/DB/QueryBuilder/Partitioned/PartitionedQueryBuilder.php +++ b/lib/private/DB/QueryBuilder/Partitioned/PartitionedQueryBuilder.php @@ -8,18 +8,15 @@ declare(strict_types=1); namespace OC\DB\QueryBuilder\Partitioned; -use OC\DB\ConnectionAdapter; use OC\DB\QueryBuilder\CompositeExpression; -use OC\DB\QueryBuilder\ExtendedQueryBuilder; use OC\DB\QueryBuilder\QuoteHelper; +use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler; use OC\DB\QueryBuilder\Sharded\ShardConnectionManager; use OC\DB\QueryBuilder\Sharded\ShardedQueryBuilder; -use OC\SystemConfig; use OCP\DB\IResult; use OCP\DB\QueryBuilder\IQueryBuilder; use OCP\DB\QueryBuilder\IQueryFunction; use OCP\IDBConnection; -use Psr\Log\LoggerInterface; /** * A special query builder that automatically splits queries that span across multiple database partitions[1]. @@ -38,7 +35,7 @@ use Psr\Log\LoggerInterface; * * [1]: A set of tables which can't be queried together with the rest of the tables, such as when sharding is used. */ -class PartitionedQueryBuilder extends ExtendedQueryBuilder { +class PartitionedQueryBuilder extends ShardedQueryBuilder { /** @var array<string, PartitionQuery> $splitQueries */ private array $splitQueries = []; /** @var list<PartitionSplit> */ @@ -53,14 +50,28 @@ class PartitionedQueryBuilder extends ExtendedQueryBuilder { private ?int $offset = null; public function __construct( - IQueryBuilder $builder, + IQueryBuilder $builder, + array $shardDefinitions, + ShardConnectionManager $shardConnectionManager, + AutoIncrementHandler $autoIncrementHandler, ) { - parent::__construct($builder); + parent::__construct($builder, $shardDefinitions, $shardConnectionManager, $autoIncrementHandler); $this->quoteHelper = new QuoteHelper(); } private function newQuery(): IQueryBuilder { - return $this->builder->getConnection()->getQueryBuilder(); + // get a fresh, non-partitioning query builder + $builder = $this->builder->getConnection()->getQueryBuilder(); + if ($builder instanceof PartitionedQueryBuilder) { + $builder = $builder->builder; + } + + return new ShardedQueryBuilder( + $builder, + $this->shardDefinitions, + $this->shardConnectionManager, + $this->autoIncrementHandler, + ); } // we need to save selects until we know all the table aliases @@ -70,8 +81,8 @@ class PartitionedQueryBuilder extends ExtendedQueryBuilder { return $this; } - public function addSelect(...$selects) { - $selects = array_map(function($select) { + public function addSelect(...$select) { + $select = array_map(function ($select) { return ['select' => $select, 'alias' => null]; }, $select); $this->selects = array_merge($this->selects, $select); @@ -281,7 +292,7 @@ class PartitionedQueryBuilder extends ExtendedQueryBuilder { $partitionPredicates = []; foreach ($predicates as $predicate) { - $partition = $this->getPartitionForPredicate((string) $predicate); + $partition = $this->getPartitionForPredicate((string)$predicate); if ($this->mainPartition === $partition) { $partitionPredicates[''][] = $predicate; } elseif ($partition) { diff --git a/lib/private/DB/QueryBuilder/QueryBuilder.php b/lib/private/DB/QueryBuilder/QueryBuilder.php index a07af842987..76fa9f42bab 100644 --- a/lib/private/DB/QueryBuilder/QueryBuilder.php +++ b/lib/private/DB/QueryBuilder/QueryBuilder.php @@ -1329,7 +1329,7 @@ class QueryBuilder implements IQueryBuilder { * @param string $table * @return string */ - public function prefixTableName($table) { + public function prefixTableName(string $table): string { if ($this->automaticTablePrefix === false || str_starts_with($table, '*PREFIX*')) { return $table; } @@ -1369,4 +1369,14 @@ class QueryBuilder implements IQueryBuilder { public function escapeLikeParameter(string $parameter): string { return $this->connection->escapeLikeParameter($parameter); } + + public function hintShardKey(string $column, mixed $value) { + return $this; + } + + public function runAcrossAllShards() { + // noop + return $this; + } + } diff --git a/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php b/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php new file mode 100644 index 00000000000..553644def4e --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php @@ -0,0 +1,152 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OCP\ICacheFactory; +use OCP\IMemcache; +use OCP\IMemcacheTTL; + +/** + * A helper to atomically determine the next auto increment value for a sharded table + * + * Since we can't use the database's auto-increment (since each db doesn't know about the keys in the other shards) + * we need external logic for doing the auto increment + */ +class AutoIncrementHandler { + public const MIN_VALID_KEY = 1000; + public const TTL = 365 * 24 * 60 * 60; + + private ?IMemcache $cache = null; + + public function __construct( + private ICacheFactory $cacheFactory, + private ShardConnectionManager $shardConnectionManager, + ) { + if (PHP_INT_SIZE < 8) { + throw new \Exception("sharding is only supported with 64bit php"); + } + } + + private function getCache(): IMemcache { + if(is_null($this->cache)) { + $cache = $this->cacheFactory->createDistributed("shared_autoincrement"); + if ($cache instanceof IMemcache) { + $this->cache = $cache; + } else { + throw new \Exception('Distributed cache ' . get_class($cache) . ' is not suitable'); + } + } + return $this->cache; + } + + /** + * Get the next value for the given shard definition + * + * The returned key is unique and incrementing, but not sequential. + * The shard id is encoded in the first byte of the returned value + * + * @param ShardDefinition $shardDefinition + * @return int + * @throws \Exception + */ + public function getNextPrimaryKey(ShardDefinition $shardDefinition, int $shard): int { + $retries = 0; + while ($retries < 5) { + $next = $this->getNextInner($shardDefinition); + if ($next !== null) { + if ($next > ShardDefinition::MAX_PRIMARY_KEY) { + throw new \Exception("Max primary key of " . ShardDefinition::MAX_PRIMARY_KEY . " exceeded"); + } + // we encode the shard the primary key was originally inserted into to allow guessing the shard by primary key later on + return ($next << 8) | $shard; + } else { + $retries++; + } + } + throw new \Exception("Failed to get next primary key"); + } + + /** + * auto increment logic without retry + * + * @param ShardDefinition $shardDefinition + * @return int|null either the next primary key or null if the call needs to be retried + */ + private function getNextInner(ShardDefinition $shardDefinition): ?int { + $cache = $this->getCache(); + // because this function will likely be called concurrently from different requests + // the implementation needs to ensure that the cached value can be cleared, invalidated or re-calculated at any point between our cache calls + // care must be taken that the logic remains fully resilient against race conditions + + // in the ideal case, the last primary key is stored in the cache and we can just do an `inc` + // if that is not the case we find the highest used id in the database increment it, and save it in the cache + + // prevent inc from returning `1` if the key doesn't exist by setting it to a non-numeric value + $cache->add($shardDefinition->table, "empty-placeholder", self::TTL); + $next = $cache->inc($shardDefinition->table); + + if ($cache instanceof IMemcacheTTL) { + $cache->setTTL($shardDefinition->table, self::TTL); + } + + // the "add + inc" trick above isn't strictly atomic, so as a safety we reject any result that to small + // to handle the edge case of the stored value disappearing between the add and inc + if (is_int($next) && $next >= self::MIN_VALID_KEY) { + return $next; + } elseif (is_int($next)) { + // we hit the edge case, so invalidate the cached value + if (!$cache->cas($shardDefinition->table, $next, "empty-placeholder")) { + // someone else is changing the value concurrently, give up and retry + return null; + } + } + + // discard the encoded initial shard + $current = $this->getMaxFromDb($shardDefinition) >> 8; + $next = max($current, self::MIN_VALID_KEY) + 1; + if ($cache->cas($shardDefinition->table, "empty-placeholder", $next)) { + return $next; + } + + // another request set the cached value before us, so we should just be able to inc + $next = $cache->inc($shardDefinition->table); + if (is_int($next) && $next >= self::MIN_VALID_KEY) { + return $next; + } elseif(is_int($next)) { + // key got cleared, invalidate and retry + $cache->cas($shardDefinition->table, $next, "empty-placeholder"); + return null; + } else { + // cleanup any non-numeric value other than the placeholder if that got stored somehow + $cache->ncad($shardDefinition->table, "empty-placeholder"); + // retry + return null; + } + } + + /** + * Get the maximum primary key value from the shards + */ + private function getMaxFromDb(ShardDefinition $shardDefinition): int { + $max = 0; + foreach ($shardDefinition->getAllShards() as $shard) { + $connection = $this->shardConnectionManager->getConnection($shardDefinition, $shard); + $query = $connection->getQueryBuilder(); + $query->select($shardDefinition->primaryKey) + ->from($shardDefinition->table) + ->orderBy($shardDefinition->primaryKey, "DESC") + ->setMaxResults(1); + $result = $query->executeQuery()->fetchOne(); + if ($result) { + $max = max($max, $result); + } + } + return $max; + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php b/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php new file mode 100644 index 00000000000..ffc95e4e54c --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/CrossShardMoveHelper.php @@ -0,0 +1,162 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IDBConnection; + +/** + * Utility methods for implementing logic that moves data across shards + */ +class CrossShardMoveHelper { + public function __construct( + private ShardConnectionManager $connectionManager + ) { + } + + public function getConnection(ShardDefinition $shardDefinition, int $shardKey): IDBConnection { + return $this->connectionManager->getConnection($shardDefinition, $shardDefinition->getShardForKey($shardKey)); + } + + /** + * Update the shard key of a set of rows, moving them to a different shard if needed + * + * @param ShardDefinition $shardDefinition + * @param string $table + * @param string $shardColumn + * @param int $sourceShardKey + * @param int $targetShardKey + * @param string $primaryColumn + * @param int[] $primaryKeys + * @return void + */ + public function moveCrossShards(ShardDefinition $shardDefinition, string $table, string $shardColumn, int $sourceShardKey, int $targetShardKey, string $primaryColumn, array $primaryKeys): void { + $sourceShard = $shardDefinition->getShardForKey($sourceShardKey); + $targetShard = $shardDefinition->getShardForKey($targetShardKey); + $sourceConnection = $this->connectionManager->getConnection($shardDefinition, $sourceShard); + if ($sourceShard === $targetShard) { + $this->updateItems($sourceConnection, $table, $shardColumn, $targetShardKey, $primaryColumn, $primaryKeys); + + return; + } + $targetConnection = $this->connectionManager->getConnection($shardDefinition, $targetShard); + + $sourceItems = $this->loadItems($sourceConnection, $table, $primaryColumn, $primaryKeys); + foreach ($sourceItems as &$sourceItem) { + $sourceItem[$shardColumn] = $targetShardKey; + } + if (!$sourceItems) { + return; + } + + $sourceConnection->beginTransaction(); + $targetConnection->beginTransaction(); + try { + $this->saveItems($targetConnection, $table, $sourceItems); + $this->deleteItems($sourceConnection, $table, $primaryColumn, $primaryKeys); + + $targetConnection->commit(); + $sourceConnection->commit(); + } catch (\Exception $e) { + $sourceConnection->rollback(); + $targetConnection->rollback(); + throw $e; + } + } + + /** + * Load rows from a table to move + * + * @param IDBConnection $connection + * @param string $table + * @param string $primaryColumn + * @param int[] $primaryKeys + * @return array[] + */ + public function loadItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): array { + $query = $connection->getQueryBuilder(); + $query->select('*') + ->from($table) + ->where($query->expr()->in($primaryColumn, $query->createParameter("keys"))); + + $chunks = array_chunk($primaryKeys, 1000); + + $results = []; + foreach ($chunks as $chunk) { + $query->setParameter("keys", $chunk, IQueryBuilder::PARAM_INT_ARRAY); + $results = array_merge($results, $query->execute()->fetchAll()); + } + + return $results; + } + + /** + * Save modified rows + * + * @param IDBConnection $connection + * @param string $table + * @param array[] $items + * @return void + */ + public function saveItems(IDBConnection $connection, string $table, array $items): void { + if (count($items) === 0) { + return; + } + $query = $connection->getQueryBuilder(); + $query->insert($table); + foreach ($items[0] as $column => $value) { + $query->setValue($column, $query->createParameter($column)); + } + + foreach ($items as $item) { + foreach ($item as $column => $value) { + if (is_int($column)) { + $query->setParameter($column, $value, IQueryBuilder::PARAM_INT); + } else { + $query->setParameter($column, $value); + } + } + $query->executeStatement(); + } + } + + /** + * @param IDBConnection $connection + * @param string $table + * @param string $primaryColumn + * @param int[] $primaryKeys + * @return void + */ + public function updateItems(IDBConnection $connection, string $table, string $shardColumn, int $targetShardKey, string $primaryColumn, array $primaryKeys): void { + $query = $connection->getQueryBuilder(); + $query->update($table) + ->set($shardColumn, $query->createNamedParameter($targetShardKey, IQueryBuilder::PARAM_INT)) + ->where($query->expr()->in($primaryColumn, $query->createNamedParameter($primaryKeys, IQueryBuilder::PARAM_INT_ARRAY))); + $query->executeQuery()->fetchAll(); + } + + /** + * @param IDBConnection $connection + * @param string $table + * @param string $primaryColumn + * @param int[] $primaryKeys + * @return void + */ + public function deleteItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): void { + $query = $connection->getQueryBuilder(); + $query->delete($table) + ->where($query->expr()->in($primaryColumn, $query->createParameter("keys"))); + $chunks = array_chunk($primaryKeys, 1000); + + foreach ($chunks as $chunk) { + $query->setParameter("keys", $chunk, IQueryBuilder::PARAM_INT_ARRAY); + $query->executeStatement(); + } + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php b/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php new file mode 100644 index 00000000000..af778489a2d --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/HashShardMapper.php @@ -0,0 +1,21 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OCP\DB\QueryBuilder\Sharded\IShardMapper; + +/** + * Map string key to an int-range by hashing the key + */ +class HashShardMapper implements IShardMapper { + public function getShardForKey(int $key, int $count): int { + $int = unpack('L', substr(md5((string)$key, true), 0, 4))[1]; + return $int % $count; + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php b/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php new file mode 100644 index 00000000000..733a6acaf9d --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/InvalidShardedQueryException.php @@ -0,0 +1,29 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +namespace OC\DB\QueryBuilder\Sharded; + +/** + * Queries on sharded table has the following limitations: + * + * 1. Either the shard key (e.g. "storage") or primary key (e.g. "fileid") must be mentioned in the query. + * Or the query must be explicitly marked as running across all shards. + * + * For queries where it isn't possible to set one of these keys in the query normally, you can set it using `hintShardKey` + * + * 2. Insert statements must always explicitly set the shard key + * 3. A query on a sharded table is not allowed to join on the same table + * 4. Right joins are not allowed on sharded tables + * 5. Updating the shard key where the new shard key maps to a different shard is not allowed + * + * Moving rows to a different shard needs to be implemented manually. `CrossShardMoveHelper` provides + * some tools to help make this easier. + */ +class InvalidShardedQueryException extends \Exception { + +} diff --git a/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php b/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php new file mode 100644 index 00000000000..a5694b06507 --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/RoundRobinShardMapper.php @@ -0,0 +1,20 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OCP\DB\QueryBuilder\Sharded\IShardMapper; + +/** + * Map string key to an int-range by hashing the key + */ +class RoundRobinShardMapper implements IShardMapper { + public function getShardForKey(int $key, int $count): int { + return $key % $count; + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php b/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php new file mode 100644 index 00000000000..87cac58bc57 --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php @@ -0,0 +1,43 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OC\DB\ConnectionAdapter; +use OC\DB\ConnectionFactory; +use OC\SystemConfig; +use OCP\IDBConnection; + +/** + * Keeps track of the db connections to the various shards + */ +class ShardConnectionManager { + /** @var array<string, IDBConnection> */ + private array $connections = []; + + public function __construct( + private SystemConfig $config, + private ConnectionFactory $factory, + ) { + } + + public function getConnection(ShardDefinition $shardDefinition, int $shard): IDBConnection { + $connectionKey = $shardDefinition->table . '_' . $shard; + if (!isset($this->connections[$connectionKey])) { + $this->connections[$connectionKey] = $this->createConnection($shardDefinition->shards[$shard]); + } + + return $this->connections[$connectionKey]; + } + + private function createConnection(array $shardConfig): IDBConnection { + $shardConfig['sharding'] = []; + $type = $this->config->getValue('dbtype', 'sqlite'); + return new ConnectionAdapter($this->factory->getConnection($type, $shardConfig)); + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php b/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php new file mode 100644 index 00000000000..5661ca079e1 --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php @@ -0,0 +1,66 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OCP\DB\QueryBuilder\Sharded\IShardMapper; + +/** + * Configuration for a shard setup + */ +class ShardDefinition { + // we reserve the bottom byte of the primary key for the initial shard, so the total shard count is limited to what we can fit there + public const MAX_SHARDS = 256; + + public const PRIMARY_KEY_MASK = 0x7F_FF_FF_FF_FF_FF_FF_00; + public const PRIMARY_KEY_SHARD_MASK = 0x00_00_00_00_00_00_00_FF; + // since we reserve 1 byte for the shard index, we only have 56 bits of primary key space + public const MAX_PRIMARY_KEY = PHP_INT_MAX >> 8; + + /** + * @param string $table + * @param string $primaryKey + * @param string $shardKey + * @param string[] $companionKeys + * @param IShardMapper $shardMapper + * @param string[] $companionTables + * @param array $shards + */ + public function __construct( + public string $table, + public string $primaryKey, + public array $companionKeys, + public string $shardKey, + public IShardMapper $shardMapper, + public array $companionTables = [], + public array $shards = [], + ) { + if (count($this->shards) >= self::MAX_SHARDS) { + throw new \Exception("Only allowed maximum of " . self::MAX_SHARDS . " shards allowed"); + } + } + + public function hasTable(string $table): bool { + if ($this->table === $table) { + return true; + } + return in_array($table, $this->companionTables); + } + + public function getShardForKey(int $key): int { + return $this->shardMapper->getShardForKey($key, count($this->shards)); + } + + public function getAllShards(): array { + return array_keys($this->shards); + } + + public function isKey(string $column): bool { + return $column === $this->primaryKey || in_array($column, $this->companionKeys); + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php b/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php new file mode 100644 index 00000000000..22b86a018b3 --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php @@ -0,0 +1,197 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OC\DB\ArrayResult; +use OCP\DB\IResult; +use OCP\DB\QueryBuilder\IQueryBuilder; + +/** + * Logic for running a query across a number of shards, combining the results + */ +class ShardQueryRunner { + public function __construct( + private ShardConnectionManager $shardConnectionManager, + private ShardDefinition $shardDefinition, + ) { + } + + /** + * Get the shards for a specific query or null if the shards aren't known in advance + * + * @param bool $allShards + * @param int[] $shardKeys + * @return null|int[] + */ + public function getShards(bool $allShards, array $shardKeys): ?array { + if ($allShards) { + return $this->shardDefinition->getAllShards(); + } + $allConfiguredShards = $this->shardDefinition->getAllShards(); + if (count($allConfiguredShards) === 1) { + return $allConfiguredShards; + } + if (empty($shardKeys)) { + return null; + } + $shards = array_map(function ($shardKey) { + return $this->shardDefinition->getShardForKey((int)$shardKey); + }, $shardKeys); + return array_values(array_unique($shards)); + } + + /** + * Try to get the shards that the keys are likely to be in, based on the shard the row was created + * + * @param int[] $primaryKeys + * @return int[] + */ + private function getLikelyShards(array $primaryKeys): array { + $shards = []; + foreach ($primaryKeys as $primaryKey) { + $encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK; + if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) { + $shards[] = $encodedShard; + } + } + return $shards; + } + + /** + * Execute a SELECT statement across the configured shards + * + * @param IQueryBuilder $query + * @param bool $allShards + * @param int[] $shardKeys + * @param int[] $primaryKeys + * @param array{column: string, order: string}[] $sortList + * @param int|null $limit + * @param int|null $offset + * @return IResult + */ + public function executeQuery( + IQueryBuilder $query, + bool $allShards, + array $shardKeys, + array $primaryKeys, + ?array $sortList = null, + ?int $limit = null, + ?int $offset = null, + ): IResult { + $shards = $this->getShards($allShards, $shardKeys); + $results = []; + if ($shards && count($shards) === 1) { + // trivial case + return $query->executeQuery($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0])); + } + // we have to emulate limit and offset, so we select offset+limit from all shards to ensure we have enough rows + // and then filter them down after we merged the results + if ($limit !== null && $offset !== null) { + $query->setMaxResults($limit + $offset); + } + + if ($shards) { + // we know exactly what shards we need to query + foreach ($shards as $shard) { + $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard); + $subResult = $query->executeQuery($shardConnection); + $results = array_merge($results, $subResult->fetchAll()); + $subResult->closeCursor(); + } + } else { + // we don't know for sure what shards we need to query, + // we first try the shards that are "likely" to have the rows we want, based on the shard that the row was + // originally created in. If we then still haven't found all rows we try the rest of the shards + $likelyShards = $this->getLikelyShards($primaryKeys); + $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards); + $shards = array_merge($likelyShards, $unlikelyShards); + + foreach ($shards as $shard) { + $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard); + $subResult = $query->executeQuery($shardConnection); + $rows = $subResult->fetchAll(); + $results = array_merge($results, $rows); + $subResult->closeCursor(); + + if (count($rows) >= count($primaryKeys)) { + // we have all the rows we're looking for + break; + } + } + } + + if ($sortList) { + usort($results, function ($a, $b) use ($sortList) { + foreach ($sortList as $sort) { + $valueA = $a[$sort['column']] ?? null; + $valueB = $b[$sort['column']] ?? null; + $cmp = $valueA <=> $valueB; + if ($cmp === 0) { + continue; + } + if ($sort['order'] === "DESC") { + $cmp = -$cmp; + } + return $cmp; + } + }); + } + + if ($limit !== null && $offset !== null) { + $results = array_slice($results, $offset, $limit); + } elseif ($limit !== null) { + $results = array_slice($results, 0, $limit); + } elseif ($offset !== null) { + $results = array_slice($results, $offset); + } + + return new ArrayResult($results); + } + + /** + * Execute an UPDATE or DELETE statement + * + * @param IQueryBuilder $query + * @param bool $allShards + * @param int[] $shardKeys + * @param int[] $primaryKeys + * @return int + * @throws \OCP\DB\Exception + */ + public function executeStatement(IQueryBuilder $query, bool $allShards, array $shardKeys, array $primaryKeys): int { + if ($query->getType() === \Doctrine\DBAL\Query\QueryBuilder::INSERT) { + throw new \Exception("insert queries need special handling"); + } + + $shards = $this->getShards($allShards, $shardKeys); + $maxCount = count($primaryKeys); + if ($shards && count($shards) === 1) { + return $query->executeStatement($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0])); + } elseif ($shards) { + $maxCount = PHP_INT_MAX; + } else { + // sort the likely shards before the rest, similar logic to `self::executeQuery` + $likelyShards = $this->getLikelyShards($primaryKeys); + $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards); + $shards = array_merge($likelyShards, $unlikelyShards); + } + + $count = 0; + + foreach ($shards as $shard) { + $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard); + $count += $query->executeStatement($shardConnection); + + if ($count >= $maxCount) { + break; + } + } + return $count; + } +} diff --git a/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php b/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php new file mode 100644 index 00000000000..6496453a1a6 --- /dev/null +++ b/lib/private/DB/QueryBuilder/Sharded/ShardedQueryBuilder.php @@ -0,0 +1,403 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\DB\QueryBuilder\Sharded; + +use OC\DB\QueryBuilder\CompositeExpression; +use OC\DB\QueryBuilder\ExtendedQueryBuilder; +use OC\DB\QueryBuilder\Parameter; +use OCP\DB\IResult; +use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IDBConnection; + +/** + * A special query builder that automatically distributes queries over multiple database shards. + * + * This relies on `PartitionedQueryBuilder` to handle splitting of parts of the query that touch the sharded tables + * from the non-sharded tables. So the query build here should only either touch only sharded table or only non-sharded tables. + * + * Most of the logic in this class is concerned with extracting either the shard key (e.g. "storage") or primary key (e.g. "fileid") + * from the query. The logic for actually running the query across the shards is mostly delegated to `ShardQueryRunner`. + */ +class ShardedQueryBuilder extends ExtendedQueryBuilder { + private array $shardKeys = []; + private array $primaryKeys = []; + private ?ShardDefinition $shardDefinition = null; + /** @var bool Run the query across all shards */ + private bool $allShards = false; + private ?string $insertTable = null; + private mixed $lastInsertId = null; + private ?IDBConnection $lastInsertConnection = null; + private ?int $updateShardKey = null; + private ?int $limit = null; + private ?int $offset = null; + /** @var array{column: string, order: string}[] */ + private array $sortList = []; + private string $mainTable = ''; + + public function __construct( + IQueryBuilder $builder, + protected array $shardDefinitions, + protected ShardConnectionManager $shardConnectionManager, + protected AutoIncrementHandler $autoIncrementHandler, + ) { + parent::__construct($builder); + } + + public function getShardKeys(): array { + return $this->getKeyValues($this->shardKeys); + } + + public function getPrimaryKeys(): array { + return $this->getKeyValues($this->primaryKeys); + } + + private function getKeyValues(array $keys): array { + $values = []; + foreach ($keys as $key) { + $values = array_merge($values, $this->getKeyValue($key)); + } + return array_values(array_unique($values)); + } + + private function getKeyValue($value): array { + if ($value instanceof Parameter) { + $value = (string)$value; + } + if (is_string($value) && str_starts_with($value, ':')) { + $param = $this->getParameter(substr($value, 1)); + if (is_array($param)) { + return $param; + } else { + return [$param]; + } + } elseif ($value !== null) { + return [$value]; + } else { + return []; + } + } + + public function where(...$predicates) { + return $this->andWhere(...$predicates); + } + + public function andWhere(...$where) { + if ($where) { + foreach ($where as $predicate) { + $this->tryLoadShardKey($predicate); + } + parent::andWhere(...$where); + } + return $this; + } + + private function tryLoadShardKey($predicate): void { + if (!$this->shardDefinition) { + return; + } + if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->shardKey)) { + $this->shardKeys += $keys; + } + if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->primaryKey)) { + $this->primaryKeys += $keys; + } + foreach ($this->shardDefinition->companionKeys as $companionKey) { + if ($keys = $this->tryExtractShardKeys($predicate, $companionKey)) { + $this->primaryKeys += $keys; + } + } + } + + /** + * @param $predicate + * @param string $column + * @return string[] + */ + private function tryExtractShardKeys($predicate, string $column): array { + if ($predicate instanceof CompositeExpression) { + $values = []; + foreach ($predicate->getParts() as $part) { + $partValues = $this->tryExtractShardKeys($part, $column); + // for OR expressions, we can only rely on the predicate if all parts contain the comparison + if ($predicate->getType() === CompositeExpression::TYPE_OR && !$partValues) { + return []; + } + $values = array_merge($values, $partValues); + } + return $values; + } + $predicate = (string)$predicate; + // expect a condition in the form of 'alias1.column1 = placeholder' or 'alias1.column1 in placeholder' + if (substr_count($predicate, ' ') > 2) { + return []; + } + if (str_contains($predicate, ' = ')) { + $parts = explode(' = ', $predicate); + if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) { + return [$parts[1]]; + } else { + return []; + } + } + + if (str_contains($predicate, ' IN ')) { + $parts = explode(' IN ', $predicate); + if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) { + return [trim(trim($parts[1], '('), ')')]; + } else { + return []; + } + } + + return []; + } + + public function set($key, $value) { + if ($this->shardDefinition && $key === $this->shardDefinition->shardKey) { + $updateShardKey = $value; + } + return parent::set($key, $value); + } + + public function setValue($column, $value) { + if ($this->shardDefinition) { + if ($this->shardDefinition->isKey($column)) { + $this->primaryKeys[] = $value; + } + if ($column === $this->shardDefinition->shardKey) { + $this->shardKeys[] = $value; + } + } + return parent::setValue($column, $value); + } + + public function values(array $values) { + foreach ($values as $column => $value) { + $this->setValue($column, $value); + } + return $this; + } + + private function actOnTable(string $table): void { + $this->mainTable = $table; + foreach ($this->shardDefinitions as $shardDefinition) { + if ($shardDefinition->hasTable($table)) { + $this->shardDefinition = $shardDefinition; + } + } + } + + public function from($from, $alias = null) { + if (is_string($from) && $from) { + $this->actOnTable($from); + } + return parent::from($from, $alias); + } + + public function update($update = null, $alias = null) { + if (is_string($update) && $update) { + $this->actOnTable($update); + } + return parent::update($update, $alias); + } + + public function insert($insert = null) { + if (is_string($insert) && $insert) { + $this->insertTable = $insert; + $this->actOnTable($insert); + } + return parent::insert($insert); + } + + public function delete($delete = null, $alias = null) { + if (is_string($delete) && $delete) { + $this->actOnTable($delete); + } + return parent::delete($delete, $alias); + } + + private function checkJoin(string $table): void { + if ($this->shardDefinition) { + if ($table === $this->mainTable) { + throw new InvalidShardedQueryException("Sharded query on {$this->mainTable} isn't allowed to join on itself"); + } + if (!$this->shardDefinition->hasTable($table)) { + // this generally shouldn't happen as the partitioning logic should prevent this + // but the check is here just in case + throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to join on $table"); + } + } + } + + public function innerJoin($fromAlias, $join, $alias, $condition = null) { + $this->checkJoin($join); + return parent::innerJoin($fromAlias, $join, $alias, $condition); + } + + public function leftJoin($fromAlias, $join, $alias, $condition = null) { + $this->checkJoin($join); + return parent::leftJoin($fromAlias, $join, $alias, $condition); + } + + public function rightJoin($fromAlias, $join, $alias, $condition = null) { + if ($this->shardDefinition) { + throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to right join"); + } + return parent::rightJoin($fromAlias, $join, $alias, $condition); + } + + public function join($fromAlias, $join, $alias, $condition = null) { + return $this->innerJoin($fromAlias, $join, $alias, $condition); + } + + public function setMaxResults($maxResults) { + if ($maxResults > 0) { + $this->limit = (int)$maxResults; + } + return parent::setMaxResults($maxResults); + } + + public function setFirstResult($firstResult) { + if ($firstResult > 0) { + $this->offset = (int)$firstResult; + } + if ($this->shardDefinition && count($this->shardDefinition->shards) > 1) { + // we have to emulate offset + return $this; + } else { + return parent::setFirstResult($firstResult); + } + } + + public function addOrderBy($sort, $order = null) { + $this->registerOrder((string) $sort, (string)$order ?? "ASC"); + return parent::orderBy($sort, $order); + } + + public function orderBy($sort, $order = null) { + $this->sortList = []; + $this->registerOrder((string) $sort, (string)$order ?? "ASC"); + return parent::orderBy($sort, $order); + } + + private function registerOrder(string $column, string $order): void { + // handle `mime + 0` and similar by just sorting on the first part of the expression + [$column] = explode(' ', $column); + $column = trim($column, '`'); + $this->sortList[] = [ + 'column' => $column, + 'order' => strtoupper($order), + ]; + } + + public function hintShardKey(string $column, mixed $value) { + if ($this->shardDefinition?->isKey($column)) { + $this->primaryKeys[] = $value; + } + if ($column === $this->shardDefinition?->shardKey) { + $this->shardKeys[] = $value; + } + return $this; + } + + public function runAcrossAllShards() { + $this->allShards = true; + return $this; + } + + /** + * @throws InvalidShardedQueryException + */ + public function validate(): void { + if ($this->shardDefinition && $this->insertTable) { + if ($this->allShards) { + throw new InvalidShardedQueryException("Can't insert across all shards"); + } + if (empty($this->getShardKeys())) { + throw new InvalidShardedQueryException("Can't insert without shard key"); + } + } + if ($this->shardDefinition && !$this->allShards) { + if (empty($this->getShardKeys()) && empty($this->getPrimaryKeys())) { + throw new InvalidShardedQueryException("No shard key or primary key set for query"); + } + } + if ($this->shardDefinition && $this->updateShardKey) { + $newShardKey = $this->getKeyValue($this->updateShardKey); + $oldShardKeys = $this->getShardKeys(); + if (count($newShardKey) !== 1) { + throw new InvalidShardedQueryException("Can't set shard key to an array"); + } + $newShardKey = current($newShardKey); + if (empty($oldShardKeys)) { + throw new InvalidShardedQueryException("Can't update without shard key"); + } + $oldShards = array_values(array_unique(array_map(function ($shardKey) { + return $this->shardDefinition->getShardForKey((int)$shardKey); + }, $oldShardKeys))); + $newShard = $this->shardDefinition->getShardForKey((int)$newShardKey); + if ($oldShards === [$newShard]) { + throw new InvalidShardedQueryException("Update statement would move rows to a different shard"); + } + } + } + + public function executeQuery(?IDBConnection $connection = null): IResult { + $this->validate(); + if ($this->shardDefinition) { + $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition); + return $runner->executeQuery($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys(), $this->sortList, $this->limit, $this->offset); + } + return parent::executeQuery($connection); + } + + public function executeStatement(?IDBConnection $connection = null): int { + $this->validate(); + if ($this->shardDefinition) { + $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition); + if ($this->insertTable) { + $shards = $runner->getShards($this->allShards, $this->getShardKeys()); + if (!$shards) { + throw new InvalidShardedQueryException("Can't insert without shard key"); + } + $count = 0; + foreach ($shards as $shard) { + $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard); + if (!$this->primaryKeys && $this->shardDefinition->table === $this->insertTable) { + $id = $this->autoIncrementHandler->getNextPrimaryKey($this->shardDefinition, $shard); + parent::setValue($this->shardDefinition->primaryKey, $this->createParameter('__generated_primary_key')); + $this->setParameter('__generated_primary_key', $id, self::PARAM_INT); + $this->lastInsertId = $id; + } + $count += parent::executeStatement($shardConnection); + + $this->lastInsertConnection = $shardConnection; + } + return $count; + } else { + return $runner->executeStatement($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys()); + } + } + return parent::executeStatement($connection); + } + + public function getLastInsertId(): int { + if ($this->lastInsertId) { + return $this->lastInsertId; + } + if ($this->lastInsertConnection) { + $table = $this->builder->prefixTableName($this->insertTable); + return $this->lastInsertConnection->lastInsertId($table); + } else { + return parent::getLastInsertId(); + } + } + + +} diff --git a/lib/private/DB/SchemaWrapper.php b/lib/private/DB/SchemaWrapper.php index 5720e10fbdb..473c0009237 100644 --- a/lib/private/DB/SchemaWrapper.php +++ b/lib/private/DB/SchemaWrapper.php @@ -36,6 +36,9 @@ class SchemaWrapper implements ISchemaWrapper { public function performDropTableCalls() { foreach ($this->tablesToDelete as $tableName => $true) { $this->connection->dropTable($tableName); + foreach ($this->connection->getShardConnections() as $shardConnection) { + $shardConnection->dropTable($tableName); + } unset($this->tablesToDelete[$tableName]); } } |