diff options
author | Maxence Lange <maxence@artificial-owl.com> | 2025-04-28 19:57:56 -0100 |
---|---|---|
committer | Maxence Lange <maxence@artificial-owl.com> | 2025-04-28 19:58:06 -0100 |
commit | 6d272ff894fa969e5a526caf6a91d60eda115c53 (patch) | |
tree | df90f826a29f7169dd9b550f2e478a1b09d29e71 /lib/private/Async | |
parent | 10a01423ecfc7e028960eee8058bd177ad28d484 (diff) | |
download | nextcloud-server-enh/noid/async-process-run.tar.gz nextcloud-server-enh/noid/async-process-run.zip |
feat(async): AsyncProcessenh/noid/async-process-run
Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
Diffstat (limited to 'lib/private/Async')
20 files changed, 2166 insertions, 0 deletions
diff --git a/lib/private/Async/ABlockWrapper.php b/lib/private/Async/ABlockWrapper.php new file mode 100644 index 00000000000..a5dd1c3b629 --- /dev/null +++ b/lib/private/Async/ABlockWrapper.php @@ -0,0 +1,40 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OC\Async\Model\Block; +use OC\Async\Model\SessionInterface; +use OCP\Async\Enum\BlockActivity; + +abstract class ABlockWrapper { + abstract public function session(array $metadata): void; + abstract public function init(): void; + abstract public function activity(BlockActivity $activity, string $line = ''): void; + abstract public function end(string $line = ''): void; + + protected Block $block; + private SessionInterface $sessionInterface; + + public function setBlock(Block $block): void { + $this->block = $block; + } + + public function getSessionInterface(): SessionInterface { + return $this->sessionInterface; + } + + public function setSessionInterface(SessionInterface $iface): void { + $this->sessionInterface = $iface; + } + + public function getReplayCount(): int { + return $this->block->getReplayCount(); + } +} diff --git a/lib/private/Async/AsyncManager.php b/lib/private/Async/AsyncManager.php new file mode 100644 index 00000000000..955037f356d --- /dev/null +++ b/lib/private/Async/AsyncManager.php @@ -0,0 +1,222 @@ +<?php + +namespace OC\Async; + +use OC\Async\Db\BlockMapper; +use OC\Async\Enum\BlockType; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; +use OCP\IAppConfig; +use ReflectionFunctionAbstract; + +class AsyncManager { + private ?string $sessionToken = null; + /** @var BlockInterface[] */ + private array $interfaces = []; + /** @var array<string, ReflectionFunctionAbstract> } */ + private array $blocksReflexion = []; + + public function __construct( + private IAppConfig $appConfig, + private BlockMapper $blockMapper, + private ForkManager $forkManager, + ) { + } + + /** + * @param BlockType $type + * @param string $serializedBlock + * @param ReflectionFunctionAbstract $reflection + * @param array $params + * @param array $allowedClasses + * + * @return IBlockInterface + * @see \OCP\Async\IAsyncProcess + * @internal prefer using public interface {@see \OCP\Async\IAsyncProcess} + */ + public function asyncBlock( + BlockType $type, + string $serializedBlock, // serialize() on the code - className if type is ::CLASS + \ReflectionFunctionAbstract $reflection, // reflection about the method/function called as unserialization + array $params, // parameters to use at unserialization + array $allowedClasses = [] // list of class included in the serialization of the 'block' + ): IBlockInterface { + $this->sessionToken ??= $this->generateToken(); + + $data = $this->serializeReflectionParams($reflection, $params); + $data['blockClasses'] = $allowedClasses; + if ($type === BlockType::CLASSNAME) { + $data['className'] = $serializedBlock; + } + + $token = $this->generateToken(); + $block = new Block(); + $block->setToken($token); + $block->setSessionToken($this->sessionToken); + $block->setBlockType($type); + $block->setBlockStatus(BlockStatus::PREP); + $block->setCode($serializedBlock); + $block->setParams($data); + $block->setOrig([]); + $block->setCreation(time()); + + + // needed !? +// $process->addMetadataEntry('_links', $this->interfaces); + +// $this->processMapper->insert($process); + + $iface = new BlockInterface($this, $block); + $this->interfaces[] = $iface; + $this->blocksReflexion[$token] = $reflection; + + return $iface; + } + +// public function updateDataset(ProcessInfo $processInfo) { +// $token = $processInfo->getToken(); +// $dataset = []; +// foreach($processInfo->getDataset() as $params) { +// $dataset[] = $this->serializeReflectionParams($this->processesReflexion[$token], $params); +// } +// $this->processMapper->updateDataset($token, $dataset); +// } + + private function endSession(ProcessExecutionTime $time): ?string { + if ($this->sessionToken === null) { + return null; + } + + $current = $this->sessionToken; + foreach ($this->interfaces as $iface) { + $process = $iface->getBlock(); + $process->addMetadataEntry('_iface', $iface->jsonSerialize()); + $process->setProcessExecutionTime($time); + + $dataset = []; + foreach ($iface->getDataset() as $params) { + $dataset[] = $this->serializeReflectionParams( + $this->blocksReflexion[$process->getToken()], $params + ); + } + $process->setDataset($dataset); + + $this->blockMapper->insert($process); + } + + $this->blockMapper->updateSessionStatus( + $this->sessionToken, BlockStatus::STANDBY, BlockStatus::PREP + ); + $this->processes = $this->interfaces = []; + $this->sessionToken = null; + + return $current; + } + + /** + * @internal + */ + public function async(ProcessExecutionTime $time): string { + $current = $this->endSession($time); + if ($current === null) { + return ''; + } + + if ($time === ProcessExecutionTime::NOW) { + $this->forkManager->forkSession($current); + } + + return $current; + } + + private function serializeReflectionParams(ReflectionFunctionAbstract $reflection, array $params): array { + $i = 0; + $processWrapper = false; + $filteredParams = []; + + foreach ($reflection->getParameters() as $arg) { + $argType = $arg->getType(); + $param = $params[$i]; + if ($argType !== null) { + if ($i === 0 && $argType->getName() === ABlockWrapper::class) { + $processWrapper = true; + continue; // we ignore IProcessWrapper as first argument, as it will be filled at execution time + } + + // TODO: compare $argType with $param +// foreach($argType->getTypes() as $t) { +// echo '> ' . $t . "\n"; +// } +// $arg->allowsNull() +// $arg->isOptional() + + } + + // TODO: we might want to filter some params ? + $filteredParams[] = $param; + +// echo '..1. ' . json_encode($param) . "\n"; +// echo '..2. ' . serialize($param) . "\n"; +// echo '? ' . gettype($param) . "\n"; + + $i++; + } + + try { + $serializedParams = serialize($params); + } catch (\Exception $e) { + throw $e; + } + + return [ + 'params' => $serializedParams, + 'paramsClasses' => $this->extractClassFromArray($filteredParams), + 'processWrapper' => $processWrapper, + ]; + } + +// +// public function unserializeParams(array $data): array { +// return $data; +// } +// + + private function extractClassFromArray(array $arr, array &$classes = []): array { + foreach ($arr as $entry) { + if (is_array($entry)) { + $this->extractClassFromArray($entry, $classes); + } + + if (is_object($entry)) { + $class = get_class($entry); + if (!in_array($class, $classes, true)) { + $classes[] = $class; + } + } + } + + return $classes; + } + + private function generateToken(int $length = 15): string { + $result = ''; + for ($i = 0; $i < $length; $i++) { + $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)]; + } + + return $result; + } + + public function dropAllBlocks(): void { + $this->blockMapper->deleteAll(); + } + + public function resetConfig(): void { + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST); + } +} diff --git a/lib/private/Async/AsyncProcess.php b/lib/private/Async/AsyncProcess.php new file mode 100644 index 00000000000..44d990749d7 --- /dev/null +++ b/lib/private/Async/AsyncProcess.php @@ -0,0 +1,64 @@ +<?php + +namespace OC\Async; + +use Laravel\SerializableClosure\SerializableClosure; +use Laravel\SerializableClosure\Serializers\Native; +use OC\Async\Enum\BlockType; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\IAsyncProcess; +use ReflectionFunction; +use ReflectionMethod; + +class AsyncProcess implements IAsyncProcess { + public function __construct( + private AsyncManager $asyncManager, + private readonly ForkManager $forkManager, + ) { + } + + public function exec(\Closure $closure, ...$params): IBlockInterface { + return $this->asyncManager->asyncBlock( + BlockType::CLOSURE, + serialize(new SerializableClosure($closure)), + new ReflectionFunction($closure), + $params, + [SerializableClosure::class, Native::class] + ); + } + + public function invoke(callable $obj, ...$params): IBlockInterface { + return $this->asyncManager->asyncBlock( + BlockType::INVOKABLE, + serialize($obj), + new ReflectionMethod($obj, '__invoke'), + $params, + [$obj::class] + ); + } + + public function call(string $class, ...$params): IBlockInterface { + // abstract ? + if (!method_exists($class, 'async')) { + throw new \Exception('class ' . $class . ' is missing async() method'); + } + + return $this->asyncManager->asyncBlock( + BlockType::CLASSNAME, + $class, + new ReflectionMethod($class, 'async'), + $params, + ); + } + + /** + * close the creation of the session and start async as soon as possible + * + * @param ProcessExecutionTime $time preferred urgency to start the async process + * + * @return string session token, empty if no opened session + */ + public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string { + return $this->asyncManager->async($time); + } +} diff --git a/lib/private/Async/Db/BlockMapper.php b/lib/private/Async/Db/BlockMapper.php new file mode 100644 index 00000000000..6d6409327a4 --- /dev/null +++ b/lib/private/Async/Db/BlockMapper.php @@ -0,0 +1,260 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Db; + +use OC\Async\Exceptions\BlockNotFoundException; +use OC\Async\Model\Block; +use OCP\AppFramework\Db\DoesNotExistException; +use OCP\AppFramework\Db\QBMapper; +use OCP\Async\Enum\BlockStatus; +use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IDBConnection; +use Psr\Log\LoggerInterface; + +/** + * @template-extends QBMapper<Block> + */ +class BlockMapper extends QBMapper { + public const TABLE = 'async_process'; + + public function __construct( + IDBConnection $db, + private LoggerInterface $logger, + ) { + parent::__construct($db, self::TABLE, Block::class); + } + + /** + * @return Block[] + */ + public function getBySession(string $sessionToken): array { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken))) + ->orderBy('id', 'asc'); + + return $this->findEntities($qb); + } + + /** + * @return string[] + */ + public function getSessions(): array { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('session_token') + ->from($this->getTableName()); + + $result = $qb->executeQuery(); + + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + return $sessions; + } + + + /** + * returns list of sessionId that contains process with: + * - at least one is in STANDBY with an older timestamp in next_run + * - none as RUNNING or BLOCKER + * + * @return string[] + */ + public function getSessionOnStandBy(): array { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('t1.session_token') + ->from($this->getTableName(), 't1') + ->leftJoin( + 't1', $this->getTableName(), 't2', + $qb->expr()->andX( + $qb->expr()->eq('t1.session_token', 't2.session_token'), + $qb->expr()->in('t2.status', $qb->createNamedParameter([BlockStatus::PREP->value, BlockStatus::RUNNING->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY)) + ) + ) + ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->isNull('t2.status')); + + $result = $qb->executeQuery(); + + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + return $sessions; + } + + /** + * reset to STANDBY all process: + * - marked as ERROR or BLOCKER, + * - next_run in an older timestamp + * - next_run not at zero + */ + public function resetFailedBlock(): int { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT)) + ->where($qb->expr()->in('status', $qb->createNamedParameter([BlockStatus::ERROR->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY))) + ->andWhere($qb->expr()->lt('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->neq('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT))); + return $qb->executeStatement(); + } + + /** + * delete sessions that contain only process with status at ::SUCCESS + */ + public function removeSuccessfulBlock(): int { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('t1.session_token') + ->from($this->getTableName(), 't1') + ->leftJoin( + 't1', $this->getTableName(), 't2', + $qb->expr()->andX( + $qb->expr()->eq('t1.session_token', 't2.session_token'), + $qb->expr()->neq('t2.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT)) + ) + ) + ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->isNull('t2.status')); + + $result = $qb->executeQuery(); + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + $count = 0; + $chunks = array_chunk($sessions, 30); + foreach($chunks as $chunk) { + $delete = $this->db->getQueryBuilder(); + $delete->delete($this->getTableName()) + ->where($qb->expr()->in('session_token', $delete->createNamedParameter($chunk, IQueryBuilder::PARAM_INT_ARRAY))); + $count += $delete->executeStatement(); + unset($delete); + } + + return $count; + } + + /** + * return a Block from its token + * + * @throws BlockNotFoundException + */ + public function getByToken(string $token): Block { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->where($qb->expr()->eq('token', $qb->createNamedParameter($token))); + + try { + return $this->findEntity($qb); + } catch (DoesNotExistException) { + throw new BlockNotFoundException('no process found'); + } + } + + public function updateStatus(Block $block, ?BlockStatus $prevStatus = null, string $lockToken = ''): bool { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter($block->getStatus())) + ->where($qb->expr()->andX( + $qb->expr()->eq('session_token', $qb->createNamedParameter($block->getSessionToken())), + $qb->expr()->eq('token', $qb->createNamedParameter($block->getToken())) + ) + ); + + // if process contain a lockToken, we conflict with the stored one + if ($block->getLockToken() !== '') { + $qb->andWhere($qb->expr()->eq('lock_token', $qb->createNamedParameter($block->getLockToken()))); + } + + // if status is switching from standby to running, we set last_run to detect timeout + if ($block->getBlockStatus() === BlockStatus::RUNNING && $prevStatus === BlockStatus::STANDBY) { + $qb->set('last_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)); + } + + // if status is updated to success, error or blocker, we store result (or exception) + if (in_array($block->getBlockStatus(), [BlockStatus::SUCCESS, BlockStatus::ERROR, BlockStatus::BLOCKER], true)) { + try { + $qb->set('result', $qb->createNamedParameter(json_encode($block->getResult(), JSON_THROW_ON_ERROR))); + $qb->set('metadata', $qb->createNamedParameter(json_encode($block->getMetadata(), JSON_THROW_ON_ERROR))); + $qb->set('next_run', $qb->createNamedParameter($block->getNextRun(), IQueryBuilder::PARAM_INT)); + } catch (\JsonException $e) { + $this->logger->warning('could not json_encode process result', ['exception' => $e]); + } + } + + if ($lockToken !== '') { + $qb->set('lock_token', $qb->createNamedParameter($lockToken)); + } + + if ($prevStatus !== null) { + $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT))); + } + + return ($qb->executeStatement() === 1); + } + + public function updateSessionStatus(string $sessionToken, BlockStatus $status, ?BlockStatus $prevStatus = null): int { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter($status->value, IQueryBuilder::PARAM_INT)) + ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken))); + + if ($prevStatus !== null) { + $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT))); + } + + return $qb->executeStatement(); + } + + + + /** + * set next_run to current time if next_run>0 in relation to session_token. + * Force process queued for replay to be run as soon as possible. + */ + public function resetSessionNextRun(string $sessionToken): void { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)) + ->where( + $qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)), + $qb->expr()->gt('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)), + ); + + $qb->executeStatement(); + } + + +// public function updateDataset(string $token, array $dataset): void { +// $qb = $this->db->getQueryBuilder(); +// $qb->update($this->getTableName()) +// ->set('dataset', $qb->createNamedParameter(json_encode($dataset))) +// ->where($qb->expr()->eq('token', $qb->createNamedParameter($token))); +// +// $qb->executeStatement(); +// } + + public function deleteAll() { + $qb = $this->db->getQueryBuilder(); + $qb->delete($this->getTableName()); + $qb->executeStatement(); + } + +} diff --git a/lib/private/Async/Enum/BlockType.php b/lib/private/Async/Enum/BlockType.php new file mode 100644 index 00000000000..839d0d24f05 --- /dev/null +++ b/lib/private/Async/Enum/BlockType.php @@ -0,0 +1,16 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Enum; + +enum BlockType: int { + case CLOSURE = 1; + case INVOKABLE = 2; + case CLASSNAME = 3; +} diff --git a/lib/private/Async/Exceptions/AsyncProcessException.php b/lib/private/Async/Exceptions/AsyncProcessException.php new file mode 100644 index 00000000000..b394a95df7c --- /dev/null +++ b/lib/private/Async/Exceptions/AsyncProcessException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class AsyncProcessException extends Exception { +} diff --git a/lib/private/Async/Exceptions/BlockAlreadyRunningException.php b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php new file mode 100644 index 00000000000..4c671d91d55 --- /dev/null +++ b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +class BlockAlreadyRunningException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/BlockNotFoundException.php b/lib/private/Async/Exceptions/BlockNotFoundException.php new file mode 100644 index 00000000000..6e25415f7e3 --- /dev/null +++ b/lib/private/Async/Exceptions/BlockNotFoundException.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +class BlockNotFoundException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/LoopbackEndpointException.php b/lib/private/Async/Exceptions/LoopbackEndpointException.php new file mode 100644 index 00000000000..2a7bea78239 --- /dev/null +++ b/lib/private/Async/Exceptions/LoopbackEndpointException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class LoopbackEndpointException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/SessionBlockedException.php b/lib/private/Async/Exceptions/SessionBlockedException.php new file mode 100644 index 00000000000..9c6d85dc4fc --- /dev/null +++ b/lib/private/Async/Exceptions/SessionBlockedException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class SessionBlockedException extends AsyncProcessException { +} diff --git a/lib/private/Async/ForkManager.php b/lib/private/Async/ForkManager.php new file mode 100644 index 00000000000..6978a601bd4 --- /dev/null +++ b/lib/private/Async/ForkManager.php @@ -0,0 +1,485 @@ +<?php + +namespace OC\Async; + +use OC\Async\Db\BlockMapper; +use OC\Async\Enum\BlockType; +use OC\Async\Exceptions\AsyncProcessException; +use OC\Async\Exceptions\LoopbackEndpointException; +use OC\Async\Exceptions\BlockAlreadyRunningException; +use OC\Async\Exceptions\SessionBlockedException; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Async\Model\SessionInterface; +use OC\Async\Wrappers\DummyBlockWrapper; +use OC\Async\Wrappers\LoggerBlockWrapper; +use OC\Config\Lexicon\CoreConfigLexicon; +use OC\DB\Connection; +use OCP\Async\Enum\BlockActivity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; +use OCP\Async\IAsyncProcess; +use OCP\Http\Client\IClientService; +use OCP\IAppConfig; +use OCP\IConfig; +use OCP\IURLGenerator; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Output\OutputInterface; + +class ForkManager { + private ?ABlockWrapper $wrapper; + + /** @var int[] */ + private array $forks = []; + private const FORK_LIMIT = 3; // maximum number of child process + private const FORK_SLEEP = 500000; // wait for msec when too many fork have been created + + public function __construct( + private BlockMapper $blockMapper, + private Connection $conn, + private IAppConfig $appConfig, + private IConfig $config, + private IClientService $clientService, + private IURLGenerator $urlGenerator, + LoggerBlockWrapper $loggerProcessWrapper, + private LoggerInterface $logger, + ) { + $this->wrapper = $loggerProcessWrapper; + } + + public function setWrapper(?ABlockWrapper $wrapper): void { + $this->wrapper = $wrapper; + } + + + /** + * @throws BlockAlreadyRunningException + * @throws AsyncProcessException + */ + public function runSession(string $token, array $metadata = []): void { + $sessionBlocks = $this->blockMapper->getBySession($token); + $metadata['sessionToken'] = $token; + + if ($this->wrapper !== null) { + $wrapper = clone $this->wrapper; + } else { + $wrapper = null; + } + + // might be need to avoid some conflict/race condition + // usleep(10000); + $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlocks)); + + if ($sessionIface->getGlobalStatus() !== BlockStatus::STANDBY) { + throw new AsyncProcessException(); + } + + $wrapper?->setSessionInterface($sessionIface); + $wrapper?->session($metadata); + + try { + foreach ($sessionBlocks as $block) { + if (!$this->confirmBlockRequirement($wrapper, $sessionIface, $block)) { + $block->replay(); + $this->blockMapper->update($block); + continue; + } + + $block->addMetadata($metadata); + $this->runBlock($block, $wrapper); + + if ($block->getBlockStatus() === BlockStatus::BLOCKER) { + $wrapper?->end('Fail process ' . $block->getToken() . ' block the rest of the session'); + throw new SessionBlockedException(); + } + } + $wrapper?->end(); + } catch (BlockAlreadyRunningException) { + $wrapper?->end('already running'); + throw new BlockAlreadyRunningException(); + } + } + + private function confirmBlockRequirement( + ?ABlockWrapper $wrapper, + SessionInterface $sessionIface, + Block $block + ): bool { + $procIface = new BlockInterface(null, $block); + foreach ($procIface->getRequire() as $requiredProcessId) { + $requiredBlock = $sessionIface->byId($requiredProcessId); + if ($requiredBlock === null) { + $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' which is not defined'); + return false; + } + if ($requiredBlock?->getStatus() !== BlockStatus::SUCCESS) { + $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' to be executed and successful'); + return false; + } + } + return true; + } + + /** + * @throws BlockAlreadyRunningException + */ + private function runBlock(Block $block, ?ABlockWrapper $wrapper = null): void { + if ($block->getBlockStatus() !== BlockStatus::STANDBY) { + return; + } + + $this->lockBlock($block); + + $data = $block->getParams(); + $serialized = $block->getCode(); + $params = unserialize($data['params'], ['allowed_classes' => $data['paramsClasses']]); + $obj = unserialize($serialized, ['allowed_classes' => $data['blockClasses'] ?? []]); + + if ($data['processWrapper'] ?? false) { + array_unshift($params, ($wrapper ?? new DummyBlockWrapper())); + } + + $wrapper?->setBlock($block); + $wrapper?->init(); + $wrapper?->activity(BlockActivity::STARTING); + $result = [ + 'executionTime' => ProcessExecutionTime::NOW->value, + 'startTime' => time(), + ]; + $iface = new BlockInterface(null, $block); + try { + $returnedData = null; + switch ($block->getBlockType()) { + case BlockType::CLOSURE: + $c = $obj->getClosure(); + $returnedData = $c(...$params); + break; + + case BlockType::INVOKABLE: + $returnedData = $obj(...$params); + break; + + case BlockType::CLASSNAME: + $obj = new $data['className'](); + $returnedData = $obj->async(...$params); + break; + } + if (is_array($returnedData)) { + $result['result'] = $returnedData; + } + $block->setBlockStatus(BlockStatus::SUCCESS); + if ($block->getReplayCount() > 0) { + // in case of success after multiple tentative, we reset next run to right now + // on all block waiting for replay. Easiest solution to find block dependant of + // this current successful run + $this->blockMapper->resetSessionNextRun($block->getSessionToken()); + } + } catch (\Exception $e) { + $wrapper?->activity(BlockActivity::ERROR, $e->getMessage()); + $result['error'] = [ + 'exception' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + 'code' => $e->getCode() + ]; + + if ($iface->isReplayable()) { + $block->replay(); // we mark the block as able to be back to STANDBY status + } else { + $block->setNextRun(0); + } + if ($iface->isBlocker()) { + $block->setBlockStatus(BlockStatus::BLOCKER); + } else { + $block->setBlockStatus(BlockStatus::ERROR); + } + } finally { + $result['endTime'] = time(); + } + + $block->setResult($result); + $wrapper?->activity(BlockActivity::ENDING); + $this->blockMapper->updateStatus($block, BlockStatus::RUNNING); + } + + /** + * @throws BlockAlreadyRunningException + */ + private function lockBlock(Block $block): void { + if ($block->getBlockStatus() !== BlockStatus::STANDBY) { + throw new BlockAlreadyRunningException('block not in standby'); + } + $lockToken = $this->generateToken(7); + $block->setBlockStatus(BlockStatus::RUNNING); + if (!$this->blockMapper->updateStatus($block, BlockStatus::STANDBY, $lockToken)) { + throw new BlockAlreadyRunningException('block is locked'); + } + $block->setLockToken($lockToken); + } + + public function forkSession(string $session, array $metadata = []): void { + if (\OC::$CLI) { + $useWebIfNeeded = $metadata['useWebIfNeeded'] ?? false; + if ($useWebIfNeeded && !extension_loaded('posix')) { + try { + $this->forkSessionLoopback($session); + return; + } catch (\Exception) { + } + } + + $this->forkSessionCli($session, $metadata); + return; + } + + try { + $this->forkSessionLoopback($session); + } catch (LoopbackEndpointException) { + // session will be processed later + } + } + + + private function forkSessionCli(string $session, array $metadata = []): void { + if (!extension_loaded('posix')) { + // log/notice that posix is not loaded + return; + } + $slot = $this->getFirstAvailableSlot(); + $metadata += [ + '_cli' => [ + 'slot' => $slot, + 'forkCount' => count($this->forks), + 'forkLimit' => self::FORK_LIMIT, + ] + ]; + + $pid = pcntl_fork(); + + // work around as the parent database connection is inherited by the child. + // when child process is over, parent process database connection will drop. + // The drop can happen anytime, even in the middle of a running request. + // work around is to close the connection as soon as possible after forking. + $this->conn->close(); + + if ($pid === -1) { + // TODO: manage issue while forking + } else if ($pid === 0) { + // forked process + try { + $this->runSession($session, $metadata); + } catch (AsyncProcessException) { + // failure to run session can be part of the process + } + exit(); + } else { + // store slot+pid + $this->forks[$slot] = $pid; + + // when fork limit is reach, cycle until space freed + while (true) { + $exitedPid = pcntl_waitpid(0, $status, WNOHANG); + if ($exitedPid > 0) { + $slot = array_search($exitedPid, $this->forks, true); + if ($slot) { + unset($this->forks[$slot]); + } + } + if (count($this->forks) < self::FORK_LIMIT) { + return; + } + usleep(self::FORK_SLEEP); + } + } + } + + /** + * Request local loopback endpoint. + * We expect the request to be closed remotely. + * + * Ignored if: + * - the endpoint is not fully configured and tested, + * - the server is on heavy load (timeout at 1 second) + * + * @return string result from the loopback endpoint + * @throws LoopbackEndpointException if not configured + */ + private function forkSessionLoopback(string $session, ?string $loopbackEndpoint = null): string { + $client = $this->clientService->newClient(); + try { + $response = $client->post( + $loopbackEndpoint ?? $this->linkToLoopbackEndpoint(), + [ + 'headers' => [], + 'verify' => false, + 'connect_timeout' => 1.0, + 'timeout' => 1.0, + 'http_errors' => true, + 'body' => ['token' => $session], + 'nextcloud' => [ + 'allow_local_address' => true, + 'allow_redirects' => true, + ] + ] + ); + + return (string)$response->getBody(); + } catch (LoopbackEndpointException $e) { + $this->logger->debug('loopback endpoint not configured', ['exception' => $e]); + throw $e; + } catch (\Exception $e) { + $this->logger->warning('could not reach loopback endpoint to initiate fork', ['exception' => $e]); + throw new LoopbackEndpointException('loopback endpoint cannot be reach', previous: $e); + } + } + + + /** + * return full (absolute) link to the web-loopback endpoint + * + * @param string|null $instance if null, stored loopback address will be used. + * + * @throws LoopbackEndpointException if $instance is null and no stored configuration + */ + public function linkToLoopbackEndpoint(?string $instance = null): string { + return rtrim($instance ?? $this->getLoopbackInstance(), '/') . $this->urlGenerator->linkToRoute('core.AsyncProcess.processFork'); + } + + /** + * return loopback address stored in configuration + * + * @return string + * @throws LoopbackEndpointException if config is not set or empty + */ + public function getLoopbackInstance(): string { + if (!$this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) { + throw new LoopbackEndpointException('loopback not configured'); + } + + $instance = $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS); + if ($instance === '') { + throw new LoopbackEndpointException('empty config'); + } + + return $instance; + } + + + public function discoverLoopbackEndpoint(?OutputInterface $output = null): ?string { + $cliUrl = $this->config->getSystemValueString('overwrite.cli.url', ''); + $output?->write('- testing value from \'overwrite.cli.url\' (<comment>' . $cliUrl . '</comment>)... '); + + $reason = ''; + if ($this->testLoopbackInstance($cliUrl, $reason)) { + $output?->writeln('<info>ok</info>'); + return $cliUrl; + } + + $output?->writeln('<error>' . $reason . '</error>'); + + foreach($this->config->getSystemValue('trusted_domains', []) as $url) { + $url = 'https://' . $url; + $output?->write('- testing entry from \'trusted_domains\' (<comment>' . $url . '</comment>)... '); + if ($this->testLoopbackInstance($url, $reason)) { + $output?->writeln('<info>ok</info>'); + return $url; + } + $output?->writeln('<error>' . $reason . '</error>'); + } + + return null; + } + + + public function testLoopbackInstance(string $url, string &$reason = ''): bool { + $url = rtrim($url, '/'); + if (!$this->pingLoopbackInstance($url)) { + $reason = 'failed ping'; + return false; + } + + $token = $this->generateToken(); + $asyncProcess = \OCP\Server::get(IAsyncProcess::class); + $asyncProcess->exec(function(string $token) { + sleep(1); // enforce a delay to confirm asynchronicity + $appConfig = \OCP\Server::get(IAppConfig::class); + $appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST, $token); + }, $token)->name('test loopback instance')->async(); + + $this->appConfig->clearCache(true); + if ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)) { + $reason = 'async process already executed'; + return false; + } + + sleep(3); + $this->appConfig->clearCache(true); + $result = ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST); + + return $result; + } + + private function pingLoopbackInstance(string $url): bool { + $pingLoopback = $this->generateToken(); + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING, $pingLoopback); + try { + $result = $this->forkSessionLoopback('__ping__', $this->linkToLoopbackEndpoint($url)); + $result = json_decode($result, true, flags: JSON_THROW_ON_ERROR); + } catch (\JsonException|LoopbackEndpointException $e) { + $this->logger->debug('could not ping loopback endpoint', ['exception' => $e]); + } + + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING); + return (($result['ping'] ?? '') === $pingLoopback); + } + + + /** + * note that the fact we fork process to run a session of processes before doing + * a check on the fact that maybe one of the process of the session is already + * running can create a small glitch when choosing the first available slot as + * a previous fork running said check is already made and will exit shortly. + * + * @return int + */ + private function getFirstAvailableSlot(): int { + $slot = -1; + for ($i = 0; $i < self::FORK_LIMIT; $i++) { + if (!array_key_exists($i, $this->forks)) { + return $i; + } + + // we confirm child process still exists + if (pcntl_waitpid($this->forks[$i], $status, WNOHANG) > 0) { + return $i; + } + } + + if ($slot === -1) { + // TODO: should not happens: log warning + } + + return -1; + } + + private function generateToken(int $length = 15): string { + $result = ''; + for ($i = 0; $i < $length; $i++) { + $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)]; + } + + return $result; + } + + /** + * we wait until all child process are done + * + * @noinspection PhpStatementHasEmptyBodyInspection + */ + public function waitChildProcess(): void { + while (pcntl_waitpid(0, $status) != -1) { + } + } + +} diff --git a/lib/private/Async/IBlockInterface.php b/lib/private/Async/IBlockInterface.php new file mode 100644 index 00000000000..33b046b42ff --- /dev/null +++ b/lib/private/Async/IBlockInterface.php @@ -0,0 +1,36 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +interface IBlockInterface { + public function getToken(): string; + public function id(string $id): self; + public function getId(): string; + public function name(string $name): self; + public function getName(): string; + public function require(string $id): self; + public function getRequire(): array; + public function delay(int $delay): self; + public function getDelay(): int; + public function getExecutionTime(): ?ProcessExecutionTime; + public function blocker(bool $blocker = true): self; + public function isBlocker(): bool; + public function replayable(bool $replayable = true): self; + public function isReplayable(): bool; + public function getStatus(): BlockStatus; + public function getResult(): ?array; + public function getError(): ?array; + public function dataset(array $dataset): self; + public function getDataset(): array; + public function async(ProcessExecutionTime $time): self; +} diff --git a/lib/private/Async/ISessionInterface.php b/lib/private/Async/ISessionInterface.php new file mode 100644 index 00000000000..19307f915bb --- /dev/null +++ b/lib/private/Async/ISessionInterface.php @@ -0,0 +1,21 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OC\Async\Model\BlockInterface; +use OCP\Async\Enum\BlockStatus; + +interface ISessionInterface { + public function getAll(): array; + public function byToken(string $token): ?BlockInterface; + public function byId(string $id): ?BlockInterface; + public function getGlobalStatus(): BlockStatus; + +} diff --git a/lib/private/Async/Model/Block.php b/lib/private/Async/Model/Block.php new file mode 100644 index 00000000000..9c66ec13ad8 --- /dev/null +++ b/lib/private/Async/Model/Block.php @@ -0,0 +1,142 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\Enum\BlockType; +use OCP\AppFramework\Db\Entity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +/** + * @method void setToken(string $token) + * @method string getToken() + * @method void setSessionToken(string $sessionToken) + * @method string getSessionToken() + * @method void setType(int $type) + * @method int getType() + * @method void setCode(string $code) + * @method string getCode() + * @method void setParams(array $params) + * @method ?array getParams() + * @method void setDataset(array $dataset) + * @method ?array getDataset() + * @method void setMetadata(array $metadata) + * @method ?array getMetadata() + * @method void setLinks(array $params) + * @method ?array getLinks() + * @method void setOrig(array $orig) + * @method ?array getOrig() + * @method void setResult(array $result) + * @method ?array getResult() + * @method void setStatus(int $status) + * @method int getStatus() + * @method void setExecutionTime(int $status) + * @method int getExecutionTime() + * @method void setLockToken(string $lockToken) + * @method string getLockToken() + * @method void setCreation(int $creation) + * @method int getCreation() + * @method void setLastRun(int $lastRun) + * @method int getLastRun() + * @method void setNextRun(int $nextRun) + * @method int getNextRun() + * @psalm-suppress PropertyNotSetInConstructor + */ +class Block extends Entity { + protected string $token = ''; + protected string $sessionToken = ''; + protected int $type = 0; + protected string $code = ''; + protected ?array $params = []; + protected ?array $dataset = []; + protected ?array $metadata = []; + protected ?array $links = []; + protected ?array $orig = []; + protected ?array $result = []; + protected int $status = 0; + protected int $executionTime = 0; + protected string $lockToken = ''; + protected int $creation = 0; + protected int $lastRun = 0; + protected int $nextRun = 0; + + public function __construct() { + $this->addType('token', 'string'); + $this->addType('sessionToken', 'string'); + $this->addType('type', 'integer'); + $this->addType('code', 'string'); + $this->addType('params', 'json'); + $this->addType('dataset', 'json'); + $this->addType('metadata', 'json'); + $this->addType('links', 'json'); + $this->addType('orig', 'json'); + $this->addType('result', 'json'); + $this->addType('status', 'integer'); + $this->addType('executionTime', 'integer'); + $this->addType('lockToken', 'string'); + $this->addType('creation', 'integer'); + $this->addType('lastRun', 'integer'); + $this->addType('nextRun', 'integer'); + } + + public function setBlockType(BlockType $type): void { + $this->setType($type->value); + } + + public function getBlockType(): BlockType { + return BlockType::from($this->getType()); + } + + public function setBlockStatus(BlockStatus $status): void { + $this->setStatus($status->value); + } + + public function getBlockStatus(): BlockStatus { + return BlockStatus::from($this->getStatus()); + } + + public function setProcessExecutionTime(ProcessExecutionTime $time): void { + $this->setExecutionTime($time->value); + } + + public function getProcessExecutionTime(): ProcessExecutionTime { + return ProcessExecutionTime::from($this->getExecutionTime()); + } + + public function addMetadata(array $metadata): self { + $metadata = array_merge($this->metadata, $metadata); + $this->setMetadata($metadata); + return $this; + } + + public function addMetadataEntry(string $key, string|int|array|bool $value): self { + $metadata = $this->metadata; + $metadata[$key] = $value; + $this->setMetadata($metadata); + return $this; + } + + public function replay(bool $now = false): void { + $count = $this->getMetadata()['_replay'] ?? 0; + $next = time() + floor(6 ** ($count + 1)); // calculate delay based on count of retry + $count = min(++$count, 5); // limit to 6^6 seconds (13h) + + if ($now) { + $next = time(); + } + + $this->addMetadataEntry('_replay', $count); + $this->setNextRun($next); + } + + public function getReplayCount(): int { + return $this->getMetadata()['_replay'] ?? 0; + } +} diff --git a/lib/private/Async/Model/BlockInterface.php b/lib/private/Async/Model/BlockInterface.php new file mode 100644 index 00000000000..36ba58194f7 --- /dev/null +++ b/lib/private/Async/Model/BlockInterface.php @@ -0,0 +1,183 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\AsyncManager; +use OC\Async\IBlockInterface; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +class BlockInterface implements IBlockInterface, \JsonSerializable { + private string $id = ''; + private string $name = ''; + private bool $blocker = false; + private bool $replayable = false; + + /** @var string[] */ + private array $require = []; + private int $delay = 0; + private ?ProcessExecutionTime $executionTime = null; + private array $dataset = []; + + public function __construct( + private readonly ?AsyncManager $asyncManager, + private readonly Block $block, + ) { + $this->import($block->getMetadata()['_iface'] ?? []); + } + + public function getBlock(): Block { + return $this->block; + } + + public function getToken(): string { + return $this->block->getToken(); + } + + public function getResult(): ?array { + if ($this->block->getBlockStatus() !== BlockStatus::SUCCESS) { + return null; + } + + return $this->block->getResult()['result']; + } + + public function getError(): ?array { + if ($this->block->getBlockStatus() !== BlockStatus::ERROR) { + return null; + } + + return $this->block->getResult()['error']; + } + + public function getStatus(): BlockStatus { + return $this->block->getBlockStatus(); + } + + public function id(string $id): self { + $this->id = strtoupper($id); + return $this; + } + + public function getId(): string { + return $this->id; + } + public function name(string $name): self { + $this->name = $name; + return $this; + } + + public function getName(): string { + return $this->name; + } + + public function blocker(bool $blocker = true): self { + $this->blocker = $blocker; + return $this; + } + + public function isBlocker(): bool { + return $this->blocker; + } + + public function replayable(bool $replayable = true): self { + $this->replayable = $replayable; + return $this; + } + + public function isReplayable(): bool { + return $this->replayable; + } + + public function require(string $id): self { + $this->require[] = strtoupper($id); + return $this; + } + + public function getRequire(): array { + return $this->require; + } + + public function delay(int $delay): self { + $this->delay = $delay; + return $this; + } + + public function getDelay(): int { + return $this->delay; + } + + public function getExecutionTime(): ?ProcessExecutionTime { + return $this->executionTime; + } + + /** + * @param array<array> $dataset + * + * @return $this + */ + public function dataset(array $dataset): self { + $this->dataset = $dataset; + return $this; + } + + public function getDataset(): array { + return $this->dataset; + } + +// public function getReplayCount(): int { +// return $this->get +// } + /** + * only available during the creation of the session + * + * @return $this + */ + public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): self { + $this->asyncManager?->async($time); + return $this; + } + + public function import(array $data): void { + $this->token = $data['token'] ?? ''; + $this->id($data['id'] ?? ''); + $this->name($data['name'] ?? ''); + $this->delay($data['delay'] ?? 0); + $this->require = $data['require'] ?? []; + $this->blocker($data['blocker'] ?? false); + $this->replayable($data['replayable'] ?? false); + } + + public function jsonSerialize(): array { + return [ + 'token' => $this->getToken(), + 'id' => $this->getId(), + 'name' => $this->getName(), + 'require' => $this->getRequire(), + 'delay' => $this->getDelay(), + 'blocker' => $this->isBlocker(), + 'replayable' => $this->isReplayable(), + ]; + } + + /** + * @param Block[] $processes + * + * @return BlockInterface[] + */ + public static function asBlockInterfaces(array $processes): array { + $interfaces = []; + foreach ($processes as $process) { + $interfaces[] = new BlockInterface(null, $process); + } + + return $interfaces; + } +} diff --git a/lib/private/Async/Model/SessionInterface.php b/lib/private/Async/Model/SessionInterface.php new file mode 100644 index 00000000000..c481fc545f6 --- /dev/null +++ b/lib/private/Async/Model/SessionInterface.php @@ -0,0 +1,83 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\ISessionInterface; +use OCP\Async\Enum\BlockStatus; + +class SessionInterface implements ISessionInterface { + public function __construct( + /** @var BlockInterface[] */ + private array $interfaces, + ) { + } + + public function getAll(): array { + return $this->interfaces; + } + + public function byToken(string $token): ?BlockInterface { + foreach ($this->interfaces as $iface) { + if ($iface->getToken() === $token) { + return $iface; + } + } + + return null; + } + + public function byId(string $id): ?BlockInterface { + foreach($this->interfaces as $iface) { + if ($iface->getId() === $id) { + return $iface; + } + } + + return null; + } + + /** + * return a global status of the session based on the status of each process + * - if one entry is still at ::PREP stage, we return ::PREP + * - if one ::BLOCKER, returns ::BLOCKER. + * - if all ::SUCCESS, returns ::SUCCESS, else ignored. + * - if all ::ERROR+::SUCCESS, returns ::ERROR, else ignored. + * - session status is the biggest value between all left process status (::STANDBY, ::RUNNING) + */ + public function getGlobalStatus(): BlockStatus { + $current = -1; + $groupedStatus = []; + foreach($this->interfaces as $iface) { + // returns ::PREP if one process is still ::PREP + if ($iface->getStatus() === BlockStatus::PREP) { + return BlockStatus::PREP; + } + // returns ::BLOCKER if one process is marked ::BLOCKER + if ($iface->getStatus() === BlockStatus::BLOCKER) { + return BlockStatus::BLOCKER; + } + // we keep trace if process is marked as ::ERROR or ::SUCCESS + // if not, we keep the highest value + if (in_array($iface->getStatus(), [BlockStatus::ERROR, BlockStatus::SUCCESS], true)) { + $groupedStatus[$iface->getStatus()->value] = true; + } else { + $current = max($current, $iface->getStatus()->value); + } + } + + // in case the all interface were ::ERROR or ::SUCCESS, we check + // if there was at least one ::ERROR. if none we return ::SUCCESS + if ($current === -1) { + return (array_key_exists(BlockStatus::ERROR->value, $groupedStatus)) ? BlockStatus::ERROR : BlockStatus::SUCCESS; + } + + return BlockStatus::from($current); + } +} diff --git a/lib/private/Async/README.md b/lib/private/Async/README.md new file mode 100644 index 00000000000..185047f52db --- /dev/null +++ b/lib/private/Async/README.md @@ -0,0 +1,379 @@ + + +# AsyncProcess + +Using `IAsyncProcess` allow the execution of code on a separated process in order to improve the quality of the user experience. + + +## Concept + +To shorten the hanging time on heavy process that reflect on the user experience and to avoid delay between +instruction and execution, this API allows to prepare instructions to be executed on a parallel thread. + +This is obtained by creating a loopback HTTP request, initiating a fresh PHP process that will execute the +prepared instruction after emulating a connection termination, freeing the main process. + +#### Technology + +The logic is to: + +- store a serialized version of the code to be executed on another process into database. +- start a new process as soon as possible that will retrieve the code from database and execute it. + +#### Setup + +The feature require setting a loopback url. +This is done automatically by a background job. The automatic process uses `'overwrite.cli.url'` and `'trusted_domains'` from _config/config.php_ to find a list of domain name to test. +It can be initiated via `occ`: + +> ./occ async:setup --discover + +Or manually set with: + +> ./occ async:setup --loopback https://cloud.example.net/ + + + +## Blocks & Sessions + +- We will define as _Block_ complete part of unsplittable code. +- A list of _Blocks_ can be grouped in _Sessions_. +- While _Sessions_ are independent of each other, interactions can be set between _Blocks_ of the same _Session_. + +**Interactions** + +- _Blocks_ are executed in the order they have been created. +- It is possible for a _Block_ to get results from a previous process from the session. +- A _Block_ defined as blocker will stop further process from that session on failure. +- A _Block_ can require a previous process to be successful before being executed. + +**Replayability** + +- A block can be set as _replayable_, meaning that in case of failure it can be run multiple time until it end properly. + +**Quick example** + +```php + +// define all part of the code that can be async +$this->asyncProcess->invoke($myInvoke1)->id('block1')->replayable(); // block1 can be replayed until successful +$this->asyncProcess->invoke($myInvoke2)->id('block2')->require('block1'); // block2 will not be executed until block1 has not been successful +$this->asyncProcess->invoke($myInvoke3)->id('block3')->blocker(); // block3 will run whatever happened to block1 and block2 and its suc1cess is mandatory to continue with the session +$this->asyncProcess->invoke($myInvoke4)->id('block4'); + +$this->asyncProcess->async(); // close the session and initiate async execution +``` + + +## ProcessExecutionTime + +Code is to be executed as soon as defined, with alternative fallback solutions in that order: + +- `::NOW` - main process will fork and execute the code in parallel (instantly) +- `::ASAP` - process will be executed by an optional live service (a second later) +- `::LATER` - process will be executed at the next execution of the cron tasks (within the next 10 minutes) +- `::ON_REQUEST` - process needs to be executed manually + + +## IAsyncProcess + +`IAsyncProcess` is the public interface that contains few methods to prepare the code to be processed on a parallel process. + + +#### Closure + +The code can be directly written in a closure by calling `exec()`: + +```php +$this->asyncProcess->exec(function (int $value, string $line, MyObject $obj): void { + // long process +}, + random_int(10000, 99999), + 'this is a line', + $myObj +)->async(); +``` + + +#### Invokable + +Within the magic method `__invoke()` in a configured objects + +```php +class MyInvoke { + public function __construct( + private array $data = [] + ) { + } + + public function __invoke(int $n): void { + // do long process + } +} + + +$myInvoke = new MyInvoke(['123']); +$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->async(); +``` + + +#### PHP Class + +Via the method `async()` from a class + +```php +<?php + +namespace OCA\MyApp; + +class MyObj { + public function __construct( + ) { + } + + public function async(int $n): void { + // run heavy stuff + } +} +``` + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class, random_int(10000, 99999))->async(); +``` + + + +## IBlockInterface + +When storing a new _Block_ via `IAsyncProcess::call()`,`IAsyncProcess::invoke()` or `IAsyncProcess::async()`, will be returned a `IBlockInterface` to provided details about the _Block_. + + +### name(string) + +Identification and/or description of the _Block_ for better understanding when debugging + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->name('my process'); +``` + +### id(string) + +Identification of the _Block_ for future interaction between _Blocks_ within the same _Session_ + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->id('my_process'); +``` + +As an example, `id` are to be used to obtain `IBlockInterface` from a specific _Block_: +```php +ISessionInterface::byId('my_process'); // returns IBlockInterface +``` + + +### blocker() + +Set current _Block_ as _Blocker_, meaning that further _Blocks_ of the _Session_ are lock until this process does not run successfully + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->blocker(); +``` + + +### require(string) + +Define that the _Block_ can only be executed if set _Block_, identified by its `id`, ran successfully. +Multiple _Blocks_ can be set a required. + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->require('other_block_1')->require('other_block_2'); +``` + +### replayable() + +The _Block_ is configured as replayable, meaning that it will be restarted until it runs correctly + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->replayable(); +``` + +The delay is calculated using 6 (six) exponent current retry, capped at 6: + +- 1st retry after few seconds, +- 2nd retry after 30 seconds, +- 3rd retry after 3 minutes, +- 4th retry after 20 minutes, +- 5th retry after 2 hours, +- 6th retry after 12 hours, +- other retries every 12 hours. + +### delay(int) + +Only try to initiate the process n seconds after current time. + +### dataset(array) + +It is possible to set a list of arguments to be applied to the same _Block_. +The _Block_ will be executed for each defined set of data + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->dataset( + [ + ['this is a string', 1], + ['this is another string', 12], + ['and another value as first parameter', 42], + ] +); +``` + + +### post-execution + +Post execution of a _Block_, its `IBlockInterface` can be used to get details about it: + +- **getExecutionTime()** returns the `ProcessExecutionTime` that initiated the process, +- **getResult()** returns the array returned by the process in case of success, +- **getError()** returns the error in case of failure + + + +## ISessionInterface + +`ISessionInterface` is available to your code via `ABlockWrapper` and helps interaction between all the _Blocks_ of the same _Session_ + +### getAll() + +returns all `IBlockInterface` from the _Session_ + +### byToken(string) + +returns a `IBockInterface` using its token + +### byId(string) + +returns a `IBockInterface` using its `id` + +### getGlobalStatus() + +return a `BlockStatus` (Enum) based on every status of all _Blocks_ of the _Session_: + +- returns `::PREP` if one block is still at prep stage, +- return `::BLOCKER` if at least one block is set as `blocker` and is failing, +- returns `::SUCCESS` if all blocks are successful, +- returns `::ERROR` if all process have failed, +- returns `::STANDBY` or `::RUNNING` if none of the previous condition are met. `::RUNNING` if at least one `Block` is currently processed. + + + + +## ABlockWrapper + +This abstract class helps to interface with other _Blocks_ from the same _Session_. +It will be generated and passed as argument to the defined block if the first parameter is an `AprocessWrapper` is expected as first parameter: + + +As a _Closure_ +```php +$this->asyncProcess->exec(function (ABlockWrapper $wrapper, array $data): array { + $resultFromProc1 = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success, unless using require() + $wrapper->activity(BlockActivity::NOTICE, 'result from previous process: ' . json_encode($resultFromProc1)) +}, + ['mydata' => true] + )->id('block2'); +``` + +When using `invoke()` +```php +class MyInvoke { + public function __construct( + ) { + } + + public function __invoke(ABlockWrapper $wrapper, int $n): void { + $data = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success + } +} + +$myInvoke = new MyInvoke(); +$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->requipe('block1'); // require ensure block1 has run successfully before this one +``` + +Syntax is the same with `call()`, when defining the `async()` method + +```php +class MyObj { + public function __construct( + ) { + } + + public function async(ABlockWrapper $wrapper): void { + } +} +``` + +#### Abstract methods + +`ABlockWrapper` is an abstract class with a list of interfaced methods that have different behavior on the _BlockWrapper_ sent by the framework. + +- **DummyBlockWrapper** will do nothing, +- **CliBlockWrapper** will generate/manage console output, +- **LoggerBlockWrapper** will only create new nextcloud logs entry. + +List of usefull methods: + +- **activity(BlockActivity $activity, string $line = '');** can be used to update details about current part of your code during its process +- **getSessionInterface()** return the `ISessionInterface` for the session +- **getReplayCount()** returns the number of retry + + +## Other tools + +#### Live Service + +This will cycle every few seconds to check for any session in stand-by mode and will execute its blocks +> ./occ async:live + + +#### Manage sessions and blocks + +Get resume about current session still in database. + +> `./occ async:manage` + +Get details about a session. + +> `./occ async:manage --session <sessionId> + +Get details about a block + +> `./occ async:manage --details <blockId> + +Get excessive details about a block + +> `./occ async:manage --details <blockId> --full-details + +Replay a not successful block + +> `./occ async:manage --replay <blockId> + +#### Mocking process + +`./occ async:setup` allow an admin to generate fake processes to emulate the feature: + +- **--mock-session _int_** create _n_ sessions +- **--mock-block _int_** create _n_ blocks +- **--fail-process _string_** create failing process + + +# Work in Progress + +missing element of the feature: + +- [ ] discussing the need of signing the PHP code stored in database to ensure its authenticity before execution, +- [ ] ability to overwrite the code or arguments of a process to fix a failing process, +- [ ] Check and confirm value type compatibility between parameters and arguments when storing new block +- [ ] implementing dataset(), +- [ ] implementing delay(), +- [ ] full documentation, +- [ ] tests, tests, tests. + + diff --git a/lib/private/Async/Wrappers/CliBlockWrapper.php b/lib/private/Async/Wrappers/CliBlockWrapper.php new file mode 100644 index 00000000000..4c38c7fc995 --- /dev/null +++ b/lib/private/Async/Wrappers/CliBlockWrapper.php @@ -0,0 +1,103 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; +use Symfony\Component\Console\Formatter\OutputFormatterStyle; +use Symfony\Component\Console\Output\OutputInterface; + +class CliBlockWrapper extends ABlockWrapper { + private const TAB_SIZE = 12; + private const HEXA = '258ad'; + private int $hexaLength = 0; + private string $prefix = ''; + private int $slot = 0; + public function __construct( + private ?OutputInterface $output = null, + ) { + $this->hexaLength = strlen(self::HEXA); + } + + public function session(array $metadata): void { + $this->slot = $metadata['_cli']['slot'] ?? 0; + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + '<open>++</>' . ' initiating session ' . + ($metadata['sessionToken'] ?? '') + ); + } + + public function init(): void { + $this->prefix = $this->generatePrefix($this->block->getToken()); + } + + public function activity(BlockActivity $activity, string $line = ''): void { + $act = match ($activity) { + BlockActivity::STARTING => '<open>>></>', + BlockActivity::ENDING => '<close><<</>', + BlockActivity::DEBUG, BlockActivity::NOTICE => ' ', + BlockActivity::WARNING => '<warn>!!</>', + BlockActivity::ERROR => '<error>!!</>', + }; + + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + $act . ' ' . + $this->prefix . ' ' . + $line + ); + } + + public function end(string $line = ''): void { + if ($line === '') { + $this->output->writeln(''); + return; + } + + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + '<warn>--</>' . ' ' . $line + ); + } + + private function generatePrefix(string $token): string { + $color = 's' . random_int(1, $this->hexaLength - 1) . + random_int(1, $this->hexaLength - 1) . + random_int(1, $this->hexaLength - 1); + + return '<' . $color . '>' . $token . '</>'; + } + + public static function initStyle(OutputInterface $output): void { + $output->getFormatter()->setStyle('open', new OutputFormatterStyle('#0f0', '', ['bold'])); + $output->getFormatter()->setStyle('close', new OutputFormatterStyle('#f00', '', ['bold'])); + $output->getFormatter()->setStyle('warn', new OutputFormatterStyle('#f00', '', [])); + $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#f00', '', ['bold'])); + + $hexaLength = strlen(self::HEXA); + for ($i = 0; $i < $hexaLength; $i++) { + for ($j = 0; $j < $hexaLength; $j++) { + for ($k = 0; $k < $hexaLength; $k++) { + $output->getFormatter()->setStyle( + 's' . $i . $j . $k, + new OutputFormatterStyle( + '#' . self::HEXA[$i] . self::HEXA[$j] . self::HEXA[$k], + '', + ['bold'] + ) + ); + } + } + } + } + +} diff --git a/lib/private/Async/Wrappers/DummyBlockWrapper.php b/lib/private/Async/Wrappers/DummyBlockWrapper.php new file mode 100644 index 00000000000..e719e4f83e5 --- /dev/null +++ b/lib/private/Async/Wrappers/DummyBlockWrapper.php @@ -0,0 +1,27 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; + +class DummyBlockWrapper extends ABlockWrapper { + public function session(array $metadata): void { + } + public function init(): void { + } + + public function activity(BlockActivity $activity, string $line = ''): void { + } + + public function end(string $line = ''): void { + } +} diff --git a/lib/private/Async/Wrappers/LoggerBlockWrapper.php b/lib/private/Async/Wrappers/LoggerBlockWrapper.php new file mode 100644 index 00000000000..66d9c80007c --- /dev/null +++ b/lib/private/Async/Wrappers/LoggerBlockWrapper.php @@ -0,0 +1,39 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; +use Psr\Log\LoggerInterface; + +class LoggerBlockWrapper extends ABlockWrapper { + private array $metadata = []; + public function __construct( + private LoggerInterface $logger, + ) { + } + + public function session(array $metadata): void { + $this->metadata = $metadata; + $this->logger->debug('session: ' . json_encode($metadata)); + } + + public function init(): void { + $this->logger->debug('process: ' . $this->metadata['sessionToken'] . ' ' . $this->block->getToken()); + } + + public function activity(BlockActivity $activity, string $line = ''): void { + $this->logger->debug('activity (' . $activity->value . ') ' . $line); + } + + public function end(string $line = ''): void { + $this->logger->debug('end session - ' . $line); + } +} |