From c1ed256d50a2f365060e2827be8893145073e434 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Tue, 27 Aug 2024 13:11:51 +0200 Subject: [PATCH] feat(taskprocessing): add IManager::runTask method to run task synchronously Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 164 +++++++++++++----- .../SynchronousBackgroundJob.php | 34 +--- lib/public/TaskProcessing/IManager.php | 27 +++ 3 files changed, 152 insertions(+), 73 deletions(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 51dcc7c94da..e9db978034e 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -716,55 +716,69 @@ class Manager implements IManager { if (!$this->canHandleTask($task)) { throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); } - $taskTypes = $this->getAvailableTaskTypes(); - $inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape']; - $inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults']; - $inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues']; - $optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape']; - $optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues']; - $optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults']; - // validate input - $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput()); - $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true); - // authenticate access to mentioned files - $ids = []; - foreach ($inputShape + $optionalInputShape as $key => $descriptor) { - if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { - /** @var list|int $inputSlot */ - $inputSlot = $task->getInput()[$key]; - if (is_array($inputSlot)) { - $ids += $inputSlot; - } else { - $ids[] = $inputSlot; - } - } - } - foreach ($ids as $fileId) { - $this->validateFileId($fileId); - $this->validateUserAccessToFile($fileId, $task->getUserId()); - } - // remove superfluous keys and set input - $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape); - $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults); - $task->setInput($inputWithDefaults); + $this->prepareTask($task); $task->setStatus(Task::STATUS_SCHEDULED); - $task->setScheduledAt(time()); - $provider = $this->getPreferredProvider($task->getTaskTypeId()); - // calculate expected completion time - $completionExpectedAt = new \DateTime('now'); - $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S')); - $task->setCompletionExpectedAt($completionExpectedAt); - // create a db entity and insert into db table - $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); - $this->taskMapper->insert($taskEntity); - // make sure the scheduler knows the id - $task->setId($taskEntity->getId()); + $this->storeTask($task); // schedule synchronous job if the provider is synchronous + $provider = $this->getPreferredProvider($task->getTaskTypeId()); if ($provider instanceof ISynchronousProvider) { $this->jobList->add(SynchronousBackgroundJob::class, null); } } + public function runTask(Task $task): Task { + if (!$this->canHandleTask($task)) { + throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); + } + + $provider = $this->getPreferredProvider($task->getTaskTypeId()); + if ($provider instanceof ISynchronousProvider) { + $this->prepareTask($task); + $task->setStatus(Task::STATUS_SCHEDULED); + $this->storeTask($task); + $this->processTask($task, $provider); + $task = $this->getTask($task->getId()); + } else { + $this->scheduleTask($task); + // poll task + while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) { + sleep(1); + $task = $this->getTask($task->getId()); + } + } + return $task; + } + + public function processTask(Task $task, ISynchronousProvider $provider): bool { + try { + try { + $input = $this->prepareInputData($task); + } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) { + $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } + try { + $this->setTaskStatus($task, Task::STATUS_RUNNING); + $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); + } catch (ProcessingException $e) { + $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } catch (\Throwable $e) { + $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } + $this->setTaskResult($task->getId(), null, $output); + } catch (NotFoundException $e) { + $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]); + } catch (Exception $e) { + $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]); + } + return true; + } + public function deleteTask(Task $task): void { $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); $this->taskMapper->delete($taskEntity); @@ -1095,6 +1109,72 @@ class Manager implements IManager { $this->taskMapper->update($taskEntity); } + /** + * Validate input, fill input default values, set completionExpectedAt, set scheduledAt + * + * @param Task $task + * @return void + * @throws UnauthorizedException + * @throws ValidationException + * @throws \OCP\TaskProcessing\Exception\Exception + */ + private function prepareTask(Task $task): void { + $taskTypes = $this->getAvailableTaskTypes(); + $taskType = $taskTypes[$task->getTaskTypeId()]; + $inputShape = $taskType['inputShape']; + $inputShapeDefaults = $taskType['inputShapeDefaults']; + $inputShapeEnumValues = $taskType['inputShapeEnumValues']; + $optionalInputShape = $taskType['optionalInputShape']; + $optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues']; + $optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults']; + // validate input + $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput()); + $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true); + // authenticate access to mentioned files + $ids = []; + foreach ($inputShape + $optionalInputShape as $key => $descriptor) { + if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { + /** @var list|int $inputSlot */ + $inputSlot = $task->getInput()[$key]; + if (is_array($inputSlot)) { + $ids += $inputSlot; + } else { + $ids[] = $inputSlot; + } + } + } + foreach ($ids as $fileId) { + $this->validateFileId($fileId); + $this->validateUserAccessToFile($fileId, $task->getUserId()); + } + // remove superfluous keys and set input + $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape); + $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults); + $task->setInput($inputWithDefaults); + $task->setScheduledAt(time()); + $provider = $this->getPreferredProvider($task->getTaskTypeId()); + // calculate expected completion time + $completionExpectedAt = new \DateTime('now'); + $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S')); + $task->setCompletionExpectedAt($completionExpectedAt); + } + + /** + * Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param + * + * @param Task $task + * @return void + * @throws Exception + * @throws \JsonException + */ + private function storeTask(Task $task): void { + // create a db entity and insert into db table + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + $this->taskMapper->insert($taskEntity); + // make sure the scheduler knows the id + $task->setId($taskEntity->getId()); + } + /** * @param array $output * @param ShapeDescriptor[] ...$specs the specs that define which keys to keep diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php index 85a8fbc21f6..3d85625da8f 100644 --- a/lib/private/TaskProcessing/SynchronousBackgroundJob.php +++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php @@ -57,37 +57,9 @@ class SynchronousBackgroundJob extends QueuedJob { $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); continue; } - try { - try { - $input = $this->taskProcessingManager->prepareInputData($task); - } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) { - $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } - try { - $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING); - $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress)); - } catch (ProcessingException $e) { - $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } catch (\Throwable $e) { - $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } - $this->taskProcessingManager->setTaskResult($task->getId(), null, $output); - } catch (NotFoundException $e) { - $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]); - } catch (Exception $e) { - $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]); + if (!$this->taskProcessingManager->processTask($task, $provider)) { + // Schedule again + $this->jobList->add(self::class, $argument); } } diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 86788449aaf..c26a5d67339 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -61,6 +61,33 @@ interface IManager { */ public function scheduleTask(Task $task): void; + /** + * Run the task and return the finished task + * + * @param Task $task The task to run + * @return Task The result task + * @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called + * @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs + * @throws Exception storing the task in the database failed + * @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input + * @since 30.0.0 + */ + public function runTask(Task $task): Task; + + /** + * Process task with a synchronous provider + * + * Prepare task input data and run the process method of the provider + * This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask() + * + * @param Task $task + * @param ISynchronousProvider $provider + * @return bool True if the task has run successfully + * @throws Exception + * @since 30.0.0 + */ + public function processTask(Task $task, ISynchronousProvider $provider): bool; + /** * Delete a task that has been scheduled before * -- 2.39.5