aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxence Lange <maxence@artificial-owl.com>2025-04-28 19:57:56 -0100
committerMaxence Lange <maxence@artificial-owl.com>2025-04-28 19:58:06 -0100
commit6d272ff894fa969e5a526caf6a91d60eda115c53 (patch)
treedf90f826a29f7169dd9b550f2e478a1b09d29e71
parent10a01423ecfc7e028960eee8058bd177ad28d484 (diff)
downloadnextcloud-server-enh/noid/async-process-run.tar.gz
nextcloud-server-enh/noid/async-process-run.zip
feat(async): AsyncProcessenh/noid/async-process-run
Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
-rw-r--r--core/BackgroundJobs/AsyncProcessJob.php62
-rw-r--r--core/Command/Async/Live.php54
-rw-r--r--core/Command/Async/Manage.php232
-rw-r--r--core/Command/Async/Setup.php435
-rw-r--r--core/Controller/AsyncProcessController.php77
-rw-r--r--core/Migrations/Version32000Date20250307222601.php64
-rw-r--r--core/register_command.php4
-rw-r--r--lib/base.php1
-rw-r--r--lib/private/Async/ABlockWrapper.php40
-rw-r--r--lib/private/Async/AsyncManager.php222
-rw-r--r--lib/private/Async/AsyncProcess.php64
-rw-r--r--lib/private/Async/Db/BlockMapper.php260
-rw-r--r--lib/private/Async/Enum/BlockType.php16
-rw-r--r--lib/private/Async/Exceptions/AsyncProcessException.php14
-rw-r--r--lib/private/Async/Exceptions/BlockAlreadyRunningException.php12
-rw-r--r--lib/private/Async/Exceptions/BlockNotFoundException.php12
-rw-r--r--lib/private/Async/Exceptions/LoopbackEndpointException.php14
-rw-r--r--lib/private/Async/Exceptions/SessionBlockedException.php14
-rw-r--r--lib/private/Async/ForkManager.php485
-rw-r--r--lib/private/Async/IBlockInterface.php36
-rw-r--r--lib/private/Async/ISessionInterface.php21
-rw-r--r--lib/private/Async/Model/Block.php142
-rw-r--r--lib/private/Async/Model/BlockInterface.php183
-rw-r--r--lib/private/Async/Model/SessionInterface.php83
-rw-r--r--lib/private/Async/README.md379
-rw-r--r--lib/private/Async/Wrappers/CliBlockWrapper.php103
-rw-r--r--lib/private/Async/Wrappers/DummyBlockWrapper.php27
-rw-r--r--lib/private/Async/Wrappers/LoggerBlockWrapper.php39
-rw-r--r--lib/private/Config/Lexicon/CoreConfigLexicon.php7
-rw-r--r--lib/private/Repair.php2
-rw-r--r--lib/private/Repair/AddAsyncProcessJob.php30
-rw-r--r--lib/private/Server.php1
-rw-r--r--lib/private/Setup.php2
-rw-r--r--lib/public/Async/Enum/BlockActivity.php18
-rw-r--r--lib/public/Async/Enum/BlockStatus.php20
-rw-r--r--lib/public/Async/Enum/ProcessExecutionTime.php17
-rw-r--r--lib/public/Async/IAsyncProcess.php20
-rw-r--r--version.php2
38 files changed, 3213 insertions, 1 deletions
diff --git a/core/BackgroundJobs/AsyncProcessJob.php b/core/BackgroundJobs/AsyncProcessJob.php
new file mode 100644
index 00000000000..3fd43abf5ab
--- /dev/null
+++ b/core/BackgroundJobs/AsyncProcessJob.php
@@ -0,0 +1,62 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\BackgroundJobs;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Wrappers\LoggerBlockWrapper;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\AppFramework\Utility\ITimeFactory;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\BackgroundJob\TimedJob;
+use OCP\IAppConfig;
+
+class AsyncProcessJob extends TimedJob {
+ public function __construct(
+ ITimeFactory $time,
+ private IAppConfig $appConfig,
+ private ForkManager $forkManager,
+ private BlockMapper $blockMapper,
+ private LoggerBlockWrapper $loggerProcessWrapper,
+ ) {
+ parent::__construct($time);
+
+ $this->setTimeSensitivity(self::TIME_SENSITIVE);
+// $this->setInterval(60 * 5);
+ $this->setInterval(1);
+ }
+
+ protected function run(mixed $argument): void {
+ $this->discoverLoopAddress();
+
+ $this->forkManager->setWrapper($this->loggerProcessWrapper);
+
+ $this->blockMapper->resetFailedBlock();
+
+ $metadata = ['executionTime' => ProcessExecutionTime::LATER];
+ foreach ($this->blockMapper->getSessionOnStandBy() as $session) {
+ $this->forkManager->forkSession($session, $metadata);
+ }
+
+ $this->blockMapper->removeSuccessfulBlock();
+
+ $this->forkManager->waitChildProcess();
+ }
+
+ private function discoverLoopAddress(): void {
+ if ($this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) {
+ return;
+ }
+
+ $found = $this->forkManager->discoverLoopbackEndpoint();
+ if ($found !== null) {
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $found);
+ }
+ }
+}
diff --git a/core/Command/Async/Live.php b/core/Command/Async/Live.php
new file mode 100644
index 00000000000..89a16a3165d
--- /dev/null
+++ b/core/Command/Async/Live.php
@@ -0,0 +1,54 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\AsyncProcess;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Wrappers\CliBlockWrapper;
+use OCP\Async\Enum\ProcessExecutionTime;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class Live extends Command {
+ public function __construct(
+ private readonly BlockMapper $blockMapper,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:live')
+ ->setDescription('test');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ CliBlockWrapper::initStyle($output);
+ $this->forkManager->setWrapper(new CliBlockWrapper($output));
+
+ $metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP];
+ while(true) {
+ $this->blockMapper->resetFailedBlock();
+
+ foreach ($this->blockMapper->getSessionOnStandBy() as $session) {
+ $this->forkManager->forkSession($session, $metadata);
+ }
+
+ sleep(3);
+ }
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+}
diff --git a/core/Command/Async/Manage.php b/core/Command/Async/Manage.php
new file mode 100644
index 00000000000..9be46b335c8
--- /dev/null
+++ b/core/Command/Async/Manage.php
@@ -0,0 +1,232 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OCP\Async\Enum\BlockExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Formatter\OutputFormatterStyle;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Manage extends Command {
+ private bool $noCrop = false;
+
+ public function __construct(
+ private AsyncManager $asyncManager,
+ private ForkManager $forkManager,
+ private BlockMapper $blockMapper,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:manage')
+ ->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session')
+ ->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all blocks from a session', '')
+ ->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific block', '')
+ ->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details')
+ ->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific block', '')
+ ->setDescription('manage');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ if ($input->getOption('clean')) {
+ $count = $this->blockMapper->removeSuccessfulBlock();
+ $output->writeln('deleted ' . $count . ' blocks');
+ return 0;
+ }
+
+ $replay = $input->getOption('replay');
+ if ($replay !== '') {
+ $this->replayBlock($input, $output, $replay);
+ return 0;
+ }
+
+ $this->noCrop = $input->getOption('full-details');
+ $output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', []));
+ $output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', []));
+ $output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold']));
+ $output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', []));
+ $output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold']));
+ $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', []));
+ $output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold']));
+
+ $details = $input->getOption('details');
+ if ($details !== '') {
+ $this->displayBlock($output, $details);
+ return 0;
+ }
+
+ $session = $input->getOption('session');
+ if ($session !== '') {
+ $this->displaySession($output, $session);
+ return 0;
+ }
+
+ $this->summary($output);
+ return 0;
+ }
+
+ private function summary(OutputInterface $output): void {
+ $statuses = [];
+ foreach ($this->blockMapper->getSessions() as $token) {
+ $sessionBlockes = $this->blockMapper->getBySession($token);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlockes));
+
+ $status = $sessionIface->getGlobalStatus()->value;
+ if (!array_key_exists($status, $statuses)) {
+ $statuses[$status] = [];
+ }
+ $statuses[$status][] = $token;
+ }
+
+ if (!empty($success = $statuses[(string)BlockStatus::SUCCESS->value] ?? [])) {
+ $output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>');
+ }
+
+ if (!empty($prep = $statuses[(string)BlockStatus::PREP->value] ?? [])) {
+ $output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>');
+ }
+
+ if (!empty($stand = $statuses[BlockStatus::STANDBY->value] ?? [])) {
+ $output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>');
+ }
+
+ if (!empty($running = $statuses[BlockStatus::RUNNING->value] ?? [])) {
+ $output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>');
+ }
+
+ if (!empty($err = $statuses[BlockStatus::ERROR->value] ?? [])) {
+ $output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>');
+ }
+
+ if (!empty($blocker = $statuses[BlockStatus::BLOCKER->value] ?? [])) {
+ $output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>');
+ }
+ }
+
+
+ private function displaySession(OutputInterface $output, string $token): void {
+ foreach ($this->blockMapper->getBySession($token) as $block) {
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+
+ $output->writeln('');
+ }
+
+ }
+
+
+ private function displayBlock(OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+
+ $output->writeln('SessionToken: <data>' . $block->getSessionToken() . '</data>');
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+
+ $output->writeln('Code: <data>' . $this->cropContent($block->getCode(), 3000, 15) . '</data>');
+ $output->writeln('Params: <data>' . $this->cropContent(json_encode($block->getParams()), 3000, 15) . '</data>');
+ $output->writeln('Metadata: <data>' . $this->cropContent(json_encode($block->getMetadata()), 3000, 15) . '</data>');
+ $output->writeln('Result: <data>' . $this->cropContent(json_encode($block->getResult()), 3000, 15) . '</data>');
+
+ $output->writeln('Dataset: <data>' . $this->cropContent(json_encode($block->getDataset()), 3000, 15) . '</data>');
+ $output->writeln('Links: <data>' . $this->cropContent(json_encode($block->getLinks()), 3000, 15) . '</data>');
+ $output->writeln('Orig: <data>' . $this->cropContent(json_encode($block->getOrig()), 3000, 15) . '</data>');
+
+ $output->writeln('ExecutionTime: <data>' . BlockExecutionTime::tryFrom($block->getExecutionTime())?->name . '</data>');
+ $output->writeln('Creation: <data>' . $block->getCreation() . '</data>');
+ $output->writeln('LastRun: <data>' . $block->getLastRun() . '</data>');
+ $output->writeln('NextRun: <data>' . $block->getNextRun() . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+ }
+
+
+ private function displayStatus(Block $block): string {
+ $name = $block->getBlockStatus()->name;
+ $color = strtolower($name);
+ return '<' . $color . '>' . $name . '</' . $color . '>';
+ }
+
+ private function displayInterface(Block $block): string {
+ $iface = new BlockInterface(null, $block);
+
+ $data = [];
+ $data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId();
+ $data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName();
+ $data[] = (!$iface->isReplayable()) ? '' : 'replayable=true';
+ $data[] = (!$iface->isBlocker()) ? '' : 'blocker=true';
+ $data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire());
+
+ return implode(', ', array_filter($data));
+ }
+
+
+
+
+ private function replayBlock(InputInterface $input, OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+ if (!in_array($block->getBlockStatus(), [BlockStatus::ERROR, BlockStatus::BLOCKER], true)) {
+ $output->writeln('only Block set as ERROR or BLOCKER can be replayed');
+ return;
+ }
+
+ $iface = new BlockInterface(null, $block);
+ if (!$iface->isReplayable()) {
+ $output->writeln('');
+ $output->writeln('Block is not set as <comment>replayable</comment>.');
+ $output->writeln('Replaying this Block can create issues.');
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you really want to replay the Block ' . $token . ' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+ return;
+ }
+ }
+
+ $block->replay(true);
+ $this->blockMapper->update($block);
+ }
+
+
+ /**
+ * crop content after n lines or n chars
+ */
+ private function cropContent(string $content, int $maxChars, int $maxLines): string {
+ if ($this->noCrop) {
+ return $content;
+ }
+ preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE);
+ return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars));
+ }
+}
+
diff --git a/core/Command/Async/Setup.php b/core/Command/Async/Setup.php
new file mode 100644
index 00000000000..d5b94a9b732
--- /dev/null
+++ b/core/Command/Async/Setup.php
@@ -0,0 +1,435 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\AsyncManager;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\IAsyncProcess;
+use OCP\IAppConfig;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Setup extends Command {
+ public function __construct(
+ private readonly IAppConfig $appConfig,
+ private readonly IAsyncProcess $asyncProcess,
+ private readonly AsyncManager $asyncManager,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:setup')
+ ->addOption('reset', '', InputOption::VALUE_NONE, 'reset all data related to the AsyncProcess feature')
+ ->addOption('drop', '', InputOption::VALUE_NONE, 'drop processes data')
+ ->addOption('loopback', '', InputOption::VALUE_REQUIRED, 'set loopback address')
+ ->addOption('discover', '', InputOption::VALUE_NONE, 'initiate the search for a possible loopback address')
+ ->addOption('yes', '', InputOption::VALUE_NONE, 'answer yes to any confirmation box')
+ ->addOption('exception-if-no-config', '', InputOption::VALUE_NONE, 'return an exception if existing configuration cannot be used')
+ ->addOption('mock-session', '', InputOption::VALUE_OPTIONAL, 'create n sessions', '0')
+ ->addOption('mock-block', '', InputOption::VALUE_OPTIONAL, 'create n blocks', '0')
+ ->addOption('fail-process', '', InputOption::VALUE_REQUIRED, 'create fail process', '')
+ ->setDescription('setup');
+ }
+
+ /**
+ * @throws LoopbackEndpointException
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ $reset = $input->getOption('reset');
+ $drop = $input->getOption('drop');
+ if ($reset || $drop) {
+ if (!$input->getOption('yes')) {
+ $question = new ConfirmationQuestion(
+ ($reset) ? '<comment>Do you want to reset all data and configuration related to AsyncProcess ?</comment> (y/N) '
+ : '<comment>Do you want to drop all data related to AsyncProcess ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return 0;
+ }
+ }
+
+ $this->asyncManager->dropAllBlocks();
+ if ($reset) {
+ $this->asyncManager->resetConfig();
+ }
+
+ $output->writeln('done.');
+
+ return 0;
+ }
+
+ $failureType = $input->getOption('fail-process');
+ if ($failureType !== '') {
+ try {
+ match ($failureType) {
+ 'static-required' => $this->createFaultyProcessStaticRequired(),
+ 'static-blocker' => $this->createFaultyProcessStaticBlocker(),
+ 'dynamic-required' => $this->createFaultyProcessDynamicRequired(),
+ 'dynamic-blocker' => $this->createFaultyProcessDynamicBlocker(),
+ 'auto-required' => $this->createFaultyProcessAutoRequired(),
+ 'auto-blocker' => $this->createFaultyProcessAutoBlocker(),
+ };
+ } catch (\UnhandledMatchError) {
+ $output->writeln(
+ 'list of fail: static-required, static-blocker, dynamic-blocker, dynamic-required, auto-blocker, auto-required'
+ );
+ }
+
+ return 0;
+ }
+
+ $session = (int)($input->getOption('mock-session') ?? 0);
+ $proc = (int)($input->getOption('mock-block') ?? 0);
+ if ($session > 0 || $proc > 0) {
+ $this->createRandomProcess($output, $session, $proc);
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+
+ if ($input->getOption('discover')) {
+ $output->writeln('<info>Searching for loopback address</info>');
+ $found = $this->forkManager->discoverLoopbackEndpoint($output);
+ $output->writeln('found a working loopback address: <info>' . $found . '</info>');
+ $this->confirmSave($input, $output, $found);
+
+ return 0;
+ }
+
+ $inputLoopback = $input->getOption('loopback');
+ if ($inputLoopback) {
+ $this->parseAddress($inputLoopback);
+ $output->write('- testing <comment>' . $inputLoopback . '</comment>... ');
+
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($inputLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+
+ return 0;
+ }
+
+ $output->writeln('<info>ok</info>');
+ $this->confirmSave($input, $output, $inputLoopback);
+
+ return 0;
+ }
+
+ try {
+ $currentLoopback = $this->forkManager->getLoopbackInstance();
+ $output->writeln('Current loopback instance: <info>' . $currentLoopback . '</info>');
+ $output->write('Testing async process using loopback endpoint... ');
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($currentLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+ } else {
+ $output->writeln('<info>ok</info>');
+ }
+
+ return 0;
+ } catch (LoopbackEndpointException $e) {
+ if ($input->getOption('exception-if-no-config')) {
+ throw $e;
+ }
+ }
+
+ $output->writeln('<info>Notes:</info>');
+ $output->writeln('no loopback instance currently set.');
+ $output->writeln('use --loopback <address> to manually configure a loopback instance');
+ $output->writeln('or use --discover for an automated process');
+
+ return 0;
+ }
+
+ private function confirmSave(InputInterface $input, OutputInterface $output, string $instance): void {
+ if (!$input->getOption('yes')) {
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you want to save loopback address \'' . $instance . '\' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return;
+ }
+ }
+
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $instance);
+ }
+
+ /**
+ * confirm format of typed address
+ */
+ private function parseAddress(string $test): void {
+ $scheme = parse_url($test, PHP_URL_SCHEME);
+ $cloudId = parse_url($test, PHP_URL_HOST);
+ if (is_bool($scheme) || is_bool($cloudId) || is_null($scheme) || is_null($cloudId)) {
+ throw new LoopbackEndpointException('format must be http[s]://domain.name[:post][/path]');
+ }
+ }
+
+ /**
+ * create a list of fake/mockup process
+ *
+ * @param int $session number of session to generate
+ * @param int $processes number of process per session to generate
+ */
+ private function createRandomProcess(OutputInterface $output, int $session, int $processes): void {
+ $session = ($session > 0) ? $session : 1;
+ $processes = ($processes > 0) ? $processes : 1;
+
+ for ($i = 0; $i < $session; $i++) {
+ for ($j = 0; $j < $processes; $j++) {
+ $process = $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $data, int $j): array {
+ if ($j > 0) {
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'result from first process of the session: '
+ . $wrapper->getSessionInterface()
+ ->byId('mock_process_0')
+ ?->getResult()['result']
+ );
+ }
+ sleep(random_int(2, 8));
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'mocked process is now over with data=' . $data
+ );
+
+ return ['result' => $data];
+ },
+ random_int(1, 5000),
+ $j
+ )->id('mock_process_' . $j)->blocker();
+ $output->writeln(' > creating process <info>' . $process->getToken() . '</info>');
+ }
+
+ $token = $this->asyncProcess->async(ProcessExecutionTime::LATER);
+
+ $output->writeln('- session <info>' . $token . '</info>');
+ $output->writeln('');
+ }
+ }
+
+ private function createFaultyProcessStaticRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+ private function createFaultyProcessStaticBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should NOT run');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+
+ private function createFaultyProcessAutoRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessAutoBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker()->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+}
diff --git a/core/Controller/AsyncProcessController.php b/core/Controller/AsyncProcessController.php
new file mode 100644
index 00000000000..f644e3a7851
--- /dev/null
+++ b/core/Controller/AsyncProcessController.php
@@ -0,0 +1,77 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\Controller;
+
+use OC\Async\Exceptions\BlockAlreadyRunningException;
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\AppFramework\Controller;
+use OCP\AppFramework\Http\Attribute\FrontpageRoute;
+use OCP\AppFramework\Http\Attribute\NoAdminRequired;
+use OCP\AppFramework\Http\Attribute\NoCSRFRequired;
+use OCP\AppFramework\Http\Attribute\PublicPage;
+use OCP\AppFramework\Http\DataResponse;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\IAppConfig;
+use OCP\IRequest;
+
+class AsyncProcessController extends Controller {
+ public function __construct(
+ string $appName,
+ IRequest $request,
+ private IAppConfig $appConfig,
+ private ForkManager $forkManager,
+ ) {
+ parent::__construct($appName, $request);
+ }
+
+ #[NoAdminRequired]
+ #[NoCSRFRequired]
+ #[PublicPage]
+ #[FrontpageRoute(verb: 'POST', url: '/core/asyncProcessFork')]
+ public function processFork(string $token): DataResponse {
+ $metadata = ['executionTime' => ProcessExecutionTime::NOW];
+
+ // remote might only need a confirmation that this is a valid loopback endpoint
+ if ($token === '__ping__') {
+ return new DataResponse(['ping' => $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING)]);
+ }
+
+ $this->async($token);
+
+ try {
+ $this->forkManager->runSession($token, $metadata);
+ } catch (BlockAlreadyRunningException) {
+ // TODO: debug() ?
+ }
+
+ exit();
+ }
+
+ private function async(string $result = ''): void {
+ if (ob_get_contents() !== false) {
+ ob_end_clean();
+ }
+
+ header('Connection: close');
+ header('Content-Encoding: none');
+ ignore_user_abort();
+ $timeLimit = 100;
+ set_time_limit(max($timeLimit, 0));
+ ob_start();
+
+ echo($result);
+
+ $size = ob_get_length();
+ header('Content-Length: ' . $size);
+ ob_end_flush();
+ flush();
+ }
+
+}
diff --git a/core/Migrations/Version32000Date20250307222601.php b/core/Migrations/Version32000Date20250307222601.php
new file mode 100644
index 00000000000..179645e0d2c
--- /dev/null
+++ b/core/Migrations/Version32000Date20250307222601.php
@@ -0,0 +1,64 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Migrations;
+
+use Closure;
+use OCP\DB\ISchemaWrapper;
+use OCP\DB\Types;
+use OCP\Migration\Attributes\AddColumn;
+use OCP\Migration\Attributes\AddIndex;
+use OCP\Migration\Attributes\ColumnType;
+use OCP\Migration\Attributes\CreateTable;
+use OCP\Migration\Attributes\DropIndex;
+use OCP\Migration\Attributes\IndexType;
+use OCP\Migration\IOutput;
+use OCP\Migration\SimpleMigrationStep;
+
+/**
+ * Create new column and index for lazy loading in preferences for the new IUserPreferences API.
+ */
+#[CreateTable(table: 'async_processes', columns: ['id', 'token', 'type', 'code', 'params', 'orig', 'result', 'status'], description: 'async task and status')]
+class Version32000Date20250307222601 extends SimpleMigrationStep {
+ public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
+ /** @var ISchemaWrapper $schema */
+ $schema = $schemaClosure();
+
+ if ($schema->hasTable('async_process')) {
+ return null;
+ }
+
+ $table = $schema->createTable('async_process');
+ $table->addColumn('id', Types::BIGINT, [ 'notnull' => true, 'length' => 64, 'autoincrement' => true, 'unsigned' => true]);
+ $table->addColumn('token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]);
+ $table->addColumn('session_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 15]);
+ $table->addColumn('type', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('code', Types::TEXT, ['notnull' => true, 'default' => '']);
+ $table->addColumn('params', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('dataset', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('metadata', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('links', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('orig', Types::TEXT, ['notnull' => true, 'default' => '[]']);
+ $table->addColumn('result', Types::TEXT, ['notnull' => true, 'default' => '']);
+ $table->addColumn('status', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('execution_time', Types::SMALLINT, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('lock_token', Types::STRING, ['notnull' => true, 'default' => '', 'length' => 7]);
+ $table->addColumn('creation', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('last_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+ $table->addColumn('next_run', Types::INTEGER, ['notnull' => true, 'default' => 0, 'unsigned' => true]);
+
+ $table->setPrimaryKey(['id'], 'asy_prc_id');
+
+ $table->addIndex(['token'], 'asy_prc_tkn');
+ $table->addIndex(['status'], 'asy_prc_sts');
+ $table->addIndex(['next_run'], 'asy_prc_nxt');
+ $table->addIndex(['status', 'id', 'next_run'], 'asy_prc_sin');
+
+ return $schema;
+ }
+}
diff --git a/core/register_command.php b/core/register_command.php
index 62305d75a30..e0c201920a9 100644
--- a/core/register_command.php
+++ b/core/register_command.php
@@ -152,6 +152,10 @@ if ($config->getSystemValueBool('installed', false)) {
$application->add(Server::get(Command\TaskProcessing\Statistics::class));
$application->add(Server::get(Command\Memcache\RedisCommand::class));
+
+ $application->add(Server::get(Command\Async\Live::class));
+ $application->add(Server::get(Command\Async\Manage::class));
+ $application->add(Server::get(Command\Async\Setup::class));
} else {
$application->add(Server::get(Command\Maintenance\Install::class));
}
diff --git a/lib/base.php b/lib/base.php
index aa463e206a3..7f880db6df8 100644
--- a/lib/base.php
+++ b/lib/base.php
@@ -582,6 +582,7 @@ class OC {
self::setRequiredIniValues();
self::handleAuthHeaders();
+
// prevent any XML processing from loading external entities
libxml_set_external_entity_loader(static function () {
return null;
diff --git a/lib/private/Async/ABlockWrapper.php b/lib/private/Async/ABlockWrapper.php
new file mode 100644
index 00000000000..a5dd1c3b629
--- /dev/null
+++ b/lib/private/Async/ABlockWrapper.php
@@ -0,0 +1,40 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OC\Async\Model\Block;
+use OC\Async\Model\SessionInterface;
+use OCP\Async\Enum\BlockActivity;
+
+abstract class ABlockWrapper {
+ abstract public function session(array $metadata): void;
+ abstract public function init(): void;
+ abstract public function activity(BlockActivity $activity, string $line = ''): void;
+ abstract public function end(string $line = ''): void;
+
+ protected Block $block;
+ private SessionInterface $sessionInterface;
+
+ public function setBlock(Block $block): void {
+ $this->block = $block;
+ }
+
+ public function getSessionInterface(): SessionInterface {
+ return $this->sessionInterface;
+ }
+
+ public function setSessionInterface(SessionInterface $iface): void {
+ $this->sessionInterface = $iface;
+ }
+
+ public function getReplayCount(): int {
+ return $this->block->getReplayCount();
+ }
+}
diff --git a/lib/private/Async/AsyncManager.php b/lib/private/Async/AsyncManager.php
new file mode 100644
index 00000000000..955037f356d
--- /dev/null
+++ b/lib/private/Async/AsyncManager.php
@@ -0,0 +1,222 @@
+<?php
+
+namespace OC\Async;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\Enum\BlockType;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use OCP\IAppConfig;
+use ReflectionFunctionAbstract;
+
+class AsyncManager {
+ private ?string $sessionToken = null;
+ /** @var BlockInterface[] */
+ private array $interfaces = [];
+ /** @var array<string, ReflectionFunctionAbstract> } */
+ private array $blocksReflexion = [];
+
+ public function __construct(
+ private IAppConfig $appConfig,
+ private BlockMapper $blockMapper,
+ private ForkManager $forkManager,
+ ) {
+ }
+
+ /**
+ * @param BlockType $type
+ * @param string $serializedBlock
+ * @param ReflectionFunctionAbstract $reflection
+ * @param array $params
+ * @param array $allowedClasses
+ *
+ * @return IBlockInterface
+ * @see \OCP\Async\IAsyncProcess
+ * @internal prefer using public interface {@see \OCP\Async\IAsyncProcess}
+ */
+ public function asyncBlock(
+ BlockType $type,
+ string $serializedBlock, // serialize() on the code - className if type is ::CLASS
+ \ReflectionFunctionAbstract $reflection, // reflection about the method/function called as unserialization
+ array $params, // parameters to use at unserialization
+ array $allowedClasses = [] // list of class included in the serialization of the 'block'
+ ): IBlockInterface {
+ $this->sessionToken ??= $this->generateToken();
+
+ $data = $this->serializeReflectionParams($reflection, $params);
+ $data['blockClasses'] = $allowedClasses;
+ if ($type === BlockType::CLASSNAME) {
+ $data['className'] = $serializedBlock;
+ }
+
+ $token = $this->generateToken();
+ $block = new Block();
+ $block->setToken($token);
+ $block->setSessionToken($this->sessionToken);
+ $block->setBlockType($type);
+ $block->setBlockStatus(BlockStatus::PREP);
+ $block->setCode($serializedBlock);
+ $block->setParams($data);
+ $block->setOrig([]);
+ $block->setCreation(time());
+
+
+ // needed !?
+// $process->addMetadataEntry('_links', $this->interfaces);
+
+// $this->processMapper->insert($process);
+
+ $iface = new BlockInterface($this, $block);
+ $this->interfaces[] = $iface;
+ $this->blocksReflexion[$token] = $reflection;
+
+ return $iface;
+ }
+
+// public function updateDataset(ProcessInfo $processInfo) {
+// $token = $processInfo->getToken();
+// $dataset = [];
+// foreach($processInfo->getDataset() as $params) {
+// $dataset[] = $this->serializeReflectionParams($this->processesReflexion[$token], $params);
+// }
+// $this->processMapper->updateDataset($token, $dataset);
+// }
+
+ private function endSession(ProcessExecutionTime $time): ?string {
+ if ($this->sessionToken === null) {
+ return null;
+ }
+
+ $current = $this->sessionToken;
+ foreach ($this->interfaces as $iface) {
+ $process = $iface->getBlock();
+ $process->addMetadataEntry('_iface', $iface->jsonSerialize());
+ $process->setProcessExecutionTime($time);
+
+ $dataset = [];
+ foreach ($iface->getDataset() as $params) {
+ $dataset[] = $this->serializeReflectionParams(
+ $this->blocksReflexion[$process->getToken()], $params
+ );
+ }
+ $process->setDataset($dataset);
+
+ $this->blockMapper->insert($process);
+ }
+
+ $this->blockMapper->updateSessionStatus(
+ $this->sessionToken, BlockStatus::STANDBY, BlockStatus::PREP
+ );
+ $this->processes = $this->interfaces = [];
+ $this->sessionToken = null;
+
+ return $current;
+ }
+
+ /**
+ * @internal
+ */
+ public function async(ProcessExecutionTime $time): string {
+ $current = $this->endSession($time);
+ if ($current === null) {
+ return '';
+ }
+
+ if ($time === ProcessExecutionTime::NOW) {
+ $this->forkManager->forkSession($current);
+ }
+
+ return $current;
+ }
+
+ private function serializeReflectionParams(ReflectionFunctionAbstract $reflection, array $params): array {
+ $i = 0;
+ $processWrapper = false;
+ $filteredParams = [];
+
+ foreach ($reflection->getParameters() as $arg) {
+ $argType = $arg->getType();
+ $param = $params[$i];
+ if ($argType !== null) {
+ if ($i === 0 && $argType->getName() === ABlockWrapper::class) {
+ $processWrapper = true;
+ continue; // we ignore IProcessWrapper as first argument, as it will be filled at execution time
+ }
+
+ // TODO: compare $argType with $param
+// foreach($argType->getTypes() as $t) {
+// echo '> ' . $t . "\n";
+// }
+// $arg->allowsNull()
+// $arg->isOptional()
+
+ }
+
+ // TODO: we might want to filter some params ?
+ $filteredParams[] = $param;
+
+// echo '..1. ' . json_encode($param) . "\n";
+// echo '..2. ' . serialize($param) . "\n";
+// echo '? ' . gettype($param) . "\n";
+
+ $i++;
+ }
+
+ try {
+ $serializedParams = serialize($params);
+ } catch (\Exception $e) {
+ throw $e;
+ }
+
+ return [
+ 'params' => $serializedParams,
+ 'paramsClasses' => $this->extractClassFromArray($filteredParams),
+ 'processWrapper' => $processWrapper,
+ ];
+ }
+
+//
+// public function unserializeParams(array $data): array {
+// return $data;
+// }
+//
+
+ private function extractClassFromArray(array $arr, array &$classes = []): array {
+ foreach ($arr as $entry) {
+ if (is_array($entry)) {
+ $this->extractClassFromArray($entry, $classes);
+ }
+
+ if (is_object($entry)) {
+ $class = get_class($entry);
+ if (!in_array($class, $classes, true)) {
+ $classes[] = $class;
+ }
+ }
+ }
+
+ return $classes;
+ }
+
+ private function generateToken(int $length = 15): string {
+ $result = '';
+ for ($i = 0; $i < $length; $i++) {
+ $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)];
+ }
+
+ return $result;
+ }
+
+ public function dropAllBlocks(): void {
+ $this->blockMapper->deleteAll();
+ }
+
+ public function resetConfig(): void {
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS);
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING);
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST);
+ }
+}
diff --git a/lib/private/Async/AsyncProcess.php b/lib/private/Async/AsyncProcess.php
new file mode 100644
index 00000000000..44d990749d7
--- /dev/null
+++ b/lib/private/Async/AsyncProcess.php
@@ -0,0 +1,64 @@
+<?php
+
+namespace OC\Async;
+
+use Laravel\SerializableClosure\SerializableClosure;
+use Laravel\SerializableClosure\Serializers\Native;
+use OC\Async\Enum\BlockType;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\IAsyncProcess;
+use ReflectionFunction;
+use ReflectionMethod;
+
+class AsyncProcess implements IAsyncProcess {
+ public function __construct(
+ private AsyncManager $asyncManager,
+ private readonly ForkManager $forkManager,
+ ) {
+ }
+
+ public function exec(\Closure $closure, ...$params): IBlockInterface {
+ return $this->asyncManager->asyncBlock(
+ BlockType::CLOSURE,
+ serialize(new SerializableClosure($closure)),
+ new ReflectionFunction($closure),
+ $params,
+ [SerializableClosure::class, Native::class]
+ );
+ }
+
+ public function invoke(callable $obj, ...$params): IBlockInterface {
+ return $this->asyncManager->asyncBlock(
+ BlockType::INVOKABLE,
+ serialize($obj),
+ new ReflectionMethod($obj, '__invoke'),
+ $params,
+ [$obj::class]
+ );
+ }
+
+ public function call(string $class, ...$params): IBlockInterface {
+ // abstract ?
+ if (!method_exists($class, 'async')) {
+ throw new \Exception('class ' . $class . ' is missing async() method');
+ }
+
+ return $this->asyncManager->asyncBlock(
+ BlockType::CLASSNAME,
+ $class,
+ new ReflectionMethod($class, 'async'),
+ $params,
+ );
+ }
+
+ /**
+ * close the creation of the session and start async as soon as possible
+ *
+ * @param ProcessExecutionTime $time preferred urgency to start the async process
+ *
+ * @return string session token, empty if no opened session
+ */
+ public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string {
+ return $this->asyncManager->async($time);
+ }
+}
diff --git a/lib/private/Async/Db/BlockMapper.php b/lib/private/Async/Db/BlockMapper.php
new file mode 100644
index 00000000000..6d6409327a4
--- /dev/null
+++ b/lib/private/Async/Db/BlockMapper.php
@@ -0,0 +1,260 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Db;
+
+use OC\Async\Exceptions\BlockNotFoundException;
+use OC\Async\Model\Block;
+use OCP\AppFramework\Db\DoesNotExistException;
+use OCP\AppFramework\Db\QBMapper;
+use OCP\Async\Enum\BlockStatus;
+use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\IDBConnection;
+use Psr\Log\LoggerInterface;
+
+/**
+ * @template-extends QBMapper<Block>
+ */
+class BlockMapper extends QBMapper {
+ public const TABLE = 'async_process';
+
+ public function __construct(
+ IDBConnection $db,
+ private LoggerInterface $logger,
+ ) {
+ parent::__construct($db, self::TABLE, Block::class);
+ }
+
+ /**
+ * @return Block[]
+ */
+ public function getBySession(string $sessionToken): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->select('*')
+ ->from($this->getTableName())
+ ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)))
+ ->orderBy('id', 'asc');
+
+ return $this->findEntities($qb);
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getSessions(): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('session_token')
+ ->from($this->getTableName());
+
+ $result = $qb->executeQuery();
+
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ return $sessions;
+ }
+
+
+ /**
+ * returns list of sessionId that contains process with:
+ * - at least one is in STANDBY with an older timestamp in next_run
+ * - none as RUNNING or BLOCKER
+ *
+ * @return string[]
+ */
+ public function getSessionOnStandBy(): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('t1.session_token')
+ ->from($this->getTableName(), 't1')
+ ->leftJoin(
+ 't1', $this->getTableName(), 't2',
+ $qb->expr()->andX(
+ $qb->expr()->eq('t1.session_token', 't2.session_token'),
+ $qb->expr()->in('t2.status', $qb->createNamedParameter([BlockStatus::PREP->value, BlockStatus::RUNNING->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY))
+ )
+ )
+ ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->isNull('t2.status'));
+
+ $result = $qb->executeQuery();
+
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ return $sessions;
+ }
+
+ /**
+ * reset to STANDBY all process:
+ * - marked as ERROR or BLOCKER,
+ * - next_run in an older timestamp
+ * - next_run not at zero
+ */
+ public function resetFailedBlock(): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT))
+ ->where($qb->expr()->in('status', $qb->createNamedParameter([BlockStatus::ERROR->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY)))
+ ->andWhere($qb->expr()->lt('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->neq('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)));
+ return $qb->executeStatement();
+ }
+
+ /**
+ * delete sessions that contain only process with status at ::SUCCESS
+ */
+ public function removeSuccessfulBlock(): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('t1.session_token')
+ ->from($this->getTableName(), 't1')
+ ->leftJoin(
+ 't1', $this->getTableName(), 't2',
+ $qb->expr()->andX(
+ $qb->expr()->eq('t1.session_token', 't2.session_token'),
+ $qb->expr()->neq('t2.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT))
+ )
+ )
+ ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->isNull('t2.status'));
+
+ $result = $qb->executeQuery();
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ $count = 0;
+ $chunks = array_chunk($sessions, 30);
+ foreach($chunks as $chunk) {
+ $delete = $this->db->getQueryBuilder();
+ $delete->delete($this->getTableName())
+ ->where($qb->expr()->in('session_token', $delete->createNamedParameter($chunk, IQueryBuilder::PARAM_INT_ARRAY)));
+ $count += $delete->executeStatement();
+ unset($delete);
+ }
+
+ return $count;
+ }
+
+ /**
+ * return a Block from its token
+ *
+ * @throws BlockNotFoundException
+ */
+ public function getByToken(string $token): Block {
+ $qb = $this->db->getQueryBuilder();
+ $qb->select('*')
+ ->from($this->getTableName())
+ ->where($qb->expr()->eq('token', $qb->createNamedParameter($token)));
+
+ try {
+ return $this->findEntity($qb);
+ } catch (DoesNotExistException) {
+ throw new BlockNotFoundException('no process found');
+ }
+ }
+
+ public function updateStatus(Block $block, ?BlockStatus $prevStatus = null, string $lockToken = ''): bool {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter($block->getStatus()))
+ ->where($qb->expr()->andX(
+ $qb->expr()->eq('session_token', $qb->createNamedParameter($block->getSessionToken())),
+ $qb->expr()->eq('token', $qb->createNamedParameter($block->getToken()))
+ )
+ );
+
+ // if process contain a lockToken, we conflict with the stored one
+ if ($block->getLockToken() !== '') {
+ $qb->andWhere($qb->expr()->eq('lock_token', $qb->createNamedParameter($block->getLockToken())));
+ }
+
+ // if status is switching from standby to running, we set last_run to detect timeout
+ if ($block->getBlockStatus() === BlockStatus::RUNNING && $prevStatus === BlockStatus::STANDBY) {
+ $qb->set('last_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT));
+ }
+
+ // if status is updated to success, error or blocker, we store result (or exception)
+ if (in_array($block->getBlockStatus(), [BlockStatus::SUCCESS, BlockStatus::ERROR, BlockStatus::BLOCKER], true)) {
+ try {
+ $qb->set('result', $qb->createNamedParameter(json_encode($block->getResult(), JSON_THROW_ON_ERROR)));
+ $qb->set('metadata', $qb->createNamedParameter(json_encode($block->getMetadata(), JSON_THROW_ON_ERROR)));
+ $qb->set('next_run', $qb->createNamedParameter($block->getNextRun(), IQueryBuilder::PARAM_INT));
+ } catch (\JsonException $e) {
+ $this->logger->warning('could not json_encode process result', ['exception' => $e]);
+ }
+ }
+
+ if ($lockToken !== '') {
+ $qb->set('lock_token', $qb->createNamedParameter($lockToken));
+ }
+
+ if ($prevStatus !== null) {
+ $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT)));
+ }
+
+ return ($qb->executeStatement() === 1);
+ }
+
+ public function updateSessionStatus(string $sessionToken, BlockStatus $status, ?BlockStatus $prevStatus = null): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter($status->value, IQueryBuilder::PARAM_INT))
+ ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)));
+
+ if ($prevStatus !== null) {
+ $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT)));
+ }
+
+ return $qb->executeStatement();
+ }
+
+
+
+ /**
+ * set next_run to current time if next_run>0 in relation to session_token.
+ * Force process queued for replay to be run as soon as possible.
+ */
+ public function resetSessionNextRun(string $sessionToken): void {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))
+ ->where(
+ $qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)),
+ $qb->expr()->gt('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)),
+ );
+
+ $qb->executeStatement();
+ }
+
+
+// public function updateDataset(string $token, array $dataset): void {
+// $qb = $this->db->getQueryBuilder();
+// $qb->update($this->getTableName())
+// ->set('dataset', $qb->createNamedParameter(json_encode($dataset)))
+// ->where($qb->expr()->eq('token', $qb->createNamedParameter($token)));
+//
+// $qb->executeStatement();
+// }
+
+ public function deleteAll() {
+ $qb = $this->db->getQueryBuilder();
+ $qb->delete($this->getTableName());
+ $qb->executeStatement();
+ }
+
+}
diff --git a/lib/private/Async/Enum/BlockType.php b/lib/private/Async/Enum/BlockType.php
new file mode 100644
index 00000000000..839d0d24f05
--- /dev/null
+++ b/lib/private/Async/Enum/BlockType.php
@@ -0,0 +1,16 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Enum;
+
+enum BlockType: int {
+ case CLOSURE = 1;
+ case INVOKABLE = 2;
+ case CLASSNAME = 3;
+}
diff --git a/lib/private/Async/Exceptions/AsyncProcessException.php b/lib/private/Async/Exceptions/AsyncProcessException.php
new file mode 100644
index 00000000000..b394a95df7c
--- /dev/null
+++ b/lib/private/Async/Exceptions/AsyncProcessException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class AsyncProcessException extends Exception {
+}
diff --git a/lib/private/Async/Exceptions/BlockAlreadyRunningException.php b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php
new file mode 100644
index 00000000000..4c671d91d55
--- /dev/null
+++ b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php
@@ -0,0 +1,12 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+class BlockAlreadyRunningException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/BlockNotFoundException.php b/lib/private/Async/Exceptions/BlockNotFoundException.php
new file mode 100644
index 00000000000..6e25415f7e3
--- /dev/null
+++ b/lib/private/Async/Exceptions/BlockNotFoundException.php
@@ -0,0 +1,12 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+class BlockNotFoundException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/LoopbackEndpointException.php b/lib/private/Async/Exceptions/LoopbackEndpointException.php
new file mode 100644
index 00000000000..2a7bea78239
--- /dev/null
+++ b/lib/private/Async/Exceptions/LoopbackEndpointException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class LoopbackEndpointException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/SessionBlockedException.php b/lib/private/Async/Exceptions/SessionBlockedException.php
new file mode 100644
index 00000000000..9c6d85dc4fc
--- /dev/null
+++ b/lib/private/Async/Exceptions/SessionBlockedException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class SessionBlockedException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/ForkManager.php b/lib/private/Async/ForkManager.php
new file mode 100644
index 00000000000..6978a601bd4
--- /dev/null
+++ b/lib/private/Async/ForkManager.php
@@ -0,0 +1,485 @@
+<?php
+
+namespace OC\Async;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\Enum\BlockType;
+use OC\Async\Exceptions\AsyncProcessException;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\Exceptions\BlockAlreadyRunningException;
+use OC\Async\Exceptions\SessionBlockedException;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OC\Async\Wrappers\DummyBlockWrapper;
+use OC\Async\Wrappers\LoggerBlockWrapper;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OC\DB\Connection;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use OCP\Async\IAsyncProcess;
+use OCP\Http\Client\IClientService;
+use OCP\IAppConfig;
+use OCP\IConfig;
+use OCP\IURLGenerator;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class ForkManager {
+ private ?ABlockWrapper $wrapper;
+
+ /** @var int[] */
+ private array $forks = [];
+ private const FORK_LIMIT = 3; // maximum number of child process
+ private const FORK_SLEEP = 500000; // wait for msec when too many fork have been created
+
+ public function __construct(
+ private BlockMapper $blockMapper,
+ private Connection $conn,
+ private IAppConfig $appConfig,
+ private IConfig $config,
+ private IClientService $clientService,
+ private IURLGenerator $urlGenerator,
+ LoggerBlockWrapper $loggerProcessWrapper,
+ private LoggerInterface $logger,
+ ) {
+ $this->wrapper = $loggerProcessWrapper;
+ }
+
+ public function setWrapper(?ABlockWrapper $wrapper): void {
+ $this->wrapper = $wrapper;
+ }
+
+
+ /**
+ * @throws BlockAlreadyRunningException
+ * @throws AsyncProcessException
+ */
+ public function runSession(string $token, array $metadata = []): void {
+ $sessionBlocks = $this->blockMapper->getBySession($token);
+ $metadata['sessionToken'] = $token;
+
+ if ($this->wrapper !== null) {
+ $wrapper = clone $this->wrapper;
+ } else {
+ $wrapper = null;
+ }
+
+ // might be need to avoid some conflict/race condition
+ // usleep(10000);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlocks));
+
+ if ($sessionIface->getGlobalStatus() !== BlockStatus::STANDBY) {
+ throw new AsyncProcessException();
+ }
+
+ $wrapper?->setSessionInterface($sessionIface);
+ $wrapper?->session($metadata);
+
+ try {
+ foreach ($sessionBlocks as $block) {
+ if (!$this->confirmBlockRequirement($wrapper, $sessionIface, $block)) {
+ $block->replay();
+ $this->blockMapper->update($block);
+ continue;
+ }
+
+ $block->addMetadata($metadata);
+ $this->runBlock($block, $wrapper);
+
+ if ($block->getBlockStatus() === BlockStatus::BLOCKER) {
+ $wrapper?->end('Fail process ' . $block->getToken() . ' block the rest of the session');
+ throw new SessionBlockedException();
+ }
+ }
+ $wrapper?->end();
+ } catch (BlockAlreadyRunningException) {
+ $wrapper?->end('already running');
+ throw new BlockAlreadyRunningException();
+ }
+ }
+
+ private function confirmBlockRequirement(
+ ?ABlockWrapper $wrapper,
+ SessionInterface $sessionIface,
+ Block $block
+ ): bool {
+ $procIface = new BlockInterface(null, $block);
+ foreach ($procIface->getRequire() as $requiredProcessId) {
+ $requiredBlock = $sessionIface->byId($requiredProcessId);
+ if ($requiredBlock === null) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' which is not defined');
+ return false;
+ }
+ if ($requiredBlock?->getStatus() !== BlockStatus::SUCCESS) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' to be executed and successful');
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function runBlock(Block $block, ?ABlockWrapper $wrapper = null): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ return;
+ }
+
+ $this->lockBlock($block);
+
+ $data = $block->getParams();
+ $serialized = $block->getCode();
+ $params = unserialize($data['params'], ['allowed_classes' => $data['paramsClasses']]);
+ $obj = unserialize($serialized, ['allowed_classes' => $data['blockClasses'] ?? []]);
+
+ if ($data['processWrapper'] ?? false) {
+ array_unshift($params, ($wrapper ?? new DummyBlockWrapper()));
+ }
+
+ $wrapper?->setBlock($block);
+ $wrapper?->init();
+ $wrapper?->activity(BlockActivity::STARTING);
+ $result = [
+ 'executionTime' => ProcessExecutionTime::NOW->value,
+ 'startTime' => time(),
+ ];
+ $iface = new BlockInterface(null, $block);
+ try {
+ $returnedData = null;
+ switch ($block->getBlockType()) {
+ case BlockType::CLOSURE:
+ $c = $obj->getClosure();
+ $returnedData = $c(...$params);
+ break;
+
+ case BlockType::INVOKABLE:
+ $returnedData = $obj(...$params);
+ break;
+
+ case BlockType::CLASSNAME:
+ $obj = new $data['className']();
+ $returnedData = $obj->async(...$params);
+ break;
+ }
+ if (is_array($returnedData)) {
+ $result['result'] = $returnedData;
+ }
+ $block->setBlockStatus(BlockStatus::SUCCESS);
+ if ($block->getReplayCount() > 0) {
+ // in case of success after multiple tentative, we reset next run to right now
+ // on all block waiting for replay. Easiest solution to find block dependant of
+ // this current successful run
+ $this->blockMapper->resetSessionNextRun($block->getSessionToken());
+ }
+ } catch (\Exception $e) {
+ $wrapper?->activity(BlockActivity::ERROR, $e->getMessage());
+ $result['error'] = [
+ 'exception' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ 'code' => $e->getCode()
+ ];
+
+ if ($iface->isReplayable()) {
+ $block->replay(); // we mark the block as able to be back to STANDBY status
+ } else {
+ $block->setNextRun(0);
+ }
+ if ($iface->isBlocker()) {
+ $block->setBlockStatus(BlockStatus::BLOCKER);
+ } else {
+ $block->setBlockStatus(BlockStatus::ERROR);
+ }
+ } finally {
+ $result['endTime'] = time();
+ }
+
+ $block->setResult($result);
+ $wrapper?->activity(BlockActivity::ENDING);
+ $this->blockMapper->updateStatus($block, BlockStatus::RUNNING);
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function lockBlock(Block $block): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ throw new BlockAlreadyRunningException('block not in standby');
+ }
+ $lockToken = $this->generateToken(7);
+ $block->setBlockStatus(BlockStatus::RUNNING);
+ if (!$this->blockMapper->updateStatus($block, BlockStatus::STANDBY, $lockToken)) {
+ throw new BlockAlreadyRunningException('block is locked');
+ }
+ $block->setLockToken($lockToken);
+ }
+
+ public function forkSession(string $session, array $metadata = []): void {
+ if (\OC::$CLI) {
+ $useWebIfNeeded = $metadata['useWebIfNeeded'] ?? false;
+ if ($useWebIfNeeded && !extension_loaded('posix')) {
+ try {
+ $this->forkSessionLoopback($session);
+ return;
+ } catch (\Exception) {
+ }
+ }
+
+ $this->forkSessionCli($session, $metadata);
+ return;
+ }
+
+ try {
+ $this->forkSessionLoopback($session);
+ } catch (LoopbackEndpointException) {
+ // session will be processed later
+ }
+ }
+
+
+ private function forkSessionCli(string $session, array $metadata = []): void {
+ if (!extension_loaded('posix')) {
+ // log/notice that posix is not loaded
+ return;
+ }
+ $slot = $this->getFirstAvailableSlot();
+ $metadata += [
+ '_cli' => [
+ 'slot' => $slot,
+ 'forkCount' => count($this->forks),
+ 'forkLimit' => self::FORK_LIMIT,
+ ]
+ ];
+
+ $pid = pcntl_fork();
+
+ // work around as the parent database connection is inherited by the child.
+ // when child process is over, parent process database connection will drop.
+ // The drop can happen anytime, even in the middle of a running request.
+ // work around is to close the connection as soon as possible after forking.
+ $this->conn->close();
+
+ if ($pid === -1) {
+ // TODO: manage issue while forking
+ } else if ($pid === 0) {
+ // forked process
+ try {
+ $this->runSession($session, $metadata);
+ } catch (AsyncProcessException) {
+ // failure to run session can be part of the process
+ }
+ exit();
+ } else {
+ // store slot+pid
+ $this->forks[$slot] = $pid;
+
+ // when fork limit is reach, cycle until space freed
+ while (true) {
+ $exitedPid = pcntl_waitpid(0, $status, WNOHANG);
+ if ($exitedPid > 0) {
+ $slot = array_search($exitedPid, $this->forks, true);
+ if ($slot) {
+ unset($this->forks[$slot]);
+ }
+ }
+ if (count($this->forks) < self::FORK_LIMIT) {
+ return;
+ }
+ usleep(self::FORK_SLEEP);
+ }
+ }
+ }
+
+ /**
+ * Request local loopback endpoint.
+ * We expect the request to be closed remotely.
+ *
+ * Ignored if:
+ * - the endpoint is not fully configured and tested,
+ * - the server is on heavy load (timeout at 1 second)
+ *
+ * @return string result from the loopback endpoint
+ * @throws LoopbackEndpointException if not configured
+ */
+ private function forkSessionLoopback(string $session, ?string $loopbackEndpoint = null): string {
+ $client = $this->clientService->newClient();
+ try {
+ $response = $client->post(
+ $loopbackEndpoint ?? $this->linkToLoopbackEndpoint(),
+ [
+ 'headers' => [],
+ 'verify' => false,
+ 'connect_timeout' => 1.0,
+ 'timeout' => 1.0,
+ 'http_errors' => true,
+ 'body' => ['token' => $session],
+ 'nextcloud' => [
+ 'allow_local_address' => true,
+ 'allow_redirects' => true,
+ ]
+ ]
+ );
+
+ return (string)$response->getBody();
+ } catch (LoopbackEndpointException $e) {
+ $this->logger->debug('loopback endpoint not configured', ['exception' => $e]);
+ throw $e;
+ } catch (\Exception $e) {
+ $this->logger->warning('could not reach loopback endpoint to initiate fork', ['exception' => $e]);
+ throw new LoopbackEndpointException('loopback endpoint cannot be reach', previous: $e);
+ }
+ }
+
+
+ /**
+ * return full (absolute) link to the web-loopback endpoint
+ *
+ * @param string|null $instance if null, stored loopback address will be used.
+ *
+ * @throws LoopbackEndpointException if $instance is null and no stored configuration
+ */
+ public function linkToLoopbackEndpoint(?string $instance = null): string {
+ return rtrim($instance ?? $this->getLoopbackInstance(), '/') . $this->urlGenerator->linkToRoute('core.AsyncProcess.processFork');
+ }
+
+ /**
+ * return loopback address stored in configuration
+ *
+ * @return string
+ * @throws LoopbackEndpointException if config is not set or empty
+ */
+ public function getLoopbackInstance(): string {
+ if (!$this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) {
+ throw new LoopbackEndpointException('loopback not configured');
+ }
+
+ $instance = $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS);
+ if ($instance === '') {
+ throw new LoopbackEndpointException('empty config');
+ }
+
+ return $instance;
+ }
+
+
+ public function discoverLoopbackEndpoint(?OutputInterface $output = null): ?string {
+ $cliUrl = $this->config->getSystemValueString('overwrite.cli.url', '');
+ $output?->write('- testing value from \'overwrite.cli.url\' (<comment>' . $cliUrl . '</comment>)... ');
+
+ $reason = '';
+ if ($this->testLoopbackInstance($cliUrl, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $cliUrl;
+ }
+
+ $output?->writeln('<error>' . $reason . '</error>');
+
+ foreach($this->config->getSystemValue('trusted_domains', []) as $url) {
+ $url = 'https://' . $url;
+ $output?->write('- testing entry from \'trusted_domains\' (<comment>' . $url . '</comment>)... ');
+ if ($this->testLoopbackInstance($url, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $url;
+ }
+ $output?->writeln('<error>' . $reason . '</error>');
+ }
+
+ return null;
+ }
+
+
+ public function testLoopbackInstance(string $url, string &$reason = ''): bool {
+ $url = rtrim($url, '/');
+ if (!$this->pingLoopbackInstance($url)) {
+ $reason = 'failed ping';
+ return false;
+ }
+
+ $token = $this->generateToken();
+ $asyncProcess = \OCP\Server::get(IAsyncProcess::class);
+ $asyncProcess->exec(function(string $token) {
+ sleep(1); // enforce a delay to confirm asynchronicity
+ $appConfig = \OCP\Server::get(IAppConfig::class);
+ $appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST, $token);
+ }, $token)->name('test loopback instance')->async();
+
+ $this->appConfig->clearCache(true);
+ if ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)) {
+ $reason = 'async process already executed';
+ return false;
+ }
+
+ sleep(3);
+ $this->appConfig->clearCache(true);
+ $result = ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST));
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST);
+
+ return $result;
+ }
+
+ private function pingLoopbackInstance(string $url): bool {
+ $pingLoopback = $this->generateToken();
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING, $pingLoopback);
+ try {
+ $result = $this->forkSessionLoopback('__ping__', $this->linkToLoopbackEndpoint($url));
+ $result = json_decode($result, true, flags: JSON_THROW_ON_ERROR);
+ } catch (\JsonException|LoopbackEndpointException $e) {
+ $this->logger->debug('could not ping loopback endpoint', ['exception' => $e]);
+ }
+
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING);
+ return (($result['ping'] ?? '') === $pingLoopback);
+ }
+
+
+ /**
+ * note that the fact we fork process to run a session of processes before doing
+ * a check on the fact that maybe one of the process of the session is already
+ * running can create a small glitch when choosing the first available slot as
+ * a previous fork running said check is already made and will exit shortly.
+ *
+ * @return int
+ */
+ private function getFirstAvailableSlot(): int {
+ $slot = -1;
+ for ($i = 0; $i < self::FORK_LIMIT; $i++) {
+ if (!array_key_exists($i, $this->forks)) {
+ return $i;
+ }
+
+ // we confirm child process still exists
+ if (pcntl_waitpid($this->forks[$i], $status, WNOHANG) > 0) {
+ return $i;
+ }
+ }
+
+ if ($slot === -1) {
+ // TODO: should not happens: log warning
+ }
+
+ return -1;
+ }
+
+ private function generateToken(int $length = 15): string {
+ $result = '';
+ for ($i = 0; $i < $length; $i++) {
+ $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)];
+ }
+
+ return $result;
+ }
+
+ /**
+ * we wait until all child process are done
+ *
+ * @noinspection PhpStatementHasEmptyBodyInspection
+ */
+ public function waitChildProcess(): void {
+ while (pcntl_waitpid(0, $status) != -1) {
+ }
+ }
+
+}
diff --git a/lib/private/Async/IBlockInterface.php b/lib/private/Async/IBlockInterface.php
new file mode 100644
index 00000000000..33b046b42ff
--- /dev/null
+++ b/lib/private/Async/IBlockInterface.php
@@ -0,0 +1,36 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+interface IBlockInterface {
+ public function getToken(): string;
+ public function id(string $id): self;
+ public function getId(): string;
+ public function name(string $name): self;
+ public function getName(): string;
+ public function require(string $id): self;
+ public function getRequire(): array;
+ public function delay(int $delay): self;
+ public function getDelay(): int;
+ public function getExecutionTime(): ?ProcessExecutionTime;
+ public function blocker(bool $blocker = true): self;
+ public function isBlocker(): bool;
+ public function replayable(bool $replayable = true): self;
+ public function isReplayable(): bool;
+ public function getStatus(): BlockStatus;
+ public function getResult(): ?array;
+ public function getError(): ?array;
+ public function dataset(array $dataset): self;
+ public function getDataset(): array;
+ public function async(ProcessExecutionTime $time): self;
+}
diff --git a/lib/private/Async/ISessionInterface.php b/lib/private/Async/ISessionInterface.php
new file mode 100644
index 00000000000..19307f915bb
--- /dev/null
+++ b/lib/private/Async/ISessionInterface.php
@@ -0,0 +1,21 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OC\Async\Model\BlockInterface;
+use OCP\Async\Enum\BlockStatus;
+
+interface ISessionInterface {
+ public function getAll(): array;
+ public function byToken(string $token): ?BlockInterface;
+ public function byId(string $id): ?BlockInterface;
+ public function getGlobalStatus(): BlockStatus;
+
+}
diff --git a/lib/private/Async/Model/Block.php b/lib/private/Async/Model/Block.php
new file mode 100644
index 00000000000..9c66ec13ad8
--- /dev/null
+++ b/lib/private/Async/Model/Block.php
@@ -0,0 +1,142 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\Enum\BlockType;
+use OCP\AppFramework\Db\Entity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+/**
+ * @method void setToken(string $token)
+ * @method string getToken()
+ * @method void setSessionToken(string $sessionToken)
+ * @method string getSessionToken()
+ * @method void setType(int $type)
+ * @method int getType()
+ * @method void setCode(string $code)
+ * @method string getCode()
+ * @method void setParams(array $params)
+ * @method ?array getParams()
+ * @method void setDataset(array $dataset)
+ * @method ?array getDataset()
+ * @method void setMetadata(array $metadata)
+ * @method ?array getMetadata()
+ * @method void setLinks(array $params)
+ * @method ?array getLinks()
+ * @method void setOrig(array $orig)
+ * @method ?array getOrig()
+ * @method void setResult(array $result)
+ * @method ?array getResult()
+ * @method void setStatus(int $status)
+ * @method int getStatus()
+ * @method void setExecutionTime(int $status)
+ * @method int getExecutionTime()
+ * @method void setLockToken(string $lockToken)
+ * @method string getLockToken()
+ * @method void setCreation(int $creation)
+ * @method int getCreation()
+ * @method void setLastRun(int $lastRun)
+ * @method int getLastRun()
+ * @method void setNextRun(int $nextRun)
+ * @method int getNextRun()
+ * @psalm-suppress PropertyNotSetInConstructor
+ */
+class Block extends Entity {
+ protected string $token = '';
+ protected string $sessionToken = '';
+ protected int $type = 0;
+ protected string $code = '';
+ protected ?array $params = [];
+ protected ?array $dataset = [];
+ protected ?array $metadata = [];
+ protected ?array $links = [];
+ protected ?array $orig = [];
+ protected ?array $result = [];
+ protected int $status = 0;
+ protected int $executionTime = 0;
+ protected string $lockToken = '';
+ protected int $creation = 0;
+ protected int $lastRun = 0;
+ protected int $nextRun = 0;
+
+ public function __construct() {
+ $this->addType('token', 'string');
+ $this->addType('sessionToken', 'string');
+ $this->addType('type', 'integer');
+ $this->addType('code', 'string');
+ $this->addType('params', 'json');
+ $this->addType('dataset', 'json');
+ $this->addType('metadata', 'json');
+ $this->addType('links', 'json');
+ $this->addType('orig', 'json');
+ $this->addType('result', 'json');
+ $this->addType('status', 'integer');
+ $this->addType('executionTime', 'integer');
+ $this->addType('lockToken', 'string');
+ $this->addType('creation', 'integer');
+ $this->addType('lastRun', 'integer');
+ $this->addType('nextRun', 'integer');
+ }
+
+ public function setBlockType(BlockType $type): void {
+ $this->setType($type->value);
+ }
+
+ public function getBlockType(): BlockType {
+ return BlockType::from($this->getType());
+ }
+
+ public function setBlockStatus(BlockStatus $status): void {
+ $this->setStatus($status->value);
+ }
+
+ public function getBlockStatus(): BlockStatus {
+ return BlockStatus::from($this->getStatus());
+ }
+
+ public function setProcessExecutionTime(ProcessExecutionTime $time): void {
+ $this->setExecutionTime($time->value);
+ }
+
+ public function getProcessExecutionTime(): ProcessExecutionTime {
+ return ProcessExecutionTime::from($this->getExecutionTime());
+ }
+
+ public function addMetadata(array $metadata): self {
+ $metadata = array_merge($this->metadata, $metadata);
+ $this->setMetadata($metadata);
+ return $this;
+ }
+
+ public function addMetadataEntry(string $key, string|int|array|bool $value): self {
+ $metadata = $this->metadata;
+ $metadata[$key] = $value;
+ $this->setMetadata($metadata);
+ return $this;
+ }
+
+ public function replay(bool $now = false): void {
+ $count = $this->getMetadata()['_replay'] ?? 0;
+ $next = time() + floor(6 ** ($count + 1)); // calculate delay based on count of retry
+ $count = min(++$count, 5); // limit to 6^6 seconds (13h)
+
+ if ($now) {
+ $next = time();
+ }
+
+ $this->addMetadataEntry('_replay', $count);
+ $this->setNextRun($next);
+ }
+
+ public function getReplayCount(): int {
+ return $this->getMetadata()['_replay'] ?? 0;
+ }
+}
diff --git a/lib/private/Async/Model/BlockInterface.php b/lib/private/Async/Model/BlockInterface.php
new file mode 100644
index 00000000000..36ba58194f7
--- /dev/null
+++ b/lib/private/Async/Model/BlockInterface.php
@@ -0,0 +1,183 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\AsyncManager;
+use OC\Async\IBlockInterface;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+class BlockInterface implements IBlockInterface, \JsonSerializable {
+ private string $id = '';
+ private string $name = '';
+ private bool $blocker = false;
+ private bool $replayable = false;
+
+ /** @var string[] */
+ private array $require = [];
+ private int $delay = 0;
+ private ?ProcessExecutionTime $executionTime = null;
+ private array $dataset = [];
+
+ public function __construct(
+ private readonly ?AsyncManager $asyncManager,
+ private readonly Block $block,
+ ) {
+ $this->import($block->getMetadata()['_iface'] ?? []);
+ }
+
+ public function getBlock(): Block {
+ return $this->block;
+ }
+
+ public function getToken(): string {
+ return $this->block->getToken();
+ }
+
+ public function getResult(): ?array {
+ if ($this->block->getBlockStatus() !== BlockStatus::SUCCESS) {
+ return null;
+ }
+
+ return $this->block->getResult()['result'];
+ }
+
+ public function getError(): ?array {
+ if ($this->block->getBlockStatus() !== BlockStatus::ERROR) {
+ return null;
+ }
+
+ return $this->block->getResult()['error'];
+ }
+
+ public function getStatus(): BlockStatus {
+ return $this->block->getBlockStatus();
+ }
+
+ public function id(string $id): self {
+ $this->id = strtoupper($id);
+ return $this;
+ }
+
+ public function getId(): string {
+ return $this->id;
+ }
+ public function name(string $name): self {
+ $this->name = $name;
+ return $this;
+ }
+
+ public function getName(): string {
+ return $this->name;
+ }
+
+ public function blocker(bool $blocker = true): self {
+ $this->blocker = $blocker;
+ return $this;
+ }
+
+ public function isBlocker(): bool {
+ return $this->blocker;
+ }
+
+ public function replayable(bool $replayable = true): self {
+ $this->replayable = $replayable;
+ return $this;
+ }
+
+ public function isReplayable(): bool {
+ return $this->replayable;
+ }
+
+ public function require(string $id): self {
+ $this->require[] = strtoupper($id);
+ return $this;
+ }
+
+ public function getRequire(): array {
+ return $this->require;
+ }
+
+ public function delay(int $delay): self {
+ $this->delay = $delay;
+ return $this;
+ }
+
+ public function getDelay(): int {
+ return $this->delay;
+ }
+
+ public function getExecutionTime(): ?ProcessExecutionTime {
+ return $this->executionTime;
+ }
+
+ /**
+ * @param array<array> $dataset
+ *
+ * @return $this
+ */
+ public function dataset(array $dataset): self {
+ $this->dataset = $dataset;
+ return $this;
+ }
+
+ public function getDataset(): array {
+ return $this->dataset;
+ }
+
+// public function getReplayCount(): int {
+// return $this->get
+// }
+ /**
+ * only available during the creation of the session
+ *
+ * @return $this
+ */
+ public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): self {
+ $this->asyncManager?->async($time);
+ return $this;
+ }
+
+ public function import(array $data): void {
+ $this->token = $data['token'] ?? '';
+ $this->id($data['id'] ?? '');
+ $this->name($data['name'] ?? '');
+ $this->delay($data['delay'] ?? 0);
+ $this->require = $data['require'] ?? [];
+ $this->blocker($data['blocker'] ?? false);
+ $this->replayable($data['replayable'] ?? false);
+ }
+
+ public function jsonSerialize(): array {
+ return [
+ 'token' => $this->getToken(),
+ 'id' => $this->getId(),
+ 'name' => $this->getName(),
+ 'require' => $this->getRequire(),
+ 'delay' => $this->getDelay(),
+ 'blocker' => $this->isBlocker(),
+ 'replayable' => $this->isReplayable(),
+ ];
+ }
+
+ /**
+ * @param Block[] $processes
+ *
+ * @return BlockInterface[]
+ */
+ public static function asBlockInterfaces(array $processes): array {
+ $interfaces = [];
+ foreach ($processes as $process) {
+ $interfaces[] = new BlockInterface(null, $process);
+ }
+
+ return $interfaces;
+ }
+}
diff --git a/lib/private/Async/Model/SessionInterface.php b/lib/private/Async/Model/SessionInterface.php
new file mode 100644
index 00000000000..c481fc545f6
--- /dev/null
+++ b/lib/private/Async/Model/SessionInterface.php
@@ -0,0 +1,83 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\ISessionInterface;
+use OCP\Async\Enum\BlockStatus;
+
+class SessionInterface implements ISessionInterface {
+ public function __construct(
+ /** @var BlockInterface[] */
+ private array $interfaces,
+ ) {
+ }
+
+ public function getAll(): array {
+ return $this->interfaces;
+ }
+
+ public function byToken(string $token): ?BlockInterface {
+ foreach ($this->interfaces as $iface) {
+ if ($iface->getToken() === $token) {
+ return $iface;
+ }
+ }
+
+ return null;
+ }
+
+ public function byId(string $id): ?BlockInterface {
+ foreach($this->interfaces as $iface) {
+ if ($iface->getId() === $id) {
+ return $iface;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * return a global status of the session based on the status of each process
+ * - if one entry is still at ::PREP stage, we return ::PREP
+ * - if one ::BLOCKER, returns ::BLOCKER.
+ * - if all ::SUCCESS, returns ::SUCCESS, else ignored.
+ * - if all ::ERROR+::SUCCESS, returns ::ERROR, else ignored.
+ * - session status is the biggest value between all left process status (::STANDBY, ::RUNNING)
+ */
+ public function getGlobalStatus(): BlockStatus {
+ $current = -1;
+ $groupedStatus = [];
+ foreach($this->interfaces as $iface) {
+ // returns ::PREP if one process is still ::PREP
+ if ($iface->getStatus() === BlockStatus::PREP) {
+ return BlockStatus::PREP;
+ }
+ // returns ::BLOCKER if one process is marked ::BLOCKER
+ if ($iface->getStatus() === BlockStatus::BLOCKER) {
+ return BlockStatus::BLOCKER;
+ }
+ // we keep trace if process is marked as ::ERROR or ::SUCCESS
+ // if not, we keep the highest value
+ if (in_array($iface->getStatus(), [BlockStatus::ERROR, BlockStatus::SUCCESS], true)) {
+ $groupedStatus[$iface->getStatus()->value] = true;
+ } else {
+ $current = max($current, $iface->getStatus()->value);
+ }
+ }
+
+ // in case the all interface were ::ERROR or ::SUCCESS, we check
+ // if there was at least one ::ERROR. if none we return ::SUCCESS
+ if ($current === -1) {
+ return (array_key_exists(BlockStatus::ERROR->value, $groupedStatus)) ? BlockStatus::ERROR : BlockStatus::SUCCESS;
+ }
+
+ return BlockStatus::from($current);
+ }
+}
diff --git a/lib/private/Async/README.md b/lib/private/Async/README.md
new file mode 100644
index 00000000000..185047f52db
--- /dev/null
+++ b/lib/private/Async/README.md
@@ -0,0 +1,379 @@
+
+
+# AsyncProcess
+
+Using `IAsyncProcess` allow the execution of code on a separated process in order to improve the quality of the user experience.
+
+
+## Concept
+
+To shorten the hanging time on heavy process that reflect on the user experience and to avoid delay between
+instruction and execution, this API allows to prepare instructions to be executed on a parallel thread.
+
+This is obtained by creating a loopback HTTP request, initiating a fresh PHP process that will execute the
+prepared instruction after emulating a connection termination, freeing the main process.
+
+#### Technology
+
+The logic is to:
+
+- store a serialized version of the code to be executed on another process into database.
+- start a new process as soon as possible that will retrieve the code from database and execute it.
+
+#### Setup
+
+The feature require setting a loopback url.
+This is done automatically by a background job. The automatic process uses `'overwrite.cli.url'` and `'trusted_domains'` from _config/config.php_ to find a list of domain name to test.
+It can be initiated via `occ`:
+
+> ./occ async:setup --discover
+
+Or manually set with:
+
+> ./occ async:setup --loopback https://cloud.example.net/
+
+
+
+## Blocks & Sessions
+
+- We will define as _Block_ complete part of unsplittable code.
+- A list of _Blocks_ can be grouped in _Sessions_.
+- While _Sessions_ are independent of each other, interactions can be set between _Blocks_ of the same _Session_.
+
+**Interactions**
+
+- _Blocks_ are executed in the order they have been created.
+- It is possible for a _Block_ to get results from a previous process from the session.
+- A _Block_ defined as blocker will stop further process from that session on failure.
+- A _Block_ can require a previous process to be successful before being executed.
+
+**Replayability**
+
+- A block can be set as _replayable_, meaning that in case of failure it can be run multiple time until it end properly.
+
+**Quick example**
+
+```php
+
+// define all part of the code that can be async
+$this->asyncProcess->invoke($myInvoke1)->id('block1')->replayable(); // block1 can be replayed until successful
+$this->asyncProcess->invoke($myInvoke2)->id('block2')->require('block1'); // block2 will not be executed until block1 has not been successful
+$this->asyncProcess->invoke($myInvoke3)->id('block3')->blocker(); // block3 will run whatever happened to block1 and block2 and its suc1cess is mandatory to continue with the session
+$this->asyncProcess->invoke($myInvoke4)->id('block4');
+
+$this->asyncProcess->async(); // close the session and initiate async execution
+```
+
+
+## ProcessExecutionTime
+
+Code is to be executed as soon as defined, with alternative fallback solutions in that order:
+
+- `::NOW` - main process will fork and execute the code in parallel (instantly)
+- `::ASAP` - process will be executed by an optional live service (a second later)
+- `::LATER` - process will be executed at the next execution of the cron tasks (within the next 10 minutes)
+- `::ON_REQUEST` - process needs to be executed manually
+
+
+## IAsyncProcess
+
+`IAsyncProcess` is the public interface that contains few methods to prepare the code to be processed on a parallel process.
+
+
+#### Closure
+
+The code can be directly written in a closure by calling `exec()`:
+
+```php
+$this->asyncProcess->exec(function (int $value, string $line, MyObject $obj): void {
+ // long process
+},
+ random_int(10000, 99999),
+ 'this is a line',
+ $myObj
+)->async();
+```
+
+
+#### Invokable
+
+Within the magic method `__invoke()` in a configured objects
+
+```php
+class MyInvoke {
+ public function __construct(
+ private array $data = []
+ ) {
+ }
+
+ public function __invoke(int $n): void {
+ // do long process
+ }
+}
+
+
+$myInvoke = new MyInvoke(['123']);
+$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->async();
+```
+
+
+#### PHP Class
+
+Via the method `async()` from a class
+
+```php
+<?php
+
+namespace OCA\MyApp;
+
+class MyObj {
+ public function __construct(
+ ) {
+ }
+
+ public function async(int $n): void {
+ // run heavy stuff
+ }
+}
+```
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class, random_int(10000, 99999))->async();
+```
+
+
+
+## IBlockInterface
+
+When storing a new _Block_ via `IAsyncProcess::call()`,`IAsyncProcess::invoke()` or `IAsyncProcess::async()`, will be returned a `IBlockInterface` to provided details about the _Block_.
+
+
+### name(string)
+
+Identification and/or description of the _Block_ for better understanding when debugging
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->name('my process');
+```
+
+### id(string)
+
+Identification of the _Block_ for future interaction between _Blocks_ within the same _Session_
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->id('my_process');
+```
+
+As an example, `id` are to be used to obtain `IBlockInterface` from a specific _Block_:
+```php
+ISessionInterface::byId('my_process'); // returns IBlockInterface
+```
+
+
+### blocker()
+
+Set current _Block_ as _Blocker_, meaning that further _Blocks_ of the _Session_ are lock until this process does not run successfully
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->blocker();
+```
+
+
+### require(string)
+
+Define that the _Block_ can only be executed if set _Block_, identified by its `id`, ran successfully.
+Multiple _Blocks_ can be set a required.
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->require('other_block_1')->require('other_block_2');
+```
+
+### replayable()
+
+The _Block_ is configured as replayable, meaning that it will be restarted until it runs correctly
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->replayable();
+```
+
+The delay is calculated using 6 (six) exponent current retry, capped at 6:
+
+- 1st retry after few seconds,
+- 2nd retry after 30 seconds,
+- 3rd retry after 3 minutes,
+- 4th retry after 20 minutes,
+- 5th retry after 2 hours,
+- 6th retry after 12 hours,
+- other retries every 12 hours.
+
+### delay(int)
+
+Only try to initiate the process n seconds after current time.
+
+### dataset(array)
+
+It is possible to set a list of arguments to be applied to the same _Block_.
+The _Block_ will be executed for each defined set of data
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->dataset(
+ [
+ ['this is a string', 1],
+ ['this is another string', 12],
+ ['and another value as first parameter', 42],
+ ]
+);
+```
+
+
+### post-execution
+
+Post execution of a _Block_, its `IBlockInterface` can be used to get details about it:
+
+- **getExecutionTime()** returns the `ProcessExecutionTime` that initiated the process,
+- **getResult()** returns the array returned by the process in case of success,
+- **getError()** returns the error in case of failure
+
+
+
+## ISessionInterface
+
+`ISessionInterface` is available to your code via `ABlockWrapper` and helps interaction between all the _Blocks_ of the same _Session_
+
+### getAll()
+
+returns all `IBlockInterface` from the _Session_
+
+### byToken(string)
+
+returns a `IBockInterface` using its token
+
+### byId(string)
+
+returns a `IBockInterface` using its `id`
+
+### getGlobalStatus()
+
+return a `BlockStatus` (Enum) based on every status of all _Blocks_ of the _Session_:
+
+- returns `::PREP` if one block is still at prep stage,
+- return `::BLOCKER` if at least one block is set as `blocker` and is failing,
+- returns `::SUCCESS` if all blocks are successful,
+- returns `::ERROR` if all process have failed,
+- returns `::STANDBY` or `::RUNNING` if none of the previous condition are met. `::RUNNING` if at least one `Block` is currently processed.
+
+
+
+
+## ABlockWrapper
+
+This abstract class helps to interface with other _Blocks_ from the same _Session_.
+It will be generated and passed as argument to the defined block if the first parameter is an `AprocessWrapper` is expected as first parameter:
+
+
+As a _Closure_
+```php
+$this->asyncProcess->exec(function (ABlockWrapper $wrapper, array $data): array {
+ $resultFromProc1 = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success, unless using require()
+ $wrapper->activity(BlockActivity::NOTICE, 'result from previous process: ' . json_encode($resultFromProc1))
+},
+ ['mydata' => true]
+ )->id('block2');
+```
+
+When using `invoke()`
+```php
+class MyInvoke {
+ public function __construct(
+ ) {
+ }
+
+ public function __invoke(ABlockWrapper $wrapper, int $n): void {
+ $data = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success
+ }
+}
+
+$myInvoke = new MyInvoke();
+$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->requipe('block1'); // require ensure block1 has run successfully before this one
+```
+
+Syntax is the same with `call()`, when defining the `async()` method
+
+```php
+class MyObj {
+ public function __construct(
+ ) {
+ }
+
+ public function async(ABlockWrapper $wrapper): void {
+ }
+}
+```
+
+#### Abstract methods
+
+`ABlockWrapper` is an abstract class with a list of interfaced methods that have different behavior on the _BlockWrapper_ sent by the framework.
+
+- **DummyBlockWrapper** will do nothing,
+- **CliBlockWrapper** will generate/manage console output,
+- **LoggerBlockWrapper** will only create new nextcloud logs entry.
+
+List of usefull methods:
+
+- **activity(BlockActivity $activity, string $line = '');** can be used to update details about current part of your code during its process
+- **getSessionInterface()** return the `ISessionInterface` for the session
+- **getReplayCount()** returns the number of retry
+
+
+## Other tools
+
+#### Live Service
+
+This will cycle every few seconds to check for any session in stand-by mode and will execute its blocks
+> ./occ async:live
+
+
+#### Manage sessions and blocks
+
+Get resume about current session still in database.
+
+> `./occ async:manage`
+
+Get details about a session.
+
+> `./occ async:manage --session <sessionId>
+
+Get details about a block
+
+> `./occ async:manage --details <blockId>
+
+Get excessive details about a block
+
+> `./occ async:manage --details <blockId> --full-details
+
+Replay a not successful block
+
+> `./occ async:manage --replay <blockId>
+
+#### Mocking process
+
+`./occ async:setup` allow an admin to generate fake processes to emulate the feature:
+
+- **--mock-session _int_** create _n_ sessions
+- **--mock-block _int_** create _n_ blocks
+- **--fail-process _string_** create failing process
+
+
+# Work in Progress
+
+missing element of the feature:
+
+- [ ] discussing the need of signing the PHP code stored in database to ensure its authenticity before execution,
+- [ ] ability to overwrite the code or arguments of a process to fix a failing process,
+- [ ] Check and confirm value type compatibility between parameters and arguments when storing new block
+- [ ] implementing dataset(),
+- [ ] implementing delay(),
+- [ ] full documentation,
+- [ ] tests, tests, tests.
+
+
diff --git a/lib/private/Async/Wrappers/CliBlockWrapper.php b/lib/private/Async/Wrappers/CliBlockWrapper.php
new file mode 100644
index 00000000000..4c38c7fc995
--- /dev/null
+++ b/lib/private/Async/Wrappers/CliBlockWrapper.php
@@ -0,0 +1,103 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+use Symfony\Component\Console\Formatter\OutputFormatterStyle;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class CliBlockWrapper extends ABlockWrapper {
+ private const TAB_SIZE = 12;
+ private const HEXA = '258ad';
+ private int $hexaLength = 0;
+ private string $prefix = '';
+ private int $slot = 0;
+ public function __construct(
+ private ?OutputInterface $output = null,
+ ) {
+ $this->hexaLength = strlen(self::HEXA);
+ }
+
+ public function session(array $metadata): void {
+ $this->slot = $metadata['_cli']['slot'] ?? 0;
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ '<open>++</>' . ' initiating session ' .
+ ($metadata['sessionToken'] ?? '')
+ );
+ }
+
+ public function init(): void {
+ $this->prefix = $this->generatePrefix($this->block->getToken());
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ $act = match ($activity) {
+ BlockActivity::STARTING => '<open>>></>',
+ BlockActivity::ENDING => '<close><<</>',
+ BlockActivity::DEBUG, BlockActivity::NOTICE => ' ',
+ BlockActivity::WARNING => '<warn>!!</>',
+ BlockActivity::ERROR => '<error>!!</>',
+ };
+
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ $act . ' ' .
+ $this->prefix . ' ' .
+ $line
+ );
+ }
+
+ public function end(string $line = ''): void {
+ if ($line === '') {
+ $this->output->writeln('');
+ return;
+ }
+
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ '<warn>--</>' . ' ' . $line
+ );
+ }
+
+ private function generatePrefix(string $token): string {
+ $color = 's' . random_int(1, $this->hexaLength - 1) .
+ random_int(1, $this->hexaLength - 1) .
+ random_int(1, $this->hexaLength - 1);
+
+ return '<' . $color . '>' . $token . '</>';
+ }
+
+ public static function initStyle(OutputInterface $output): void {
+ $output->getFormatter()->setStyle('open', new OutputFormatterStyle('#0f0', '', ['bold']));
+ $output->getFormatter()->setStyle('close', new OutputFormatterStyle('#f00', '', ['bold']));
+ $output->getFormatter()->setStyle('warn', new OutputFormatterStyle('#f00', '', []));
+ $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#f00', '', ['bold']));
+
+ $hexaLength = strlen(self::HEXA);
+ for ($i = 0; $i < $hexaLength; $i++) {
+ for ($j = 0; $j < $hexaLength; $j++) {
+ for ($k = 0; $k < $hexaLength; $k++) {
+ $output->getFormatter()->setStyle(
+ 's' . $i . $j . $k,
+ new OutputFormatterStyle(
+ '#' . self::HEXA[$i] . self::HEXA[$j] . self::HEXA[$k],
+ '',
+ ['bold']
+ )
+ );
+ }
+ }
+ }
+ }
+
+}
diff --git a/lib/private/Async/Wrappers/DummyBlockWrapper.php b/lib/private/Async/Wrappers/DummyBlockWrapper.php
new file mode 100644
index 00000000000..e719e4f83e5
--- /dev/null
+++ b/lib/private/Async/Wrappers/DummyBlockWrapper.php
@@ -0,0 +1,27 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+
+class DummyBlockWrapper extends ABlockWrapper {
+ public function session(array $metadata): void {
+ }
+ public function init(): void {
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ }
+
+ public function end(string $line = ''): void {
+ }
+}
diff --git a/lib/private/Async/Wrappers/LoggerBlockWrapper.php b/lib/private/Async/Wrappers/LoggerBlockWrapper.php
new file mode 100644
index 00000000000..66d9c80007c
--- /dev/null
+++ b/lib/private/Async/Wrappers/LoggerBlockWrapper.php
@@ -0,0 +1,39 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+use Psr\Log\LoggerInterface;
+
+class LoggerBlockWrapper extends ABlockWrapper {
+ private array $metadata = [];
+ public function __construct(
+ private LoggerInterface $logger,
+ ) {
+ }
+
+ public function session(array $metadata): void {
+ $this->metadata = $metadata;
+ $this->logger->debug('session: ' . json_encode($metadata));
+ }
+
+ public function init(): void {
+ $this->logger->debug('process: ' . $this->metadata['sessionToken'] . ' ' . $this->block->getToken());
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ $this->logger->debug('activity (' . $activity->value . ') ' . $line);
+ }
+
+ public function end(string $line = ''): void {
+ $this->logger->debug('end session - ' . $line);
+ }
+}
diff --git a/lib/private/Config/Lexicon/CoreConfigLexicon.php b/lib/private/Config/Lexicon/CoreConfigLexicon.php
index 34a0b883c54..05de9378932 100644
--- a/lib/private/Config/Lexicon/CoreConfigLexicon.php
+++ b/lib/private/Config/Lexicon/CoreConfigLexicon.php
@@ -17,6 +17,10 @@ use NCU\Config\ValueType;
* ConfigLexicon for 'core' app/user configs
*/
class CoreConfigLexicon implements IConfigLexicon {
+ public const ASYNC_LOOPBACK_ADDRESS = 'loopback_address';
+ public const ASYNC_LOOPBACK_PING = 'async_loopback_ping';
+ public const ASYNC_LOOPBACK_TEST = 'async_loopback_test';
+
public function getStrictness(): ConfigLexiconStrictness {
return ConfigLexiconStrictness::IGNORE;
}
@@ -28,6 +32,9 @@ class CoreConfigLexicon implements IConfigLexicon {
public function getAppConfigs(): array {
return [
new ConfigLexiconEntry('lastcron', ValueType::INT, 0, 'timestamp of last cron execution'),
+ new ConfigLexiconEntry(self::ASYNC_LOOPBACK_ADDRESS, ValueType::STRING, '', 'local address of the instance to initiate async process via web request', true),
+ new ConfigLexiconEntry(self::ASYNC_LOOPBACK_PING, ValueType::STRING, '', 'temporary random string used to confirm web-async loopback endpoint is valid', true),
+ new ConfigLexiconEntry(self::ASYNC_LOOPBACK_TEST, ValueType::STRING, '', 'temporary random string used to confirm web-async is fully functional', true),
];
}
diff --git a/lib/private/Repair.php b/lib/private/Repair.php
index c5069bff48e..74525164a79 100644
--- a/lib/private/Repair.php
+++ b/lib/private/Repair.php
@@ -9,6 +9,7 @@ namespace OC;
use OC\DB\ConnectionAdapter;
use OC\Repair\AddAppConfigLazyMigration;
+use OC\Repair\AddAsyncProcessJob;
use OC\Repair\AddBruteForceCleanupJob;
use OC\Repair\AddCleanupDeletedUsersBackgroundJob;
use OC\Repair\AddCleanupUpdaterBackupsJob;
@@ -191,6 +192,7 @@ class Repair implements IOutput {
\OCP\Server::get(AddRemoveOldTasksBackgroundJob::class),
\OCP\Server::get(AddMetadataGenerationJob::class),
\OCP\Server::get(AddAppConfigLazyMigration::class),
+ \OCP\Server::get(AddAsyncProcessJob::class),
\OCP\Server::get(RepairLogoDimension::class),
\OCP\Server::get(RemoveLegacyDatadirFile::class),
\OCP\Server::get(AddCleanupDeletedUsersBackgroundJob::class),
diff --git a/lib/private/Repair/AddAsyncProcessJob.php b/lib/private/Repair/AddAsyncProcessJob.php
new file mode 100644
index 00000000000..1a471fb993a
--- /dev/null
+++ b/lib/private/Repair/AddAsyncProcessJob.php
@@ -0,0 +1,30 @@
+<?php
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Repair;
+
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OC\Core\BackgroundJobs\AsyncProcessJob;
+use OCP\BackgroundJob\IJobList;
+use OCP\IAppConfig;
+use OCP\Migration\IOutput;
+use OCP\Migration\IRepairStep;
+
+class AddAsyncProcessJob implements IRepairStep {
+ public function __construct(
+ private IJobList $jobList,
+ ) {
+ }
+
+ public function getName() {
+ return 'Setup AsyncProcess and queue job to periodically manage the feature';
+ }
+
+ public function run(IOutput $output) {
+ $this->jobList->add(AsyncProcessJob::class);
+ }
+}
diff --git a/lib/private/Server.php b/lib/private/Server.php
index 545ceacbe81..ef237178eb7 100644
--- a/lib/private/Server.php
+++ b/lib/private/Server.php
@@ -575,6 +575,7 @@ class Server extends ServerContainer implements IServerContainer {
$this->registerAlias(IAppConfig::class, \OC\AppConfig::class);
$this->registerAlias(IUserConfig::class, \OC\Config\UserConfig::class);
+ $this->registerAlias(\OCP\Async\IAsyncProcess::class, \OC\Async\AsyncProcess::class);
$this->registerService(IFactory::class, function (Server $c) {
return new \OC\L10N\Factory(
diff --git a/lib/private/Setup.php b/lib/private/Setup.php
index 6d3021aff71..efad8caa7b6 100644
--- a/lib/private/Setup.php
+++ b/lib/private/Setup.php
@@ -14,6 +14,7 @@ use Exception;
use InvalidArgumentException;
use OC\Authentication\Token\PublicKeyTokenProvider;
use OC\Authentication\Token\TokenCleanupJob;
+use OC\Core\BackgroundJobs\AsyncProcessJob;
use OC\Log\Rotate;
use OC\Preview\BackgroundCleanupJob;
use OC\TextProcessing\RemoveOldTasksBackgroundJob;
@@ -495,6 +496,7 @@ class Setup {
$jobList->add(BackgroundCleanupJob::class);
$jobList->add(RemoveOldTasksBackgroundJob::class);
$jobList->add(CleanupDeletedUsers::class);
+ $jobList->add(AsyncProcessJob::class);
}
/**
diff --git a/lib/public/Async/Enum/BlockActivity.php b/lib/public/Async/Enum/BlockActivity.php
new file mode 100644
index 00000000000..ee8cf12d825
--- /dev/null
+++ b/lib/public/Async/Enum/BlockActivity.php
@@ -0,0 +1,18 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OCP\Async\Enum;
+
+enum BlockActivity: int {
+ case STARTING = 0;
+ case DEBUG = 1;
+ case NOTICE = 2;
+ case WARNING = 3;
+ case ERROR = 4;
+ case ENDING = 9;
+}
diff --git a/lib/public/Async/Enum/BlockStatus.php b/lib/public/Async/Enum/BlockStatus.php
new file mode 100644
index 00000000000..83485880a8e
--- /dev/null
+++ b/lib/public/Async/Enum/BlockStatus.php
@@ -0,0 +1,20 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OCP\Async\Enum;
+
+enum BlockStatus: int {
+ case PREP = 0;
+ case STANDBY = 1;
+ case RUNNING = 4;
+
+ case BLOCKER = 7;
+ case ERROR = 8;
+ case SUCCESS = 9;
+}
diff --git a/lib/public/Async/Enum/ProcessExecutionTime.php b/lib/public/Async/Enum/ProcessExecutionTime.php
new file mode 100644
index 00000000000..600bdce3c91
--- /dev/null
+++ b/lib/public/Async/Enum/ProcessExecutionTime.php
@@ -0,0 +1,17 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OCP\Async\Enum;
+
+enum ProcessExecutionTime: int {
+ case NOW = 0;
+ case ASAP = 1;
+ case LATER = 2;
+ case ON_REQUEST = 9;
+}
diff --git a/lib/public/Async/IAsyncProcess.php b/lib/public/Async/IAsyncProcess.php
new file mode 100644
index 00000000000..9e32178a1cf
--- /dev/null
+++ b/lib/public/Async/IAsyncProcess.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OCP\Async;
+
+use OC\Async\IBlockInterface;
+use OCP\Async\Enum\ProcessExecutionTime;
+
+interface IAsyncProcess {
+ public function exec(\Closure $closure, ...$params): IBlockInterface;
+
+ public function invoke(callable $obj, ...$params): IBlockInterface;
+
+ public function call(string $class, ...$params): IBlockInterface;
+
+ public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string;
+}
diff --git a/version.php b/version.php
index 0b7a87cc96b..bb36c6f5831 100644
--- a/version.php
+++ b/version.php
@@ -9,7 +9,7 @@
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level
// when updating major/minor version number.
-$OC_Version = [32, 0, 0, 0];
+$OC_Version = [32, 0, 0, 1];
// The human-readable string
$OC_VersionString = '32.0.0 dev';