aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/DB/Connection.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/private/DB/Connection.php')
-rw-r--r--lib/private/DB/Connection.php74
1 files changed, 71 insertions, 3 deletions
diff --git a/lib/private/DB/Connection.php b/lib/private/DB/Connection.php
index 74cdba3e218..447c164c1a4 100644
--- a/lib/private/DB/Connection.php
+++ b/lib/private/DB/Connection.php
@@ -23,12 +23,19 @@ use Doctrine\DBAL\Platforms\SqlitePlatform;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Statement;
-use OC\DB\QueryBuilder\Partitioned\PartitionSplit;
use OC\DB\QueryBuilder\Partitioned\PartitionedQueryBuilder;
+use OC\DB\QueryBuilder\Partitioned\PartitionSplit;
use OC\DB\QueryBuilder\QueryBuilder;
+use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler;
+use OC\DB\QueryBuilder\Sharded\CrossShardMoveHelper;
+use OC\DB\QueryBuilder\Sharded\RoundRobinShardMapper;
+use OC\DB\QueryBuilder\Sharded\ShardConnectionManager;
+use OC\DB\QueryBuilder\Sharded\ShardDefinition;
use OC\SystemConfig;
use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\DB\QueryBuilder\Sharded\IShardMapper;
use OCP\Diagnostics\IEventLogger;
+use OCP\ICacheFactory;
use OCP\IDBConnection;
use OCP\ILogger;
use OCP\IRequestId;
@@ -80,6 +87,10 @@ class Connection extends PrimaryReadReplicaConnection {
/** @var array<string, list<string>> */
protected array $partitions;
+ /** @var ShardDefinition[] */
+ protected array $shards = [];
+ protected ShardConnectionManager $shardConnectionManager;
+ protected AutoIncrementHandler $autoIncrementHandler;
/**
* Initializes a new instance of the Connection class.
@@ -105,6 +116,13 @@ class Connection extends PrimaryReadReplicaConnection {
$this->adapter = new $params['adapter']($this);
$this->tablePrefix = $params['tablePrefix'];
+ /** @psalm-suppress InvalidArrayOffset */
+ $this->shardConnectionManager = $this->params['shard_connection_manager'] ?? Server::get(ShardConnectionManager::class);
+ /** @psalm-suppress InvalidArrayOffset */
+ $this->autoIncrementHandler = $this->params['auto_increment_handler'] ?? new AutoIncrementHandler(
+ Server::get(ICacheFactory::class),
+ $this->shardConnectionManager,
+ );
$this->systemConfig = \OC::$server->getSystemConfig();
$this->clock = Server::get(ClockInterface::class);
$this->logger = Server::get(LoggerInterface::class);
@@ -123,12 +141,45 @@ class Connection extends PrimaryReadReplicaConnection {
$this->_config->setSQLLogger($debugStack);
}
- $this->partitions = $this->systemConfig->getValue('db.partitions', []);
+ // todo: only allow specific, pre-defined shard configurations, the current config exists for easy testing setup
+ $this->shards = array_map(function (array $config) {
+ $shardMapperClass = $config['mapper'] ?? RoundRobinShardMapper::class;
+ $shardMapper = Server::get($shardMapperClass);
+ if (!$shardMapper instanceof IShardMapper) {
+ throw new \Exception("Invalid shard mapper: $shardMapperClass");
+ }
+ return new ShardDefinition(
+ $config['table'],
+ $config['primary_key'],
+ $config['companion_keys'],
+ $config['shard_key'],
+ $shardMapper,
+ $config['companion_tables'],
+ $config['shards']
+ );
+ }, $this->params['sharding']);
+ $this->partitions = array_map(function (ShardDefinition $shard) {
+ return array_merge([$shard->table], $shard->companionTables);
+ }, $this->shards);
$this->setNestTransactionsWithSavepoints(true);
}
/**
+ * @return IDBConnection[]
+ */
+ public function getShardConnections(): array {
+ $connections = [];
+ foreach ($this->shards as $shardDefinition) {
+ foreach ($shardDefinition->getAllShards() as $shard) {
+ /** @var ConnectionAdapter $connection */
+ $connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
+ }
+ }
+ return $connections;
+ }
+
+ /**
* @throws Exception
*/
public function connect($connectionName = null) {
@@ -176,13 +227,19 @@ class Connection extends PrimaryReadReplicaConnection {
*/
public function getQueryBuilder(): IQueryBuilder {
$this->queriesBuilt++;
+
$builder = new QueryBuilder(
new ConnectionAdapter($this),
$this->systemConfig,
$this->logger
);
if (count($this->partitions) > 0) {
- $builder = new PartitionedQueryBuilder($builder);
+ $builder = new PartitionedQueryBuilder(
+ $builder,
+ $this->shards,
+ $this->shardConnectionManager,
+ $this->autoIncrementHandler,
+ );
foreach ($this->partitions as $name => $tables) {
$partition = new PartitionSplit($name, $tables);
$builder->addPartition($partition);
@@ -704,6 +761,9 @@ class Connection extends PrimaryReadReplicaConnection {
return $migrator->generateChangeScript($toSchema);
} else {
$migrator->migrate($toSchema);
+ foreach ($this->getShardConnections() as $shardConnection) {
+ $shardConnection->migrateToSchema($toSchema);
+ }
}
}
@@ -846,4 +906,12 @@ class Connection extends PrimaryReadReplicaConnection {
}
}
}
+
+ public function getShardDefinition(string $name): ?ShardDefinition {
+ return $this->shards[$name] ?? null;
+ }
+
+ public function getCrossShardMoveHelper(): CrossShardMoveHelper {
+ return new CrossShardMoveHelper($this->shardConnectionManager);
+ }
}