aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/Async
diff options
context:
space:
mode:
authorMaxence Lange <maxence@artificial-owl.com>2025-04-28 19:57:56 -0100
committerMaxence Lange <maxence@artificial-owl.com>2025-04-28 19:58:06 -0100
commit6d272ff894fa969e5a526caf6a91d60eda115c53 (patch)
treedf90f826a29f7169dd9b550f2e478a1b09d29e71 /lib/private/Async
parent10a01423ecfc7e028960eee8058bd177ad28d484 (diff)
downloadnextcloud-server-enh/noid/async-process-run.tar.gz
nextcloud-server-enh/noid/async-process-run.zip
feat(async): AsyncProcessenh/noid/async-process-run
Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
Diffstat (limited to 'lib/private/Async')
-rw-r--r--lib/private/Async/ABlockWrapper.php40
-rw-r--r--lib/private/Async/AsyncManager.php222
-rw-r--r--lib/private/Async/AsyncProcess.php64
-rw-r--r--lib/private/Async/Db/BlockMapper.php260
-rw-r--r--lib/private/Async/Enum/BlockType.php16
-rw-r--r--lib/private/Async/Exceptions/AsyncProcessException.php14
-rw-r--r--lib/private/Async/Exceptions/BlockAlreadyRunningException.php12
-rw-r--r--lib/private/Async/Exceptions/BlockNotFoundException.php12
-rw-r--r--lib/private/Async/Exceptions/LoopbackEndpointException.php14
-rw-r--r--lib/private/Async/Exceptions/SessionBlockedException.php14
-rw-r--r--lib/private/Async/ForkManager.php485
-rw-r--r--lib/private/Async/IBlockInterface.php36
-rw-r--r--lib/private/Async/ISessionInterface.php21
-rw-r--r--lib/private/Async/Model/Block.php142
-rw-r--r--lib/private/Async/Model/BlockInterface.php183
-rw-r--r--lib/private/Async/Model/SessionInterface.php83
-rw-r--r--lib/private/Async/README.md379
-rw-r--r--lib/private/Async/Wrappers/CliBlockWrapper.php103
-rw-r--r--lib/private/Async/Wrappers/DummyBlockWrapper.php27
-rw-r--r--lib/private/Async/Wrappers/LoggerBlockWrapper.php39
20 files changed, 2166 insertions, 0 deletions
diff --git a/lib/private/Async/ABlockWrapper.php b/lib/private/Async/ABlockWrapper.php
new file mode 100644
index 00000000000..a5dd1c3b629
--- /dev/null
+++ b/lib/private/Async/ABlockWrapper.php
@@ -0,0 +1,40 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OC\Async\Model\Block;
+use OC\Async\Model\SessionInterface;
+use OCP\Async\Enum\BlockActivity;
+
+abstract class ABlockWrapper {
+ abstract public function session(array $metadata): void;
+ abstract public function init(): void;
+ abstract public function activity(BlockActivity $activity, string $line = ''): void;
+ abstract public function end(string $line = ''): void;
+
+ protected Block $block;
+ private SessionInterface $sessionInterface;
+
+ public function setBlock(Block $block): void {
+ $this->block = $block;
+ }
+
+ public function getSessionInterface(): SessionInterface {
+ return $this->sessionInterface;
+ }
+
+ public function setSessionInterface(SessionInterface $iface): void {
+ $this->sessionInterface = $iface;
+ }
+
+ public function getReplayCount(): int {
+ return $this->block->getReplayCount();
+ }
+}
diff --git a/lib/private/Async/AsyncManager.php b/lib/private/Async/AsyncManager.php
new file mode 100644
index 00000000000..955037f356d
--- /dev/null
+++ b/lib/private/Async/AsyncManager.php
@@ -0,0 +1,222 @@
+<?php
+
+namespace OC\Async;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\Enum\BlockType;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use OCP\IAppConfig;
+use ReflectionFunctionAbstract;
+
+class AsyncManager {
+ private ?string $sessionToken = null;
+ /** @var BlockInterface[] */
+ private array $interfaces = [];
+ /** @var array<string, ReflectionFunctionAbstract> } */
+ private array $blocksReflexion = [];
+
+ public function __construct(
+ private IAppConfig $appConfig,
+ private BlockMapper $blockMapper,
+ private ForkManager $forkManager,
+ ) {
+ }
+
+ /**
+ * @param BlockType $type
+ * @param string $serializedBlock
+ * @param ReflectionFunctionAbstract $reflection
+ * @param array $params
+ * @param array $allowedClasses
+ *
+ * @return IBlockInterface
+ * @see \OCP\Async\IAsyncProcess
+ * @internal prefer using public interface {@see \OCP\Async\IAsyncProcess}
+ */
+ public function asyncBlock(
+ BlockType $type,
+ string $serializedBlock, // serialize() on the code - className if type is ::CLASS
+ \ReflectionFunctionAbstract $reflection, // reflection about the method/function called as unserialization
+ array $params, // parameters to use at unserialization
+ array $allowedClasses = [] // list of class included in the serialization of the 'block'
+ ): IBlockInterface {
+ $this->sessionToken ??= $this->generateToken();
+
+ $data = $this->serializeReflectionParams($reflection, $params);
+ $data['blockClasses'] = $allowedClasses;
+ if ($type === BlockType::CLASSNAME) {
+ $data['className'] = $serializedBlock;
+ }
+
+ $token = $this->generateToken();
+ $block = new Block();
+ $block->setToken($token);
+ $block->setSessionToken($this->sessionToken);
+ $block->setBlockType($type);
+ $block->setBlockStatus(BlockStatus::PREP);
+ $block->setCode($serializedBlock);
+ $block->setParams($data);
+ $block->setOrig([]);
+ $block->setCreation(time());
+
+
+ // needed !?
+// $process->addMetadataEntry('_links', $this->interfaces);
+
+// $this->processMapper->insert($process);
+
+ $iface = new BlockInterface($this, $block);
+ $this->interfaces[] = $iface;
+ $this->blocksReflexion[$token] = $reflection;
+
+ return $iface;
+ }
+
+// public function updateDataset(ProcessInfo $processInfo) {
+// $token = $processInfo->getToken();
+// $dataset = [];
+// foreach($processInfo->getDataset() as $params) {
+// $dataset[] = $this->serializeReflectionParams($this->processesReflexion[$token], $params);
+// }
+// $this->processMapper->updateDataset($token, $dataset);
+// }
+
+ private function endSession(ProcessExecutionTime $time): ?string {
+ if ($this->sessionToken === null) {
+ return null;
+ }
+
+ $current = $this->sessionToken;
+ foreach ($this->interfaces as $iface) {
+ $process = $iface->getBlock();
+ $process->addMetadataEntry('_iface', $iface->jsonSerialize());
+ $process->setProcessExecutionTime($time);
+
+ $dataset = [];
+ foreach ($iface->getDataset() as $params) {
+ $dataset[] = $this->serializeReflectionParams(
+ $this->blocksReflexion[$process->getToken()], $params
+ );
+ }
+ $process->setDataset($dataset);
+
+ $this->blockMapper->insert($process);
+ }
+
+ $this->blockMapper->updateSessionStatus(
+ $this->sessionToken, BlockStatus::STANDBY, BlockStatus::PREP
+ );
+ $this->processes = $this->interfaces = [];
+ $this->sessionToken = null;
+
+ return $current;
+ }
+
+ /**
+ * @internal
+ */
+ public function async(ProcessExecutionTime $time): string {
+ $current = $this->endSession($time);
+ if ($current === null) {
+ return '';
+ }
+
+ if ($time === ProcessExecutionTime::NOW) {
+ $this->forkManager->forkSession($current);
+ }
+
+ return $current;
+ }
+
+ private function serializeReflectionParams(ReflectionFunctionAbstract $reflection, array $params): array {
+ $i = 0;
+ $processWrapper = false;
+ $filteredParams = [];
+
+ foreach ($reflection->getParameters() as $arg) {
+ $argType = $arg->getType();
+ $param = $params[$i];
+ if ($argType !== null) {
+ if ($i === 0 && $argType->getName() === ABlockWrapper::class) {
+ $processWrapper = true;
+ continue; // we ignore IProcessWrapper as first argument, as it will be filled at execution time
+ }
+
+ // TODO: compare $argType with $param
+// foreach($argType->getTypes() as $t) {
+// echo '> ' . $t . "\n";
+// }
+// $arg->allowsNull()
+// $arg->isOptional()
+
+ }
+
+ // TODO: we might want to filter some params ?
+ $filteredParams[] = $param;
+
+// echo '..1. ' . json_encode($param) . "\n";
+// echo '..2. ' . serialize($param) . "\n";
+// echo '? ' . gettype($param) . "\n";
+
+ $i++;
+ }
+
+ try {
+ $serializedParams = serialize($params);
+ } catch (\Exception $e) {
+ throw $e;
+ }
+
+ return [
+ 'params' => $serializedParams,
+ 'paramsClasses' => $this->extractClassFromArray($filteredParams),
+ 'processWrapper' => $processWrapper,
+ ];
+ }
+
+//
+// public function unserializeParams(array $data): array {
+// return $data;
+// }
+//
+
+ private function extractClassFromArray(array $arr, array &$classes = []): array {
+ foreach ($arr as $entry) {
+ if (is_array($entry)) {
+ $this->extractClassFromArray($entry, $classes);
+ }
+
+ if (is_object($entry)) {
+ $class = get_class($entry);
+ if (!in_array($class, $classes, true)) {
+ $classes[] = $class;
+ }
+ }
+ }
+
+ return $classes;
+ }
+
+ private function generateToken(int $length = 15): string {
+ $result = '';
+ for ($i = 0; $i < $length; $i++) {
+ $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)];
+ }
+
+ return $result;
+ }
+
+ public function dropAllBlocks(): void {
+ $this->blockMapper->deleteAll();
+ }
+
+ public function resetConfig(): void {
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS);
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING);
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST);
+ }
+}
diff --git a/lib/private/Async/AsyncProcess.php b/lib/private/Async/AsyncProcess.php
new file mode 100644
index 00000000000..44d990749d7
--- /dev/null
+++ b/lib/private/Async/AsyncProcess.php
@@ -0,0 +1,64 @@
+<?php
+
+namespace OC\Async;
+
+use Laravel\SerializableClosure\SerializableClosure;
+use Laravel\SerializableClosure\Serializers\Native;
+use OC\Async\Enum\BlockType;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\IAsyncProcess;
+use ReflectionFunction;
+use ReflectionMethod;
+
+class AsyncProcess implements IAsyncProcess {
+ public function __construct(
+ private AsyncManager $asyncManager,
+ private readonly ForkManager $forkManager,
+ ) {
+ }
+
+ public function exec(\Closure $closure, ...$params): IBlockInterface {
+ return $this->asyncManager->asyncBlock(
+ BlockType::CLOSURE,
+ serialize(new SerializableClosure($closure)),
+ new ReflectionFunction($closure),
+ $params,
+ [SerializableClosure::class, Native::class]
+ );
+ }
+
+ public function invoke(callable $obj, ...$params): IBlockInterface {
+ return $this->asyncManager->asyncBlock(
+ BlockType::INVOKABLE,
+ serialize($obj),
+ new ReflectionMethod($obj, '__invoke'),
+ $params,
+ [$obj::class]
+ );
+ }
+
+ public function call(string $class, ...$params): IBlockInterface {
+ // abstract ?
+ if (!method_exists($class, 'async')) {
+ throw new \Exception('class ' . $class . ' is missing async() method');
+ }
+
+ return $this->asyncManager->asyncBlock(
+ BlockType::CLASSNAME,
+ $class,
+ new ReflectionMethod($class, 'async'),
+ $params,
+ );
+ }
+
+ /**
+ * close the creation of the session and start async as soon as possible
+ *
+ * @param ProcessExecutionTime $time preferred urgency to start the async process
+ *
+ * @return string session token, empty if no opened session
+ */
+ public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): string {
+ return $this->asyncManager->async($time);
+ }
+}
diff --git a/lib/private/Async/Db/BlockMapper.php b/lib/private/Async/Db/BlockMapper.php
new file mode 100644
index 00000000000..6d6409327a4
--- /dev/null
+++ b/lib/private/Async/Db/BlockMapper.php
@@ -0,0 +1,260 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Db;
+
+use OC\Async\Exceptions\BlockNotFoundException;
+use OC\Async\Model\Block;
+use OCP\AppFramework\Db\DoesNotExistException;
+use OCP\AppFramework\Db\QBMapper;
+use OCP\Async\Enum\BlockStatus;
+use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\IDBConnection;
+use Psr\Log\LoggerInterface;
+
+/**
+ * @template-extends QBMapper<Block>
+ */
+class BlockMapper extends QBMapper {
+ public const TABLE = 'async_process';
+
+ public function __construct(
+ IDBConnection $db,
+ private LoggerInterface $logger,
+ ) {
+ parent::__construct($db, self::TABLE, Block::class);
+ }
+
+ /**
+ * @return Block[]
+ */
+ public function getBySession(string $sessionToken): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->select('*')
+ ->from($this->getTableName())
+ ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)))
+ ->orderBy('id', 'asc');
+
+ return $this->findEntities($qb);
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getSessions(): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('session_token')
+ ->from($this->getTableName());
+
+ $result = $qb->executeQuery();
+
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ return $sessions;
+ }
+
+
+ /**
+ * returns list of sessionId that contains process with:
+ * - at least one is in STANDBY with an older timestamp in next_run
+ * - none as RUNNING or BLOCKER
+ *
+ * @return string[]
+ */
+ public function getSessionOnStandBy(): array {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('t1.session_token')
+ ->from($this->getTableName(), 't1')
+ ->leftJoin(
+ 't1', $this->getTableName(), 't2',
+ $qb->expr()->andX(
+ $qb->expr()->eq('t1.session_token', 't2.session_token'),
+ $qb->expr()->in('t2.status', $qb->createNamedParameter([BlockStatus::PREP->value, BlockStatus::RUNNING->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY))
+ )
+ )
+ ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->isNull('t2.status'));
+
+ $result = $qb->executeQuery();
+
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ return $sessions;
+ }
+
+ /**
+ * reset to STANDBY all process:
+ * - marked as ERROR or BLOCKER,
+ * - next_run in an older timestamp
+ * - next_run not at zero
+ */
+ public function resetFailedBlock(): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter(BlockStatus::STANDBY->value, IQueryBuilder::PARAM_INT))
+ ->where($qb->expr()->in('status', $qb->createNamedParameter([BlockStatus::ERROR->value, BlockStatus::BLOCKER->value], IQueryBuilder::PARAM_INT_ARRAY)))
+ ->andWhere($qb->expr()->lt('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->neq('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)));
+ return $qb->executeStatement();
+ }
+
+ /**
+ * delete sessions that contain only process with status at ::SUCCESS
+ */
+ public function removeSuccessfulBlock(): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->selectDistinct('t1.session_token')
+ ->from($this->getTableName(), 't1')
+ ->leftJoin(
+ 't1', $this->getTableName(), 't2',
+ $qb->expr()->andX(
+ $qb->expr()->eq('t1.session_token', 't2.session_token'),
+ $qb->expr()->neq('t2.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT))
+ )
+ )
+ ->where($qb->expr()->eq('t1.status', $qb->createNamedParameter(BlockStatus::SUCCESS->value, IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->lt('t1.next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT)))
+ ->andWhere($qb->expr()->isNull('t2.status'));
+
+ $result = $qb->executeQuery();
+ $sessions = [];
+ while ($row = $result->fetch()) {
+ $sessions[] = $row['session_token'];
+ }
+ $result->closeCursor();
+
+ $count = 0;
+ $chunks = array_chunk($sessions, 30);
+ foreach($chunks as $chunk) {
+ $delete = $this->db->getQueryBuilder();
+ $delete->delete($this->getTableName())
+ ->where($qb->expr()->in('session_token', $delete->createNamedParameter($chunk, IQueryBuilder::PARAM_INT_ARRAY)));
+ $count += $delete->executeStatement();
+ unset($delete);
+ }
+
+ return $count;
+ }
+
+ /**
+ * return a Block from its token
+ *
+ * @throws BlockNotFoundException
+ */
+ public function getByToken(string $token): Block {
+ $qb = $this->db->getQueryBuilder();
+ $qb->select('*')
+ ->from($this->getTableName())
+ ->where($qb->expr()->eq('token', $qb->createNamedParameter($token)));
+
+ try {
+ return $this->findEntity($qb);
+ } catch (DoesNotExistException) {
+ throw new BlockNotFoundException('no process found');
+ }
+ }
+
+ public function updateStatus(Block $block, ?BlockStatus $prevStatus = null, string $lockToken = ''): bool {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter($block->getStatus()))
+ ->where($qb->expr()->andX(
+ $qb->expr()->eq('session_token', $qb->createNamedParameter($block->getSessionToken())),
+ $qb->expr()->eq('token', $qb->createNamedParameter($block->getToken()))
+ )
+ );
+
+ // if process contain a lockToken, we conflict with the stored one
+ if ($block->getLockToken() !== '') {
+ $qb->andWhere($qb->expr()->eq('lock_token', $qb->createNamedParameter($block->getLockToken())));
+ }
+
+ // if status is switching from standby to running, we set last_run to detect timeout
+ if ($block->getBlockStatus() === BlockStatus::RUNNING && $prevStatus === BlockStatus::STANDBY) {
+ $qb->set('last_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT));
+ }
+
+ // if status is updated to success, error or blocker, we store result (or exception)
+ if (in_array($block->getBlockStatus(), [BlockStatus::SUCCESS, BlockStatus::ERROR, BlockStatus::BLOCKER], true)) {
+ try {
+ $qb->set('result', $qb->createNamedParameter(json_encode($block->getResult(), JSON_THROW_ON_ERROR)));
+ $qb->set('metadata', $qb->createNamedParameter(json_encode($block->getMetadata(), JSON_THROW_ON_ERROR)));
+ $qb->set('next_run', $qb->createNamedParameter($block->getNextRun(), IQueryBuilder::PARAM_INT));
+ } catch (\JsonException $e) {
+ $this->logger->warning('could not json_encode process result', ['exception' => $e]);
+ }
+ }
+
+ if ($lockToken !== '') {
+ $qb->set('lock_token', $qb->createNamedParameter($lockToken));
+ }
+
+ if ($prevStatus !== null) {
+ $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT)));
+ }
+
+ return ($qb->executeStatement() === 1);
+ }
+
+ public function updateSessionStatus(string $sessionToken, BlockStatus $status, ?BlockStatus $prevStatus = null): int {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('status', $qb->createNamedParameter($status->value, IQueryBuilder::PARAM_INT))
+ ->where($qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)));
+
+ if ($prevStatus !== null) {
+ $qb->andWhere($qb->expr()->eq('status', $qb->createNamedParameter($prevStatus->value, IQueryBuilder::PARAM_INT)));
+ }
+
+ return $qb->executeStatement();
+ }
+
+
+
+ /**
+ * set next_run to current time if next_run>0 in relation to session_token.
+ * Force process queued for replay to be run as soon as possible.
+ */
+ public function resetSessionNextRun(string $sessionToken): void {
+ $qb = $this->db->getQueryBuilder();
+ $qb->update($this->getTableName())
+ ->set('next_run', $qb->createNamedParameter(time(), IQueryBuilder::PARAM_INT))
+ ->where(
+ $qb->expr()->eq('session_token', $qb->createNamedParameter($sessionToken)),
+ $qb->expr()->gt('next_run', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)),
+ );
+
+ $qb->executeStatement();
+ }
+
+
+// public function updateDataset(string $token, array $dataset): void {
+// $qb = $this->db->getQueryBuilder();
+// $qb->update($this->getTableName())
+// ->set('dataset', $qb->createNamedParameter(json_encode($dataset)))
+// ->where($qb->expr()->eq('token', $qb->createNamedParameter($token)));
+//
+// $qb->executeStatement();
+// }
+
+ public function deleteAll() {
+ $qb = $this->db->getQueryBuilder();
+ $qb->delete($this->getTableName());
+ $qb->executeStatement();
+ }
+
+}
diff --git a/lib/private/Async/Enum/BlockType.php b/lib/private/Async/Enum/BlockType.php
new file mode 100644
index 00000000000..839d0d24f05
--- /dev/null
+++ b/lib/private/Async/Enum/BlockType.php
@@ -0,0 +1,16 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Enum;
+
+enum BlockType: int {
+ case CLOSURE = 1;
+ case INVOKABLE = 2;
+ case CLASSNAME = 3;
+}
diff --git a/lib/private/Async/Exceptions/AsyncProcessException.php b/lib/private/Async/Exceptions/AsyncProcessException.php
new file mode 100644
index 00000000000..b394a95df7c
--- /dev/null
+++ b/lib/private/Async/Exceptions/AsyncProcessException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class AsyncProcessException extends Exception {
+}
diff --git a/lib/private/Async/Exceptions/BlockAlreadyRunningException.php b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php
new file mode 100644
index 00000000000..4c671d91d55
--- /dev/null
+++ b/lib/private/Async/Exceptions/BlockAlreadyRunningException.php
@@ -0,0 +1,12 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+class BlockAlreadyRunningException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/BlockNotFoundException.php b/lib/private/Async/Exceptions/BlockNotFoundException.php
new file mode 100644
index 00000000000..6e25415f7e3
--- /dev/null
+++ b/lib/private/Async/Exceptions/BlockNotFoundException.php
@@ -0,0 +1,12 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+class BlockNotFoundException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/LoopbackEndpointException.php b/lib/private/Async/Exceptions/LoopbackEndpointException.php
new file mode 100644
index 00000000000..2a7bea78239
--- /dev/null
+++ b/lib/private/Async/Exceptions/LoopbackEndpointException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class LoopbackEndpointException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/Exceptions/SessionBlockedException.php b/lib/private/Async/Exceptions/SessionBlockedException.php
new file mode 100644
index 00000000000..9c6d85dc4fc
--- /dev/null
+++ b/lib/private/Async/Exceptions/SessionBlockedException.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Exceptions;
+
+use Exception;
+
+class SessionBlockedException extends AsyncProcessException {
+}
diff --git a/lib/private/Async/ForkManager.php b/lib/private/Async/ForkManager.php
new file mode 100644
index 00000000000..6978a601bd4
--- /dev/null
+++ b/lib/private/Async/ForkManager.php
@@ -0,0 +1,485 @@
+<?php
+
+namespace OC\Async;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\Enum\BlockType;
+use OC\Async\Exceptions\AsyncProcessException;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\Exceptions\BlockAlreadyRunningException;
+use OC\Async\Exceptions\SessionBlockedException;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OC\Async\Wrappers\DummyBlockWrapper;
+use OC\Async\Wrappers\LoggerBlockWrapper;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OC\DB\Connection;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use OCP\Async\IAsyncProcess;
+use OCP\Http\Client\IClientService;
+use OCP\IAppConfig;
+use OCP\IConfig;
+use OCP\IURLGenerator;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class ForkManager {
+ private ?ABlockWrapper $wrapper;
+
+ /** @var int[] */
+ private array $forks = [];
+ private const FORK_LIMIT = 3; // maximum number of child process
+ private const FORK_SLEEP = 500000; // wait for msec when too many fork have been created
+
+ public function __construct(
+ private BlockMapper $blockMapper,
+ private Connection $conn,
+ private IAppConfig $appConfig,
+ private IConfig $config,
+ private IClientService $clientService,
+ private IURLGenerator $urlGenerator,
+ LoggerBlockWrapper $loggerProcessWrapper,
+ private LoggerInterface $logger,
+ ) {
+ $this->wrapper = $loggerProcessWrapper;
+ }
+
+ public function setWrapper(?ABlockWrapper $wrapper): void {
+ $this->wrapper = $wrapper;
+ }
+
+
+ /**
+ * @throws BlockAlreadyRunningException
+ * @throws AsyncProcessException
+ */
+ public function runSession(string $token, array $metadata = []): void {
+ $sessionBlocks = $this->blockMapper->getBySession($token);
+ $metadata['sessionToken'] = $token;
+
+ if ($this->wrapper !== null) {
+ $wrapper = clone $this->wrapper;
+ } else {
+ $wrapper = null;
+ }
+
+ // might be need to avoid some conflict/race condition
+ // usleep(10000);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlocks));
+
+ if ($sessionIface->getGlobalStatus() !== BlockStatus::STANDBY) {
+ throw new AsyncProcessException();
+ }
+
+ $wrapper?->setSessionInterface($sessionIface);
+ $wrapper?->session($metadata);
+
+ try {
+ foreach ($sessionBlocks as $block) {
+ if (!$this->confirmBlockRequirement($wrapper, $sessionIface, $block)) {
+ $block->replay();
+ $this->blockMapper->update($block);
+ continue;
+ }
+
+ $block->addMetadata($metadata);
+ $this->runBlock($block, $wrapper);
+
+ if ($block->getBlockStatus() === BlockStatus::BLOCKER) {
+ $wrapper?->end('Fail process ' . $block->getToken() . ' block the rest of the session');
+ throw new SessionBlockedException();
+ }
+ }
+ $wrapper?->end();
+ } catch (BlockAlreadyRunningException) {
+ $wrapper?->end('already running');
+ throw new BlockAlreadyRunningException();
+ }
+ }
+
+ private function confirmBlockRequirement(
+ ?ABlockWrapper $wrapper,
+ SessionInterface $sessionIface,
+ Block $block
+ ): bool {
+ $procIface = new BlockInterface(null, $block);
+ foreach ($procIface->getRequire() as $requiredProcessId) {
+ $requiredBlock = $sessionIface->byId($requiredProcessId);
+ if ($requiredBlock === null) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' which is not defined');
+ return false;
+ }
+ if ($requiredBlock?->getStatus() !== BlockStatus::SUCCESS) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' to be executed and successful');
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function runBlock(Block $block, ?ABlockWrapper $wrapper = null): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ return;
+ }
+
+ $this->lockBlock($block);
+
+ $data = $block->getParams();
+ $serialized = $block->getCode();
+ $params = unserialize($data['params'], ['allowed_classes' => $data['paramsClasses']]);
+ $obj = unserialize($serialized, ['allowed_classes' => $data['blockClasses'] ?? []]);
+
+ if ($data['processWrapper'] ?? false) {
+ array_unshift($params, ($wrapper ?? new DummyBlockWrapper()));
+ }
+
+ $wrapper?->setBlock($block);
+ $wrapper?->init();
+ $wrapper?->activity(BlockActivity::STARTING);
+ $result = [
+ 'executionTime' => ProcessExecutionTime::NOW->value,
+ 'startTime' => time(),
+ ];
+ $iface = new BlockInterface(null, $block);
+ try {
+ $returnedData = null;
+ switch ($block->getBlockType()) {
+ case BlockType::CLOSURE:
+ $c = $obj->getClosure();
+ $returnedData = $c(...$params);
+ break;
+
+ case BlockType::INVOKABLE:
+ $returnedData = $obj(...$params);
+ break;
+
+ case BlockType::CLASSNAME:
+ $obj = new $data['className']();
+ $returnedData = $obj->async(...$params);
+ break;
+ }
+ if (is_array($returnedData)) {
+ $result['result'] = $returnedData;
+ }
+ $block->setBlockStatus(BlockStatus::SUCCESS);
+ if ($block->getReplayCount() > 0) {
+ // in case of success after multiple tentative, we reset next run to right now
+ // on all block waiting for replay. Easiest solution to find block dependant of
+ // this current successful run
+ $this->blockMapper->resetSessionNextRun($block->getSessionToken());
+ }
+ } catch (\Exception $e) {
+ $wrapper?->activity(BlockActivity::ERROR, $e->getMessage());
+ $result['error'] = [
+ 'exception' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ 'code' => $e->getCode()
+ ];
+
+ if ($iface->isReplayable()) {
+ $block->replay(); // we mark the block as able to be back to STANDBY status
+ } else {
+ $block->setNextRun(0);
+ }
+ if ($iface->isBlocker()) {
+ $block->setBlockStatus(BlockStatus::BLOCKER);
+ } else {
+ $block->setBlockStatus(BlockStatus::ERROR);
+ }
+ } finally {
+ $result['endTime'] = time();
+ }
+
+ $block->setResult($result);
+ $wrapper?->activity(BlockActivity::ENDING);
+ $this->blockMapper->updateStatus($block, BlockStatus::RUNNING);
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function lockBlock(Block $block): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ throw new BlockAlreadyRunningException('block not in standby');
+ }
+ $lockToken = $this->generateToken(7);
+ $block->setBlockStatus(BlockStatus::RUNNING);
+ if (!$this->blockMapper->updateStatus($block, BlockStatus::STANDBY, $lockToken)) {
+ throw new BlockAlreadyRunningException('block is locked');
+ }
+ $block->setLockToken($lockToken);
+ }
+
+ public function forkSession(string $session, array $metadata = []): void {
+ if (\OC::$CLI) {
+ $useWebIfNeeded = $metadata['useWebIfNeeded'] ?? false;
+ if ($useWebIfNeeded && !extension_loaded('posix')) {
+ try {
+ $this->forkSessionLoopback($session);
+ return;
+ } catch (\Exception) {
+ }
+ }
+
+ $this->forkSessionCli($session, $metadata);
+ return;
+ }
+
+ try {
+ $this->forkSessionLoopback($session);
+ } catch (LoopbackEndpointException) {
+ // session will be processed later
+ }
+ }
+
+
+ private function forkSessionCli(string $session, array $metadata = []): void {
+ if (!extension_loaded('posix')) {
+ // log/notice that posix is not loaded
+ return;
+ }
+ $slot = $this->getFirstAvailableSlot();
+ $metadata += [
+ '_cli' => [
+ 'slot' => $slot,
+ 'forkCount' => count($this->forks),
+ 'forkLimit' => self::FORK_LIMIT,
+ ]
+ ];
+
+ $pid = pcntl_fork();
+
+ // work around as the parent database connection is inherited by the child.
+ // when child process is over, parent process database connection will drop.
+ // The drop can happen anytime, even in the middle of a running request.
+ // work around is to close the connection as soon as possible after forking.
+ $this->conn->close();
+
+ if ($pid === -1) {
+ // TODO: manage issue while forking
+ } else if ($pid === 0) {
+ // forked process
+ try {
+ $this->runSession($session, $metadata);
+ } catch (AsyncProcessException) {
+ // failure to run session can be part of the process
+ }
+ exit();
+ } else {
+ // store slot+pid
+ $this->forks[$slot] = $pid;
+
+ // when fork limit is reach, cycle until space freed
+ while (true) {
+ $exitedPid = pcntl_waitpid(0, $status, WNOHANG);
+ if ($exitedPid > 0) {
+ $slot = array_search($exitedPid, $this->forks, true);
+ if ($slot) {
+ unset($this->forks[$slot]);
+ }
+ }
+ if (count($this->forks) < self::FORK_LIMIT) {
+ return;
+ }
+ usleep(self::FORK_SLEEP);
+ }
+ }
+ }
+
+ /**
+ * Request local loopback endpoint.
+ * We expect the request to be closed remotely.
+ *
+ * Ignored if:
+ * - the endpoint is not fully configured and tested,
+ * - the server is on heavy load (timeout at 1 second)
+ *
+ * @return string result from the loopback endpoint
+ * @throws LoopbackEndpointException if not configured
+ */
+ private function forkSessionLoopback(string $session, ?string $loopbackEndpoint = null): string {
+ $client = $this->clientService->newClient();
+ try {
+ $response = $client->post(
+ $loopbackEndpoint ?? $this->linkToLoopbackEndpoint(),
+ [
+ 'headers' => [],
+ 'verify' => false,
+ 'connect_timeout' => 1.0,
+ 'timeout' => 1.0,
+ 'http_errors' => true,
+ 'body' => ['token' => $session],
+ 'nextcloud' => [
+ 'allow_local_address' => true,
+ 'allow_redirects' => true,
+ ]
+ ]
+ );
+
+ return (string)$response->getBody();
+ } catch (LoopbackEndpointException $e) {
+ $this->logger->debug('loopback endpoint not configured', ['exception' => $e]);
+ throw $e;
+ } catch (\Exception $e) {
+ $this->logger->warning('could not reach loopback endpoint to initiate fork', ['exception' => $e]);
+ throw new LoopbackEndpointException('loopback endpoint cannot be reach', previous: $e);
+ }
+ }
+
+
+ /**
+ * return full (absolute) link to the web-loopback endpoint
+ *
+ * @param string|null $instance if null, stored loopback address will be used.
+ *
+ * @throws LoopbackEndpointException if $instance is null and no stored configuration
+ */
+ public function linkToLoopbackEndpoint(?string $instance = null): string {
+ return rtrim($instance ?? $this->getLoopbackInstance(), '/') . $this->urlGenerator->linkToRoute('core.AsyncProcess.processFork');
+ }
+
+ /**
+ * return loopback address stored in configuration
+ *
+ * @return string
+ * @throws LoopbackEndpointException if config is not set or empty
+ */
+ public function getLoopbackInstance(): string {
+ if (!$this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) {
+ throw new LoopbackEndpointException('loopback not configured');
+ }
+
+ $instance = $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS);
+ if ($instance === '') {
+ throw new LoopbackEndpointException('empty config');
+ }
+
+ return $instance;
+ }
+
+
+ public function discoverLoopbackEndpoint(?OutputInterface $output = null): ?string {
+ $cliUrl = $this->config->getSystemValueString('overwrite.cli.url', '');
+ $output?->write('- testing value from \'overwrite.cli.url\' (<comment>' . $cliUrl . '</comment>)... ');
+
+ $reason = '';
+ if ($this->testLoopbackInstance($cliUrl, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $cliUrl;
+ }
+
+ $output?->writeln('<error>' . $reason . '</error>');
+
+ foreach($this->config->getSystemValue('trusted_domains', []) as $url) {
+ $url = 'https://' . $url;
+ $output?->write('- testing entry from \'trusted_domains\' (<comment>' . $url . '</comment>)... ');
+ if ($this->testLoopbackInstance($url, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $url;
+ }
+ $output?->writeln('<error>' . $reason . '</error>');
+ }
+
+ return null;
+ }
+
+
+ public function testLoopbackInstance(string $url, string &$reason = ''): bool {
+ $url = rtrim($url, '/');
+ if (!$this->pingLoopbackInstance($url)) {
+ $reason = 'failed ping';
+ return false;
+ }
+
+ $token = $this->generateToken();
+ $asyncProcess = \OCP\Server::get(IAsyncProcess::class);
+ $asyncProcess->exec(function(string $token) {
+ sleep(1); // enforce a delay to confirm asynchronicity
+ $appConfig = \OCP\Server::get(IAppConfig::class);
+ $appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST, $token);
+ }, $token)->name('test loopback instance')->async();
+
+ $this->appConfig->clearCache(true);
+ if ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)) {
+ $reason = 'async process already executed';
+ return false;
+ }
+
+ sleep(3);
+ $this->appConfig->clearCache(true);
+ $result = ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST));
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST);
+
+ return $result;
+ }
+
+ private function pingLoopbackInstance(string $url): bool {
+ $pingLoopback = $this->generateToken();
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING, $pingLoopback);
+ try {
+ $result = $this->forkSessionLoopback('__ping__', $this->linkToLoopbackEndpoint($url));
+ $result = json_decode($result, true, flags: JSON_THROW_ON_ERROR);
+ } catch (\JsonException|LoopbackEndpointException $e) {
+ $this->logger->debug('could not ping loopback endpoint', ['exception' => $e]);
+ }
+
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING);
+ return (($result['ping'] ?? '') === $pingLoopback);
+ }
+
+
+ /**
+ * note that the fact we fork process to run a session of processes before doing
+ * a check on the fact that maybe one of the process of the session is already
+ * running can create a small glitch when choosing the first available slot as
+ * a previous fork running said check is already made and will exit shortly.
+ *
+ * @return int
+ */
+ private function getFirstAvailableSlot(): int {
+ $slot = -1;
+ for ($i = 0; $i < self::FORK_LIMIT; $i++) {
+ if (!array_key_exists($i, $this->forks)) {
+ return $i;
+ }
+
+ // we confirm child process still exists
+ if (pcntl_waitpid($this->forks[$i], $status, WNOHANG) > 0) {
+ return $i;
+ }
+ }
+
+ if ($slot === -1) {
+ // TODO: should not happens: log warning
+ }
+
+ return -1;
+ }
+
+ private function generateToken(int $length = 15): string {
+ $result = '';
+ for ($i = 0; $i < $length; $i++) {
+ $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)];
+ }
+
+ return $result;
+ }
+
+ /**
+ * we wait until all child process are done
+ *
+ * @noinspection PhpStatementHasEmptyBodyInspection
+ */
+ public function waitChildProcess(): void {
+ while (pcntl_waitpid(0, $status) != -1) {
+ }
+ }
+
+}
diff --git a/lib/private/Async/IBlockInterface.php b/lib/private/Async/IBlockInterface.php
new file mode 100644
index 00000000000..33b046b42ff
--- /dev/null
+++ b/lib/private/Async/IBlockInterface.php
@@ -0,0 +1,36 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+interface IBlockInterface {
+ public function getToken(): string;
+ public function id(string $id): self;
+ public function getId(): string;
+ public function name(string $name): self;
+ public function getName(): string;
+ public function require(string $id): self;
+ public function getRequire(): array;
+ public function delay(int $delay): self;
+ public function getDelay(): int;
+ public function getExecutionTime(): ?ProcessExecutionTime;
+ public function blocker(bool $blocker = true): self;
+ public function isBlocker(): bool;
+ public function replayable(bool $replayable = true): self;
+ public function isReplayable(): bool;
+ public function getStatus(): BlockStatus;
+ public function getResult(): ?array;
+ public function getError(): ?array;
+ public function dataset(array $dataset): self;
+ public function getDataset(): array;
+ public function async(ProcessExecutionTime $time): self;
+}
diff --git a/lib/private/Async/ISessionInterface.php b/lib/private/Async/ISessionInterface.php
new file mode 100644
index 00000000000..19307f915bb
--- /dev/null
+++ b/lib/private/Async/ISessionInterface.php
@@ -0,0 +1,21 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async;
+
+use OC\Async\Model\BlockInterface;
+use OCP\Async\Enum\BlockStatus;
+
+interface ISessionInterface {
+ public function getAll(): array;
+ public function byToken(string $token): ?BlockInterface;
+ public function byId(string $id): ?BlockInterface;
+ public function getGlobalStatus(): BlockStatus;
+
+}
diff --git a/lib/private/Async/Model/Block.php b/lib/private/Async/Model/Block.php
new file mode 100644
index 00000000000..9c66ec13ad8
--- /dev/null
+++ b/lib/private/Async/Model/Block.php
@@ -0,0 +1,142 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\Enum\BlockType;
+use OCP\AppFramework\Db\Entity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+/**
+ * @method void setToken(string $token)
+ * @method string getToken()
+ * @method void setSessionToken(string $sessionToken)
+ * @method string getSessionToken()
+ * @method void setType(int $type)
+ * @method int getType()
+ * @method void setCode(string $code)
+ * @method string getCode()
+ * @method void setParams(array $params)
+ * @method ?array getParams()
+ * @method void setDataset(array $dataset)
+ * @method ?array getDataset()
+ * @method void setMetadata(array $metadata)
+ * @method ?array getMetadata()
+ * @method void setLinks(array $params)
+ * @method ?array getLinks()
+ * @method void setOrig(array $orig)
+ * @method ?array getOrig()
+ * @method void setResult(array $result)
+ * @method ?array getResult()
+ * @method void setStatus(int $status)
+ * @method int getStatus()
+ * @method void setExecutionTime(int $status)
+ * @method int getExecutionTime()
+ * @method void setLockToken(string $lockToken)
+ * @method string getLockToken()
+ * @method void setCreation(int $creation)
+ * @method int getCreation()
+ * @method void setLastRun(int $lastRun)
+ * @method int getLastRun()
+ * @method void setNextRun(int $nextRun)
+ * @method int getNextRun()
+ * @psalm-suppress PropertyNotSetInConstructor
+ */
+class Block extends Entity {
+ protected string $token = '';
+ protected string $sessionToken = '';
+ protected int $type = 0;
+ protected string $code = '';
+ protected ?array $params = [];
+ protected ?array $dataset = [];
+ protected ?array $metadata = [];
+ protected ?array $links = [];
+ protected ?array $orig = [];
+ protected ?array $result = [];
+ protected int $status = 0;
+ protected int $executionTime = 0;
+ protected string $lockToken = '';
+ protected int $creation = 0;
+ protected int $lastRun = 0;
+ protected int $nextRun = 0;
+
+ public function __construct() {
+ $this->addType('token', 'string');
+ $this->addType('sessionToken', 'string');
+ $this->addType('type', 'integer');
+ $this->addType('code', 'string');
+ $this->addType('params', 'json');
+ $this->addType('dataset', 'json');
+ $this->addType('metadata', 'json');
+ $this->addType('links', 'json');
+ $this->addType('orig', 'json');
+ $this->addType('result', 'json');
+ $this->addType('status', 'integer');
+ $this->addType('executionTime', 'integer');
+ $this->addType('lockToken', 'string');
+ $this->addType('creation', 'integer');
+ $this->addType('lastRun', 'integer');
+ $this->addType('nextRun', 'integer');
+ }
+
+ public function setBlockType(BlockType $type): void {
+ $this->setType($type->value);
+ }
+
+ public function getBlockType(): BlockType {
+ return BlockType::from($this->getType());
+ }
+
+ public function setBlockStatus(BlockStatus $status): void {
+ $this->setStatus($status->value);
+ }
+
+ public function getBlockStatus(): BlockStatus {
+ return BlockStatus::from($this->getStatus());
+ }
+
+ public function setProcessExecutionTime(ProcessExecutionTime $time): void {
+ $this->setExecutionTime($time->value);
+ }
+
+ public function getProcessExecutionTime(): ProcessExecutionTime {
+ return ProcessExecutionTime::from($this->getExecutionTime());
+ }
+
+ public function addMetadata(array $metadata): self {
+ $metadata = array_merge($this->metadata, $metadata);
+ $this->setMetadata($metadata);
+ return $this;
+ }
+
+ public function addMetadataEntry(string $key, string|int|array|bool $value): self {
+ $metadata = $this->metadata;
+ $metadata[$key] = $value;
+ $this->setMetadata($metadata);
+ return $this;
+ }
+
+ public function replay(bool $now = false): void {
+ $count = $this->getMetadata()['_replay'] ?? 0;
+ $next = time() + floor(6 ** ($count + 1)); // calculate delay based on count of retry
+ $count = min(++$count, 5); // limit to 6^6 seconds (13h)
+
+ if ($now) {
+ $next = time();
+ }
+
+ $this->addMetadataEntry('_replay', $count);
+ $this->setNextRun($next);
+ }
+
+ public function getReplayCount(): int {
+ return $this->getMetadata()['_replay'] ?? 0;
+ }
+}
diff --git a/lib/private/Async/Model/BlockInterface.php b/lib/private/Async/Model/BlockInterface.php
new file mode 100644
index 00000000000..36ba58194f7
--- /dev/null
+++ b/lib/private/Async/Model/BlockInterface.php
@@ -0,0 +1,183 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\AsyncManager;
+use OC\Async\IBlockInterface;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+
+class BlockInterface implements IBlockInterface, \JsonSerializable {
+ private string $id = '';
+ private string $name = '';
+ private bool $blocker = false;
+ private bool $replayable = false;
+
+ /** @var string[] */
+ private array $require = [];
+ private int $delay = 0;
+ private ?ProcessExecutionTime $executionTime = null;
+ private array $dataset = [];
+
+ public function __construct(
+ private readonly ?AsyncManager $asyncManager,
+ private readonly Block $block,
+ ) {
+ $this->import($block->getMetadata()['_iface'] ?? []);
+ }
+
+ public function getBlock(): Block {
+ return $this->block;
+ }
+
+ public function getToken(): string {
+ return $this->block->getToken();
+ }
+
+ public function getResult(): ?array {
+ if ($this->block->getBlockStatus() !== BlockStatus::SUCCESS) {
+ return null;
+ }
+
+ return $this->block->getResult()['result'];
+ }
+
+ public function getError(): ?array {
+ if ($this->block->getBlockStatus() !== BlockStatus::ERROR) {
+ return null;
+ }
+
+ return $this->block->getResult()['error'];
+ }
+
+ public function getStatus(): BlockStatus {
+ return $this->block->getBlockStatus();
+ }
+
+ public function id(string $id): self {
+ $this->id = strtoupper($id);
+ return $this;
+ }
+
+ public function getId(): string {
+ return $this->id;
+ }
+ public function name(string $name): self {
+ $this->name = $name;
+ return $this;
+ }
+
+ public function getName(): string {
+ return $this->name;
+ }
+
+ public function blocker(bool $blocker = true): self {
+ $this->blocker = $blocker;
+ return $this;
+ }
+
+ public function isBlocker(): bool {
+ return $this->blocker;
+ }
+
+ public function replayable(bool $replayable = true): self {
+ $this->replayable = $replayable;
+ return $this;
+ }
+
+ public function isReplayable(): bool {
+ return $this->replayable;
+ }
+
+ public function require(string $id): self {
+ $this->require[] = strtoupper($id);
+ return $this;
+ }
+
+ public function getRequire(): array {
+ return $this->require;
+ }
+
+ public function delay(int $delay): self {
+ $this->delay = $delay;
+ return $this;
+ }
+
+ public function getDelay(): int {
+ return $this->delay;
+ }
+
+ public function getExecutionTime(): ?ProcessExecutionTime {
+ return $this->executionTime;
+ }
+
+ /**
+ * @param array<array> $dataset
+ *
+ * @return $this
+ */
+ public function dataset(array $dataset): self {
+ $this->dataset = $dataset;
+ return $this;
+ }
+
+ public function getDataset(): array {
+ return $this->dataset;
+ }
+
+// public function getReplayCount(): int {
+// return $this->get
+// }
+ /**
+ * only available during the creation of the session
+ *
+ * @return $this
+ */
+ public function async(ProcessExecutionTime $time = ProcessExecutionTime::NOW): self {
+ $this->asyncManager?->async($time);
+ return $this;
+ }
+
+ public function import(array $data): void {
+ $this->token = $data['token'] ?? '';
+ $this->id($data['id'] ?? '');
+ $this->name($data['name'] ?? '');
+ $this->delay($data['delay'] ?? 0);
+ $this->require = $data['require'] ?? [];
+ $this->blocker($data['blocker'] ?? false);
+ $this->replayable($data['replayable'] ?? false);
+ }
+
+ public function jsonSerialize(): array {
+ return [
+ 'token' => $this->getToken(),
+ 'id' => $this->getId(),
+ 'name' => $this->getName(),
+ 'require' => $this->getRequire(),
+ 'delay' => $this->getDelay(),
+ 'blocker' => $this->isBlocker(),
+ 'replayable' => $this->isReplayable(),
+ ];
+ }
+
+ /**
+ * @param Block[] $processes
+ *
+ * @return BlockInterface[]
+ */
+ public static function asBlockInterfaces(array $processes): array {
+ $interfaces = [];
+ foreach ($processes as $process) {
+ $interfaces[] = new BlockInterface(null, $process);
+ }
+
+ return $interfaces;
+ }
+}
diff --git a/lib/private/Async/Model/SessionInterface.php b/lib/private/Async/Model/SessionInterface.php
new file mode 100644
index 00000000000..c481fc545f6
--- /dev/null
+++ b/lib/private/Async/Model/SessionInterface.php
@@ -0,0 +1,83 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Model;
+
+use OC\Async\ISessionInterface;
+use OCP\Async\Enum\BlockStatus;
+
+class SessionInterface implements ISessionInterface {
+ public function __construct(
+ /** @var BlockInterface[] */
+ private array $interfaces,
+ ) {
+ }
+
+ public function getAll(): array {
+ return $this->interfaces;
+ }
+
+ public function byToken(string $token): ?BlockInterface {
+ foreach ($this->interfaces as $iface) {
+ if ($iface->getToken() === $token) {
+ return $iface;
+ }
+ }
+
+ return null;
+ }
+
+ public function byId(string $id): ?BlockInterface {
+ foreach($this->interfaces as $iface) {
+ if ($iface->getId() === $id) {
+ return $iface;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * return a global status of the session based on the status of each process
+ * - if one entry is still at ::PREP stage, we return ::PREP
+ * - if one ::BLOCKER, returns ::BLOCKER.
+ * - if all ::SUCCESS, returns ::SUCCESS, else ignored.
+ * - if all ::ERROR+::SUCCESS, returns ::ERROR, else ignored.
+ * - session status is the biggest value between all left process status (::STANDBY, ::RUNNING)
+ */
+ public function getGlobalStatus(): BlockStatus {
+ $current = -1;
+ $groupedStatus = [];
+ foreach($this->interfaces as $iface) {
+ // returns ::PREP if one process is still ::PREP
+ if ($iface->getStatus() === BlockStatus::PREP) {
+ return BlockStatus::PREP;
+ }
+ // returns ::BLOCKER if one process is marked ::BLOCKER
+ if ($iface->getStatus() === BlockStatus::BLOCKER) {
+ return BlockStatus::BLOCKER;
+ }
+ // we keep trace if process is marked as ::ERROR or ::SUCCESS
+ // if not, we keep the highest value
+ if (in_array($iface->getStatus(), [BlockStatus::ERROR, BlockStatus::SUCCESS], true)) {
+ $groupedStatus[$iface->getStatus()->value] = true;
+ } else {
+ $current = max($current, $iface->getStatus()->value);
+ }
+ }
+
+ // in case the all interface were ::ERROR or ::SUCCESS, we check
+ // if there was at least one ::ERROR. if none we return ::SUCCESS
+ if ($current === -1) {
+ return (array_key_exists(BlockStatus::ERROR->value, $groupedStatus)) ? BlockStatus::ERROR : BlockStatus::SUCCESS;
+ }
+
+ return BlockStatus::from($current);
+ }
+}
diff --git a/lib/private/Async/README.md b/lib/private/Async/README.md
new file mode 100644
index 00000000000..185047f52db
--- /dev/null
+++ b/lib/private/Async/README.md
@@ -0,0 +1,379 @@
+
+
+# AsyncProcess
+
+Using `IAsyncProcess` allow the execution of code on a separated process in order to improve the quality of the user experience.
+
+
+## Concept
+
+To shorten the hanging time on heavy process that reflect on the user experience and to avoid delay between
+instruction and execution, this API allows to prepare instructions to be executed on a parallel thread.
+
+This is obtained by creating a loopback HTTP request, initiating a fresh PHP process that will execute the
+prepared instruction after emulating a connection termination, freeing the main process.
+
+#### Technology
+
+The logic is to:
+
+- store a serialized version of the code to be executed on another process into database.
+- start a new process as soon as possible that will retrieve the code from database and execute it.
+
+#### Setup
+
+The feature require setting a loopback url.
+This is done automatically by a background job. The automatic process uses `'overwrite.cli.url'` and `'trusted_domains'` from _config/config.php_ to find a list of domain name to test.
+It can be initiated via `occ`:
+
+> ./occ async:setup --discover
+
+Or manually set with:
+
+> ./occ async:setup --loopback https://cloud.example.net/
+
+
+
+## Blocks & Sessions
+
+- We will define as _Block_ complete part of unsplittable code.
+- A list of _Blocks_ can be grouped in _Sessions_.
+- While _Sessions_ are independent of each other, interactions can be set between _Blocks_ of the same _Session_.
+
+**Interactions**
+
+- _Blocks_ are executed in the order they have been created.
+- It is possible for a _Block_ to get results from a previous process from the session.
+- A _Block_ defined as blocker will stop further process from that session on failure.
+- A _Block_ can require a previous process to be successful before being executed.
+
+**Replayability**
+
+- A block can be set as _replayable_, meaning that in case of failure it can be run multiple time until it end properly.
+
+**Quick example**
+
+```php
+
+// define all part of the code that can be async
+$this->asyncProcess->invoke($myInvoke1)->id('block1')->replayable(); // block1 can be replayed until successful
+$this->asyncProcess->invoke($myInvoke2)->id('block2')->require('block1'); // block2 will not be executed until block1 has not been successful
+$this->asyncProcess->invoke($myInvoke3)->id('block3')->blocker(); // block3 will run whatever happened to block1 and block2 and its suc1cess is mandatory to continue with the session
+$this->asyncProcess->invoke($myInvoke4)->id('block4');
+
+$this->asyncProcess->async(); // close the session and initiate async execution
+```
+
+
+## ProcessExecutionTime
+
+Code is to be executed as soon as defined, with alternative fallback solutions in that order:
+
+- `::NOW` - main process will fork and execute the code in parallel (instantly)
+- `::ASAP` - process will be executed by an optional live service (a second later)
+- `::LATER` - process will be executed at the next execution of the cron tasks (within the next 10 minutes)
+- `::ON_REQUEST` - process needs to be executed manually
+
+
+## IAsyncProcess
+
+`IAsyncProcess` is the public interface that contains few methods to prepare the code to be processed on a parallel process.
+
+
+#### Closure
+
+The code can be directly written in a closure by calling `exec()`:
+
+```php
+$this->asyncProcess->exec(function (int $value, string $line, MyObject $obj): void {
+ // long process
+},
+ random_int(10000, 99999),
+ 'this is a line',
+ $myObj
+)->async();
+```
+
+
+#### Invokable
+
+Within the magic method `__invoke()` in a configured objects
+
+```php
+class MyInvoke {
+ public function __construct(
+ private array $data = []
+ ) {
+ }
+
+ public function __invoke(int $n): void {
+ // do long process
+ }
+}
+
+
+$myInvoke = new MyInvoke(['123']);
+$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->async();
+```
+
+
+#### PHP Class
+
+Via the method `async()` from a class
+
+```php
+<?php
+
+namespace OCA\MyApp;
+
+class MyObj {
+ public function __construct(
+ ) {
+ }
+
+ public function async(int $n): void {
+ // run heavy stuff
+ }
+}
+```
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class, random_int(10000, 99999))->async();
+```
+
+
+
+## IBlockInterface
+
+When storing a new _Block_ via `IAsyncProcess::call()`,`IAsyncProcess::invoke()` or `IAsyncProcess::async()`, will be returned a `IBlockInterface` to provided details about the _Block_.
+
+
+### name(string)
+
+Identification and/or description of the _Block_ for better understanding when debugging
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->name('my process');
+```
+
+### id(string)
+
+Identification of the _Block_ for future interaction between _Blocks_ within the same _Session_
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->id('my_process');
+```
+
+As an example, `id` are to be used to obtain `IBlockInterface` from a specific _Block_:
+```php
+ISessionInterface::byId('my_process'); // returns IBlockInterface
+```
+
+
+### blocker()
+
+Set current _Block_ as _Blocker_, meaning that further _Blocks_ of the _Session_ are lock until this process does not run successfully
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->blocker();
+```
+
+
+### require(string)
+
+Define that the _Block_ can only be executed if set _Block_, identified by its `id`, ran successfully.
+Multiple _Blocks_ can be set a required.
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->require('other_block_1')->require('other_block_2');
+```
+
+### replayable()
+
+The _Block_ is configured as replayable, meaning that it will be restarted until it runs correctly
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->replayable();
+```
+
+The delay is calculated using 6 (six) exponent current retry, capped at 6:
+
+- 1st retry after few seconds,
+- 2nd retry after 30 seconds,
+- 3rd retry after 3 minutes,
+- 4th retry after 20 minutes,
+- 5th retry after 2 hours,
+- 6th retry after 12 hours,
+- other retries every 12 hours.
+
+### delay(int)
+
+Only try to initiate the process n seconds after current time.
+
+### dataset(array)
+
+It is possible to set a list of arguments to be applied to the same _Block_.
+The _Block_ will be executed for each defined set of data
+
+```php
+$this->asyncProcess->call(\OCA\MyApp\MyObj::class)->dataset(
+ [
+ ['this is a string', 1],
+ ['this is another string', 12],
+ ['and another value as first parameter', 42],
+ ]
+);
+```
+
+
+### post-execution
+
+Post execution of a _Block_, its `IBlockInterface` can be used to get details about it:
+
+- **getExecutionTime()** returns the `ProcessExecutionTime` that initiated the process,
+- **getResult()** returns the array returned by the process in case of success,
+- **getError()** returns the error in case of failure
+
+
+
+## ISessionInterface
+
+`ISessionInterface` is available to your code via `ABlockWrapper` and helps interaction between all the _Blocks_ of the same _Session_
+
+### getAll()
+
+returns all `IBlockInterface` from the _Session_
+
+### byToken(string)
+
+returns a `IBockInterface` using its token
+
+### byId(string)
+
+returns a `IBockInterface` using its `id`
+
+### getGlobalStatus()
+
+return a `BlockStatus` (Enum) based on every status of all _Blocks_ of the _Session_:
+
+- returns `::PREP` if one block is still at prep stage,
+- return `::BLOCKER` if at least one block is set as `blocker` and is failing,
+- returns `::SUCCESS` if all blocks are successful,
+- returns `::ERROR` if all process have failed,
+- returns `::STANDBY` or `::RUNNING` if none of the previous condition are met. `::RUNNING` if at least one `Block` is currently processed.
+
+
+
+
+## ABlockWrapper
+
+This abstract class helps to interface with other _Blocks_ from the same _Session_.
+It will be generated and passed as argument to the defined block if the first parameter is an `AprocessWrapper` is expected as first parameter:
+
+
+As a _Closure_
+```php
+$this->asyncProcess->exec(function (ABlockWrapper $wrapper, array $data): array {
+ $resultFromProc1 = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success, unless using require()
+ $wrapper->activity(BlockActivity::NOTICE, 'result from previous process: ' . json_encode($resultFromProc1))
+},
+ ['mydata' => true]
+ )->id('block2');
+```
+
+When using `invoke()`
+```php
+class MyInvoke {
+ public function __construct(
+ ) {
+ }
+
+ public function __invoke(ABlockWrapper $wrapper, int $n): void {
+ $data = $wrapper->getSessionInterface()->byId('block1')?->getResult(); // can be null if 'block1' is not success
+ }
+}
+
+$myInvoke = new MyInvoke();
+$this->asyncProcess->invoke($myInvoke, random_int(10000, 99999))->requipe('block1'); // require ensure block1 has run successfully before this one
+```
+
+Syntax is the same with `call()`, when defining the `async()` method
+
+```php
+class MyObj {
+ public function __construct(
+ ) {
+ }
+
+ public function async(ABlockWrapper $wrapper): void {
+ }
+}
+```
+
+#### Abstract methods
+
+`ABlockWrapper` is an abstract class with a list of interfaced methods that have different behavior on the _BlockWrapper_ sent by the framework.
+
+- **DummyBlockWrapper** will do nothing,
+- **CliBlockWrapper** will generate/manage console output,
+- **LoggerBlockWrapper** will only create new nextcloud logs entry.
+
+List of usefull methods:
+
+- **activity(BlockActivity $activity, string $line = '');** can be used to update details about current part of your code during its process
+- **getSessionInterface()** return the `ISessionInterface` for the session
+- **getReplayCount()** returns the number of retry
+
+
+## Other tools
+
+#### Live Service
+
+This will cycle every few seconds to check for any session in stand-by mode and will execute its blocks
+> ./occ async:live
+
+
+#### Manage sessions and blocks
+
+Get resume about current session still in database.
+
+> `./occ async:manage`
+
+Get details about a session.
+
+> `./occ async:manage --session <sessionId>
+
+Get details about a block
+
+> `./occ async:manage --details <blockId>
+
+Get excessive details about a block
+
+> `./occ async:manage --details <blockId> --full-details
+
+Replay a not successful block
+
+> `./occ async:manage --replay <blockId>
+
+#### Mocking process
+
+`./occ async:setup` allow an admin to generate fake processes to emulate the feature:
+
+- **--mock-session _int_** create _n_ sessions
+- **--mock-block _int_** create _n_ blocks
+- **--fail-process _string_** create failing process
+
+
+# Work in Progress
+
+missing element of the feature:
+
+- [ ] discussing the need of signing the PHP code stored in database to ensure its authenticity before execution,
+- [ ] ability to overwrite the code or arguments of a process to fix a failing process,
+- [ ] Check and confirm value type compatibility between parameters and arguments when storing new block
+- [ ] implementing dataset(),
+- [ ] implementing delay(),
+- [ ] full documentation,
+- [ ] tests, tests, tests.
+
+
diff --git a/lib/private/Async/Wrappers/CliBlockWrapper.php b/lib/private/Async/Wrappers/CliBlockWrapper.php
new file mode 100644
index 00000000000..4c38c7fc995
--- /dev/null
+++ b/lib/private/Async/Wrappers/CliBlockWrapper.php
@@ -0,0 +1,103 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+use Symfony\Component\Console\Formatter\OutputFormatterStyle;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class CliBlockWrapper extends ABlockWrapper {
+ private const TAB_SIZE = 12;
+ private const HEXA = '258ad';
+ private int $hexaLength = 0;
+ private string $prefix = '';
+ private int $slot = 0;
+ public function __construct(
+ private ?OutputInterface $output = null,
+ ) {
+ $this->hexaLength = strlen(self::HEXA);
+ }
+
+ public function session(array $metadata): void {
+ $this->slot = $metadata['_cli']['slot'] ?? 0;
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ '<open>++</>' . ' initiating session ' .
+ ($metadata['sessionToken'] ?? '')
+ );
+ }
+
+ public function init(): void {
+ $this->prefix = $this->generatePrefix($this->block->getToken());
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ $act = match ($activity) {
+ BlockActivity::STARTING => '<open>>></>',
+ BlockActivity::ENDING => '<close><<</>',
+ BlockActivity::DEBUG, BlockActivity::NOTICE => ' ',
+ BlockActivity::WARNING => '<warn>!!</>',
+ BlockActivity::ERROR => '<error>!!</>',
+ };
+
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ $act . ' ' .
+ $this->prefix . ' ' .
+ $line
+ );
+ }
+
+ public function end(string $line = ''): void {
+ if ($line === '') {
+ $this->output->writeln('');
+ return;
+ }
+
+ $this->output->writeln(
+ str_repeat(' ', $this->slot * self::TAB_SIZE) . ' ' .
+ '<warn>--</>' . ' ' . $line
+ );
+ }
+
+ private function generatePrefix(string $token): string {
+ $color = 's' . random_int(1, $this->hexaLength - 1) .
+ random_int(1, $this->hexaLength - 1) .
+ random_int(1, $this->hexaLength - 1);
+
+ return '<' . $color . '>' . $token . '</>';
+ }
+
+ public static function initStyle(OutputInterface $output): void {
+ $output->getFormatter()->setStyle('open', new OutputFormatterStyle('#0f0', '', ['bold']));
+ $output->getFormatter()->setStyle('close', new OutputFormatterStyle('#f00', '', ['bold']));
+ $output->getFormatter()->setStyle('warn', new OutputFormatterStyle('#f00', '', []));
+ $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#f00', '', ['bold']));
+
+ $hexaLength = strlen(self::HEXA);
+ for ($i = 0; $i < $hexaLength; $i++) {
+ for ($j = 0; $j < $hexaLength; $j++) {
+ for ($k = 0; $k < $hexaLength; $k++) {
+ $output->getFormatter()->setStyle(
+ 's' . $i . $j . $k,
+ new OutputFormatterStyle(
+ '#' . self::HEXA[$i] . self::HEXA[$j] . self::HEXA[$k],
+ '',
+ ['bold']
+ )
+ );
+ }
+ }
+ }
+ }
+
+}
diff --git a/lib/private/Async/Wrappers/DummyBlockWrapper.php b/lib/private/Async/Wrappers/DummyBlockWrapper.php
new file mode 100644
index 00000000000..e719e4f83e5
--- /dev/null
+++ b/lib/private/Async/Wrappers/DummyBlockWrapper.php
@@ -0,0 +1,27 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+
+class DummyBlockWrapper extends ABlockWrapper {
+ public function session(array $metadata): void {
+ }
+ public function init(): void {
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ }
+
+ public function end(string $line = ''): void {
+ }
+}
diff --git a/lib/private/Async/Wrappers/LoggerBlockWrapper.php b/lib/private/Async/Wrappers/LoggerBlockWrapper.php
new file mode 100644
index 00000000000..66d9c80007c
--- /dev/null
+++ b/lib/private/Async/Wrappers/LoggerBlockWrapper.php
@@ -0,0 +1,39 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Async\Wrappers;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\Model\Block;
+use OCP\Async\Enum\BlockActivity;
+use Psr\Log\LoggerInterface;
+
+class LoggerBlockWrapper extends ABlockWrapper {
+ private array $metadata = [];
+ public function __construct(
+ private LoggerInterface $logger,
+ ) {
+ }
+
+ public function session(array $metadata): void {
+ $this->metadata = $metadata;
+ $this->logger->debug('session: ' . json_encode($metadata));
+ }
+
+ public function init(): void {
+ $this->logger->debug('process: ' . $this->metadata['sessionToken'] . ' ' . $this->block->getToken());
+ }
+
+ public function activity(BlockActivity $activity, string $line = ''): void {
+ $this->logger->debug('activity (' . $activity->value . ') ' . $line);
+ }
+
+ public function end(string $line = ''): void {
+ $this->logger->debug('end session - ' . $line);
+ }
+}