aboutsummaryrefslogtreecommitdiffstats
path: root/core/Command/Async
diff options
context:
space:
mode:
Diffstat (limited to 'core/Command/Async')
-rw-r--r--core/Command/Async/Live.php54
-rw-r--r--core/Command/Async/Manage.php232
-rw-r--r--core/Command/Async/Setup.php435
3 files changed, 721 insertions, 0 deletions
diff --git a/core/Command/Async/Live.php b/core/Command/Async/Live.php
new file mode 100644
index 00000000000..89a16a3165d
--- /dev/null
+++ b/core/Command/Async/Live.php
@@ -0,0 +1,54 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\AsyncProcess;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Wrappers\CliBlockWrapper;
+use OCP\Async\Enum\ProcessExecutionTime;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class Live extends Command {
+ public function __construct(
+ private readonly BlockMapper $blockMapper,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:live')
+ ->setDescription('test');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ CliBlockWrapper::initStyle($output);
+ $this->forkManager->setWrapper(new CliBlockWrapper($output));
+
+ $metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP];
+ while(true) {
+ $this->blockMapper->resetFailedBlock();
+
+ foreach ($this->blockMapper->getSessionOnStandBy() as $session) {
+ $this->forkManager->forkSession($session, $metadata);
+ }
+
+ sleep(3);
+ }
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+}
diff --git a/core/Command/Async/Manage.php b/core/Command/Async/Manage.php
new file mode 100644
index 00000000000..9be46b335c8
--- /dev/null
+++ b/core/Command/Async/Manage.php
@@ -0,0 +1,232 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\AsyncManager;
+use OC\Async\Db\BlockMapper;
+use OC\Async\ForkManager;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OCP\Async\Enum\BlockExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Formatter\OutputFormatterStyle;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Manage extends Command {
+ private bool $noCrop = false;
+
+ public function __construct(
+ private AsyncManager $asyncManager,
+ private ForkManager $forkManager,
+ private BlockMapper $blockMapper,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:manage')
+ ->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session')
+ ->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all blocks from a session', '')
+ ->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific block', '')
+ ->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details')
+ ->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific block', '')
+ ->setDescription('manage');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ if ($input->getOption('clean')) {
+ $count = $this->blockMapper->removeSuccessfulBlock();
+ $output->writeln('deleted ' . $count . ' blocks');
+ return 0;
+ }
+
+ $replay = $input->getOption('replay');
+ if ($replay !== '') {
+ $this->replayBlock($input, $output, $replay);
+ return 0;
+ }
+
+ $this->noCrop = $input->getOption('full-details');
+ $output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', []));
+ $output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', []));
+ $output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold']));
+ $output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', []));
+ $output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold']));
+ $output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', []));
+ $output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold']));
+
+ $details = $input->getOption('details');
+ if ($details !== '') {
+ $this->displayBlock($output, $details);
+ return 0;
+ }
+
+ $session = $input->getOption('session');
+ if ($session !== '') {
+ $this->displaySession($output, $session);
+ return 0;
+ }
+
+ $this->summary($output);
+ return 0;
+ }
+
+ private function summary(OutputInterface $output): void {
+ $statuses = [];
+ foreach ($this->blockMapper->getSessions() as $token) {
+ $sessionBlockes = $this->blockMapper->getBySession($token);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlockes));
+
+ $status = $sessionIface->getGlobalStatus()->value;
+ if (!array_key_exists($status, $statuses)) {
+ $statuses[$status] = [];
+ }
+ $statuses[$status][] = $token;
+ }
+
+ if (!empty($success = $statuses[(string)BlockStatus::SUCCESS->value] ?? [])) {
+ $output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>');
+ }
+
+ if (!empty($prep = $statuses[(string)BlockStatus::PREP->value] ?? [])) {
+ $output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>');
+ }
+
+ if (!empty($stand = $statuses[BlockStatus::STANDBY->value] ?? [])) {
+ $output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>');
+ }
+
+ if (!empty($running = $statuses[BlockStatus::RUNNING->value] ?? [])) {
+ $output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>');
+ }
+
+ if (!empty($err = $statuses[BlockStatus::ERROR->value] ?? [])) {
+ $output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>');
+ }
+
+ if (!empty($blocker = $statuses[BlockStatus::BLOCKER->value] ?? [])) {
+ $output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>');
+ }
+ }
+
+
+ private function displaySession(OutputInterface $output, string $token): void {
+ foreach ($this->blockMapper->getBySession($token) as $block) {
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+
+ $output->writeln('');
+ }
+
+ }
+
+
+ private function displayBlock(OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+
+ $output->writeln('SessionToken: <data>' . $block->getSessionToken() . '</data>');
+ $output->writeln('BlockToken: <data>' . $block->getToken() . '</data>');
+ $output->writeln('BlockType: <data>' . $block->getBlockType()->name . '</data>');
+ $output->writeln('Interface: <data>' . $this->displayInterface($block) . '</data>');
+
+ $output->writeln('Code: <data>' . $this->cropContent($block->getCode(), 3000, 15) . '</data>');
+ $output->writeln('Params: <data>' . $this->cropContent(json_encode($block->getParams()), 3000, 15) . '</data>');
+ $output->writeln('Metadata: <data>' . $this->cropContent(json_encode($block->getMetadata()), 3000, 15) . '</data>');
+ $output->writeln('Result: <data>' . $this->cropContent(json_encode($block->getResult()), 3000, 15) . '</data>');
+
+ $output->writeln('Dataset: <data>' . $this->cropContent(json_encode($block->getDataset()), 3000, 15) . '</data>');
+ $output->writeln('Links: <data>' . $this->cropContent(json_encode($block->getLinks()), 3000, 15) . '</data>');
+ $output->writeln('Orig: <data>' . $this->cropContent(json_encode($block->getOrig()), 3000, 15) . '</data>');
+
+ $output->writeln('ExecutionTime: <data>' . BlockExecutionTime::tryFrom($block->getExecutionTime())?->name . '</data>');
+ $output->writeln('Creation: <data>' . $block->getCreation() . '</data>');
+ $output->writeln('LastRun: <data>' . $block->getLastRun() . '</data>');
+ $output->writeln('NextRun: <data>' . $block->getNextRun() . '</data>');
+ $output->writeln('BlockStatus: ' . $this->displayStatus($block));
+ $output->writeln('Replay: <data>' . $block->getReplayCount() . '</data>');
+ }
+
+
+ private function displayStatus(Block $block): string {
+ $name = $block->getBlockStatus()->name;
+ $color = strtolower($name);
+ return '<' . $color . '>' . $name . '</' . $color . '>';
+ }
+
+ private function displayInterface(Block $block): string {
+ $iface = new BlockInterface(null, $block);
+
+ $data = [];
+ $data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId();
+ $data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName();
+ $data[] = (!$iface->isReplayable()) ? '' : 'replayable=true';
+ $data[] = (!$iface->isBlocker()) ? '' : 'blocker=true';
+ $data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire());
+
+ return implode(', ', array_filter($data));
+ }
+
+
+
+
+ private function replayBlock(InputInterface $input, OutputInterface $output, string $token): void {
+ $block = $this->blockMapper->getByToken($token);
+ if (!in_array($block->getBlockStatus(), [BlockStatus::ERROR, BlockStatus::BLOCKER], true)) {
+ $output->writeln('only Block set as ERROR or BLOCKER can be replayed');
+ return;
+ }
+
+ $iface = new BlockInterface(null, $block);
+ if (!$iface->isReplayable()) {
+ $output->writeln('');
+ $output->writeln('Block is not set as <comment>replayable</comment>.');
+ $output->writeln('Replaying this Block can create issues.');
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you really want to replay the Block ' . $token . ' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+ return;
+ }
+ }
+
+ $block->replay(true);
+ $this->blockMapper->update($block);
+ }
+
+
+ /**
+ * crop content after n lines or n chars
+ */
+ private function cropContent(string $content, int $maxChars, int $maxLines): string {
+ if ($this->noCrop) {
+ return $content;
+ }
+ preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE);
+ return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars));
+ }
+}
+
diff --git a/core/Command/Async/Setup.php b/core/Command/Async/Setup.php
new file mode 100644
index 00000000000..d5b94a9b732
--- /dev/null
+++ b/core/Command/Async/Setup.php
@@ -0,0 +1,435 @@
+<?php
+
+declare(strict_types=1);
+
+/**
+ * SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+namespace OC\Core\Command\Async;
+
+use OC\Async\ABlockWrapper;
+use OC\Async\AsyncManager;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\ForkManager;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\IAsyncProcess;
+use OCP\IAppConfig;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Helper\QuestionHelper;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class Setup extends Command {
+ public function __construct(
+ private readonly IAppConfig $appConfig,
+ private readonly IAsyncProcess $asyncProcess,
+ private readonly AsyncManager $asyncManager,
+ private readonly ForkManager $forkManager,
+ ) {
+ parent::__construct();
+ }
+
+ protected function configure() {
+ parent::configure();
+ $this->setName('async:setup')
+ ->addOption('reset', '', InputOption::VALUE_NONE, 'reset all data related to the AsyncProcess feature')
+ ->addOption('drop', '', InputOption::VALUE_NONE, 'drop processes data')
+ ->addOption('loopback', '', InputOption::VALUE_REQUIRED, 'set loopback address')
+ ->addOption('discover', '', InputOption::VALUE_NONE, 'initiate the search for a possible loopback address')
+ ->addOption('yes', '', InputOption::VALUE_NONE, 'answer yes to any confirmation box')
+ ->addOption('exception-if-no-config', '', InputOption::VALUE_NONE, 'return an exception if existing configuration cannot be used')
+ ->addOption('mock-session', '', InputOption::VALUE_OPTIONAL, 'create n sessions', '0')
+ ->addOption('mock-block', '', InputOption::VALUE_OPTIONAL, 'create n blocks', '0')
+ ->addOption('fail-process', '', InputOption::VALUE_REQUIRED, 'create fail process', '')
+ ->setDescription('setup');
+ }
+
+ /**
+ * @throws LoopbackEndpointException
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ $reset = $input->getOption('reset');
+ $drop = $input->getOption('drop');
+ if ($reset || $drop) {
+ if (!$input->getOption('yes')) {
+ $question = new ConfirmationQuestion(
+ ($reset) ? '<comment>Do you want to reset all data and configuration related to AsyncProcess ?</comment> (y/N) '
+ : '<comment>Do you want to drop all data related to AsyncProcess ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return 0;
+ }
+ }
+
+ $this->asyncManager->dropAllBlocks();
+ if ($reset) {
+ $this->asyncManager->resetConfig();
+ }
+
+ $output->writeln('done.');
+
+ return 0;
+ }
+
+ $failureType = $input->getOption('fail-process');
+ if ($failureType !== '') {
+ try {
+ match ($failureType) {
+ 'static-required' => $this->createFaultyProcessStaticRequired(),
+ 'static-blocker' => $this->createFaultyProcessStaticBlocker(),
+ 'dynamic-required' => $this->createFaultyProcessDynamicRequired(),
+ 'dynamic-blocker' => $this->createFaultyProcessDynamicBlocker(),
+ 'auto-required' => $this->createFaultyProcessAutoRequired(),
+ 'auto-blocker' => $this->createFaultyProcessAutoBlocker(),
+ };
+ } catch (\UnhandledMatchError) {
+ $output->writeln(
+ 'list of fail: static-required, static-blocker, dynamic-blocker, dynamic-required, auto-blocker, auto-required'
+ );
+ }
+
+ return 0;
+ }
+
+ $session = (int)($input->getOption('mock-session') ?? 0);
+ $proc = (int)($input->getOption('mock-block') ?? 0);
+ if ($session > 0 || $proc > 0) {
+ $this->createRandomProcess($output, $session, $proc);
+
+ $this->forkManager->waitChildProcess();
+
+ return 0;
+ }
+
+ if ($input->getOption('discover')) {
+ $output->writeln('<info>Searching for loopback address</info>');
+ $found = $this->forkManager->discoverLoopbackEndpoint($output);
+ $output->writeln('found a working loopback address: <info>' . $found . '</info>');
+ $this->confirmSave($input, $output, $found);
+
+ return 0;
+ }
+
+ $inputLoopback = $input->getOption('loopback');
+ if ($inputLoopback) {
+ $this->parseAddress($inputLoopback);
+ $output->write('- testing <comment>' . $inputLoopback . '</comment>... ');
+
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($inputLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+
+ return 0;
+ }
+
+ $output->writeln('<info>ok</info>');
+ $this->confirmSave($input, $output, $inputLoopback);
+
+ return 0;
+ }
+
+ try {
+ $currentLoopback = $this->forkManager->getLoopbackInstance();
+ $output->writeln('Current loopback instance: <info>' . $currentLoopback . '</info>');
+ $output->write('Testing async process using loopback endpoint... ');
+ $reason = '';
+ if (!$this->forkManager->testLoopbackInstance($currentLoopback, $reason)) {
+ $output->writeln('<error>' . $reason . '</error>');
+ } else {
+ $output->writeln('<info>ok</info>');
+ }
+
+ return 0;
+ } catch (LoopbackEndpointException $e) {
+ if ($input->getOption('exception-if-no-config')) {
+ throw $e;
+ }
+ }
+
+ $output->writeln('<info>Notes:</info>');
+ $output->writeln('no loopback instance currently set.');
+ $output->writeln('use --loopback <address> to manually configure a loopback instance');
+ $output->writeln('or use --discover for an automated process');
+
+ return 0;
+ }
+
+ private function confirmSave(InputInterface $input, OutputInterface $output, string $instance): void {
+ if (!$input->getOption('yes')) {
+ $output->writeln('');
+ $question = new ConfirmationQuestion(
+ '<comment>Do you want to save loopback address \'' . $instance . '\' ?</comment> (y/N) ',
+ false,
+ '/^(y|Y)/i'
+ );
+
+ /** @var QuestionHelper $helper */
+ $helper = $this->getHelper('question');
+ if (!$helper->ask($input, $output, $question)) {
+ $output->writeln('aborted.');
+
+ return;
+ }
+ }
+
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $instance);
+ }
+
+ /**
+ * confirm format of typed address
+ */
+ private function parseAddress(string $test): void {
+ $scheme = parse_url($test, PHP_URL_SCHEME);
+ $cloudId = parse_url($test, PHP_URL_HOST);
+ if (is_bool($scheme) || is_bool($cloudId) || is_null($scheme) || is_null($cloudId)) {
+ throw new LoopbackEndpointException('format must be http[s]://domain.name[:post][/path]');
+ }
+ }
+
+ /**
+ * create a list of fake/mockup process
+ *
+ * @param int $session number of session to generate
+ * @param int $processes number of process per session to generate
+ */
+ private function createRandomProcess(OutputInterface $output, int $session, int $processes): void {
+ $session = ($session > 0) ? $session : 1;
+ $processes = ($processes > 0) ? $processes : 1;
+
+ for ($i = 0; $i < $session; $i++) {
+ for ($j = 0; $j < $processes; $j++) {
+ $process = $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $data, int $j): array {
+ if ($j > 0) {
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'result from first process of the session: '
+ . $wrapper->getSessionInterface()
+ ->byId('mock_process_0')
+ ?->getResult()['result']
+ );
+ }
+ sleep(random_int(2, 8));
+ $wrapper->activity(
+ BlockActivity::NOTICE, 'mocked process is now over with data=' . $data
+ );
+
+ return ['result' => $data];
+ },
+ random_int(1, 5000),
+ $j
+ )->id('mock_process_' . $j)->blocker();
+ $output->writeln(' > creating process <info>' . $process->getToken() . '</info>');
+ }
+
+ $token = $this->asyncProcess->async(ProcessExecutionTime::LATER);
+
+ $output->writeln('- session <info>' . $token . '</info>');
+ $output->writeln('');
+ }
+ }
+
+ private function createFaultyProcessStaticRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+ private function createFaultyProcessStaticBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ $wrapper->activity(
+ BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds'
+ );
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should NOT run');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should NOT run');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessDynamicBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+
+ private function createFaultyProcessAutoRequired(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-required'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2')->require('mock_process_1');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::NOTICE, '(3) this process should run!');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+
+ private function createFaultyProcessAutoBlocker(): void {
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper, int $n): array {
+ if ($wrapper->getReplayCount() > 1) {
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will not crash anymore');
+ return ['dynamic-blocker'];
+ }
+
+ $wrapper->activity(BlockActivity::NOTICE, '(1) this process will crash in ' . $n . ' seconds');
+ sleep($n);
+ throw new \Exception('crash');
+ },
+ random_int(2, 8)
+ )->id('mock_process_1')->blocker()->replayable();
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): void {
+ $wrapper->activity(BlockActivity::ERROR, '(2) this process should only run after few replay from MOCK_PROCESS_1');
+ },
+ )->id('mock_process_2');
+
+ $this->asyncProcess->exec(
+ function (ABlockWrapper $wrapper): array {
+ $wrapper->activity(BlockActivity::ERROR, '(3) this process should only run after few replay from MOCK_PROCESS_1');
+
+ return ['ok'];
+ },
+ )->id('mock_process_3');
+
+ $this->asyncProcess->async(ProcessExecutionTime::LATER);
+ }
+
+}