aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/BackgroundJobs/AsyncProcessJob.php62
-rw-r--r--core/Command/Async/Live.php54
-rw-r--r--core/Command/Async/Manage.php232
-rw-r--r--core/Command/Async/Setup.php435
-rw-r--r--core/Controller/AsyncProcessController.php77
-rw-r--r--core/Migrations/Version32000Date20250307222601.php64
-rw-r--r--core/register_command.php4
7 files changed, 928 insertions, 0 deletions
diff --git a/core/BackgroundJobs/AsyncProcessJob.php b/core/BackgroundJobs/AsyncProcessJob.php
new file mode 100644
index 00000000000..3fd43abf5ab
--- /dev/null
+++ b/core/BackgroundJobs/AsyncProcessJob.php
@@ -0,0 +1,62 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\BackgroundJobs;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Wrappers\LoggerBlockWrapper;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\AppFramework\Utility\ITimeFactory;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\BackgroundJob\TimedJob;
+use OCP\IAppConfig;
+
+class AsyncProcessJob extends TimedJob {
+ public function __construct(
+ ITimeFactory $time,
+ private IAppConfig $appConfig,
+ private ForkManager $forkManager,
+ private BlockMapper $blockMapper,
+ private LoggerBlockWrapper $loggerProcessWrapper,
+ ) {
+ parent::__construct($time);
+
+ $this->setTimeSensitivity(self::TIME_SENSITIVE);
+// $this->setInterval(60 * 5);
+ $this->setInterval(1);
+ }
+
+ protected function run(mixed $argument): void {
+ $this->discoverLoopAddress();
+
+ $this->forkManager->setWrapper($this->loggerProcessWrapper);
+
+ $this->blockMapper->resetFailedBlock();
+
+ $metadata = ['executionTime' => ProcessExecutionTime::LATER];
+ foreach ($this->blockMapper->getSessionOnStandBy() as $session) {
+ $this->forkManager->forkSession($session, $metadata);
+ }
+
+ $this->blockMapper->removeSuccessfulBlock();
+
+ $this->forkManager->waitChildProcess();
+ }
+
+ private function discoverLoopAddress(): void {
+ if ($this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) {
+ return;
+ }
+
+ $found = $this->forkManager->discoverLoopbackEndpoint();
+ if ($found !== null) {
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $found);
+ }
+ }
+}
diff --git a/core/Command/Async/Live.php b/core/Command/Async/Live.php
new file mode 100644
index 00000000000..89a16a3165d
--- /dev/null
+++ b/core/Command/Async/Live.php
@@ -0,0 +1,54 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\AsyncProcess;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Wrappers\CliBlockWrapper;
+use OCP\Async\Enum\ProcessExecutionTime;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class Live extends Command {
+ public function __construct(
+ private readonly BlockMapper $blockMapper,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:live')
+ ->setDescription('test');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ CliBlockWrapper::initStyle($output);
+ $this->forkManager->setWrapper(new CliBlockWrapper($output));
+
+ $metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP];
+ while(true) {
+ $this->blockMapper->resetFailedBlock();
+
+ foreach ($this->blockMapper->getSessionOnStandBy() as $session) {
+ $this->forkManager->forkSession($session, $metadata);
+ }
+
+ sleep(3);
+ }
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+}
diff --git a/core/Command/Async/Manage.php b/core/Command/Async/Manage.php
new file mode 100644
index 00000000000..9be46b335c8
--- /dev/null
+++ b/core/Command/Async/Manage.php
@@ -0,0 +1,232 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OCP\Async\Enum\BlockExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Formatter\OutputFormatterStyle;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Manage extends Command {
+ private bool $noCrop = false;
+
+ public function __construct(
+ private AsyncManager $asyncManager,
+ private ForkManager $forkManager,
+ private BlockMapper $blockMapper,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:manage')
+ ->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session')
+ ->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all blocks from a session', '')
+ ->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific block', '')
+ ->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details')
+ ->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific block', '')
+ ->setDescription('manage');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ if ($input->getOption('clean')) {
+ $count = $this->blockMapper->removeSuccessfulBlock();
+ $output->writeln('deleted ' . $count . ' blocks');
+ return 0;
+ }
+
+ $replay = $input->getOption('replay');
+ if ($replay !== '') {
+ $this->replayBlock($input, $output, $replay);
+ return 0;
+ }
+
+ $this->noCrop = $input->getOption('full-details');
+ $output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', []));
+ $output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', []));
+ $output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold']));
+ $output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', []));
+ $output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold']));
+ $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', []));
+ $output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold']));
+
+ $details = $input->getOption('details');
+ if ($details !== '') {
+ $this->displayBlock($output, $details);
+ return 0;
+ }
+
+ $session = $input->getOption('session');
+ if ($session !== '') {
+ $this->displaySession($output, $session);
+ return 0;
+ }
+
+ $this->summary($output);
+ return 0;
+ }
+
+ private function summary(OutputInterface $output): void {
+ $statuses = [];
+ foreach ($this->blockMapper->getSessions() as $token) {
+ $sessionBlockes = $this->blockMapper->getBySession($token);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlockes));
+
+ $status = $sessionIface->getGlobalStatus()->value;
+ if (!array_key_exists($status, $statuses)) {
+ $statuses[$status] = [];
+ }
+ $statuses[$status][] = $token;
+ }
+
+ if (!empty($success = $statuses[(string)BlockStatus::SUCCESS->value] ?? [])) {
+ $output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>');
+ }
+
+ if (!empty($prep = $statuses[(string)BlockStatus::PREP->value] ?? [])) {
+ $output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>');
+ }
+
+ if (!empty($stand = $statuses[BlockStatus::STANDBY->value] ?? [])) {
+ $output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>');
+ }
+
+ if (!empty($running = $statuses[BlockStatus::RUNNING->value] ?? [])) {
+ $output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>');
+ }
+
+ if (!empty($err = $statuses[BlockStatus::ERROR->value] ?? [])) {
+ $output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>');
+ }
+
+ if (!empty($blocker = $statuses[BlockStatus::BLOCKER->value] ?? [])) {
+ $output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>');
+ }
+ }
+
+
+ private function displaySession(OutputInterface $output, string $token): void {
+ foreach ($this->blockMapper->getBySession($token) as $block) {
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+
+ $output->writeln('');
+ }
+
+ }
+
+
+ private function displayBlock(OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+
+ $output->writeln('SessionToken: <data>' . $block->getSessionToken() . '</data>');
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+
+ $output->writeln('Code: <data>' . $this->cropContent($block->getCode(), 3000, 15) . '</data>');
+ $output->writeln('Params: <data>' . $this->cropContent(json_encode($block->getParams()), 3000, 15) . '</data>');
+ $output->writeln('Metadata: <data>' . $this->cropContent(json_encode($block->getMetadata()), 3000, 15) . '</data>');
+ $output->writeln('Result: <data>' . $this->cropContent(json_encode($block->getResult()), 3000, 15) . '</data>');
+
+ $output->writeln('Dataset: <data>' . $this->cropContent(json_encode($block->getDataset()), 3000, 15) . '</data>');
+ $output->writeln('Links: <data>' . $this->cropContent(json_encode($block->getLinks()), 3000, 15) . '</data>');
+ $output->writeln('Orig: <data>' . $this->cropContent(json_encode($block->getOrig()), 3000, 15) . '</data>');
+
+ $output->writeln('ExecutionTime: <data>' . BlockExecutionTime::tryFrom($block->getExecutionTime())?->name . '</data>');
+ $output->writeln('Creation: <data>' . $block->getCreation() . '</data>');
+ $output->writeln('LastRun: <data>' . $block->getLastRun() . '</data>');
+ $output->writeln('NextRun: <data>' . $block->getNextRun() . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+ }
+
+
+ private function displayStatus(Block $block): string {
+ $name = $block->getBlockStatus()->name;
+ $color = strtolower($name);
+ return '<' . $color . '>' . $name . '</' . $color . '>';
+ }
+
+ private function displayInterface(Block $block): string {
+ $iface = new BlockInterface(null, $block);
+
+ $data = [];
+ $data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId();
+ $data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName();
+ $data[] = (!$iface->isReplayable()) ? '' : 'replayable=true';
+ $data[] = (!$iface->isBlocker()) ? '' : 'blocker=true';
+ $data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire());
+
+ return implode(', ', array_filter($data));
+ }
+
+
+
+
+ private function replayBlock(InputInterface $input, OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+ if (!in_array($block->getBlockStatus(), [BlockStatus::ERROR, BlockStatus::BLOCKER], true)) {
+ $output->writeln('only Block set as ERROR or BLOCKER can be replayed');
+ return;
+ }
+
+ $iface = new BlockInterface(null, $block);
+ if (!$iface->isReplayable()) {
+ $output->writeln('');
+ $output->writeln('Block is not set as <comment>replayable</comment>.');
+ $output->writeln('Replaying this Block can create issues.');
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you really want to replay the Block ' . $token . ' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+ return;
+ }
+ }
+
+ $block->replay(true);
+ $this->blockMapper->update($block);
+ }
+
+
+ /**
+ * crop content after n lines or n chars
+ */
+ private function cropContent(string $content, int $maxChars, int $maxLines): string {
+ if ($this->noCrop) {
+ return $content;
+ }
+ preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE);
+ return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars));
+ }
+}
+
diff --git a/core/Command/Async/Setup.php b/core/Command/Async/Setup.php
new file mode 100644
index 00000000000..d5b94a9b732
--- /dev/null
+++ b/core/Command/Async/Setup.php
@@ -0,0 +1,435 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\AsyncManager;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\IAsyncProcess;
+use OCP\IAppConfig;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Setup extends Command {
+ public function __construct(
+ private readonly IAppConfig $appConfig,
+ private readonly IAsyncProcess $asyncProcess,
+ private readonly AsyncManager $asyncManager,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:setup')
+ ->addOption('reset', '', InputOption::VALUE_NONE, 'reset all data related to the AsyncProcess feature')
+ ->addOption('drop', '', InputOption::VALUE_NONE, 'drop processes data')
+ ->addOption('loopback', '', InputOption::VALUE_REQUIRED, 'set loopback address')
+ ->addOption('discover', '', InputOption::VALUE_NONE, 'initiate the search for a possible loopback address')
+ ->addOption('yes', '', InputOption::VALUE_NONE, 'answer yes to any confirmation box')
+ ->addOption('exception-if-no-config', '', InputOption::VALUE_NONE, 'return an exception if existing configuration cannot be used')
+ ->addOption('mock-session', '', InputOption::VALUE_OPTIONAL, 'create n sessions', '0')
+ ->addOption('mock-block', '', InputOption::VALUE_OPTIONAL, 'create n blocks', '0')
+ ->addOption('fail-process', '', InputOption::VALUE_REQUIRED, 'create fail process', '')
+ ->setDescription('setup');
+ }
+
+ /**
+ * @throws LoopbackEndpointException
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ $reset = $input->getOption('reset');
+ $drop = $input->getOption('drop');
+ if ($reset || $drop) {
+ if (!$input->getOption('yes')) {
+ $question = new ConfirmationQuestion(
+ ($reset) ? '<comment>Do you want to reset all data and configuration related to AsyncProcess ?</comment> (y/N) '
+ : '<comment>Do you want to drop all data related to AsyncProcess ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return 0;
+ }
+ }
+
+ $this->asyncManager->dropAllBlocks();
+ if ($reset) {
+ $this->asyncManager->resetConfig();
+ }
+
+ $output->writeln('done.');
+
+ return 0;
+ }
+
+ $failureType = $input->getOption('fail-process');
+ if ($failureType !== '') {
+ try {
+ match ($failureType) {
+ 'static-required' => $this->createFaultyProcessStaticRequired(),
+ 'static-blocker' => $this->createFaultyProcessStaticBlocker(),
+ 'dynamic-required' => $this->createFaultyProcessDynamicRequired(),
+ 'dynamic-blocker' => $this->createFaultyProcessDynamicBlocker(),
+ 'auto-required' => $this->createFaultyProcessAutoRequired(),
+ 'auto-blocker' => $this->createFaultyProcessAutoBlocker(),
+ };
+ } catch (\UnhandledMatchError) {
+ $output->writeln(
+ 'list of fail: static-required, static-blocker, dynamic-blocker, dynamic-required, auto-blocker, auto-required'
+ );
+ }
+
+ return 0;
+ }
+
+ $session = (int)($input->getOption('mock-session') ?? 0);
+ $proc = (int)($input->getOption('mock-block') ?? 0);
+ if ($session > 0 || $proc > 0) {
+ $this->createRandomProcess($output, $session, $proc);
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+
+ if ($input->getOption('discover')) {
+ $output->writeln('<info>Searching for loopback address</info>');
+ $found = $this->forkManager->discoverLoopbackEndpoint($output);
+ $output->writeln('found a working loopback address: <info>' . $found . '</info>');
+ $this->confirmSave($input, $output, $found);
+
+ return 0;
+ }
+
+ $inputLoopback = $input->getOption('loopback');
+ if ($inputLoopback) {
+ $this->parseAddress($inputLoopback);
+ $output->write('- testing <comment>' . $inputLoopback . '</comment>... ');
+
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($inputLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+
+ return 0;
+ }
+
+ $output->writeln('<info>ok</info>');
+ $this->confirmSave($input, $output, $inputLoopback);
+
+ return 0;
+ }
+
+ try {
+ $currentLoopback = $this->forkManager->getLoopbackInstance();
+ $output->writeln('Current loopback instance: <info>' . $currentLoopback . '</info>');
+ $output->write('Testing async process using loopback endpoint... ');
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($currentLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+ } else {
+ $output->writeln('<info>ok</info>');
+ }
+
+ return 0;
+ } catch (LoopbackEndpointException $e) {
+ if ($input->getOption('exception-if-no-config')) {
+ throw $e;
+ }
+ }
+
+ $output->writeln('<info>Notes:</info>');
+ $output->writeln('no loopback instance currently set.');
+ $output->writeln('use --loopback <address> to manually configure a loopback instance');
+ $output->writeln('or use --discover for an automated process');
+
+ return 0;
+ }
+
+ private function confirmSave(InputInterface $input, OutputInterface $output, string $instance): void {
+ if (!$input->getOption('yes')) {
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you want to save loopback address \'' . $instance . '\' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return;
+ }
+ }
+
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $instance);
+ }
+
+ /**
+ * confirm format of typed address
+ */
+ private function parseAddress(string $test): void {
+ $scheme = parse_url($test, PHP_URL_SCHEME);
+ $cloudId = parse_url($test, PHP_URL_HOST);
+ if (is_bool($scheme) || is_bool($cloudId) || is_null($scheme) || is_null($cloudId)) {
+ throw new LoopbackEndpointException('format must be http[s]://domain.name[:post][/path]');
+ }
+ }
+
+ /**
+ * create a list of fake/mockup process
+ *
+ * @param int $session number of session to generate
+ * @param int $processes number of process per session to generate
+ */
+ private function createRandomProcess(OutputInterface $output, int $session, int $processes): void {
+ $session = ($session > 0) ? $session : 1;
+ $processes = ($processes > 0) ? $processes : 1;
+
+ for ($i = 0; $i < $session; $i++) {
+ for ($j = 0; $j < $processes; $j++) {
+ $process = $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $data, int $j): array {
+ if ($j > 0) {
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'result from first process of the session: '
+ . $wrapper->getSessionInterface()
+ ->byId('mock_process_0')
+ ?->getResult()['result']
+ );
+ }
+ sleep(random_int(2, 8));
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'mocked process is now over with data=' . $data
+ );
+
+ return ['result' => $data];
+ },
+ random_int(1, 5000),
+ $j
+ )->id('mock_process_' . $j)->blocker();
+ $output->writeln(' > creating process <info>' . $process->getToken() . '</info>');
+ }
+
+ $token = $this->asyncProcess->async(ProcessExecutionTime::LATER);
+
+ $output->writeln('- session <info>' . $token . '</info>');
+ $output->writeln('');
+ }
+ }
+
+ private function createFaultyProcessStaticRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+ private function createFaultyProcessStaticBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should NOT run');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+
+ private function createFaultyProcessAutoRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessAutoBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker()->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+}
diff --git a/core/Controller/AsyncProcessController.php b/core/Controller/AsyncProcessController.php
new file mode 100644
index 00000000000..f644e3a7851
--- /dev/null
+++ b/core/Controller/AsyncProcessController.php
@@ -0,0 +1,77 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\Controller;
+
+use OC\Async\Exceptions\BlockAlreadyRunningException;
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\AppFramework\Controller;
+use OCP\AppFramework\Http\Attribute\FrontpageRoute;
+use OCP\AppFramework\Http\Attribute\NoAdminRequired;
+use OCP\AppFramework\Http\Attribute\NoCSRFRequired;
+use OCP\AppFramework\Http\Attribute\PublicPage;
+use OCP\AppFramework\Http\DataResponse;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\IAppConfig;
+use OCP\IRequest;
+
+class AsyncProcessController extends Controller {
+ public function __construct(
+ string $appName,
+ IRequest $request,
+ private IAppConfig $appConfig,
+ private ForkManager $forkManager,
+ ) {
+ parent::__construct($appName, $request);
+ }
+
+ #[NoAdminRequired]
+ #[NoCSRFRequired]
+ #[PublicPage]
+ #[FrontpageRoute(verb: 'POST', url: '/core/asyncProcessFork')]
+ public function processFork(string $token): DataResponse {
+ $metadata = ['executionTime' => ProcessExecutionTime::NOW];
+
+ // remote might only need a confirmation that this is a valid loopback endpoint
+ if ($token === '__ping__') {
+ return new DataResponse(['ping' => $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING)]);
+ }
+
+ $this->async($token);
+
+ try {
+ $this->forkManager->runSession($token, $metadata);
+ } catch (BlockAlreadyRunningException) {
+ // TODO: debug() ?
+ }
+
+ exit();
+ }
+
+ private function async(string $result = ''): void {
+ if (ob_get_contents() !== false) {
+ ob_end_clean();
+ }
+
+ header('Connection: close');
+ header('Content-Encoding: none');
+ ignore_user_abort();
+ $timeLimit = 100;
+ set_time_limit(max($timeLimit, 0));
+ ob_start();
+
+ echo($result);
+
+ $size = ob_get_length();
+ header('Content-Length: ' . $size);
+ ob_end_flush();
+ flush();
+ }
+
+}
diff --git a/core/Migrations/Version32000Date20250307222601.php b/core/Migrations/Version32000Date20250307222601.php
new file mode 100644
index 00000000000..179645e0d2c
--- /dev/null
+++ b/core/Migrations/Version32000Date20250307222601.php
@@ -0,0 +1,64 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Migrations;
+
+use Closure;
+use OCP\DB\ISchemaWrapper;
+use OCP\DB\Types;
+use OCP\Migration\Attributes\AddColumn;
+use OCP\Migration\Attributes\AddIndex;
+use OCP\Migration\Attributes\ColumnType;
+use OCP\Migration\Attributes\CreateTable;
+use OCP\Migration\Attributes\DropIndex;
+use OCP\Migration\Attributes\IndexType;
+use OCP\Migration\IOutput;
+use OCP\Migration\SimpleMigrationStep;
+
+/**
+ * Create new column and index for lazy loading in preferences for the new IUserPreferences API.
+ */
+#[CreateTable(table: 'async_processes', columns: ['id', 'token', 'type', 'code', 'params', 'orig', 'result', 'status'], description: 'async task and status')]
+class Version32000Date20250307222601 extends SimpleMigrationStep {
+ public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
+ /** @var ISchemaWrapper $schema */
+ $schema = $schemaClosure();
+
+ if ($schema->hasTable('async_process')) {
+ return null;
+ }
+
+ $table = $schema->createTable('async_process');
+ $table->addColumn('id', Types::BIGINT, [ 'notnull' => true, 'length' => 64, 'autoincrement' => true, 'unsigned' => true]);
+ $table->addColumn('token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]);
+ $table->addColumn('session_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]);
+ $table->addColumn('type', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('code', Types::TEXT, ['notnull' => true, 'default' => '']);
+ $table->addColumn('params', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('dataset', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('metadata', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('links', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('orig', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('result', Types::TEXT, ['notnull' => true, 'default' => '']);
+ $table->addColumn('status', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('execution_time', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('lock_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 7]);
+ $table->addColumn('creation', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('last_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('next_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+
+ $table->setPrimaryKey(['id'], 'asy_prc_id');
+
+ $table->addIndex(['token'], 'asy_prc_tkn');
+ $table->addIndex(['status'], 'asy_prc_sts');
+ $table->addIndex(['next_run'], 'asy_prc_nxt');
+ $table->addIndex(['status', 'id', 'next_run'], 'asy_prc_sin');
+
+ return $schema;
+ }
+}
diff --git a/core/register_command.php b/core/register_command.php
index 62305d75a30..e0c201920a9 100644
--- a/core/register_command.php
+++ b/core/register_command.php
@@ -152,6 +152,10 @@ if ($config->getSystemValueBool('installed', false)) {
$application->add(Server::get(Command\TaskProcessing\Statistics::class));
$application->add(Server::get(Command\Memcache\RedisCommand::class));
+
+ $application->add(Server::get(Command\Async\Live::class));
+ $application->add(Server::get(Command\Async\Manage::class));
+ $application->add(Server::get(Command\Async\Setup::class));
} else {
$application->add(Server::get(Command\Maintenance\Install::class));
}