]> source.dussan.org Git - nextcloud-server.git/commitdiff
feat(taskprocessing): add IManager::runTask method to run task synchronously
authorJulien Veyssier <julien-nc@posteo.net>
Tue, 27 Aug 2024 11:11:51 +0000 (13:11 +0200)
committerJulien Veyssier <julien-nc@posteo.net>
Tue, 27 Aug 2024 11:11:51 +0000 (13:11 +0200)
Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
lib/private/TaskProcessing/Manager.php
lib/private/TaskProcessing/SynchronousBackgroundJob.php
lib/public/TaskProcessing/IManager.php

index 51dcc7c94da2fab6809bbaf0aeab27b29f8a7689..e9db978034ec2eb475100505780a8c214a7ae444 100644 (file)
@@ -716,55 +716,69 @@ class Manager implements IManager {
                if (!$this->canHandleTask($task)) {
                        throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
                }
-               $taskTypes = $this->getAvailableTaskTypes();
-               $inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
-               $inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
-               $inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
-               $optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
-               $optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
-               $optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
-               // validate input
-               $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
-               $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
-               // authenticate access to mentioned files
-               $ids = [];
-               foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
-                       if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
-                               /** @var list<int>|int $inputSlot */
-                               $inputSlot = $task->getInput()[$key];
-                               if (is_array($inputSlot)) {
-                                       $ids += $inputSlot;
-                               } else {
-                                       $ids[] = $inputSlot;
-                               }
-                       }
-               }
-               foreach ($ids as $fileId) {
-                       $this->validateFileId($fileId);
-                       $this->validateUserAccessToFile($fileId, $task->getUserId());
-               }
-               // remove superfluous keys and set input
-               $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
-               $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
-               $task->setInput($inputWithDefaults);
+               $this->prepareTask($task);
                $task->setStatus(Task::STATUS_SCHEDULED);
-               $task->setScheduledAt(time());
-               $provider = $this->getPreferredProvider($task->getTaskTypeId());
-               // calculate expected completion time
-               $completionExpectedAt = new \DateTime('now');
-               $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
-               $task->setCompletionExpectedAt($completionExpectedAt);
-               // create a db entity and insert into db table
-               $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
-               $this->taskMapper->insert($taskEntity);
-               // make sure the scheduler knows the id
-               $task->setId($taskEntity->getId());
+               $this->storeTask($task);
                // schedule synchronous job if the provider is synchronous
+               $provider = $this->getPreferredProvider($task->getTaskTypeId());
                if ($provider instanceof ISynchronousProvider) {
                        $this->jobList->add(SynchronousBackgroundJob::class, null);
                }
        }
 
