diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/BackgroundJobs/AsyncProcessJob.php | 62 | ||||
-rw-r--r-- | core/Command/Async/Live.php | 54 | ||||
-rw-r--r-- | core/Command/Async/Manage.php | 232 | ||||
-rw-r--r-- | core/Command/Async/Setup.php | 435 | ||||
-rw-r--r-- | core/Controller/AsyncProcessController.php | 77 | ||||
-rw-r--r-- | core/Migrations/Version32000Date20250307222601.php | 64 | ||||
-rw-r--r-- | core/register_command.php | 4 |
7 files changed, 928 insertions, 0 deletions
diff --git a/core/BackgroundJobs/AsyncProcessJob.php b/core/BackgroundJobs/AsyncProcessJob.php new file mode 100644 index 00000000000..3fd43abf5ab --- /dev/null +++ b/core/BackgroundJobs/AsyncProcessJob.php @@ -0,0 +1,62 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\BackgroundJobs; + +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Wrappers\LoggerBlockWrapper; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\BackgroundJob\TimedJob; +use OCP\IAppConfig; + +class AsyncProcessJob extends TimedJob { + public function __construct( + ITimeFactory $time, + private IAppConfig $appConfig, + private ForkManager $forkManager, + private BlockMapper $blockMapper, + private LoggerBlockWrapper $loggerProcessWrapper, + ) { + parent::__construct($time); + + $this->setTimeSensitivity(self::TIME_SENSITIVE); +// $this->setInterval(60 * 5); + $this->setInterval(1); + } + + protected function run(mixed $argument): void { + $this->discoverLoopAddress(); + + $this->forkManager->setWrapper($this->loggerProcessWrapper); + + $this->blockMapper->resetFailedBlock(); + + $metadata = ['executionTime' => ProcessExecutionTime::LATER]; + foreach ($this->blockMapper->getSessionOnStandBy() as $session) { + $this->forkManager->forkSession($session, $metadata); + } + + $this->blockMapper->removeSuccessfulBlock(); + + $this->forkManager->waitChildProcess(); + } + + private function discoverLoopAddress(): void { + if ($this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) { + return; + } + + $found = $this->forkManager->discoverLoopbackEndpoint(); + if ($found !== null) { + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $found); + } + } +} diff --git a/core/Command/Async/Live.php b/core/Command/Async/Live.php new file mode 100644 index 00000000000..89a16a3165d --- /dev/null +++ b/core/Command/Async/Live.php @@ -0,0 +1,54 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\Command\Async; + +use OC\Async\AsyncManager; +use OC\Async\AsyncProcess; +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Wrappers\CliBlockWrapper; +use OCP\Async\Enum\ProcessExecutionTime; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; + +class Live extends Command { + public function __construct( + private readonly BlockMapper $blockMapper, + private readonly ForkManager $forkManager, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:live') + ->setDescription('test'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + CliBlockWrapper::initStyle($output); + $this->forkManager->setWrapper(new CliBlockWrapper($output)); + + $metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP]; + while(true) { + $this->blockMapper->resetFailedBlock(); + + foreach ($this->blockMapper->getSessionOnStandBy() as $session) { + $this->forkManager->forkSession($session, $metadata); + } + + sleep(3); + } + + $this->forkManager->waitChildProcess(); + + return 0; + } +} diff --git a/core/Command/Async/Manage.php b/core/Command/Async/Manage.php new file mode 100644 index 00000000000..9be46b335c8 --- /dev/null +++ b/core/Command/Async/Manage.php @@ -0,0 +1,232 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Command\Async; + +use OC\Async\AsyncManager; +use OC\Async\Db\BlockMapper; +use OC\Async\ForkManager; +use OC\Async\Model\Block; +use OC\Async\Model\BlockInterface; +use OC\Async\Model\SessionInterface; +use OCP\Async\Enum\BlockExecutionTime; +use OCP\Async\Enum\BlockStatus; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Formatter\OutputFormatterStyle; +use Symfony\Component\Console\Helper\QuestionHelper; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ConfirmationQuestion; + +class Manage extends Command { + private bool $noCrop = false; + + public function __construct( + private AsyncManager $asyncManager, + private ForkManager $forkManager, + private BlockMapper $blockMapper, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:manage') + ->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session') + ->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all blocks from a session', '') + ->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific block', '') + ->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details') + ->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific block', '') + ->setDescription('manage'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + if ($input->getOption('clean')) { + $count = $this->blockMapper->removeSuccessfulBlock(); + $output->writeln('deleted ' . $count . ' blocks'); + return 0; + } + + $replay = $input->getOption('replay'); + if ($replay !== '') { + $this->replayBlock($input, $output, $replay); + return 0; + } + + $this->noCrop = $input->getOption('full-details'); + $output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', [])); + $output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', [])); + $output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold'])); + $output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', [])); + $output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold'])); + $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', [])); + $output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold'])); + + $details = $input->getOption('details'); + if ($details !== '') { + $this->displayBlock($output, $details); + return 0; + } + + $session = $input->getOption('session'); + if ($session !== '') { + $this->displaySession($output, $session); + return 0; + } + + $this->summary($output); + return 0; + } + + private function summary(OutputInterface $output): void { + $statuses = []; + foreach ($this->blockMapper->getSessions() as $token) { + $sessionBlockes = $this->blockMapper->getBySession($token); + $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlockes)); + + $status = $sessionIface->getGlobalStatus()->value; + if (!array_key_exists($status, $statuses)) { + $statuses[$status] = []; + } + $statuses[$status][] = $token; + } + + if (!empty($success = $statuses[(string)BlockStatus::SUCCESS->value] ?? [])) { + $output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>'); + } + + if (!empty($prep = $statuses[(string)BlockStatus::PREP->value] ?? [])) { + $output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>'); + } + + if (!empty($stand = $statuses[BlockStatus::STANDBY->value] ?? [])) { + $output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>'); + } + + if (!empty($running = $statuses[BlockStatus::RUNNING->value] ?? [])) { + $output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>'); + } + + if (!empty($err = $statuses[BlockStatus::ERROR->value] ?? [])) { + $output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>'); + } + + if (!empty($blocker = $statuses[BlockStatus::BLOCKER->value] ?? [])) { + $output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>'); + } + } + + + private function displaySession(OutputInterface $output, string $token): void { + foreach ($this->blockMapper->getBySession($token) as $block) { + $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>'); + $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>'); + $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>'); + $output->writeln('BlockStatus: ' . $this->displayStatus($block)); + $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>'); + + $output->writeln(''); + } + + } + + + private function displayBlock(OutputInterface $output, string $token): void { + $block = $this->blockMapper->getByToken($token); + + $output->writeln('SessionToken: <data>' . $block->getSessionToken() . '</data>'); + $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>'); + $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>'); + $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>'); + + $output->writeln('Code: <data>' . $this->cropContent($block->getCode(), 3000, 15) . '</data>'); + $output->writeln('Params: <data>' . $this->cropContent(json_encode($block->getParams()), 3000, 15) . '</data>'); + $output->writeln('Metadata: <data>' . $this->cropContent(json_encode($block->getMetadata()), 3000, 15) . '</data>'); + $output->writeln('Result: <data>' . $this->cropContent(json_encode($block->getResult()), 3000, 15) . '</data>'); + + $output->writeln('Dataset: <data>' . $this->cropContent(json_encode($block->getDataset()), 3000, 15) . '</data>'); + $output->writeln('Links: <data>' . $this->cropContent(json_encode($block->getLinks()), 3000, 15) . '</data>'); + $output->writeln('Orig: <data>' . $this->cropContent(json_encode($block->getOrig()), 3000, 15) . '</data>'); + + $output->writeln('ExecutionTime: <data>' . BlockExecutionTime::tryFrom($block->getExecutionTime())?->name . '</data>'); + $output->writeln('Creation: <data>' . $block->getCreation() . '</data>'); + $output->writeln('LastRun: <data>' . $block->getLastRun() . '</data>'); + $output->writeln('NextRun: <data>' . $block->getNextRun() . '</data>'); + $output->writeln('BlockStatus: ' . $this->displayStatus($block)); + $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>'); + } + + + private function displayStatus(Block $block): string { + $name = $block->getBlockStatus()->name; + $color = strtolower($name); + return '<' . $color . '>' . $name . '</' . $color . '>'; + } + + private function displayInterface(Block $block): string { + $iface = new BlockInterface(null, $block); + + $data = []; + $data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId(); + $data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName(); + $data[] = (!$iface->isReplayable()) ? '' : 'replayable=true'; + $data[] = (!$iface->isBlocker()) ? '' : 'blocker=true'; + $data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire()); + + return implode(', ', array_filter($data)); + } + + + + + private function replayBlock(InputInterface $input, OutputInterface $output, string $token): void { + $block = $this->blockMapper->getByToken($token); + if (!in_array($block->getBlockStatus(), [BlockStatus::ERROR, BlockStatus::BLOCKER], true)) { + $output->writeln('only Block set as ERROR or BLOCKER can be replayed'); + return; + } + + $iface = new BlockInterface(null, $block); + if (!$iface->isReplayable()) { + $output->writeln(''); + $output->writeln('Block is not set as <comment>replayable</comment>.'); + $output->writeln('Replaying this Block can create issues.'); + $output->writeln(''); + $question = new ConfirmationQuestion( + '<comment>Do you really want to replay the Block ' . $token . ' ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + return; + } + } + + $block->replay(true); + $this->blockMapper->update($block); + } + + + /** + * crop content after n lines or n chars + */ + private function cropContent(string $content, int $maxChars, int $maxLines): string { + if ($this->noCrop) { + return $content; + } + preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE); + return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars)); + } +} + diff --git a/core/Command/Async/Setup.php b/core/Command/Async/Setup.php new file mode 100644 index 00000000000..d5b94a9b732 --- /dev/null +++ b/core/Command/Async/Setup.php @@ -0,0 +1,435 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Command\Async; + +use OC\Async\ABlockWrapper; +use OC\Async\AsyncManager; +use OC\Async\Exceptions\LoopbackEndpointException; +use OC\Async\ForkManager; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\Async\Enum\BlockActivity; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\Async\IAsyncProcess; +use OCP\IAppConfig; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Helper\QuestionHelper; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ConfirmationQuestion; + +class Setup extends Command { + public function __construct( + private readonly IAppConfig $appConfig, + private readonly IAsyncProcess $asyncProcess, + private readonly AsyncManager $asyncManager, + private readonly ForkManager $forkManager, + ) { + parent::__construct(); + } + + protected function configure() { + parent::configure(); + $this->setName('async:setup') + ->addOption('reset', '', InputOption::VALUE_NONE, 'reset all data related to the AsyncProcess feature') + ->addOption('drop', '', InputOption::VALUE_NONE, 'drop processes data') + ->addOption('loopback', '', InputOption::VALUE_REQUIRED, 'set loopback address') + ->addOption('discover', '', InputOption::VALUE_NONE, 'initiate the search for a possible loopback address') + ->addOption('yes', '', InputOption::VALUE_NONE, 'answer yes to any confirmation box') + ->addOption('exception-if-no-config', '', InputOption::VALUE_NONE, 'return an exception if existing configuration cannot be used') + ->addOption('mock-session', '', InputOption::VALUE_OPTIONAL, 'create n sessions', '0') + ->addOption('mock-block', '', InputOption::VALUE_OPTIONAL, 'create n blocks', '0') + ->addOption('fail-process', '', InputOption::VALUE_REQUIRED, 'create fail process', '') + ->setDescription('setup'); + } + + /** + * @throws LoopbackEndpointException + */ + protected function execute(InputInterface $input, OutputInterface $output): int { + $reset = $input->getOption('reset'); + $drop = $input->getOption('drop'); + if ($reset || $drop) { + if (!$input->getOption('yes')) { + $question = new ConfirmationQuestion( + ($reset) ? '<comment>Do you want to reset all data and configuration related to AsyncProcess ?</comment> (y/N) ' + : '<comment>Do you want to drop all data related to AsyncProcess ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + + return 0; + } + } + + $this->asyncManager->dropAllBlocks(); + if ($reset) { + $this->asyncManager->resetConfig(); + } + + $output->writeln('done.'); + + return 0; + } + + $failureType = $input->getOption('fail-process'); + if ($failureType !== '') { + try { + match ($failureType) { + 'static-required' => $this->createFaultyProcessStaticRequired(), + 'static-blocker' => $this->createFaultyProcessStaticBlocker(), + 'dynamic-required' => $this->createFaultyProcessDynamicRequired(), + 'dynamic-blocker' => $this->createFaultyProcessDynamicBlocker(), + 'auto-required' => $this->createFaultyProcessAutoRequired(), + 'auto-blocker' => $this->createFaultyProcessAutoBlocker(), + }; + } catch (\UnhandledMatchError) { + $output->writeln( + 'list of fail: static-required, static-blocker, dynamic-blocker, dynamic-required, auto-blocker, auto-required' + ); + } + + return 0; + } + + $session = (int)($input->getOption('mock-session') ?? 0); + $proc = (int)($input->getOption('mock-block') ?? 0); + if ($session > 0 || $proc > 0) { + $this->createRandomProcess($output, $session, $proc); + + $this->forkManager->waitChildProcess(); + + return 0; + } + + if ($input->getOption('discover')) { + $output->writeln('<info>Searching for loopback address</info>'); + $found = $this->forkManager->discoverLoopbackEndpoint($output); + $output->writeln('found a working loopback address: <info>' . $found . '</info>'); + $this->confirmSave($input, $output, $found); + + return 0; + } + + $inputLoopback = $input->getOption('loopback'); + if ($inputLoopback) { + $this->parseAddress($inputLoopback); + $output->write('- testing <comment>' . $inputLoopback . '</comment>... '); + + $reason = ''; + if (!$this->forkManager->testLoopbackInstance($inputLoopback, $reason)) { + $output->writeln('<error>' . $reason . '</error>'); + + return 0; + } + + $output->writeln('<info>ok</info>'); + $this->confirmSave($input, $output, $inputLoopback); + + return 0; + } + + try { + $currentLoopback = $this->forkManager->getLoopbackInstance(); + $output->writeln('Current loopback instance: <info>' . $currentLoopback . '</info>'); + $output->write('Testing async process using loopback endpoint... '); + $reason = ''; + if (!$this->forkManager->testLoopbackInstance($currentLoopback, $reason)) { + $output->writeln('<error>' . $reason . '</error>'); + } else { + $output->writeln('<info>ok</info>'); + } + + return 0; + } catch (LoopbackEndpointException $e) { + if ($input->getOption('exception-if-no-config')) { + throw $e; + } + } + + $output->writeln('<info>Notes:</info>'); + $output->writeln('no loopback instance currently set.'); + $output->writeln('use --loopback <address> to manually configure a loopback instance'); + $output->writeln('or use --discover for an automated process'); + + return 0; + } + + private function confirmSave(InputInterface $input, OutputInterface $output, string $instance): void { + if (!$input->getOption('yes')) { + $output->writeln(''); + $question = new ConfirmationQuestion( + '<comment>Do you want to save loopback address \'' . $instance . '\' ?</comment> (y/N) ', + false, + '/^(y|Y)/i' + ); + + /** @var QuestionHelper $helper */ + $helper = $this->getHelper('question'); + if (!$helper->ask($input, $output, $question)) { + $output->writeln('aborted.'); + + return; + } + } + + $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $instance); + } + + /** + * confirm format of typed address + */ + private function parseAddress(string $test): void { + $scheme = parse_url($test, PHP_URL_SCHEME); + $cloudId = parse_url($test, PHP_URL_HOST); + if (is_bool($scheme) || is_bool($cloudId) || is_null($scheme) || is_null($cloudId)) { + throw new LoopbackEndpointException('format must be http[s]://domain.name[:post][/path]'); + } + } + + /** + * create a list of fake/mockup process + * + * @param int $session number of session to generate + * @param int $processes number of process per session to generate + */ + private function createRandomProcess(OutputInterface $output, int $session, int $processes): void { + $session = ($session > 0) ? $session : 1; + $processes = ($processes > 0) ? $processes : 1; + + for ($i = 0; $i < $session; $i++) { + for ($j = 0; $j < $processes; $j++) { + $process = $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $data, int $j): array { + if ($j > 0) { + $wrapper->activity( + BlockActivity::NOTICE, 'result from first process of the session: ' + . $wrapper->getSessionInterface() + ->byId('mock_process_0') + ?->getResult()['result'] + ); + } + sleep(random_int(2, 8)); + $wrapper->activity( + BlockActivity::NOTICE, 'mocked process is now over with data=' . $data + ); + + return ['result' => $data]; + }, + random_int(1, 5000), + $j + )->id('mock_process_' . $j)->blocker(); + $output->writeln(' > creating process <info>' . $process->getToken() . '</info>'); + } + + $token = $this->asyncProcess->async(ProcessExecutionTime::LATER); + + $output->writeln('- session <info>' . $token . '</info>'); + $output->writeln(''); + } + } + + private function createFaultyProcessStaticRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + $wrapper->activity( + BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds' + ); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + private function createFaultyProcessStaticBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + $wrapper->activity( + BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds' + ); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should NOT run'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessDynamicRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-required']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessDynamicBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-blocker']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + + private function createFaultyProcessAutoRequired(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-required']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->replayable(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2')->require('mock_process_1'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + + + private function createFaultyProcessAutoBlocker(): void { + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper, int $n): array { + if ($wrapper->getReplayCount() > 1) { + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore'); + return ['dynamic-blocker']; + } + + $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'); + sleep($n); + throw new \Exception('crash'); + }, + random_int(2, 8) + )->id('mock_process_1')->blocker()->replayable(); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): void { + $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1'); + }, + )->id('mock_process_2'); + + $this->asyncProcess->exec( + function (ABlockWrapper $wrapper): array { + $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1'); + + return ['ok']; + }, + )->id('mock_process_3'); + + $this->asyncProcess->async(ProcessExecutionTime::LATER); + } + +} diff --git a/core/Controller/AsyncProcessController.php b/core/Controller/AsyncProcessController.php new file mode 100644 index 00000000000..f644e3a7851 --- /dev/null +++ b/core/Controller/AsyncProcessController.php @@ -0,0 +1,77 @@ +<?php + +declare(strict_types=1); + +/** + * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +namespace OC\Core\Controller; + +use OC\Async\Exceptions\BlockAlreadyRunningException; +use OC\Async\ForkManager; +use OC\Config\Lexicon\CoreConfigLexicon; +use OCP\AppFramework\Controller; +use OCP\AppFramework\Http\Attribute\FrontpageRoute; +use OCP\AppFramework\Http\Attribute\NoAdminRequired; +use OCP\AppFramework\Http\Attribute\NoCSRFRequired; +use OCP\AppFramework\Http\Attribute\PublicPage; +use OCP\AppFramework\Http\DataResponse; +use OCP\Async\Enum\ProcessExecutionTime; +use OCP\IAppConfig; +use OCP\IRequest; + +class AsyncProcessController extends Controller { + public function __construct( + string $appName, + IRequest $request, + private IAppConfig $appConfig, + private ForkManager $forkManager, + ) { + parent::__construct($appName, $request); + } + + #[NoAdminRequired] + #[NoCSRFRequired] + #[PublicPage] + #[FrontpageRoute(verb: 'POST', url: '/core/asyncProcessFork')] + public function processFork(string $token): DataResponse { + $metadata = ['executionTime' => ProcessExecutionTime::NOW]; + + // remote might only need a confirmation that this is a valid loopback endpoint + if ($token === '__ping__') { + return new DataResponse(['ping' => $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING)]); + } + + $this->async($token); + + try { + $this->forkManager->runSession($token, $metadata); + } catch (BlockAlreadyRunningException) { + // TODO: debug() ? + } + + exit(); + } + + private function async(string $result = ''): void { + if (ob_get_contents() !== false) { + ob_end_clean(); + } + + header('Connection: close'); + header('Content-Encoding: none'); + ignore_user_abort(); + $timeLimit = 100; + set_time_limit(max($timeLimit, 0)); + ob_start(); + + echo($result); + + $size = ob_get_length(); + header('Content-Length: ' . $size); + ob_end_flush(); + flush(); + } + +} diff --git a/core/Migrations/Version32000Date20250307222601.php b/core/Migrations/Version32000Date20250307222601.php new file mode 100644 index 00000000000..179645e0d2c --- /dev/null +++ b/core/Migrations/Version32000Date20250307222601.php @@ -0,0 +1,64 @@ +<?php + +declare(strict_types=1); +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace OC\Core\Migrations; + +use Closure; +use OCP\DB\ISchemaWrapper; +use OCP\DB\Types; +use OCP\Migration\Attributes\AddColumn; +use OCP\Migration\Attributes\AddIndex; +use OCP\Migration\Attributes\ColumnType; +use OCP\Migration\Attributes\CreateTable; +use OCP\Migration\Attributes\DropIndex; +use OCP\Migration\Attributes\IndexType; +use OCP\Migration\IOutput; +use OCP\Migration\SimpleMigrationStep; + +/** + * Create new column and index for lazy loading in preferences for the new IUserPreferences API. + */ +#[CreateTable(table: 'async_processes', columns: ['id', 'token', 'type', 'code', 'params', 'orig', 'result', 'status'], description: 'async task and status')] +class Version32000Date20250307222601 extends SimpleMigrationStep { + public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper { + /** @var ISchemaWrapper $schema */ + $schema = $schemaClosure(); + + if ($schema->hasTable('async_process')) { + return null; + } + + $table = $schema->createTable('async_process'); + $table->addColumn('id', Types::BIGINT, [ 'notnull' => true, 'length' => 64, 'autoincrement' => true, 'unsigned' => true]); + $table->addColumn('token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]); + $table->addColumn('session_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]); + $table->addColumn('type', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('code', Types::TEXT, ['notnull' => true, 'default' => '']); + $table->addColumn('params', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('dataset', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('metadata', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('links', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('orig', Types::TEXT, ['notnull' => true, 'default' => '[]']); + $table->addColumn('result', Types::TEXT, ['notnull' => true, 'default' => '']); + $table->addColumn('status', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('execution_time', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('lock_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 7]); + $table->addColumn('creation', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('last_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + $table->addColumn('next_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]); + + $table->setPrimaryKey(['id'], 'asy_prc_id'); + + $table->addIndex(['token'], 'asy_prc_tkn'); + $table->addIndex(['status'], 'asy_prc_sts'); + $table->addIndex(['next_run'], 'asy_prc_nxt'); + $table->addIndex(['status', 'id', 'next_run'], 'asy_prc_sin'); + + return $schema; + } +} diff --git a/core/register_command.php b/core/register_command.php index 62305d75a30..e0c201920a9 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -152,6 +152,10 @@ if ($config->getSystemValueBool('installed', false)) { $application->add(Server::get(Command\TaskProcessing\Statistics::class)); $application->add(Server::get(Command\Memcache\RedisCommand::class)); + + $application->add(Server::get(Command\Async\Live::class)); + $application->add(Server::get(Command\Async\Manage::class)); + $application->add(Server::get(Command\Async\Setup::class)); } else { $application->add(Server::get(Command\Maintenance\Install::class)); } |