diff options
Diffstat (limited to 'lib/private/DB/Connection.php')
-rw-r--r-- | lib/private/DB/Connection.php | 74 |
1 files changed, 71 insertions, 3 deletions
diff --git a/lib/private/DB/Connection.php b/lib/private/DB/Connection.php index 74cdba3e218..447c164c1a4 100644 --- a/lib/private/DB/Connection.php +++ b/lib/private/DB/Connection.php @@ -23,12 +23,19 @@ use Doctrine\DBAL\Platforms\SqlitePlatform; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Statement; -use OC\DB\QueryBuilder\Partitioned\PartitionSplit; use OC\DB\QueryBuilder\Partitioned\PartitionedQueryBuilder; +use OC\DB\QueryBuilder\Partitioned\PartitionSplit; use OC\DB\QueryBuilder\QueryBuilder; +use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler; +use OC\DB\QueryBuilder\Sharded\CrossShardMoveHelper; +use OC\DB\QueryBuilder\Sharded\RoundRobinShardMapper; +use OC\DB\QueryBuilder\Sharded\ShardConnectionManager; +use OC\DB\QueryBuilder\Sharded\ShardDefinition; use OC\SystemConfig; use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\DB\QueryBuilder\Sharded\IShardMapper; use OCP\Diagnostics\IEventLogger; +use OCP\ICacheFactory; use OCP\IDBConnection; use OCP\ILogger; use OCP\IRequestId; @@ -80,6 +87,10 @@ class Connection extends PrimaryReadReplicaConnection { /** @var array<string, list<string>> */ protected array $partitions; + /** @var ShardDefinition[] */ + protected array $shards = []; + protected ShardConnectionManager $shardConnectionManager; + protected AutoIncrementHandler $autoIncrementHandler; /** * Initializes a new instance of the Connection class. @@ -105,6 +116,13 @@ class Connection extends PrimaryReadReplicaConnection { $this->adapter = new $params['adapter']($this); $this->tablePrefix = $params['tablePrefix']; + /** @psalm-suppress InvalidArrayOffset */ + $this->shardConnectionManager = $this->params['shard_connection_manager'] ?? Server::get(ShardConnectionManager::class); + /** @psalm-suppress InvalidArrayOffset */ + $this->autoIncrementHandler = $this->params['auto_increment_handler'] ?? new AutoIncrementHandler( + Server::get(ICacheFactory::class), + $this->shardConnectionManager, + ); $this->systemConfig = \OC::$server->getSystemConfig(); $this->clock = Server::get(ClockInterface::class); $this->logger = Server::get(LoggerInterface::class); @@ -123,12 +141,45 @@ class Connection extends PrimaryReadReplicaConnection { $this->_config->setSQLLogger($debugStack); } - $this->partitions = $this->systemConfig->getValue('db.partitions', []); + // todo: only allow specific, pre-defined shard configurations, the current config exists for easy testing setup + $this->shards = array_map(function (array $config) { + $shardMapperClass = $config['mapper'] ?? RoundRobinShardMapper::class; + $shardMapper = Server::get($shardMapperClass); + if (!$shardMapper instanceof IShardMapper) { + throw new \Exception("Invalid shard mapper: $shardMapperClass"); + } + return new ShardDefinition( + $config['table'], + $config['primary_key'], + $config['companion_keys'], + $config['shard_key'], + $shardMapper, + $config['companion_tables'], + $config['shards'] + ); + }, $this->params['sharding']); + $this->partitions = array_map(function (ShardDefinition $shard) { + return array_merge([$shard->table], $shard->companionTables); + }, $this->shards); $this->setNestTransactionsWithSavepoints(true); } /** + * @return IDBConnection[] + */ + public function getShardConnections(): array { + $connections = []; + foreach ($this->shards as $shardDefinition) { + foreach ($shardDefinition->getAllShards() as $shard) { + /** @var ConnectionAdapter $connection */ + $connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard); + } + } + return $connections; + } + + /** * @throws Exception */ public function connect($connectionName = null) { @@ -176,13 +227,19 @@ class Connection extends PrimaryReadReplicaConnection { */ public function getQueryBuilder(): IQueryBuilder { $this->queriesBuilt++; + $builder = new QueryBuilder( new ConnectionAdapter($this), $this->systemConfig, $this->logger ); if (count($this->partitions) > 0) { - $builder = new PartitionedQueryBuilder($builder); + $builder = new PartitionedQueryBuilder( + $builder, + $this->shards, + $this->shardConnectionManager, + $this->autoIncrementHandler, + ); foreach ($this->partitions as $name => $tables) { $partition = new PartitionSplit($name, $tables); $builder->addPartition($partition); @@ -704,6 +761,9 @@ class Connection extends PrimaryReadReplicaConnection { return $migrator->generateChangeScript($toSchema); } else { $migrator->migrate($toSchema); + foreach ($this->getShardConnections() as $shardConnection) { + $shardConnection->migrateToSchema($toSchema); + } } } @@ -846,4 +906,12 @@ class Connection extends PrimaryReadReplicaConnection { } } } + + public function getShardDefinition(string $name): ?ShardDefinition { + return $this->shards[$name] ?? null; + } + + public function getCrossShardMoveHelper(): CrossShardMoveHelper { + return new CrossShardMoveHelper($this->shardConnectionManager); + } } |