+       public function runTask(Task $task): Task {
+               if (!$this->canHandleTask($task)) {
+                       throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
+               }
+
+               $provider = $this->getPreferredProvider($task->getTaskTypeId());
+               if ($provider instanceof ISynchronousProvider) {
+                       $this->prepareTask($task);
+                       $task->setStatus(Task::STATUS_SCHEDULED);
+                       $this->storeTask($task);
+                       $this->processTask($task, $provider);
+                       $task = $this->getTask($task->getId());
+               } else {
+                       $this->scheduleTask($task);
+                       // poll task
+                       while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
+                               sleep(1);
+                               $task = $this->getTask($task->getId());
+                       }
+               }
+               return $task;
+       }
+
+       public function processTask(Task $task, ISynchronousProvider $provider): bool {
+               try {
+                       try {
+                               $input = $this->prepareInputData($task);
+                       } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
+                               $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+                               $this->setTaskResult($task->getId(), $e->getMessage(), null);
+                               return false;
+                       }
+                       try {
+                               $this->setTaskStatus($task, Task::STATUS_RUNNING);
+                               $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
+                       } catch (ProcessingException $e) {
+                               $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+                               $this->setTaskResult($task->getId(), $e->getMessage(), null);
+                               return false;
+                       } catch (\Throwable $e) {
+                               $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
+                               $this->setTaskResult($task->getId(), $e->getMessage(), null);
+                               return false;
+                       }
+                       $this->setTaskResult($task->getId(), null, $output);
+               } catch (NotFoundException $e) {
+                       $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
+               } catch (Exception $e) {
+                       $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+               }
+               return true;
+       }
+
        public function deleteTask(Task $task): void {
                $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
                $this->taskMapper->delete($taskEntity);
@@ -1095,6 +1109,72 @@ class Manager implements IManager {
                $this->taskMapper->update($taskEntity);
        }
 
+       /**
+        * Validate input, fill input default values, set completionExpectedAt, set scheduledAt
+        *
+        * @param Task $task
+        * @return void
+        * @throws UnauthorizedException
+        * @throws ValidationException
+        * @throws \OCP\TaskProcessing\Exception\Exception
+        */
+       private function prepareTask(Task $task): void {
+               $taskTypes = $this->getAvailableTaskTypes();
+               $taskType = $taskTypes[$task->getTaskTypeId()];
+               $inputShape = $taskType['inputShape'];
+               $inputShapeDefaults = $taskType['inputShapeDefaults'];
+               $inputShapeEnumValues = $taskType['inputShapeEnumValues'];
+               $optionalInputShape = $taskType['optionalInputShape'];
+               $optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
+               $optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
+               // validate input
+               $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
+               $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
+               // authenticate access to mentioned files
+               $ids = [];
+               foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
+                       if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
+                               /** @var list<int>|int $inputSlot */
+                               $inputSlot = $task->getInput()[$key];
+                               if (is_array($inputSlot)) {
+                                       $ids += $inputSlot;
+                               } else {
+                                       $ids[] = $inputSlot;
+                               }
+                       }
+               }
+               foreach ($ids as $fileId) {
+                       $this->validateFileId($fileId);
+                       $this->validateUserAccessToFile($fileId, $task->getUserId());
+               }
+               // remove superfluous keys and set input
+               $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
+               $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
+               $task->setInput($inputWithDefaults);
+               $task->setScheduledAt(time());
+               $provider = $this->getPreferredProvider($task->getTaskTypeId());
+               // calculate expected completion time
+               $completionExpectedAt = new \DateTime('now');
+               $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
+               $task->setCompletionExpectedAt($completionExpectedAt);
+       }
+
+       /**
+        * Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
+        *
+        * @param Task $task
+        * @return void
+        * @throws Exception
+        * @throws \JsonException
+        */
+       private function storeTask(Task $task): void {
+               // create a db entity and insert into db table
+               $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
+               $this->taskMapper->insert($taskEntity);
+               // make sure the scheduler knows the id
+               $task->setId($taskEntity->getId());
+       }
+
        /**
         * @param array $output
         * @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
index 85a8fbc21f6f68f1eba9251ee8b5349c52ac414f..3d85625da8f4c1ba3f992274b836befb4be8197a 100644 (file)
@@ -57,37 +57,9 @@ class SynchronousBackgroundJob extends QueuedJob {
                                $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
                                continue;
                        }
-                       try {
-                               try {
-                                       $input = $this->taskProcessingManager->prepareInputData($task);
-                               } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
-                                       $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
-                                       $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
-                                       // Schedule again
-                                       $this->jobList->add(self::class, $argument);
-                                       return;
-                               }
-                               try {
-                                       $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
-                                       $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
-                               } catch (ProcessingException $e) {
-                                       $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
-                                       $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
-                                       // Schedule again
-                                       $this->jobList->add(self::class, $argument);
-                                       return;
-                               } catch (\Throwable $e) {
-                                       $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
-                                       $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
-                                       // Schedule again
-                                       $this->jobList->add(self::class, $argument);
-                                       return;
-                               }
-                               $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
-                       } catch (NotFoundException $e) {
-                               $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
-                       } catch (Exception $e) {
-                               $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+                       if (!$this->taskProcessingManager->processTask($task, $provider)) {
+                               // Schedule again
+                               $this->jobList->add(self::class, $argument);
                        }
                }
 
index 86788449aaf557ee2bb78864b4a87b6aca258f7b..c26a5d67339ce056e9bd8af9646fe4cdee238a67 100644 (file)
@@ -61,6 +61,33 @@ interface IManager {
         */
        public function scheduleTask(Task $task): void;
 
+       /**
+        * Run the task and return the finished task
+        *
+        * @param Task $task The task to run
+        * @return Task The result task
+        * @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called
+        * @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs
+        * @throws Exception storing the task in the database failed
+        * @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input
+        * @since 30.0.0
+        */
+       public function runTask(Task $task): Task;
+
+       /**
+        * Process task with a synchronous provider
+        *
+        * Prepare task input data and run the process method of the provider
+        * This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask()
+        *
+        * @param Task $task
+        * @param ISynchronousProvider $provider
+        * @return bool True if the task has run successfully
+        * @throws Exception
+        * @since 30.0.0
+        */
+       public function processTask(Task $task, ISynchronousProvider $provider): bool;
+
        /**
         * Delete a task that has been scheduled before
         *