diff options
author | Maxence Lange <maxence@artificial-owl.com> | 2025-04-28 19:57:56 -0100 |
---|---|---|
committer | Maxence Lange <maxence@artificial-owl.com> | 2025-04-28 19:58:06 -0100 |
commit | 6d272ff894fa969e5a526caf6a91d60eda115c53 (patch) | |
tree | df90f826a29f7169dd9b550f2e478a1b09d29e71 | |
parent | 10a01423ecfc7e028960eee8058bd177ad28d484 (diff) | |
download | nextcloud-server-enh/noid/async-process-run.tar.gz nextcloud-server-enh/noid/async-process-run.zip |
feat(async): AsyncProcessenh/noid/async-process-run
Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
38 files changed, 3213 insertions, 1 deletions
diff --git a/core/BackgroundJobs/AsyncProcessJob.php b/core/BackgroundJobs/AsyncProcessJob.php new file mode 100644 index 00000000000..3fd43abf5ab --- /dev/null +++ b/core/BackgroundJobs/AsyncProcessJob.php @@ -0,0 +1,62 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\BackgroundJobs; + +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Wrappers\LoggerBlockWrapper; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\BackgroundJob\TimedJob; +use OCP\IAppConfig; + +class AsyncProcessJob extends TimedJob { + public function __construct( + ITimeFactory $time, + private IAppConfig $appConfig, + private ForkManager $forkManager, + private BlockMapper $blockMapper, + private LoggerBlockWrapper $loggerProcessWrapper, + ) { + parent::__construct($time); + + $this->setTimeSensitivity(self::TIME_SENSITIVE); +// $this->setInterval(60 * 5); + $this->setInterval(1); + } + + protected function run(mixed $argument): void { + $this->discoverLoopAddress(); + + $this->forkManager->setWrapper($this->loggerProcessWrapper); + + $this->blockMapper->resetFailedBlock(); + + $metadata = ['executionTime' => ProcessExecutionTime::LATER]; + foreach ($this->blockMapper->getSessionOnStandBy() as $session) { + $this->forkManager->forkSession($session, $metadata); + } + + $this->blockMapper->removeSuccessfulBlock(); + + $this->forkManager->waitChildProcess(); + } + + private function discoverLoopAddress(): void { + if ($this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) { + return; + } + + $found = $this->forkManager->discoverLoopbackEndpoint(); + if ($found !== null) { + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $found); + } + } +} diff --git a/core/Command/Async/Live.php b/core/Command/Async/Live.php new file mode 100644 index 00000000000..89a16a3165d --- /dev/null +++ b/core/Command/Async/Live.php @@ -0,0 +1,54 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\Command\Async; + +use OC\Async\AsyncManager; +use OC\Async\AsyncProcess; +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Wrappers\CliBlockWrapper; +use OCP\Async\Enum\ProcessExecutionTime; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; + +class Live extends Command { + public function __construct( + private readonly BlockMapper $blockMapper, + private readonly ForkManager $forkManager, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:live') + ->setDescription('test'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + CliBlockWrapper::initStyle($output); + $this->forkManager->setWrapper(new CliBlockWrapper($output)); + + $metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP]; + while(true) { + $this->blockMapper->resetFailedBlock(); + + foreach ($this->blockMapper->getSessionOnStandBy() as $session) { + $this->forkManager->forkSession($session, $metadata); + } + + sleep(3); + } + + $this->forkManager->waitChildProcess(); + + return 0; + } +} diff --git a/core/Command/Async/Manage.php b/core/Command/Async/Manage.php new file mode 100644 index 00000000000..9be46b335c8 --- /dev/null +++ b/core/Command/Async/Manage.php @@ -0,0 +1,232 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Command\Async; + +use OC\Async\AsyncManager; +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Async\Model\SessionInterface; +use OCP\Async\Enum\BlockExecutionTime; +use OCP\Async\Enum\BlockStatus; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Formatter\OutputFormatterStyle; +use Symfony\Component\Console\Helper\QuestionHelper; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ConfirmationQuestion; + +class Manage extends Command { + private bool $noCrop = false; + + public function __construct( + private AsyncManager $asyncManager, + private ForkManager $forkManager, + private BlockMapper $blockMapper, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:manage') + ->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session') + ->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all blocks from a session', '') + ->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific block', '') + ->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details') + ->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific block', '') + ->setDescription('manage'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + if ($input->getOption('clean')) { + $count = $this->blockMapper->removeSuccessfulBlock(); + $output->writeln('deleted ' . $count . ' blocks'); + return 0; + } + + $replay = $input->getOption('replay'); + if ($replay !== '') { + $this->replayBlock($input, $output, $replay); + return 0; + } + + $this->noCrop = $input->getOption('full-details'); + $output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', [])); + $output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', [])); + $output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold'])); + $output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', [])); + $output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold'])); + $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', [])); + $output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold'])); + + $details = $input->getOption('details'); + if ($details !== '') { + $this->displayBlock($output, $details); + return 0; + } + + $session = $input->getOption('session'); + if ($session !== '') { + $this->displaySession($output, $session); + return 0; + } + + $this->summary($output); + return 0; + } + + private function summary(OutputInterface $output): void { + $statuses = []; + foreach ($this->blockMapper->getSessions() as $token) { + $sessionBlockes = $this->blockMapper->getBySession($token); + $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlockes)); + + $status = $sessionIface->getGlobalStatus()->value; + if (!array_key_exists($status, $statuses)) { + $statuses[$status] = []; + } + $statuses[$status][] = $token; + } + + if (!empty($success = $statuses[(string)BlockStatus::SUCCESS->value] ?? [])) { + $output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>'); + } + + if (!empty($prep = $statuses[(string)BlockStatus::PREP->value] ?? [])) { + $output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>'); + } + + if (!empty($stand = $statuses[BlockStatus::STANDBY->value] ?? [])) { + $output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>'); + } + + if (!empty($running = $statuses[BlockStatus::RUNNING->value] ?? [])) { + $output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>'); + } + + if (!empty($err = $statuses[BlockStatus::ERROR->value] ?? [])) { + $output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>'); + } + + if (!empty($blocker = $statuses[BlockStatus::BLOCKER->value] ?? [])) { + $output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>'); + } + } + + + private function displaySession(OutputInterface $output, string $token): void { + foreach ($this->blockMapper->getBySession($token) as $block) { + $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>'); + $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>'); + $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>'); + $output->writeln('BlockStatus: ' . $this->displayStatus($block)); + $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>'); + + $output->writeln(''); + } + + } + + + private function displayBlock(OutputInterface $output, string $token): void { + $block = $this->blockMapper->getByToken($token); + + $output->writeln('SessionToken: <data>' . $block->getSessionToken() . '</data>'); + $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>'); + $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>'); + $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>'); + + $output->writeln('Code: <data>' . $this->cropContent($block->getCode(), 3000, 15) . '</data>'); + $output->writeln('Params: <data>' . $this->cropContent(json_encode($block->getParams()), 3000, 15) . '</data>'); + $output->writeln('Metadata: <data>' . $this->cropContent(json_encode($block->getMetadata()), 3000, 15) . '</data>'); + $output->writeln('Result: <data>' . $this->cropContent(json_encode($block->getResult()), 3000, 15) . '</data>'); + + $output->writeln('Dataset: <data>' . $this->cropContent(json_encode($block->getDataset()), 3000, 15) . '</data>'); + $output->writeln('Links: <data>' . $this->cropContent(json_encode($block->getLinks()), 3000, 15) . '</data>'); + $output->writeln('Orig: <data>' . $this->cropContent(json_encode($block->getOrig()), 3000, 15) . '</data>'); + + $output->writeln('ExecutionTime: <data>' . BlockExecutionTime::tryFrom($block->getExecutionTime())?->name . '</data>'); + $output->writeln('Creation: <data>' . $block->getCreation() . '</data>'); + $output->writeln('LastRun: <data>' . $block->getLastRun() . '</data>'); + $output->writeln('NextRun: <data>' . $block->getNextRun() . '</data>'); + $output->writeln('BlockStatus: ' . $this->displayStatus($block)); + $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>'); + } + + + private function displayStatus(Block $block): string { + $name = $block->getBlockStatus()->name; + $color = strtolower($name); + return '<' . $color . '>' . $name . '</' . $color . '>'; + } + + private function displayInterface(Block $block): string { + $iface = new BlockInterface(null, $block); + + $data = []; + $data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId(); + $data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName(); + $data[] = (!$iface->isReplayable()) ? '' : 'replayable=true'; + $data[] = (!$iface->isBlocker()) ? '' : 'blocker=true'; + $data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire()); + + return implode(', ', array_filter($data)); + } + + + + + private function replayBlock(InputInterface $input, OutputInterface $output, string $token): void { + $block = $this->blockMapper->getByToken($token); + if (!in_array($block->getBlockStatus(), [BlockStatus::ERROR, BlockStatus::BLOCKER], true)) { + $output->writeln('only Block set as ERROR or BLOCKER can be replayed'); + return; + } + + $iface = new BlockInterface(null, $block); + if (!$iface->isReplayable()) { + $output->writeln(''); + $output->writeln('Block is not set as <comment>replayable</comment>.'); + $output->writeln('Replaying this Block can create issues.'); + $output->writeln(''); + $question = new ConfirmationQuestion( + '<comment>Do you really want to replay the Block ' . $token . ' ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + return; + } + } + + $block->replay(true); + $this->blockMapper->update($block); + } + + + /** + * crop content after n lines or n chars + */ + private function cropContent(string $content, int $maxChars, int $maxLines): string { + if ($this->noCrop) { + return $content; + } + preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE); + return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars)); + } +} + diff --git a/core/Command/Async/Setup.php b/core/Command/Async/Setup.php new file mode 100644 index 00000000000..d5b94a9b732 --- /dev/null +++ b/core/Command/Async/Setup.php @@ -0,0 +1,435 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Command\Async; + +use OC\Async\ABlockWrapper; +use OC\Async\AsyncManager; +use OC\Async\Exceptions\LoopbackEndpointException; +use OC\Async\ForkManager; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\Async\Enum\BlockActivity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\IAsyncProcess; +use OCP\IAppConfig; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Helper\QuestionHelper; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ConfirmationQuestion; + +class Setup extends Command { + public function __construct( + private readonly IAppConfig $appConfig, + private readonly IAsyncProcess $asyncProcess, + private readonly AsyncManager $asyncManager, + private readonly ForkManager $forkManager, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:setup') + ->addOption('reset', '', InputOption::VALUE_NONE, 'reset all data related to the AsyncProcess feature') + ->addOption('drop', '', InputOption::VALUE_NONE, 'drop processes data') + ->addOption('loopback', '', InputOption::VALUE_REQUIRED, 'set loopback address') + ->addOption('discover', '', InputOption::VALUE_NONE, 'initiate the search for a possible loopback address') + ->addOption('yes', '', InputOption::VALUE_NONE, 'answer yes to any confirmation box') + ->addOption('exception-if-no-config', '', InputOption::VALUE_NONE, 'return an exception if existing configuration cannot be used') + ->addOption('mock-session', '', InputOption::VALUE_OPTIONAL, 'create n sessions', '0') + ->addOption('mock-block', '', InputOption::VALUE_OPTIONAL, 'create n blocks', '0') + ->addOption('fail-process', '', InputOption::VALUE_REQUIRED, 'create fail process', '') + ->setDescription('setup'); + } + + /** + * @throws LoopbackEndpointException + */ + protected function execute(InputInterface $input, OutputInterface $output): int { + $reset = $input->getOption('reset'); + $drop = $input->getOption('drop'); + if ($reset || $drop) { + if (!$input->getOption('yes')) { + $question = new ConfirmationQuestion( + ($reset) ? '<comment>Do you want to reset all data and configuration related to AsyncProcess ?</comment> (y/N) ' + : '<comment>Do you want to drop all data related to AsyncProcess ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + + return 0; + } + } + + $this->asyncManager->dropAllBlocks(); + if ($reset) { + $this->asyncManager->resetConfig(); + } + + $output->writeln('done.'); + + return 0; + } + + $failureType = $input->getOption('fail-process'); + if ($failureType !== '') { + try { + match ($failureType) { + 'static-required' => $this->createFaultyProcessStaticRequired(), + 'static-blocker' => $this->createFaultyProcessStaticBlocker(), + 'dynamic-required' => $this->createFaultyProcessDynamicRequired(), + 'dynamic-blocker' => $this->createFaultyProcessDynamicBlocker(), + 'auto-required' => $this->createFaultyProcessAutoRequired(), + 'auto-blocker' => $this->createFaultyProcessAutoBlocker(), + }; + } catch (\UnhandledMatchError) { + $output->writeln( + 'list of fail: static-required, static-blocker, dynamic-blocker, dynamic-required, auto-blocker, auto-required' + ); + } + + return 0; + } + + $session = (int)($input->getOption('mock-session') ?? 0); + $proc = (int)($input->getOption('mock-block') ?? 0); + if ($session > 0 || $proc > 0) { + $this->createRandomProcess($output, $session, $proc); + + $this->forkManager->waitChildProcess(); + + return 0; + } + + if ($input->getOption('discover')) { + $output->writeln('<info>Searching for loopback address</info>'); + $found = $this->forkManager->discoverLoopbackEndpoint($output); + $output->writeln('found a working loopback address: <info>' . $found . '</info>'); + $this->confirmSave($input, $output, $found); + + return 0; + } + + $inputLoopback = $input->getOption('loopback'); + if ($inputLoopback) { + $this->parseAddress($inputLoopback); + $output->write('- testing <comment>' . $inputLoopback . '</comment>... '); + + $reason = ''; + if (!$this->forkManager->testLoopbackInstance($inputLoopback, $reason)) { + $output->writeln('<error>' . $reason . '</error>'); + + return 0; + } + + $output->writeln('<info>ok</info>'); + $this->confirmSave($input, $output, $inputLoopback); + + return 0; + } + + try { + $currentLoopback = $this->forkManager->getLoopbackInstance(); + $output->writeln('Current loopback instance: <info>' . $currentLoopback . '</info>'); + $output->write('Testing async process using loopback endpoint... '); + $reason = ''; + if (!$this->forkManager->testLoopbackInstance($currentLoopback, $reason)) { + $output->writeln('<error>' . $reason . '</error>'); + } else { + $output->writeln('<info>ok</info>'); + } + + return 0; + } catch (LoopbackEndpointException $e) { + if ($input->getOption('exception-if-no-config')) { + throw $e; + } + } + + $output->writeln('<info>Notes:</info>'); + $output->writeln('no loopback instance currently set.'); + $output->writeln('use --loopback <address> to manually configure a loopback instance'); + $output->writeln('or use --discover for an automated process'); + + return 0; + } + + private function confirmSave(InputInterface $input, OutputInterface $output, string $instance): void { + if (!$input->getOption('yes')) { + $output->writeln(''); + $question = new ConfirmationQuestion( + '<comment>Do you want to save loopback address \'' . $instance . '\' ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + + return; + } + } + + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $instance); + } + + /** + * confirm format of typed address + */ + private function parseAddress(string $test): void { + $scheme = parse_url($test, PHP_URL_SCHEME); + $cloudId = parse_url($test, PHP_URL_HOST); + if (is_bool($scheme) || is_bool($cloudId) || is_null($scheme) || is_null($cloudId)) { + throw new LoopbackEndpointException('format must be http[s]://domain.name[:post][/path]'); + } + } + + /** + * create a list of fake/mockup process + * + * @param int $session number of session to generate + * @param int $processes number of process per session to generate + */ + private function createRandomProcess(OutputInterface $output, int $session, int $processes): void { + $session = ($session > 0) ? $session : 1; + $processes = ($processes > 0) ? $processes : 1; + + for ($i = 0; $i < $session; $i++) { + for ($j = 0; $j < $processes; $j++) { + $process = $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $data, int $j): array { + if ($j > 0) { + $wrapper->activity( + BlockActivity::NOTICE, 'result from first process of the session: ' + . $wrapper->getSessionInterface() + ->byId('mock_process_0') + ?->getResult()['result'] + ); + } + sleep(random_int(2, 8)); + $wrapper->activity( + BlockActivity::NOTICE, 'mocked process is now over with data=' . $data + ); + + return ['result' => $data]; + }, + random_int(1, 5000), + $j + )->id('mock_process_' . $j)->blocker(); + $output->writeln(' > creating process <info>' . $process->getToken() . '</info>'); + } + + $token = $this->asyncProcess->async(ProcessExecutionTime::LATER); + + $output->writeln('- session <info>' . $token . '</info>'); + $output->writeln(''); + } + } + + private function createFaultyProcessStaticRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + $wrapper->activity( + BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds' + ); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + private function createFaultyProcessStaticBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + $wrapper->activity( + BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds' + ); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should NOT run'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessDynamicRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-required']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessDynamicBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-blocker']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + + private function createFaultyProcessAutoRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-required']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->replayable(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessAutoBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-blocker']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker()->replayable(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + +} diff --git a/core/Controller/AsyncProcessController.php b/core/Controller/AsyncProcessController.php new file mode 100644 index 00000000000..f644e3a7851 --- /dev/null +++ b/core/Controller/AsyncProcessController.php @@ -0,0 +1,77 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\Controller; + +use OC\Async\Exceptions\BlockAlreadyRunningException; +use OC\Async\ForkManager; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\AppFramework\Controller; +use OCP\AppFramework\Http\Attribute\FrontpageRoute; +use OCP\AppFramework\Http\Attribute\NoAdminRequired; +use OCP\AppFramework\Http\Attribute\NoCSRFRequired; +use OCP\AppFramework\Http\Attribute\PublicPage; +use OCP\AppFramework\Http\DataResponse; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\IAppConfig; +use OCP\IRequest; + +class AsyncProcessController extends Controller { + public function __construct( + string $appName, + IRequest $request, + private IAppConfig $appConfig, + private ForkManager $forkManager, + ) { + parent::__construct($appName, $request); + } + + #[NoAdminRequired] + #[NoCSRFRequired] + #[PublicPage] + #[FrontpageRoute(verb: 'POST', url: '/core/asyncProcessFork')] + public function processFork(string $token): DataResponse { + $metadata = ['executionTime' => ProcessExecutionTime::NOW]; + + // remote might only need a confirmation that this is a valid loopback endpoint + if ($token === '__ping__') { + return new DataResponse(['ping' => $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING)]); + } + + $this->async($token); + + try { + $this->forkManager->runSession($token, $metadata); + } catch (BlockAlreadyRunningException) { + // TODO: debug() ? + } + + exit(); + } + + private function async(string $result = ''): void { + if (ob_get_contents() !== false) { + ob_end_clean(); + } + + header('Connection: close'); + header('Content-Encoding: none'); + ignore_user_abort(); + $timeLimit = 100; + set_time_limit(max($timeLimit, 0)); + ob_start(); + + echo($result); + + $size = ob_get_length(); + header('Content-Length: ' . $size); + ob_end_flush(); + flush(); + } + +} diff --git a/core/Migrations/Version32000Date20250307222601.php b/core/Migrations/Version32000Date20250307222601.php new file mode 100644 index 00000000000..179645e0d2c --- /dev/null +++ b/core/Migrations/Version32000Date20250307222601.php @@ -0,0 +1,64 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Migrations; + +use Closure; +use OCP\DB\ISchemaWrapper; +use OCP\DB\Types; +use OCP\Migration\Attributes\AddColumn; +use OCP\Migration\Attributes\AddIndex; +use OCP\Migration\Attributes\ColumnType; +use OCP\Migration\Attributes\CreateTable; +use OCP\Migration\Attributes\DropIndex; +use OCP\Migration\Attributes\IndexType; +use OCP\Migration\IOutput; +use OCP\Migration\SimpleMigrationStep; + +/** + * Create new column and index for lazy loading in preferences for the new IUserPreferences API. + */ +#[CreateTable(table: 'async_processes', columns: ['id', 'token', 'type', 'code', 'params', 'orig', 'result', 'status'], description: 'async task and status')] +class Version32000Date20250307222601 extends SimpleMigrationStep { + public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper { + /** @var ISchemaWrapper $schema */ + $schema = $schemaClosure(); + + if ($schema->hasTable('async_process')) { + return null; + } + + $table = $schema->createTable('async_process'); + $table->addColumn('id', Types::BIGINT, [ 'notnull' => true, 'length' => 64, 'autoincrement' => true, 'unsigned' => true]); + $table->addColumn('token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]); + $table->addColumn('session_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]); + $table->addColumn('type', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('code', Types::TEXT, ['notnull' => true, 'default' => '']); + $table->addColumn('params', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('dataset', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('metadata', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('links', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('orig', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('result', Types::TEXT, ['notnull' => true, 'default' => '']); + $table->addColumn('status', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('execution_time', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('lock_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 7]); + $table->addColumn('creation', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('last_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('next_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + + $table->setPrimaryKey(['id'], 'asy_prc_id'); + + $table->addIndex(['token'], 'asy_prc_tkn'); + $table->addIndex(['status'], 'asy_prc_sts'); + $table->addIndex(['next_run'], 'asy_prc_nxt'); + $table->addIndex(['status', 'id', 'next_run'], 'asy_prc_sin'); + + return $schema; + } +} diff --git a/core/register_command.php b/core/register_command.php index 62305d75a30..e0c201920a9 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -152,6 +152,10 @@ if ($config->getSystemValueBool('installed', false)) { $application->add(Server::get(Command\TaskProcessing\Statistics::class)); $application->add(Server::get(Command\Memcache\RedisCommand::class)); + + $application->add(Server::get(Command\Async\Live::class)); + $application->add(Server::get(Command\Async\Manage::class)); + $application->add(Server::get(Command\Async\Setup::class)); } else { $application->add(Server::get(Command\Maintenance\Install::class)); } diff --git a/lib/base.php b/lib/base.php index aa463e206a3..7f880db6df8 100644 --- a/lib/base.php +++ b/lib/base.php @@ -582,6 +582,7 @@ class OC { self::setRequiredIniValues(); self::handleAuthHeaders(); + // prevent any XML processing from loading external entities libxml_set_external_entity_loader(static function () { return null; diff --git a/lib/private/Async/ABlockWrapper.php b/lib/private/Async/ABlockWrapper.php new file mode 100644 index 00000000000..a5dd1c3b629 --- /dev/null +++ b/lib/private/Async/ABlockWrapper.php @@ -0,0 +1,40 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OC\Async\Model\Block; +use OC\Async\Model\SessionInterface; +use OCP\Async\Enum\BlockActivity; + +abstract class ABlockWrapper { + abstract public function session(array $metadata): void; + abstract public function init(): void; + abstract public function activity(BlockActivity $activity, string $line = ''): void; + abstract public function end(string $line = ''): void; + + protected Block $block; + private SessionInterface $sessionInterface; + + public function setBlock(Block $block): void { + $this->block = $block; + } + + public function getSessionInterface(): SessionInterface { + return $this->sessionInterface; + } + + public function setSessionInterface(SessionInterface $iface): void { + $this->sessionInterface = $iface; + } + + public function getReplayCount(): int { + return $this->block->getReplayCount(); + } +} diff --git a/lib/private/Async/AsyncManager.php b/lib/private/Async/AsyncManager.php new file mode 100644 index 00000000000..955037f356d --- /dev/null +++ b/lib/private/Async/AsyncManager.php @@ -0,0 +1,222 @@ +<?php + +namespace OC\Async; + +use OC\Async\Db\BlockMapper; +use OC\Async\Enum\BlockType; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; +use OCP\IAppConfig; +use ReflectionFunctionAbstract; + +class AsyncManager { + private ?string $sessionToken = null; + /** @var BlockInterface[] */ + private array $interfaces = []; + /** @var array<string, ReflectionFunctionAbstract> } */ + private array $blocksReflexion = []; + + public function __construct( + private IAppConfig $appConfig, + private BlockMapper $blockMapper, + private ForkManager $forkManager, + ) { + } + + /** + * @param BlockType $type + * @param string $serializedBlock + * @param ReflectionFunctionAbstract $reflection + * @param array $params + * @param array $allowedClasses + * + * @return IBlockInterface + * @see \OCP\Async\IAsyncProcess + * @internal prefer using public interface {@see \OCP\Async\IAsyncProcess} + */ + public function asyncBlock( + BlockType $type, + string $serializedBlock, // serialize() on the code - className if type is ::CLASS + \ReflectionFunctionAbstract $reflection, // reflection about the method/function called as unserialization + array $params, // parameters to use at unserialization + array $allowedClasses = [] // list of class included in the serialization of the 'block' + ): IBlockInterface { + $this->sessionToken ??= $this->generateToken(); + + $data = $this->serializeReflectionParams($reflection, $params); + $data['blockClasses'] = $allowedClasses; + if ($type === BlockType::CLASSNAME) { + $data['className'] = $serializedBlock; + } + + $token = $this->generateToken(); + $block = new Block(); + $block->setToken($token); + $block->setSessionToken($this->sessionToken); + $block->setBlockType($type); + $block->setBlockStatus(BlockStatus::PREP); + $block->setCode($serializedBlock); + $block->setParams($data); + $block->setOrig([]); + $block->setCreation(time()); + + + // needed !? +// $process->addMetadataEntry('_links', $this->interfaces); + +// $this->processMapper->insert($process); + + $iface = new BlockInterface($this, $block); + $this->interfaces[] = $iface; + $this->blocksReflexion[$token] = $reflection; + + return $iface; + } + +// public function updateDataset(ProcessInfo $processInfo) { +// $token = $processInfo->getToken(); +// $dataset = []; +// foreach($processInfo->getDataset() as $params) { +// $dataset[] = $this->serializeReflectionParams($this->processesReflexion[$token], $params); +// } +// $this->processMapper->updateDataset($token, $dataset); +// } + + private function endSession(ProcessExecutionTime $time): ?string { + if ($this->sessionToken === null) { + return null; + } + + $current = $this->sessionToken; + foreach ($this->interfaces as $iface) { + $process = $iface->getBlock(); + $process->addMetadataEntry('_iface', $iface->jsonSerialize()); + $process->setProcessExecutionTime($time); + + $dataset = []; + foreach ($iface->getDataset() as $params) { + $dataset[] = $this->serializeReflectionParams( + $this->blocksReflexion[$process->getToken()], $params + ); + } + $process->setDataset($dataset); + + $this->blockMapper->insert($process); + } + + $this->blockMapper->updateSessionStatus( + $this->sessionToken, BlockStatus::STANDBY, BlockStatus::PREP + ); + $this->processes = $this->interfaces = []; + $this->sessionToken = null; + + return $current; + } + + /** + * @internal + */ + public function async(ProcessExecutionTime $time): string { + $current = $this->endSession($time); + if ($current === null) { + return ''; + } + + if ($time === ProcessExecutionTime::NOW) { + $this->forkManager->forkSession($current); + } + + return $current; + } + + private function serializeReflectionParams(ReflectionFunctionAbstract $reflection, array $params): array { + $i = 0; + $processWrapper = false; + $filteredParams = []; + + foreach ($reflection->getParameters() as $arg) { + $argType = $arg->getType(); + $param = $params[$i]; + if ($argType !== null) { + if ($i === 0 && $argType->getName() === ABlockWrapper::class) { + $processWrapper = true; + continue; // we ignore IProcessWrapper as first argument, as it will be filled at execution time + } + + // TODO: compare $argType with $param +// foreach($argType->getTypes() as $t) { +// echo '> ' . $t . "\n"; +// } +// $arg->allowsNull() +// $arg->isOptional() + + } + + // TODO: we might want to filter some params ? + $filteredParams[] = $param; + +// echo '..1. ' . json_encode($param) . "\n"; +// echo '..2. ' . serialize($param) . "\n"; +// echo '? ' . gettype($param) . "\n"; + + $i++; + } + + try { + $serializedParams = serialize($params); + } catch (\Exception $e) { + throw $e; + } + + return [ + 'params' => $serializedParams, + 'paramsClasses' => $this->extractClassFromArray($filteredParams), + 'processWrapper' => $processWrapper, + ]; + } + +// +// public function unserializeParams(array $data): array { +// return $data; +// } +// + + private function extractClassFromArray(array $arr, array &$classes = []): array { + foreach ($arr as $entry) { + if (is_array($entry)) { + $this->extractClassFromArray($entry, $classes); + } + + if (is_object($entry)) { + $class = get_class($entry); + if (!in_array($class, $classes, true)) { + $classes[] = $class; + } + } + } + + return $classes; + } + + private function generateToken(int $length = 15): string { + $result = ''; + for ($i = 0; $i < $length; $i++) { + $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)]; + } + + return $result; + } + + public function dropAllBlocks(): void { + $this->blockMapper->deleteAll(); + } + + public function resetConfig(): void { + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST); + } +} diff --git a/lib/private/Async/AsyncProcess.php b/lib/private/Async/AsyncProcess.php new file mode 100644 index 00000000000..44d990749d7 --- /dev/null +++ b/lib/private/Async/AsyncProcess.php @@ -0,0 +1,64 @@ +<?php + +namespace OC\Async; + +use Laravel\SerializableClosure\SerializableClosure; +use Laravel\SerializableClosure\Serializers\Native; +use OC\Async\Enum\BlockType; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\IAsyncProcess; +use ReflectionFunction; +use ReflectionMethod; + +class AsyncProcess implements IAsyncProcess { + public function __construct( + private AsyncManager $asyncManager, + private readonly ForkManager $forkManager, + ) { + } + + public function exec(\Closure $closure, ...$params): IBlockInterface { + return $this->asyncManager->asyncBlock( + BlockType::CLOSURE, + serialize(new SerializableClosure($closure)), + new ReflectionFunction($closure), + $params, + [SerializableClosure::class, Native::class] + ); + } + + public function invoke(callable $obj, ...$params): IBlockInterface { + return $this->asyncManager->asyncBlock( + BlockType::INVOKABLE, + serialize($obj), + new ReflectionMethod($obj, '__invoke'), + $params, + [$obj::class] + ); + } + + public function call(string $class, ...$params): IBlockInterface { + // abstract ? + if (!method_exists($class, 'async')) { + throw new \Exception('class ' . $class . ' is missing async() method'); + } + + return $this->asyncManager->asyncBlock( + BlockType::CLASSNAME, + $class, + new ReflectionMethod($class, 'async'), + $params, + ); + } + + /** + * close the creation of the session and start async as soon as possible + * + * @param ProcessExecutionTime $time preferred urgency to start the async process + * + * @return string session token, empty if no opened session + */ + public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string { + return $this->asyncManager->async($time); + } +} diff --git a/lib/private/Async/Db/BlockMapper.php b/lib/private/Async/Db/BlockMapper.php new file mode 100644 index 00000000000..6d6409327a4 --- /dev/null +++ b/lib/private/Async/Db/BlockMapper.php @@ -0,0 +1,260 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Db; + +use OC\Async\Exceptions\BlockNotFoundException; +use OC\Async\Model\Block; +use OCP\AppFramework\Db\DoesNotExistException; +use OCP\AppFramework\Db\QBMapper; +use OCP\Async\Enum\BlockStatus; +use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IDBConnection; +use Psr\Log\LoggerInterface; + +/** + * @template-extends QBMapper<Block> + */ +class BlockMapper extends QBMapper { + public const TABLE = 'async_process'; + + public function __construct( + IDBConnection $db, + private LoggerInterface $logger, + ) { + parent::__construct($db, self::TABLE, Block::class); + } + + /** + * @return Block[] + */ + public function getBySession(string $sessionToken): array { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken))) + ->orderBy('id', 'asc'); + + return $this->findEntities($qb); + } + + /** + * @return string[] + */ + public function getSessions(): array { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('session_token') + ->from($this->getTableName()); + + $result = $qb->executeQuery(); + + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + return $sessions; + } + + + /** + * returns list of sessionId that contains process with: + * - at least one is in STANDBY with an older timestamp in next_run + * - none as RUNNING or BLOCKER + * + * @return string[] + */ + public function getSessionOnStandBy(): array { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('t1.session_token') + ->from($this->getTableName(), 't1') + ->leftJoin( + 't1', $this->getTableName(), 't2', + $qb->expr()->andX( + $qb->expr()->eq('t1.session_token', 't2.session_token'), + $qb->expr()->in('t2.status', $qb->createNamedParameter([BlockStatus::PREP->value, BlockStatus::RUNNING->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY)) + ) + ) + ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->isNull('t2.status')); + + $result = $qb->executeQuery(); + + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + return $sessions; + } + + /** + * reset to STANDBY all process: + * - marked as ERROR or BLOCKER, + * - next_run in an older timestamp + * - next_run not at zero + */ + public function resetFailedBlock(): int { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT)) + ->where($qb->expr()->in('status', $qb->createNamedParameter([BlockStatus::ERROR->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY))) + ->andWhere($qb->expr()->lt('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->neq('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT))); + return $qb->executeStatement(); + } + + /** + * delete sessions that contain only process with status at ::SUCCESS + */ + public function removeSuccessfulBlock(): int { + $qb = $this->db->getQueryBuilder(); + $qb->selectDistinct('t1.session_token') + ->from($this->getTableName(), 't1') + ->leftJoin( + 't1', $this->getTableName(), 't2', + $qb->expr()->andX( + $qb->expr()->eq('t1.session_token', 't2.session_token'), + $qb->expr()->neq('t2.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT)) + ) + ) + ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))) + ->andWhere($qb->expr()->isNull('t2.status')); + + $result = $qb->executeQuery(); + $sessions = []; + while ($row = $result->fetch()) { + $sessions[] = $row['session_token']; + } + $result->closeCursor(); + + $count = 0; + $chunks = array_chunk($sessions, 30); + foreach($chunks as $chunk) { + $delete = $this->db->getQueryBuilder(); + $delete->delete($this->getTableName()) + ->where($qb->expr()->in('session_token', $delete->createNamedParameter($chunk, IQueryBuilder::PARAM_INT_ARRAY))); + $count += $delete->executeStatement(); + unset($delete); + } + + return $count; + } + + /** + * return a Block from its token + * + * @throws BlockNotFoundException + */ + public function getByToken(string $token): Block { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->where($qb->expr()->eq('token', $qb->createNamedParameter($token))); + + try { + return $this->findEntity($qb); + } catch (DoesNotExistException) { + throw new BlockNotFoundException('no process found'); + } + } + + public function updateStatus(Block $block, ?BlockStatus $prevStatus = null, string $lockToken = ''): bool { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter($block->getStatus())) + ->where($qb->expr()->andX( + $qb->expr()->eq('session_token', $qb->createNamedParameter($block->getSessionToken())), + $qb->expr()->eq('token', $qb->createNamedParameter($block->getToken())) + ) + ); + + // if process contain a lockToken, we conflict with the stored one + if ($block->getLockToken() !== '') { + $qb->andWhere($qb->expr()->eq('lock_token', $qb->createNamedParameter($block->getLockToken()))); + } + + // if status is switching from standby to running, we set last_run to detect timeout + if ($block->getBlockStatus() === BlockStatus::RUNNING && $prevStatus === BlockStatus::STANDBY) { + $qb->set('last_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)); + } + + // if status is updated to success, error or blocker, we store result (or exception) + if (in_array($block->getBlockStatus(), [BlockStatus::SUCCESS, BlockStatus::ERROR, BlockStatus::BLOCKER], true)) { + try { + $qb->set('result', $qb->createNamedParameter(json_encode($block->getResult(), JSON_THROW_ON_ERROR))); + $qb->set('metadata', $qb->createNamedParameter(json_encode($block->getMetadata(), JSON_THROW_ON_ERROR))); + $qb->set('next_run', $qb->createNamedParameter($block->getNextRun(), IQueryBuilder::PARAM_INT)); + } catch (\JsonException $e) { + $this->logger->warning('could not json_encode process result', ['exception' => $e]); + } + } + + if ($lockToken !== '') { + $qb->set('lock_token', $qb->createNamedParameter($lockToken)); + } + + if ($prevStatus !== null) { + $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT))); + } + + return ($qb->executeStatement() === 1); + } + + public function updateSessionStatus(string $sessionToken, BlockStatus $status, ?BlockStatus $prevStatus = null): int { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('status', $qb->createNamedParameter($status->value, IQueryBuilder::PARAM_INT)) + ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken))); + + if ($prevStatus !== null) { + $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT))); + } + + return $qb->executeStatement(); + } + + + + /** + * set next_run to current time if next_run>0 in relation to session_token. + * Force process queued for replay to be run as soon as possible. + */ + public function resetSessionNextRun(string $sessionToken): void { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)) + ->where( + $qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)), + $qb->expr()->gt('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)), + ); + + $qb->executeStatement(); + } + + +// public function updateDataset(string $token, array $dataset): void { +// $qb = $this->db->getQueryBuilder(); +// $qb->update($this->getTableName()) +// ->set('dataset', $qb->createNamedParameter(json_encode($dataset))) +// ->where($qb->expr()->eq('token', $qb->createNamedParameter($token))); +// +// $qb->executeStatement(); +// } + + public function deleteAll() { + $qb = $this->db->getQueryBuilder(); + $qb->delete($this->getTableName()); + $qb->executeStatement(); + } + +} diff --git a/lib/private/Async/Enum/BlockType.php b/lib/private/Async/Enum/BlockType.php new file mode 100644 index 00000000000..839d0d24f05 --- /dev/null +++ b/lib/private/Async/Enum/BlockType.php @@ -0,0 +1,16 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Enum; + +enum BlockType: int { + case CLOSURE = 1; + case INVOKABLE = 2; + case CLASSNAME = 3; +} diff --git a/lib/private/Async/Exceptions/AsyncProcessException.php b/lib/private/Async/Exceptions/AsyncProcessException.php new file mode 100644 index 00000000000..b394a95df7c --- /dev/null +++ b/lib/private/Async/Exceptions/AsyncProcessException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class AsyncProcessException extends Exception { +} diff --git a/lib/private/Async/Exceptions/BlockAlreadyRunningException.php b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php new file mode 100644 index 00000000000..4c671d91d55 --- /dev/null +++ b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +class BlockAlreadyRunningException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/BlockNotFoundException.php b/lib/private/Async/Exceptions/BlockNotFoundException.php new file mode 100644 index 00000000000..6e25415f7e3 --- /dev/null +++ b/lib/private/Async/Exceptions/BlockNotFoundException.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +class BlockNotFoundException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/LoopbackEndpointException.php b/lib/private/Async/Exceptions/LoopbackEndpointException.php new file mode 100644 index 00000000000..2a7bea78239 --- /dev/null +++ b/lib/private/Async/Exceptions/LoopbackEndpointException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class LoopbackEndpointException extends AsyncProcessException { +} diff --git a/lib/private/Async/Exceptions/SessionBlockedException.php b/lib/private/Async/Exceptions/SessionBlockedException.php new file mode 100644 index 00000000000..9c6d85dc4fc --- /dev/null +++ b/lib/private/Async/Exceptions/SessionBlockedException.php @@ -0,0 +1,14 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Exceptions; + +use Exception; + +class SessionBlockedException extends AsyncProcessException { +} diff --git a/lib/private/Async/ForkManager.php b/lib/private/Async/ForkManager.php new file mode 100644 index 00000000000..6978a601bd4 --- /dev/null +++ b/lib/private/Async/ForkManager.php @@ -0,0 +1,485 @@ +<?php + +namespace OC\Async; + +use OC\Async\Db\BlockMapper; +use OC\Async\Enum\BlockType; +use OC\Async\Exceptions\AsyncProcessException; +use OC\Async\Exceptions\LoopbackEndpointException; +use OC\Async\Exceptions\BlockAlreadyRunningException; +use OC\Async\Exceptions\SessionBlockedException; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Async\Model\SessionInterface; +use OC\Async\Wrappers\DummyBlockWrapper; +use OC\Async\Wrappers\LoggerBlockWrapper; +use OC\Config\Lexicon\CoreConfigLexicon; +use OC\DB\Connection; +use OCP\Async\Enum\BlockActivity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; +use OCP\Async\IAsyncProcess; +use OCP\Http\Client\IClientService; +use OCP\IAppConfig; +use OCP\IConfig; +use OCP\IURLGenerator; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Output\OutputInterface; + +class ForkManager { + private ?ABlockWrapper $wrapper; + + /** @var int[] */ + private array $forks = []; + private const FORK_LIMIT = 3; // maximum number of child process + private const FORK_SLEEP = 500000; // wait for msec when too many fork have been created + + public function __construct( + private BlockMapper $blockMapper, + private Connection $conn, + private IAppConfig $appConfig, + private IConfig $config, + private IClientService $clientService, + private IURLGenerator $urlGenerator, + LoggerBlockWrapper $loggerProcessWrapper, + private LoggerInterface $logger, + ) { + $this->wrapper = $loggerProcessWrapper; + } + + public function setWrapper(?ABlockWrapper $wrapper): void { + $this->wrapper = $wrapper; + } + + + /** + * @throws BlockAlreadyRunningException + * @throws AsyncProcessException + */ + public function runSession(string $token, array $metadata = []): void { + $sessionBlocks = $this->blockMapper->getBySession($token); + $metadata['sessionToken'] = $token; + + if ($this->wrapper !== null) { + $wrapper = clone $this->wrapper; + } else { + $wrapper = null; + } + + // might be need to avoid some conflict/race condition + // usleep(10000); + $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlocks)); + + if ($sessionIface->getGlobalStatus() !== BlockStatus::STANDBY) { + throw new AsyncProcessException(); + } + + $wrapper?->setSessionInterface($sessionIface); + $wrapper?->session($metadata); + + try { + foreach ($sessionBlocks as $block) { + if (!$this->confirmBlockRequirement($wrapper, $sessionIface, $block)) { + $block->replay(); + $this->blockMapper->update($block); + continue; + } + + $block->addMetadata($metadata); + $this->runBlock($block, $wrapper); + + if ($block->getBlockStatus() === BlockStatus::BLOCKER) { + $wrapper?->end('Fail process ' . $block->getToken() . ' block the rest of the session'); + throw new SessionBlockedException(); + } + } + $wrapper?->end(); + } catch (BlockAlreadyRunningException) { + $wrapper?->end('already running'); + throw new BlockAlreadyRunningException(); + } + } + + private function confirmBlockRequirement( + ?ABlockWrapper $wrapper, + SessionInterface $sessionIface, + Block $block + ): bool { + $procIface = new BlockInterface(null, $block); + foreach ($procIface->getRequire() as $requiredProcessId) { + $requiredBlock = $sessionIface->byId($requiredProcessId); + if ($requiredBlock === null) { + $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' which is not defined'); + return false; + } + if ($requiredBlock?->getStatus() !== BlockStatus::SUCCESS) { + $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' to be executed and successful'); + return false; + } + } + return true; + } + + /** + * @throws BlockAlreadyRunningException + */ + private function runBlock(Block $block, ?ABlockWrapper $wrapper = null): void { + if ($block->getBlockStatus() !== BlockStatus::STANDBY) { + return; + } + + $this->lockBlock($block); + + $data = $block->getParams(); + $serialized = $block->getCode(); + $params = unserialize($data['params'], ['allowed_classes' => $data['paramsClasses']]); + $obj = unserialize($serialized, ['allowed_classes' => $data['blockClasses'] ?? []]); + + if ($data['processWrapper'] ?? false) { + array_unshift($params, ($wrapper ?? new DummyBlockWrapper())); + } + + $wrapper?->setBlock($block); + $wrapper?->init(); + $wrapper?->activity(BlockActivity::STARTING); + $result = [ + 'executionTime' => ProcessExecutionTime::NOW->value, + 'startTime' => time(), + ]; + $iface = new BlockInterface(null, $block); + try { + $returnedData = null; + switch ($block->getBlockType()) { + case BlockType::CLOSURE: + $c = $obj->getClosure(); + $returnedData = $c(...$params); + break; + + case BlockType::INVOKABLE: + $returnedData = $obj(...$params); + break; + + case BlockType::CLASSNAME: + $obj = new $data['className'](); + $returnedData = $obj->async(...$params); + break; + } + if (is_array($returnedData)) { + $result['result'] = $returnedData; + } + $block->setBlockStatus(BlockStatus::SUCCESS); + if ($block->getReplayCount() > 0) { + // in case of success after multiple tentative, we reset next run to right now + // on all block waiting for replay. Easiest solution to find block dependant of + // this current successful run + $this->blockMapper->resetSessionNextRun($block->getSessionToken()); + } + } catch (\Exception $e) { + $wrapper?->activity(BlockActivity::ERROR, $e->getMessage()); + $result['error'] = [ + 'exception' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + 'code' => $e->getCode() + ]; + + if ($iface->isReplayable()) { + $block->replay(); // we mark the block as able to be back to STANDBY status + } else { + $block->setNextRun(0); + } + if ($iface->isBlocker()) { + $block->setBlockStatus(BlockStatus::BLOCKER); + } else { + $block->setBlockStatus(BlockStatus::ERROR); + } + } finally { + $result['endTime'] = time(); + } + + $block->setResult($result); + $wrapper?->activity(BlockActivity::ENDING); + $this->blockMapper->updateStatus($block, BlockStatus::RUNNING); + } + + /** + * @throws BlockAlreadyRunningException + */ + private function lockBlock(Block $block): void { + if ($block->getBlockStatus() !== BlockStatus::STANDBY) { + throw new BlockAlreadyRunningException('block not in standby'); + } + $lockToken = $this->generateToken(7); + $block->setBlockStatus(BlockStatus::RUNNING); + if (!$this->blockMapper->updateStatus($block, BlockStatus::STANDBY, $lockToken)) { + throw new BlockAlreadyRunningException('block is locked'); + } + $block->setLockToken($lockToken); + } + + public function forkSession(string $session, array $metadata = []): void { + if (\OC::$CLI) { + $useWebIfNeeded = $metadata['useWebIfNeeded'] ?? false; + if ($useWebIfNeeded && !extension_loaded('posix')) { + try { + $this->forkSessionLoopback($session); + return; + } catch (\Exception) { + } + } + + $this->forkSessionCli($session, $metadata); + return; + } + + try { + $this->forkSessionLoopback($session); + } catch (LoopbackEndpointException) { + // session will be processed later + } + } + + + private function forkSessionCli(string $session, array $metadata = []): void { + if (!extension_loaded('posix')) { + // log/notice that posix is not loaded + return; + } + $slot = $this->getFirstAvailableSlot(); + $metadata += [ + '_cli' => [ + 'slot' => $slot, + 'forkCount' => count($this->forks), + 'forkLimit' => self::FORK_LIMIT, + ] + ]; + + $pid = pcntl_fork(); + + // work around as the parent database connection is inherited by the child. + // when child process is over, parent process database connection will drop. + // The drop can happen anytime, even in the middle of a running request. + // work around is to close the connection as soon as possible after forking. + $this->conn->close(); + + if ($pid === -1) { + // TODO: manage issue while forking + } else if ($pid === 0) { + // forked process + try { + $this->runSession($session, $metadata); + } catch (AsyncProcessException) { + // failure to run session can be part of the process + } + exit(); + } else { + // store slot+pid + $this->forks[$slot] = $pid; + + // when fork limit is reach, cycle until space freed + while (true) { + $exitedPid = pcntl_waitpid(0, $status, WNOHANG); + if ($exitedPid > 0) { + $slot = array_search($exitedPid, $this->forks, true); + if ($slot) { + unset($this->forks[$slot]); + } + } + if (count($this->forks) < self::FORK_LIMIT) { + return; + } + usleep(self::FORK_SLEEP); + } + } + } + + /** + * Request local loopback endpoint. + * We expect the request to be closed remotely. + * + * Ignored if: + * - the endpoint is not fully configured and tested, + * - the server is on heavy load (timeout at 1 second) + * + * @return string result from the loopback endpoint + * @throws LoopbackEndpointException if not configured + */ + private function forkSessionLoopback(string $session, ?string $loopbackEndpoint = null): string { + $client = $this->clientService->newClient(); + try { + $response = $client->post( + $loopbackEndpoint ?? $this->linkToLoopbackEndpoint(), + [ + 'headers' => [], + 'verify' => false, + 'connect_timeout' => 1.0, + 'timeout' => 1.0, + 'http_errors' => true, + 'body' => ['token' => $session], + 'nextcloud' => [ + 'allow_local_address' => true, + 'allow_redirects' => true, + ] + ] + ); + + return (string)$response->getBody(); + } catch (LoopbackEndpointException $e) { + $this->logger->debug('loopback endpoint not configured', ['exception' => $e]); + throw $e; + } catch (\Exception $e) { + $this->logger->warning('could not reach loopback endpoint to initiate fork', ['exception' => $e]); + throw new LoopbackEndpointException('loopback endpoint cannot be reach', previous: $e); + } + } + + + /** + * return full (absolute) link to the web-loopback endpoint + * + * @param string|null $instance if null, stored loopback address will be used. + * + * @throws LoopbackEndpointException if $instance is null and no stored configuration + */ + public function linkToLoopbackEndpoint(?string $instance = null): string { + return rtrim($instance ?? $this->getLoopbackInstance(), '/') . $this->urlGenerator->linkToRoute('core.AsyncProcess.processFork'); + } + + /** + * return loopback address stored in configuration + * + * @return string + * @throws LoopbackEndpointException if config is not set or empty + */ + public function getLoopbackInstance(): string { + if (!$this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) { + throw new LoopbackEndpointException('loopback not configured'); + } + + $instance = $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS); + if ($instance === '') { + throw new LoopbackEndpointException('empty config'); + } + + return $instance; + } + + + public function discoverLoopbackEndpoint(?OutputInterface $output = null): ?string { + $cliUrl = $this->config->getSystemValueString('overwrite.cli.url', ''); + $output?->write('- testing value from \'overwrite.cli.url\' (<comment>' . $cliUrl . '</comment>)... '); + + $reason = ''; + if ($this->testLoopbackInstance($cliUrl, $reason)) { + $output?->writeln('<info>ok</info>'); + return $cliUrl; + } + + $output?->writeln('<error>' . $reason . '</error>'); + + foreach($this->config->getSystemValue('trusted_domains', []) as $url) { + $url = 'https://' . $url; + $output?->write('- testing entry from \'trusted_domains\' (<comment>' . $url . '</comment>)... '); + if ($this->testLoopbackInstance($url, $reason)) { + $output?->writeln('<info>ok</info>'); + return $url; + } + $output?->writeln('<error>' . $reason . '</error>'); + } + + return null; + } + + + public function testLoopbackInstance(string $url, string &$reason = ''): bool { + $url = rtrim($url, '/'); + if (!$this->pingLoopbackInstance($url)) { + $reason = 'failed ping'; + return false; + } + + $token = $this->generateToken(); + $asyncProcess = \OCP\Server::get(IAsyncProcess::class); + $asyncProcess->exec(function(string $token) { + sleep(1); // enforce a delay to confirm asynchronicity + $appConfig = \OCP\Server::get(IAppConfig::class); + $appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST, $token); + }, $token)->name('test loopback instance')->async(); + + $this->appConfig->clearCache(true); + if ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)) { + $reason = 'async process already executed'; + return false; + } + + sleep(3); + $this->appConfig->clearCache(true); + $result = ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)); + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST); + + return $result; + } + + private function pingLoopbackInstance(string $url): bool { + $pingLoopback = $this->generateToken(); + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING, $pingLoopback); + try { + $result = $this->forkSessionLoopback('__ping__', $this->linkToLoopbackEndpoint($url)); + $result = json_decode($result, true, flags: JSON_THROW_ON_ERROR); + } catch (\JsonException|LoopbackEndpointException $e) { + $this->logger->debug('could not ping loopback endpoint', ['exception' => $e]); + } + + $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING); + return (($result['ping'] ?? '') === $pingLoopback); + } + + + /** + * note that the fact we fork process to run a session of processes before doing + * a check on the fact that maybe one of the process of the session is already + * running can create a small glitch when choosing the first available slot as + * a previous fork running said check is already made and will exit shortly. + * + * @return int + */ + private function getFirstAvailableSlot(): int { + $slot = -1; + for ($i = 0; $i < self::FORK_LIMIT; $i++) { + if (!array_key_exists($i, $this->forks)) { + return $i; + } + + // we confirm child process still exists + if (pcntl_waitpid($this->forks[$i], $status, WNOHANG) > 0) { + return $i; + } + } + + if ($slot === -1) { + // TODO: should not happens: log warning + } + + return -1; + } + + private function generateToken(int $length = 15): string { + $result = ''; + for ($i = 0; $i < $length; $i++) { + $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)]; + } + + return $result; + } + + /** + * we wait until all child process are done + * + * @noinspection PhpStatementHasEmptyBodyInspection + */ + public function waitChildProcess(): void { + while (pcntl_waitpid(0, $status) != -1) { + } + } + +} diff --git a/lib/private/Async/IBlockInterface.php b/lib/private/Async/IBlockInterface.php new file mode 100644 index 00000000000..33b046b42ff --- /dev/null +++ b/lib/private/Async/IBlockInterface.php @@ -0,0 +1,36 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +interface IBlockInterface { + public function getToken(): string; + public function id(string $id): self; + public function getId(): string; + public function name(string $name): self; + public function getName(): string; + public function require(string $id): self; + public function getRequire(): array; + public function delay(int $delay): self; + public function getDelay(): int; + public function getExecutionTime(): ?ProcessExecutionTime; + public function blocker(bool $blocker = true): self; + public function isBlocker(): bool; + public function replayable(bool $replayable = true): self; + public function isReplayable(): bool; + public function getStatus(): BlockStatus; + public function getResult(): ?array; + public function getError(): ?array; + public function dataset(array $dataset): self; + public function getDataset(): array; + public function async(ProcessExecutionTime $time): self; +} diff --git a/lib/private/Async/ISessionInterface.php b/lib/private/Async/ISessionInterface.php new file mode 100644 index 00000000000..19307f915bb --- /dev/null +++ b/lib/private/Async/ISessionInterface.php @@ -0,0 +1,21 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async; + +use OC\Async\Model\BlockInterface; +use OCP\Async\Enum\BlockStatus; + +interface ISessionInterface { + public function getAll(): array; + public function byToken(string $token): ?BlockInterface; + public function byId(string $id): ?BlockInterface; + public function getGlobalStatus(): BlockStatus; + +} diff --git a/lib/private/Async/Model/Block.php b/lib/private/Async/Model/Block.php new file mode 100644 index 00000000000..9c66ec13ad8 --- /dev/null +++ b/lib/private/Async/Model/Block.php @@ -0,0 +1,142 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\Enum\BlockType; +use OCP\AppFramework\Db\Entity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +/** + * @method void setToken(string $token) + * @method string getToken() + * @method void setSessionToken(string $sessionToken) + * @method string getSessionToken() + * @method void setType(int $type) + * @method int getType() + * @method void setCode(string $code) + * @method string getCode() + * @method void setParams(array $params) + * @method ?array getParams() + * @method void setDataset(array $dataset) + * @method ?array getDataset() + * @method void setMetadata(array $metadata) + * @method ?array getMetadata() + * @method void setLinks(array $params) + * @method ?array getLinks() + * @method void setOrig(array $orig) + * @method ?array getOrig() + * @method void setResult(array $result) + * @method ?array getResult() + * @method void setStatus(int $status) + * @method int getStatus() + * @method void setExecutionTime(int $status) + * @method int getExecutionTime() + * @method void setLockToken(string $lockToken) + * @method string getLockToken() + * @method void setCreation(int $creation) + * @method int getCreation() + * @method void setLastRun(int $lastRun) + * @method int getLastRun() + * @method void setNextRun(int $nextRun) + * @method int getNextRun() + * @psalm-suppress PropertyNotSetInConstructor + */ +class Block extends Entity { + protected string $token = ''; + protected string $sessionToken = ''; + protected int $type = 0; + protected string $code = ''; + protected ?array $params = []; + protected ?array $dataset = []; + protected ?array $metadata = []; + protected ?array $links = []; + protected ?array $orig = []; + protected ?array $result = []; + protected int $status = 0; + protected int $executionTime = 0; + protected string $lockToken = ''; + protected int $creation = 0; + protected int $lastRun = 0; + protected int $nextRun = 0; + + public function __construct() { + $this->addType('token', 'string'); + $this->addType('sessionToken', 'string'); + $this->addType('type', 'integer'); + $this->addType('code', 'string'); + $this->addType('params', 'json'); + $this->addType('dataset', 'json'); + $this->addType('metadata', 'json'); + $this->addType('links', 'json'); + $this->addType('orig', 'json'); + $this->addType('result', 'json'); + $this->addType('status', 'integer'); + $this->addType('executionTime', 'integer'); + $this->addType('lockToken', 'string'); + $this->addType('creation', 'integer'); + $this->addType('lastRun', 'integer'); + $this->addType('nextRun', 'integer'); + } + + public function setBlockType(BlockType $type): void { + $this->setType($type->value); + } + + public function getBlockType(): BlockType { + return BlockType::from($this->getType()); + } + + public function setBlockStatus(BlockStatus $status): void { + $this->setStatus($status->value); + } + + public function getBlockStatus(): BlockStatus { + return BlockStatus::from($this->getStatus()); + } + + public function setProcessExecutionTime(ProcessExecutionTime $time): void { + $this->setExecutionTime($time->value); + } + + public function getProcessExecutionTime(): ProcessExecutionTime { + return ProcessExecutionTime::from($this->getExecutionTime()); + } + + public function addMetadata(array $metadata): self { + $metadata = array_merge($this->metadata, $metadata); + $this->setMetadata($metadata); + return $this; + } + + public function addMetadataEntry(string $key, string|int|array|bool $value): self { + $metadata = $this->metadata; + $metadata[$key] = $value; + $this->setMetadata($metadata); + return $this; + } + + public function replay(bool $now = false): void { + $count = $this->getMetadata()['_replay'] ?? 0; + $next = time() + floor(6 ** ($count + 1)); // calculate delay based on count of retry + $count = min(++$count, 5); // limit to 6^6 seconds (13h) + + if ($now) { + $next = time(); + } + + $this->addMetadataEntry('_replay', $count); + $this->setNextRun($next); + } + + public function getReplayCount(): int { + return $this->getMetadata()['_replay'] ?? 0; + } +} diff --git a/lib/private/Async/Model/BlockInterface.php b/lib/private/Async/Model/BlockInterface.php new file mode 100644 index 00000000000..36ba58194f7 --- /dev/null +++ b/lib/private/Async/Model/BlockInterface.php @@ -0,0 +1,183 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\AsyncManager; +use OC\Async\IBlockInterface; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\Enum\BlockStatus; + +class BlockInterface implements IBlockInterface, \JsonSerializable { + private string $id = ''; + private string $name = ''; + private bool $blocker = false; + private bool $replayable = false; + + /** @var string[] */ + private array $require = []; + private int $delay = 0; + private ?ProcessExecutionTime $executionTime = null; + private array $dataset = []; + + public function __construct( + private readonly ?AsyncManager $asyncManager, + private readonly Block $block, + ) { + $this->import($block->getMetadata()['_iface'] ?? []); + } + + public function getBlock(): Block { + return $this->block; + } + + public function getToken(): string { + return $this->block->getToken(); + } + + public function getResult(): ?array { + if ($this->block->getBlockStatus() !== BlockStatus::SUCCESS) { + return null; + } + + return $this->block->getResult()['result']; + } + + public function getError(): ?array { + if ($this->block->getBlockStatus() !== BlockStatus::ERROR) { + return null; + } + + return $this->block->getResult()['error']; + } + + public function getStatus(): BlockStatus { + return $this->block->getBlockStatus(); + } + + public function id(string $id): self { + $this->id = strtoupper($id); + return $this; + } + + public function getId(): string { + return $this->id; + } + public function name(string $name): self { + $this->name = $name; + return $this; + } + + public function getName(): string { + return $this->name; + } + + public function blocker(bool $blocker = true): self { + $this->blocker = $blocker; + return $this; + } + + public function isBlocker(): bool { + return $this->blocker; + } + + public function replayable(bool $replayable = true): self { + $this->replayable = $replayable; + return $this; + } + + public function isReplayable(): bool { + return $this->replayable; + } + + public function require(string $id): self { + $this->require[] = strtoupper($id); + return $this; + } + + public function getRequire(): array { + return $this->require; + } + + public function delay(int $delay): self { + $this->delay = $delay; + return $this; + } + + public function getDelay(): int { + return $this->delay; + } + + public function getExecutionTime(): ?ProcessExecutionTime { + return $this->executionTime; + } + + /** + * @param array<array> $dataset + * + * @return $this + */ + public function dataset(array $dataset): self { + $this->dataset = $dataset; + return $this; + } + + public function getDataset(): array { + return $this->dataset; + } + +// public function getReplayCount(): int { +// return $this->get +// } + /** + * only available during the creation of the session + * + * @return $this + */ + public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): self { + $this->asyncManager?->async($time); + return $this; + } + + public function import(array $data): void { + $this->token = $data['token'] ?? ''; + $this->id($data['id'] ?? ''); + $this->name($data['name'] ?? ''); + $this->delay($data['delay'] ?? 0); + $this->require = $data['require'] ?? []; + $this->blocker($data['blocker'] ?? false); + $this->replayable($data['replayable'] ?? false); + } + + public function jsonSerialize(): array { + return [ + 'token' => $this->getToken(), + 'id' => $this->getId(), + 'name' => $this->getName(), + 'require' => $this->getRequire(), + 'delay' => $this->getDelay(), + 'blocker' => $this->isBlocker(), + 'replayable' => $this->isReplayable(), + ]; + } + + /** + * @param Block[] $processes + * + * @return BlockInterface[] + */ + public static function asBlockInterfaces(array $processes): array { + $interfaces = []; + foreach ($processes as $process) { + $interfaces[] = new BlockInterface(null, $process); + } + + return $interfaces; + } +} diff --git a/lib/private/Async/Model/SessionInterface.php b/lib/private/Async/Model/SessionInterface.php new file mode 100644 index 00000000000..c481fc545f6 --- /dev/null +++ b/lib/private/Async/Model/SessionInterface.php @@ -0,0 +1,83 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Model; + +use OC\Async\ISessionInterface; +use OCP\Async\Enum\BlockStatus; + +class SessionInterface implements ISessionInterface { + public function __construct( + /** @var BlockInterface[] */ + private array $interfaces, + ) { + } + + public function getAll(): array { + return $this->interfaces; + } + + public function byToken(string $token): ?BlockInterface { + foreach ($this->interfaces as $iface) { + if ($iface->getToken() === $token) { + return $iface; + } + } + + return null; + } + + public function byId(string $id): ?BlockInterface { + foreach($this->interfaces as $iface) { + if ($iface->getId() === $id) { + return $iface; + } + } + + return null; + } + + /** + * return a global status of the session based on the status of each process + * - if one entry is still at ::PREP stage, we return ::PREP + * - if one ::BLOCKER, returns ::BLOCKER. + * - if all ::SUCCESS, returns ::SUCCESS, else ignored. + * - if all ::ERROR+::SUCCESS, returns ::ERROR, else ignored. + * - session status is the biggest value between all left process status (::STANDBY, ::RUNNING) + */ + public function getGlobalStatus(): BlockStatus { + $current = -1; + $groupedStatus = []; + foreach($this->interfaces as $iface) { + // returns ::PREP if one process is still ::PREP + if ($iface->getStatus() === BlockStatus::PREP) { + return BlockStatus::PREP; + } + // returns ::BLOCKER if one process is marked ::BLOCKER + if ($iface->getStatus() === BlockStatus::BLOCKER) { + return BlockStatus::BLOCKER; + } + // we keep trace if process is marked as ::ERROR or ::SUCCESS + // if not, we keep the highest value + if (in_array($iface->getStatus(), [BlockStatus::ERROR, BlockStatus::SUCCESS], true)) { + $groupedStatus[$iface->getStatus()->value] = true; + } else { + $current = max($current, $iface->getStatus()->value); + } + } + + // in case the all interface were ::ERROR or ::SUCCESS, we check + // if there was at least one ::ERROR. if none we return ::SUCCESS + if ($current === -1) { + return (array_key_exists(BlockStatus::ERROR->value, $groupedStatus)) ? BlockStatus::ERROR : BlockStatus::SUCCESS; + } + + return BlockStatus::from($current); + } +} diff --git a/lib/private/Async/README.md b/lib/private/Async/README.md new file mode 100644 index 00000000000..185047f52db --- /dev/null +++ b/lib/private/Async/README.md @@ -0,0 +1,379 @@ + + +# AsyncProcess + +Using `IAsyncProcess` allow the execution of code on a separated process in order to improve the quality of the user experience. + + +## Concept + +To shorten the hanging time on heavy process that reflect on the user experience and to avoid delay between +instruction and execution, this API allows to prepare instructions to be executed on a parallel thread. + +This is obtained by creating a loopback HTTP request, initiating a fresh PHP process that will execute the +prepared instruction after emulating a connection termination, freeing the main process. + +#### Technology + +The logic is to: + +- store a serialized version of the code to be executed on another process into database. +- start a new process as soon as possible that will retrieve the code from database and execute it. + +#### Setup + +The feature require setting a loopback url. +This is done automatically by a background job. The automatic process uses `'overwrite.cli.url'` and `'trusted_domains'` from _config/config.php_ to find a list of domain name to test. +It can be initiated via `occ`: + +> ./occ async:setup --discover + +Or manually set with: + +> ./occ async:setup --loopback https://cloud.example.net/ + + + +## Blocks & Sessions + +- We will define as _Block_ complete part of unsplittable code. +- A list of _Blocks_ can be grouped in _Sessions_. +- While _Sessions_ are independent of each other, interactions can be set between _Blocks_ of the same _Session_. + +**Interactions** + +- _Blocks_ are executed in the order they have been created. +- It is possible for a _Block_ to get results from a previous process from the session. +- A _Block_ defined as blocker will stop further process from that session on failure. +- A _Block_ can require a previous process to be successful before being executed. + +**Replayability** + +- A block can be set as _replayable_, meaning that in case of failure it can be run multiple time until it end properly. + +**Quick example** + +```php + +// define all part of the code that can be async +$this->asyncProcess->invoke($myInvoke1)->id('block1')->replayable(); // block1 can be replayed until successful +$this->asyncProcess->invoke($myInvoke2)->id('block2')->require('block1'); // block2 will not be executed until block1 has not been successful +$this->asyncProcess->invoke($myInvoke3)->id('block3')->blocker(); // block3 will run whatever happened to block1 and block2 and its suc1cess is mandatory to continue with the session +$this->asyncProcess->invoke($myInvoke4)->id('block4'); + +$this->asyncProcess->async(); // close the session and initiate async execution +``` + + +## ProcessExecutionTime + +Code is to be executed as soon as defined, with alternative fallback solutions in that order: + +- `::NOW` - main process will fork and execute the code in parallel (instantly) +- `::ASAP` - process will be executed by an optional live service (a second later) +- `::LATER` - process will be executed at the next execution of the cron tasks (within the next 10 minutes) +- `::ON_REQUEST` - process needs to be executed manually + + +## IAsyncProcess + +`IAsyncProcess` is the public interface that contains few methods to prepare the code to be processed on a parallel process. + + +#### Closure + +The code can be directly written in a closure by calling `exec()`: + +```php +$this->asyncProcess->exec(function (int $value, string $line, MyObject $obj): void { + // long process +}, + random_int(10000, 99999), + 'this is a line', + $myObj +)->async(); +``` + + +#### Invokable + +Within the magic method `__invoke()` in a configured objects + +```php +class MyInvoke { + public function __construct( + private array $data = [] + ) { + } + + public function __invoke(int $n): void { + // do long process + } +} + + +$myInvoke = new MyInvoke(['123']); +$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->async(); +``` + + +#### PHP Class + +Via the method `async()` from a class + +```php +<?php + +namespace OCA\MyApp; + +class MyObj { + public function __construct( + ) { + } + + public function async(int $n): void { + // run heavy stuff + } +} +``` + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class, random_int(10000, 99999))->async(); +``` + + + +## IBlockInterface + +When storing a new _Block_ via `IAsyncProcess::call()`,`IAsyncProcess::invoke()` or `IAsyncProcess::async()`, will be returned a `IBlockInterface` to provided details about the _Block_. + + +### name(string) + +Identification and/or description of the _Block_ for better understanding when debugging + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->name('my process'); +``` + +### id(string) + +Identification of the _Block_ for future interaction between _Blocks_ within the same _Session_ + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->id('my_process'); +``` + +As an example, `id` are to be used to obtain `IBlockInterface` from a specific _Block_: +```php +ISessionInterface::byId('my_process'); // returns IBlockInterface +``` + + +### blocker() + +Set current _Block_ as _Blocker_, meaning that further _Blocks_ of the _Session_ are lock until this process does not run successfully + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->blocker(); +``` + + +### require(string) + +Define that the _Block_ can only be executed if set _Block_, identified by its `id`, ran successfully. +Multiple _Blocks_ can be set a required. + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->require('other_block_1')->require('other_block_2'); +``` + +### replayable() + +The _Block_ is configured as replayable, meaning that it will be restarted until it runs correctly + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->replayable(); +``` + +The delay is calculated using 6 (six) exponent current retry, capped at 6: + +- 1st retry after few seconds, +- 2nd retry after 30 seconds, +- 3rd retry after 3 minutes, +- 4th retry after 20 minutes, +- 5th retry after 2 hours, +- 6th retry after 12 hours, +- other retries every 12 hours. + +### delay(int) + +Only try to initiate the process n seconds after current time. + +### dataset(array) + +It is possible to set a list of arguments to be applied to the same _Block_. +The _Block_ will be executed for each defined set of data + +```php +$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->dataset( + [ + ['this is a string', 1], + ['this is another string', 12], + ['and another value as first parameter', 42], + ] +); +``` + + +### post-execution + +Post execution of a _Block_, its `IBlockInterface` can be used to get details about it: + +- **getExecutionTime()** returns the `ProcessExecutionTime` that initiated the process, +- **getResult()** returns the array returned by the process in case of success, +- **getError()** returns the error in case of failure + + + +## ISessionInterface + +`ISessionInterface` is available to your code via `ABlockWrapper` and helps interaction between all the _Blocks_ of the same _Session_ + +### getAll() + +returns all `IBlockInterface` from the _Session_ + +### byToken(string) + +returns a `IBockInterface` using its token + +### byId(string) + +returns a `IBockInterface` using its `id` + +### getGlobalStatus() + +return a `BlockStatus` (Enum) based on every status of all _Blocks_ of the _Session_: + +- returns `::PREP` if one block is still at prep stage, +- return `::BLOCKER` if at least one block is set as `blocker` and is failing, +- returns `::SUCCESS` if all blocks are successful, +- returns `::ERROR` if all process have failed, +- returns `::STANDBY` or `::RUNNING` if none of the previous condition are met. `::RUNNING` if at least one `Block` is currently processed. + + + + +## ABlockWrapper + +This abstract class helps to interface with other _Blocks_ from the same _Session_. +It will be generated and passed as argument to the defined block if the first parameter is an `AprocessWrapper` is expected as first parameter: + + +As a _Closure_ +```php +$this->asyncProcess->exec(function (ABlockWrapper $wrapper, array $data): array { + $resultFromProc1 = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success, unless using require() + $wrapper->activity(BlockActivity::NOTICE, 'result from previous process: ' . json_encode($resultFromProc1)) +}, + ['mydata' => true] + )->id('block2'); +``` + +When using `invoke()` +```php +class MyInvoke { + public function __construct( + ) { + } + + public function __invoke(ABlockWrapper $wrapper, int $n): void { + $data = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success + } +} + +$myInvoke = new MyInvoke(); +$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->requipe('block1'); // require ensure block1 has run successfully before this one +``` + +Syntax is the same with `call()`, when defining the `async()` method + +```php +class MyObj { + public function __construct( + ) { + } + + public function async(ABlockWrapper $wrapper): void { + } +} +``` + +#### Abstract methods + +`ABlockWrapper` is an abstract class with a list of interfaced methods that have different behavior on the _BlockWrapper_ sent by the framework. + +- **DummyBlockWrapper** will do nothing, +- **CliBlockWrapper** will generate/manage console output, +- **LoggerBlockWrapper** will only create new nextcloud logs entry. + +List of usefull methods: + +- **activity(BlockActivity $activity, string $line = '');** can be used to update details about current part of your code during its process +- **getSessionInterface()** return the `ISessionInterface` for the session +- **getReplayCount()** returns the number of retry + + +## Other tools + +#### Live Service + +This will cycle every few seconds to check for any session in stand-by mode and will execute its blocks +> ./occ async:live + + +#### Manage sessions and blocks + +Get resume about current session still in database. + +> `./occ async:manage` + +Get details about a session. + +> `./occ async:manage --session <sessionId> + +Get details about a block + +> `./occ async:manage --details <blockId> + +Get excessive details about a block + +> `./occ async:manage --details <blockId> --full-details + +Replay a not successful block + +> `./occ async:manage --replay <blockId> + +#### Mocking process + +`./occ async:setup` allow an admin to generate fake processes to emulate the feature: + +- **--mock-session _int_** create _n_ sessions +- **--mock-block _int_** create _n_ blocks +- **--fail-process _string_** create failing process + + +# Work in Progress + +missing element of the feature: + +- [ ] discussing the need of signing the PHP code stored in database to ensure its authenticity before execution, +- [ ] ability to overwrite the code or arguments of a process to fix a failing process, +- [ ] Check and confirm value type compatibility between parameters and arguments when storing new block +- [ ] implementing dataset(), +- [ ] implementing delay(), +- [ ] full documentation, +- [ ] tests, tests, tests. + + diff --git a/lib/private/Async/Wrappers/CliBlockWrapper.php b/lib/private/Async/Wrappers/CliBlockWrapper.php new file mode 100644 index 00000000000..4c38c7fc995 --- /dev/null +++ b/lib/private/Async/Wrappers/CliBlockWrapper.php @@ -0,0 +1,103 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; +use Symfony\Component\Console\Formatter\OutputFormatterStyle; +use Symfony\Component\Console\Output\OutputInterface; + +class CliBlockWrapper extends ABlockWrapper { + private const TAB_SIZE = 12; + private const HEXA = '258ad'; + private int $hexaLength = 0; + private string $prefix = ''; + private int $slot = 0; + public function __construct( + private ?OutputInterface $output = null, + ) { + $this->hexaLength = strlen(self::HEXA); + } + + public function session(array $metadata): void { + $this->slot = $metadata['_cli']['slot'] ?? 0; + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + '<open>++</>' . ' initiating session ' . + ($metadata['sessionToken'] ?? '') + ); + } + + public function init(): void { + $this->prefix = $this->generatePrefix($this->block->getToken()); + } + + public function activity(BlockActivity $activity, string $line = ''): void { + $act = match ($activity) { + BlockActivity::STARTING => '<open>>></>', + BlockActivity::ENDING => '<close><<</>', + BlockActivity::DEBUG, BlockActivity::NOTICE => ' ', + BlockActivity::WARNING => '<warn>!!</>', + BlockActivity::ERROR => '<error>!!</>', + }; + + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + $act . ' ' . + $this->prefix . ' ' . + $line + ); + } + + public function end(string $line = ''): void { + if ($line === '') { + $this->output->writeln(''); + return; + } + + $this->output->writeln( + str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' . + '<warn>--</>' . ' ' . $line + ); + } + + private function generatePrefix(string $token): string { + $color = 's' . random_int(1, $this->hexaLength - 1) . + random_int(1, $this->hexaLength - 1) . + random_int(1, $this->hexaLength - 1); + + return '<' . $color . '>' . $token . '</>'; + } + + public static function initStyle(OutputInterface $output): void { + $output->getFormatter()->setStyle('open', new OutputFormatterStyle('#0f0', '', ['bold'])); + $output->getFormatter()->setStyle('close', new OutputFormatterStyle('#f00', '', ['bold'])); + $output->getFormatter()->setStyle('warn', new OutputFormatterStyle('#f00', '', [])); + $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#f00', '', ['bold'])); + + $hexaLength = strlen(self::HEXA); + for ($i = 0; $i < $hexaLength; $i++) { + for ($j = 0; $j < $hexaLength; $j++) { + for ($k = 0; $k < $hexaLength; $k++) { + $output->getFormatter()->setStyle( + 's' . $i . $j . $k, + new OutputFormatterStyle( + '#' . self::HEXA[$i] . self::HEXA[$j] . self::HEXA[$k], + '', + ['bold'] + ) + ); + } + } + } + } + +} diff --git a/lib/private/Async/Wrappers/DummyBlockWrapper.php b/lib/private/Async/Wrappers/DummyBlockWrapper.php new file mode 100644 index 00000000000..e719e4f83e5 --- /dev/null +++ b/lib/private/Async/Wrappers/DummyBlockWrapper.php @@ -0,0 +1,27 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; + +class DummyBlockWrapper extends ABlockWrapper { + public function session(array $metadata): void { + } + public function init(): void { + } + + public function activity(BlockActivity $activity, string $line = ''): void { + } + + public function end(string $line = ''): void { + } +} diff --git a/lib/private/Async/Wrappers/LoggerBlockWrapper.php b/lib/private/Async/Wrappers/LoggerBlockWrapper.php new file mode 100644 index 00000000000..66d9c80007c --- /dev/null +++ b/lib/private/Async/Wrappers/LoggerBlockWrapper.php @@ -0,0 +1,39 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Async\Wrappers; + +use OC\Async\ABlockWrapper; +use OC\Async\Model\Block; +use OCP\Async\Enum\BlockActivity; +use Psr\Log\LoggerInterface; + +class LoggerBlockWrapper extends ABlockWrapper { + private array $metadata = []; + public function __construct( + private LoggerInterface $logger, + ) { + } + + public function session(array $metadata): void { + $this->metadata = $metadata; + $this->logger->debug('session: ' . json_encode($metadata)); + } + + public function init(): void { + $this->logger->debug('process: ' . $this->metadata['sessionToken'] . ' ' . $this->block->getToken()); + } + + public function activity(BlockActivity $activity, string $line = ''): void { + $this->logger->debug('activity (' . $activity->value . ') ' . $line); + } + + public function end(string $line = ''): void { + $this->logger->debug('end session - ' . $line); + } +} diff --git a/lib/private/Config/Lexicon/CoreConfigLexicon.php b/lib/private/Config/Lexicon/CoreConfigLexicon.php index 34a0b883c54..05de9378932 100644 --- a/lib/private/Config/Lexicon/CoreConfigLexicon.php +++ b/lib/private/Config/Lexicon/CoreConfigLexicon.php @@ -17,6 +17,10 @@ use NCU\Config\ValueType; * ConfigLexicon for 'core' app/user configs */ class CoreConfigLexicon implements IConfigLexicon { + public const ASYNC_LOOPBACK_ADDRESS = 'loopback_address'; + public const ASYNC_LOOPBACK_PING = 'async_loopback_ping'; + public const ASYNC_LOOPBACK_TEST = 'async_loopback_test'; + public function getStrictness(): ConfigLexiconStrictness { return ConfigLexiconStrictness::IGNORE; } @@ -28,6 +32,9 @@ class CoreConfigLexicon implements IConfigLexicon { public function getAppConfigs(): array { return [ new ConfigLexiconEntry('lastcron', ValueType::INT, 0, 'timestamp of last cron execution'), + new ConfigLexiconEntry(self::ASYNC_LOOPBACK_ADDRESS, ValueType::STRING, '', 'local address of the instance to initiate async process via web request', true), + new ConfigLexiconEntry(self::ASYNC_LOOPBACK_PING, ValueType::STRING, '', 'temporary random string used to confirm web-async loopback endpoint is valid', true), + new ConfigLexiconEntry(self::ASYNC_LOOPBACK_TEST, ValueType::STRING, '', 'temporary random string used to confirm web-async is fully functional', true), ]; } diff --git a/lib/private/Repair.php b/lib/private/Repair.php index c5069bff48e..74525164a79 100644 --- a/lib/private/Repair.php +++ b/lib/private/Repair.php @@ -9,6 +9,7 @@ namespace OC; use OC\DB\ConnectionAdapter; use OC\Repair\AddAppConfigLazyMigration; +use OC\Repair\AddAsyncProcessJob; use OC\Repair\AddBruteForceCleanupJob; use OC\Repair\AddCleanupDeletedUsersBackgroundJob; use OC\Repair\AddCleanupUpdaterBackupsJob; @@ -191,6 +192,7 @@ class Repair implements IOutput { \OCP\Server::get(AddRemoveOldTasksBackgroundJob::class), \OCP\Server::get(AddMetadataGenerationJob::class), \OCP\Server::get(AddAppConfigLazyMigration::class), + \OCP\Server::get(AddAsyncProcessJob::class), \OCP\Server::get(RepairLogoDimension::class), \OCP\Server::get(RemoveLegacyDatadirFile::class), \OCP\Server::get(AddCleanupDeletedUsersBackgroundJob::class), diff --git a/lib/private/Repair/AddAsyncProcessJob.php b/lib/private/Repair/AddAsyncProcessJob.php new file mode 100644 index 00000000000..1a471fb993a --- /dev/null +++ b/lib/private/Repair/AddAsyncProcessJob.php @@ -0,0 +1,30 @@ +<?php + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Repair; + +use OC\Async\ForkManager; +use OC\Config\Lexicon\CoreConfigLexicon; +use OC\Core\BackgroundJobs\AsyncProcessJob; +use OCP\BackgroundJob\IJobList; +use OCP\IAppConfig; +use OCP\Migration\IOutput; +use OCP\Migration\IRepairStep; + +class AddAsyncProcessJob implements IRepairStep { + public function __construct( + private IJobList $jobList, + ) { + } + + public function getName() { + return 'Setup AsyncProcess and queue job to periodically manage the feature'; + } + + public function run(IOutput $output) { + $this->jobList->add(AsyncProcessJob::class); + } +} diff --git a/lib/private/Server.php b/lib/private/Server.php index 545ceacbe81..ef237178eb7 100644 --- a/lib/private/Server.php +++ b/lib/private/Server.php @@ -575,6 +575,7 @@ class Server extends ServerContainer implements IServerContainer { $this->registerAlias(IAppConfig::class, \OC\AppConfig::class); $this->registerAlias(IUserConfig::class, \OC\Config\UserConfig::class); + $this->registerAlias(\OCP\Async\IAsyncProcess::class, \OC\Async\AsyncProcess::class); $this->registerService(IFactory::class, function (Server $c) { return new \OC\L10N\Factory( diff --git a/lib/private/Setup.php b/lib/private/Setup.php index 6d3021aff71..efad8caa7b6 100644 --- a/lib/private/Setup.php +++ b/lib/private/Setup.php @@ -14,6 +14,7 @@ use Exception; use InvalidArgumentException; use OC\Authentication\Token\PublicKeyTokenProvider; use OC\Authentication\Token\TokenCleanupJob; +use OC\Core\BackgroundJobs\AsyncProcessJob; use OC\Log\Rotate; use OC\Preview\BackgroundCleanupJob; use OC\TextProcessing\RemoveOldTasksBackgroundJob; @@ -495,6 +496,7 @@ class Setup { $jobList->add(BackgroundCleanupJob::class); $jobList->add(RemoveOldTasksBackgroundJob::class); $jobList->add(CleanupDeletedUsers::class); + $jobList->add(AsyncProcessJob::class); } /** diff --git a/lib/public/Async/Enum/BlockActivity.php b/lib/public/Async/Enum/BlockActivity.php new file mode 100644 index 00000000000..ee8cf12d825 --- /dev/null +++ b/lib/public/Async/Enum/BlockActivity.php @@ -0,0 +1,18 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OCP\Async\Enum; + +enum BlockActivity: int { + case STARTING = 0; + case DEBUG = 1; + case NOTICE = 2; + case WARNING = 3; + case ERROR = 4; + case ENDING = 9; +} diff --git a/lib/public/Async/Enum/BlockStatus.php b/lib/public/Async/Enum/BlockStatus.php new file mode 100644 index 00000000000..83485880a8e --- /dev/null +++ b/lib/public/Async/Enum/BlockStatus.php @@ -0,0 +1,20 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OCP\Async\Enum; + +enum BlockStatus: int { + case PREP = 0; + case STANDBY = 1; + case RUNNING = 4; + + case BLOCKER = 7; + case ERROR = 8; + case SUCCESS = 9; +} diff --git a/lib/public/Async/Enum/ProcessExecutionTime.php b/lib/public/Async/Enum/ProcessExecutionTime.php new file mode 100644 index 00000000000..600bdce3c91 --- /dev/null +++ b/lib/public/Async/Enum/ProcessExecutionTime.php @@ -0,0 +1,17 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OCP\Async\Enum; + +enum ProcessExecutionTime: int { + case NOW = 0; + case ASAP = 1; + case LATER = 2; + case ON_REQUEST = 9; +} diff --git a/lib/public/Async/IAsyncProcess.php b/lib/public/Async/IAsyncProcess.php new file mode 100644 index 00000000000..9e32178a1cf --- /dev/null +++ b/lib/public/Async/IAsyncProcess.php @@ -0,0 +1,20 @@ +<?php + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OCP\Async; + +use OC\Async\IBlockInterface; +use OCP\Async\Enum\ProcessExecutionTime; + +interface IAsyncProcess { + public function exec(\Closure $closure, ...$params): IBlockInterface; + + public function invoke(callable $obj, ...$params): IBlockInterface; + + public function call(string $class, ...$params): IBlockInterface; + + public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string; +} diff --git a/version.php b/version.php index 0b7a87cc96b..bb36c6f5831 100644 --- a/version.php +++ b/version.php @@ -9,7 +9,7 @@ // between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level // when updating major/minor version number. -$OC_Version = [32, 0, 0, 0]; +$OC_Version = [32, 0, 0, 1]; // The human-readable string $OC_VersionString = '32.0.0 dev'; |