* SPDX-License-Identifier: AGPL-3.0-or-later */ namespace OC\DB\QueryBuilder\Sharded; use OCP\ICacheFactory; use OCP\IMemcache; use OCP\IMemcacheTTL; /** * A helper to atomically determine the next auto increment value for a sharded table * * Since we can't use the database's auto-increment (since each db doesn't know about the keys in the other shards) * we need external logic for doing the auto increment */ class AutoIncrementHandler { public const MIN_VALID_KEY = 1000; public const TTL = 365 * 24 * 60 * 60; private ?IMemcache $cache = null; public function __construct( private ICacheFactory $cacheFactory, private ShardConnectionManager $shardConnectionManager, ) { if (PHP_INT_SIZE < 8) { throw new \Exception('sharding is only supported with 64bit php'); } } private function getCache(): IMemcache { if (is_null($this->cache)) { $cache = $this->cacheFactory->createDistributed('shared_autoincrement'); if ($cache instanceof IMemcache) { $this->cache = $cache; } else { throw new \Exception('Distributed cache ' . get_class($cache) . ' is not suitable'); } } return $this->cache; } /** * Get the next value for the given shard definition * * The returned key is unique and incrementing, but not sequential. * The shard id is encoded in the first byte of the returned value * * @param ShardDefinition $shardDefinition * @return int * @throws \Exception */ public function getNextPrimaryKey(ShardDefinition $shardDefinition, int $shard): int { $retries = 0; while ($retries < 5) { $next = $this->getNextInner($shardDefinition); if ($next !== null) { if ($next > ShardDefinition::MAX_PRIMARY_KEY) { throw new \Exception('Max primary key of ' . ShardDefinition::MAX_PRIMARY_KEY . ' exceeded'); } // we encode the shard the primary key was originally inserted into to allow guessing the shard by primary key later on return ($next << 8) | $shard; } else { $retries++; } } throw new \Exception('Failed to get next primary key'); } /** * auto increment logic without retry * * @param ShardDefinition $shardDefinition * @return int|null either the next primary key or null if the call needs to be retried */ private function getNextInner(ShardDefinition $shardDefinition): ?int { $cache = $this->getCache(); // because this function will likely be called concurrently from different requests // the implementation needs to ensure that the cached value can be cleared, invalidated or re-calculated at any point between our cache calls // care must be taken that the logic remains fully resilient against race conditions // in the ideal case, the last primary key is stored in the cache and we can just do an `inc` // if that is not the case we find the highest used id in the database increment it, and save it in the cache // prevent inc from returning `1` if the key doesn't exist by setting it to a non-numeric value $cache->add($shardDefinition->table, 'empty-placeholder', self::TTL); $next = $cache->inc($shardDefinition->table); if ($cache instanceof IMemcacheTTL) { $cache->setTTL($shardDefinition->table, self::TTL); } // the "add + inc" trick above isn't strictly atomic, so as a safety we reject any result that to small // to handle the edge case of the stored value disappearing between the add and inc if (is_int($next) && $next >= self::MIN_VALID_KEY) { return $next; } elseif (is_int($next)) { // we hit the edge case, so invalidate the cached value if (!$cache->cas($shardDefinition->table, $next, 'empty-placeholder')) { // someone else is changing the value concurrently, give up and retry return null; } } // discard the encoded initial shard $current = $this->getMaxFromDb($shardDefinition); $next = max($current, self::MIN_VALID_KEY) + 1; if ($cache->cas($shardDefinition->table, 'empty-placeholder', $next)) { return $next; } // another request set the cached value before us, so we should just be able to inc $next = $cache->inc($shardDefinition->table); if (is_int($next) && $next >= self::MIN_VALID_KEY) { return $next; } elseif (is_int($next)) { // key got cleared, invalidate and retry $cache->cas($shardDefinition->table, $next, 'empty-placeholder'); return null; } else { // cleanup any non-numeric value other than the placeholder if that got stored somehow $cache->ncad($shardDefinition->table, 'empty-placeholder'); // retry return null; } } /** * Get the maximum primary key value from the shards, note that this has already stripped any embedded shard id */ private function getMaxFromDb(ShardDefinition $shardDefinition): int { $max = $shardDefinition->fromFileId; $query = $this->shardConnectionManager->getConnection($shardDefinition, 0)->getQueryBuilder(); $query->select($shardDefinition->primaryKey) ->from($shardDefinition->table) ->orderBy($shardDefinition->primaryKey, 'DESC') ->setMaxResults(1); foreach ($shardDefinition->getAllShards() as $shard) { $connection = $this->shardConnectionManager->getConnection($shardDefinition, $shard); $result = $query->executeQuery($connection)->fetchOne(); if ($result) { if ($result > $shardDefinition->fromFileId) { $result = $result >> 8; } $max = max($max, $result); } } return $max; } } c.php Nextcloud server, a safe home for all your data: https://github.com/nextcloud/serverwww-data
aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/Hooks/EmitterTrait.php
blob: 58b8242ec5489ca92f3103a4be0fb32cf1decaff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<?php
/**
 * @copyright Copyright (c) 2016, ownCloud, Inc.
 *
 * @author Morris Jobke <hey@morrisjobke.de>
 * @author Robin Appelman <robin@icewind.nl>
 *
 * @license AGPL-3.0
 *
 * This code is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License, version 3,
 * as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License, version 3,
 * along with this program. If not, see <http://www.gnu.org/licenses/>
 *
 */

namespace OC\Hooks;

trait EmitterTrait {

	/**
	 * @var callable[][] $listeners
	 */
	protected $listeners = [];

	/**
	 * @param string $scope
	 * @param string $method
	 * @param callable $callback
	 */
	public function listen($scope, $method, callable $callback) {
		$eventName = $scope . '::' . $method;
		if (!isset($this->listeners[$eventName])) {
			$this->listeners[$eventName] = [];
		}
		if (array_search($callback, $this->listeners[$eventName], true) === false) {
			$this->listeners[$eventName][] = $callback;
		}
	}

	/**
	 * @param string $scope optional
	 * @param string $method optional
	 * @param callable $callback optional
	 */
	public function removeListener($scope = null, $method = null, callable $callback = null) {
		$names = [];
		$allNames = array_keys($this->listeners);
		if ($scope and $method) {
			$name = $scope . '::' . $method;
			if (isset($this->listeners[$name])) {
				$names[] = $name;
			}
		} elseif ($scope) {
			foreach ($allNames as $name) {
				$parts = explode('::', $name, 2);
				if ($parts[0] == $scope) {
					$names[] = $name;
				}
			}
		} elseif ($method) {
			foreach ($allNames as $name) {
				$parts = explode('::', $name, 2);
				if ($parts[1] == $method) {
					$names[] = $name;
				}
			}
		} else {
			$names = $allNames;
		}

		foreach ($names as $name) {
			if ($callback) {
				$index = array_search($callback, $this->listeners[$name], true);
				if ($index !== false) {
					unset($this->listeners[$name][$index]);
				}
			} else {
				$this->listeners[$name] = [];
			}
		}
	}

	/**
	 * @param string $scope
	 * @param string $method
	 * @param array $arguments optional
	 */
	protected function emit($scope, $method, array $arguments = []) {
		$eventName = $scope . '::' . $method;
		if (isset($this->listeners[$eventName])) {
			foreach ($this->listeners[$eventName] as $callback) {
				call_user_func_array($callback, $arguments);
			}
		}
	}
}