aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/Async/ForkManager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/private/Async/ForkManager.php')
-rw-r--r--lib/private/Async/ForkManager.php485
1 files changed, 485 insertions, 0 deletions
diff --git a/lib/private/Async/ForkManager.php b/lib/private/Async/ForkManager.php
new file mode 100644
index 00000000000..6978a601bd4
--- /dev/null
+++ b/lib/private/Async/ForkManager.php
@@ -0,0 +1,485 @@
+<?php
+
+namespace OC\Async;
+
+use OC\Async\Db\BlockMapper;
+use OC\Async\Enum\BlockType;
+use OC\Async\Exceptions\AsyncProcessException;
+use OC\Async\Exceptions\LoopbackEndpointException;
+use OC\Async\Exceptions\BlockAlreadyRunningException;
+use OC\Async\Exceptions\SessionBlockedException;
+use OC\Async\Model\Block;
+use OC\Async\Model\BlockInterface;
+use OC\Async\Model\SessionInterface;
+use OC\Async\Wrappers\DummyBlockWrapper;
+use OC\Async\Wrappers\LoggerBlockWrapper;
+use OC\Config\Lexicon\CoreConfigLexicon;
+use OC\DB\Connection;
+use OCP\Async\Enum\BlockActivity;
+use OCP\Async\Enum\ProcessExecutionTime;
+use OCP\Async\Enum\BlockStatus;
+use OCP\Async\IAsyncProcess;
+use OCP\Http\Client\IClientService;
+use OCP\IAppConfig;
+use OCP\IConfig;
+use OCP\IURLGenerator;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class ForkManager {
+ private ?ABlockWrapper $wrapper;
+
+ /** @var int[] */
+ private array $forks = [];
+ private const FORK_LIMIT = 3; // maximum number of child process
+ private const FORK_SLEEP = 500000; // wait for msec when too many fork have been created
+
+ public function __construct(
+ private BlockMapper $blockMapper,
+ private Connection $conn,
+ private IAppConfig $appConfig,
+ private IConfig $config,
+ private IClientService $clientService,
+ private IURLGenerator $urlGenerator,
+ LoggerBlockWrapper $loggerProcessWrapper,
+ private LoggerInterface $logger,
+ ) {
+ $this->wrapper = $loggerProcessWrapper;
+ }
+
+ public function setWrapper(?ABlockWrapper $wrapper): void {
+ $this->wrapper = $wrapper;
+ }
+
+
+ /**
+ * @throws BlockAlreadyRunningException
+ * @throws AsyncProcessException
+ */
+ public function runSession(string $token, array $metadata = []): void {
+ $sessionBlocks = $this->blockMapper->getBySession($token);
+ $metadata['sessionToken'] = $token;
+
+ if ($this->wrapper !== null) {
+ $wrapper = clone $this->wrapper;
+ } else {
+ $wrapper = null;
+ }
+
+ // might be need to avoid some conflict/race condition
+ // usleep(10000);
+ $sessionIface = new SessionInterface(BlockInterface::asBlockInterfaces($sessionBlocks));
+
+ if ($sessionIface->getGlobalStatus() !== BlockStatus::STANDBY) {
+ throw new AsyncProcessException();
+ }
+
+ $wrapper?->setSessionInterface($sessionIface);
+ $wrapper?->session($metadata);
+
+ try {
+ foreach ($sessionBlocks as $block) {
+ if (!$this->confirmBlockRequirement($wrapper, $sessionIface, $block)) {
+ $block->replay();
+ $this->blockMapper->update($block);
+ continue;
+ }
+
+ $block->addMetadata($metadata);
+ $this->runBlock($block, $wrapper);
+
+ if ($block->getBlockStatus() === BlockStatus::BLOCKER) {
+ $wrapper?->end('Fail process ' . $block->getToken() . ' block the rest of the session');
+ throw new SessionBlockedException();
+ }
+ }
+ $wrapper?->end();
+ } catch (BlockAlreadyRunningException) {
+ $wrapper?->end('already running');
+ throw new BlockAlreadyRunningException();
+ }
+ }
+
+ private function confirmBlockRequirement(
+ ?ABlockWrapper $wrapper,
+ SessionInterface $sessionIface,
+ Block $block
+ ): bool {
+ $procIface = new BlockInterface(null, $block);
+ foreach ($procIface->getRequire() as $requiredProcessId) {
+ $requiredBlock = $sessionIface->byId($requiredProcessId);
+ if ($requiredBlock === null) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' which is not defined');
+ return false;
+ }
+ if ($requiredBlock?->getStatus() !== BlockStatus::SUCCESS) {
+ $wrapper?->activity(BlockActivity::NOTICE, 'could not initiated block as it requires block ' . $requiredProcessId . ' to be executed and successful');
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function runBlock(Block $block, ?ABlockWrapper $wrapper = null): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ return;
+ }
+
+ $this->lockBlock($block);
+
+ $data = $block->getParams();
+ $serialized = $block->getCode();
+ $params = unserialize($data['params'], ['allowed_classes' => $data['paramsClasses']]);
+ $obj = unserialize($serialized, ['allowed_classes' => $data['blockClasses'] ?? []]);
+
+ if ($data['processWrapper'] ?? false) {
+ array_unshift($params, ($wrapper ?? new DummyBlockWrapper()));
+ }
+
+ $wrapper?->setBlock($block);
+ $wrapper?->init();
+ $wrapper?->activity(BlockActivity::STARTING);
+ $result = [
+ 'executionTime' => ProcessExecutionTime::NOW->value,
+ 'startTime' => time(),
+ ];
+ $iface = new BlockInterface(null, $block);
+ try {
+ $returnedData = null;
+ switch ($block->getBlockType()) {
+ case BlockType::CLOSURE:
+ $c = $obj->getClosure();
+ $returnedData = $c(...$params);
+ break;
+
+ case BlockType::INVOKABLE:
+ $returnedData = $obj(...$params);
+ break;
+
+ case BlockType::CLASSNAME:
+ $obj = new $data['className']();
+ $returnedData = $obj->async(...$params);
+ break;
+ }
+ if (is_array($returnedData)) {
+ $result['result'] = $returnedData;
+ }
+ $block->setBlockStatus(BlockStatus::SUCCESS);
+ if ($block->getReplayCount() > 0) {
+ // in case of success after multiple tentative, we reset next run to right now
+ // on all block waiting for replay. Easiest solution to find block dependant of
+ // this current successful run
+ $this->blockMapper->resetSessionNextRun($block->getSessionToken());
+ }
+ } catch (\Exception $e) {
+ $wrapper?->activity(BlockActivity::ERROR, $e->getMessage());
+ $result['error'] = [
+ 'exception' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ 'code' => $e->getCode()
+ ];
+
+ if ($iface->isReplayable()) {
+ $block->replay(); // we mark the block as able to be back to STANDBY status
+ } else {
+ $block->setNextRun(0);
+ }
+ if ($iface->isBlocker()) {
+ $block->setBlockStatus(BlockStatus::BLOCKER);
+ } else {
+ $block->setBlockStatus(BlockStatus::ERROR);
+ }
+ } finally {
+ $result['endTime'] = time();
+ }
+
+ $block->setResult($result);
+ $wrapper?->activity(BlockActivity::ENDING);
+ $this->blockMapper->updateStatus($block, BlockStatus::RUNNING);
+ }
+
+ /**
+ * @throws BlockAlreadyRunningException
+ */
+ private function lockBlock(Block $block): void {
+ if ($block->getBlockStatus() !== BlockStatus::STANDBY) {
+ throw new BlockAlreadyRunningException('block not in standby');
+ }
+ $lockToken = $this->generateToken(7);
+ $block->setBlockStatus(BlockStatus::RUNNING);
+ if (!$this->blockMapper->updateStatus($block, BlockStatus::STANDBY, $lockToken)) {
+ throw new BlockAlreadyRunningException('block is locked');
+ }
+ $block->setLockToken($lockToken);
+ }
+
+ public function forkSession(string $session, array $metadata = []): void {
+ if (\OC::$CLI) {
+ $useWebIfNeeded = $metadata['useWebIfNeeded'] ?? false;
+ if ($useWebIfNeeded && !extension_loaded('posix')) {
+ try {
+ $this->forkSessionLoopback($session);
+ return;
+ } catch (\Exception) {
+ }
+ }
+
+ $this->forkSessionCli($session, $metadata);
+ return;
+ }
+
+ try {
+ $this->forkSessionLoopback($session);
+ } catch (LoopbackEndpointException) {
+ // session will be processed later
+ }
+ }
+
+
+ private function forkSessionCli(string $session, array $metadata = []): void {
+ if (!extension_loaded('posix')) {
+ // log/notice that posix is not loaded
+ return;
+ }
+ $slot = $this->getFirstAvailableSlot();
+ $metadata += [
+ '_cli' => [
+ 'slot' => $slot,
+ 'forkCount' => count($this->forks),
+ 'forkLimit' => self::FORK_LIMIT,
+ ]
+ ];
+
+ $pid = pcntl_fork();
+
+ // work around as the parent database connection is inherited by the child.
+ // when child process is over, parent process database connection will drop.
+ // The drop can happen anytime, even in the middle of a running request.
+ // work around is to close the connection as soon as possible after forking.
+ $this->conn->close();
+
+ if ($pid === -1) {
+ // TODO: manage issue while forking
+ } else if ($pid === 0) {
+ // forked process
+ try {
+ $this->runSession($session, $metadata);
+ } catch (AsyncProcessException) {
+ // failure to run session can be part of the process
+ }
+ exit();
+ } else {
+ // store slot+pid
+ $this->forks[$slot] = $pid;
+
+ // when fork limit is reach, cycle until space freed
+ while (true) {
+ $exitedPid = pcntl_waitpid(0, $status, WNOHANG);
+ if ($exitedPid > 0) {
+ $slot = array_search($exitedPid, $this->forks, true);
+ if ($slot) {
+ unset($this->forks[$slot]);
+ }
+ }
+ if (count($this->forks) < self::FORK_LIMIT) {
+ return;
+ }
+ usleep(self::FORK_SLEEP);
+ }
+ }
+ }
+
+ /**
+ * Request local loopback endpoint.
+ * We expect the request to be closed remotely.
+ *
+ * Ignored if:
+ * - the endpoint is not fully configured and tested,
+ * - the server is on heavy load (timeout at 1 second)
+ *
+ * @return string result from the loopback endpoint
+ * @throws LoopbackEndpointException if not configured
+ */
+ private function forkSessionLoopback(string $session, ?string $loopbackEndpoint = null): string {
+ $client = $this->clientService->newClient();
+ try {
+ $response = $client->post(
+ $loopbackEndpoint ?? $this->linkToLoopbackEndpoint(),
+ [
+ 'headers' => [],
+ 'verify' => false,
+ 'connect_timeout' => 1.0,
+ 'timeout' => 1.0,
+ 'http_errors' => true,
+ 'body' => ['token' => $session],
+ 'nextcloud' => [
+ 'allow_local_address' => true,
+ 'allow_redirects' => true,
+ ]
+ ]
+ );
+
+ return (string)$response->getBody();
+ } catch (LoopbackEndpointException $e) {
+ $this->logger->debug('loopback endpoint not configured', ['exception' => $e]);
+ throw $e;
+ } catch (\Exception $e) {
+ $this->logger->warning('could not reach loopback endpoint to initiate fork', ['exception' => $e]);
+ throw new LoopbackEndpointException('loopback endpoint cannot be reach', previous: $e);
+ }
+ }
+
+
+ /**
+ * return full (absolute) link to the web-loopback endpoint
+ *
+ * @param string|null $instance if null, stored loopback address will be used.
+ *
+ * @throws LoopbackEndpointException if $instance is null and no stored configuration
+ */
+ public function linkToLoopbackEndpoint(?string $instance = null): string {
+ return rtrim($instance ?? $this->getLoopbackInstance(), '/') . $this->urlGenerator->linkToRoute('core.AsyncProcess.processFork');
+ }
+
+ /**
+ * return loopback address stored in configuration
+ *
+ * @return string
+ * @throws LoopbackEndpointException if config is not set or empty
+ */
+ public function getLoopbackInstance(): string {
+ if (!$this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, true)) {
+ throw new LoopbackEndpointException('loopback not configured');
+ }
+
+ $instance = $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS);
+ if ($instance === '') {
+ throw new LoopbackEndpointException('empty config');
+ }
+
+ return $instance;
+ }
+
+
+ public function discoverLoopbackEndpoint(?OutputInterface $output = null): ?string {
+ $cliUrl = $this->config->getSystemValueString('overwrite.cli.url', '');
+ $output?->write('- testing value from \'overwrite.cli.url\' (<comment>' . $cliUrl . '</comment>)... ');
+
+ $reason = '';
+ if ($this->testLoopbackInstance($cliUrl, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $cliUrl;
+ }
+
+ $output?->writeln('<error>' . $reason . '</error>');
+
+ foreach($this->config->getSystemValue('trusted_domains', []) as $url) {
+ $url = 'https://' . $url;
+ $output?->write('- testing entry from \'trusted_domains\' (<comment>' . $url . '</comment>)... ');
+ if ($this->testLoopbackInstance($url, $reason)) {
+ $output?->writeln('<info>ok</info>');
+ return $url;
+ }
+ $output?->writeln('<error>' . $reason . '</error>');
+ }
+
+ return null;
+ }
+
+
+ public function testLoopbackInstance(string $url, string &$reason = ''): bool {
+ $url = rtrim($url, '/');
+ if (!$this->pingLoopbackInstance($url)) {
+ $reason = 'failed ping';
+ return false;
+ }
+
+ $token = $this->generateToken();
+ $asyncProcess = \OCP\Server::get(IAsyncProcess::class);
+ $asyncProcess->exec(function(string $token) {
+ sleep(1); // enforce a delay to confirm asynchronicity
+ $appConfig = \OCP\Server::get(IAppConfig::class);
+ $appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST, $token);
+ }, $token)->name('test loopback instance')->async();
+
+ $this->appConfig->clearCache(true);
+ if ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST)) {
+ $reason = 'async process already executed';
+ return false;
+ }
+
+ sleep(3);
+ $this->appConfig->clearCache(true);
+ $result = ($token === $this->appConfig->getValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST));
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_TEST);
+
+ return $result;
+ }
+
+ private function pingLoopbackInstance(string $url): bool {
+ $pingLoopback = $this->generateToken();
+ $this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING, $pingLoopback);
+ try {
+ $result = $this->forkSessionLoopback('__ping__', $this->linkToLoopbackEndpoint($url));
+ $result = json_decode($result, true, flags: JSON_THROW_ON_ERROR);
+ } catch (\JsonException|LoopbackEndpointException $e) {
+ $this->logger->debug('could not ping loopback endpoint', ['exception' => $e]);
+ }
+
+ $this->appConfig->deleteKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_PING);
+ return (($result['ping'] ?? '') === $pingLoopback);
+ }
+
+
+ /**
+ * note that the fact we fork process to run a session of processes before doing
+ * a check on the fact that maybe one of the process of the session is already
+ * running can create a small glitch when choosing the first available slot as
+ * a previous fork running said check is already made and will exit shortly.
+ *
+ * @return int
+ */
+ private function getFirstAvailableSlot(): int {
+ $slot = -1;
+ for ($i = 0; $i < self::FORK_LIMIT; $i++) {
+ if (!array_key_exists($i, $this->forks)) {
+ return $i;
+ }
+
+ // we confirm child process still exists
+ if (pcntl_waitpid($this->forks[$i], $status, WNOHANG) > 0) {
+ return $i;
+ }
+ }
+
+ if ($slot === -1) {
+ // TODO: should not happens: log warning
+ }
+
+ return -1;
+ }
+
+ private function generateToken(int $length = 15): string {
+ $result = '';
+ for ($i = 0; $i < $length; $i++) {
+ $result .= 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'[random_int(0, 61)];
+ }
+
+ return $result;
+ }
+
+ /**
+ * we wait until all child process are done
+ *
+ * @noinspection PhpStatementHasEmptyBodyInspection
+ */
+ public function waitChildProcess(): void {
+ while (pcntl_waitpid(0, $status) != -1) {
+ }
+ }
+
+}