diff options
author | Julien Veyssier <julien-nc@posteo.net> | 2024-08-27 17:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-27 17:53:20 +0200 |
commit | 558877c8422c1a7083d083b2fe0be553f3047ba0 (patch) | |
tree | 9319d8d2aa783f982c54762c74ab0c40c8933ba1 /lib/private | |
parent | 9754f4f723e107725c20e8cb63487b47f846f1ad (diff) | |
parent | 396b8f52f1e5ea8a9005911179762492bbbe5e88 (diff) | |
download | nextcloud-server-558877c8422c1a7083d083b2fe0be553f3047ba0.tar.gz nextcloud-server-558877c8422c1a7083d083b2fe0be553f3047ba0.zip |
Merge pull request #47522 from nextcloud/enh/noid/taskprocessing-runtask
[TaskProcessing] Add manager::runTask method
Diffstat (limited to 'lib/private')
-rw-r--r-- | lib/private/TaskProcessing/Manager.php | 164 | ||||
-rw-r--r-- | lib/private/TaskProcessing/SynchronousBackgroundJob.php | 57 |
2 files changed, 138 insertions, 83 deletions
diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 51dcc7c94da..e9db978034e 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -716,55 +716,69 @@ class Manager implements IManager { if (!$this->canHandleTask($task)) { throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); } - $taskTypes = $this->getAvailableTaskTypes(); - $inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape']; - $inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults']; - $inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues']; - $optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape']; - $optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues']; - $optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults']; - // validate input - $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput()); - $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true); - // authenticate access to mentioned files - $ids = []; - foreach ($inputShape + $optionalInputShape as $key => $descriptor) { - if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { - /** @var list<int>|int $inputSlot */ - $inputSlot = $task->getInput()[$key]; - if (is_array($inputSlot)) { - $ids += $inputSlot; - } else { - $ids[] = $inputSlot; - } - } - } - foreach ($ids as $fileId) { - $this->validateFileId($fileId); - $this->validateUserAccessToFile($fileId, $task->getUserId()); - } - // remove superfluous keys and set input - $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape); - $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults); - $task->setInput($inputWithDefaults); + $this->prepareTask($task); $task->setStatus(Task::STATUS_SCHEDULED); - $task->setScheduledAt(time()); - $provider = $this->getPreferredProvider($task->getTaskTypeId()); - // calculate expected completion time - $completionExpectedAt = new \DateTime('now'); - $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S')); - $task->setCompletionExpectedAt($completionExpectedAt); - // create a db entity and insert into db table - $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); - $this->taskMapper->insert($taskEntity); - // make sure the scheduler knows the id - $task->setId($taskEntity->getId()); + $this->storeTask($task); // schedule synchronous job if the provider is synchronous + $provider = $this->getPreferredProvider($task->getTaskTypeId()); if ($provider instanceof ISynchronousProvider) { $this->jobList->add(SynchronousBackgroundJob::class, null); } } + public function runTask(Task $task): Task { + if (!$this->canHandleTask($task)) { + throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); + } + + $provider = $this->getPreferredProvider($task->getTaskTypeId()); + if ($provider instanceof ISynchronousProvider) { + $this->prepareTask($task); + $task->setStatus(Task::STATUS_SCHEDULED); + $this->storeTask($task); + $this->processTask($task, $provider); + $task = $this->getTask($task->getId()); + } else { + $this->scheduleTask($task); + // poll task + while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) { + sleep(1); + $task = $this->getTask($task->getId()); + } + } + return $task; + } + + public function processTask(Task $task, ISynchronousProvider $provider): bool { + try { + try { + $input = $this->prepareInputData($task); + } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) { + $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } + try { + $this->setTaskStatus($task, Task::STATUS_RUNNING); + $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); + } catch (ProcessingException $e) { + $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } catch (\Throwable $e) { + $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]); + $this->setTaskResult($task->getId(), $e->getMessage(), null); + return false; + } + $this->setTaskResult($task->getId(), null, $output); + } catch (NotFoundException $e) { + $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]); + } catch (Exception $e) { + $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]); + } + return true; + } + public function deleteTask(Task $task): void { $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); $this->taskMapper->delete($taskEntity); @@ -1096,6 +1110,72 @@ class Manager implements IManager { } /** + * Validate input, fill input default values, set completionExpectedAt, set scheduledAt + * + * @param Task $task + * @return void + * @throws UnauthorizedException + * @throws ValidationException + * @throws \OCP\TaskProcessing\Exception\Exception + */ + private function prepareTask(Task $task): void { + $taskTypes = $this->getAvailableTaskTypes(); + $taskType = $taskTypes[$task->getTaskTypeId()]; + $inputShape = $taskType['inputShape']; + $inputShapeDefaults = $taskType['inputShapeDefaults']; + $inputShapeEnumValues = $taskType['inputShapeEnumValues']; + $optionalInputShape = $taskType['optionalInputShape']; + $optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues']; + $optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults']; + // validate input + $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput()); + $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true); + // authenticate access to mentioned files + $ids = []; + foreach ($inputShape + $optionalInputShape as $key => $descriptor) { + if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { + /** @var list<int>|int $inputSlot */ + $inputSlot = $task->getInput()[$key]; + if (is_array($inputSlot)) { + $ids += $inputSlot; + } else { + $ids[] = $inputSlot; + } + } + } + foreach ($ids as $fileId) { + $this->validateFileId($fileId); + $this->validateUserAccessToFile($fileId, $task->getUserId()); + } + // remove superfluous keys and set input + $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape); + $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults); + $task->setInput($inputWithDefaults); + $task->setScheduledAt(time()); + $provider = $this->getPreferredProvider($task->getTaskTypeId()); + // calculate expected completion time + $completionExpectedAt = new \DateTime('now'); + $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S')); + $task->setCompletionExpectedAt($completionExpectedAt); + } + + /** + * Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param + * + * @param Task $task + * @return void + * @throws Exception + * @throws \JsonException + */ + private function storeTask(Task $task): void { + // create a db entity and insert into db table + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + $this->taskMapper->insert($taskEntity); + // make sure the scheduler knows the id + $task->setId($taskEntity->getId()); + } + + /** * @param array $output * @param ShapeDescriptor[] ...$specs the specs that define which keys to keep * @return array diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php index 85a8fbc21f6..de3b424176c 100644 --- a/lib/private/TaskProcessing/SynchronousBackgroundJob.php +++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php @@ -9,14 +9,8 @@ namespace OC\TaskProcessing; use OCP\AppFramework\Utility\ITimeFactory; use OCP\BackgroundJob\IJobList; use OCP\BackgroundJob\QueuedJob; -use OCP\Files\GenericFileException; -use OCP\Files\NotPermittedException; -use OCP\Lock\LockedException; use OCP\TaskProcessing\Exception\Exception; use OCP\TaskProcessing\Exception\NotFoundException; -use OCP\TaskProcessing\Exception\ProcessingException; -use OCP\TaskProcessing\Exception\UnauthorizedException; -use OCP\TaskProcessing\Exception\ValidationException; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\Task; @@ -57,46 +51,27 @@ class SynchronousBackgroundJob extends QueuedJob { $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); continue; } - try { - try { - $input = $this->taskProcessingManager->prepareInputData($task); - } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) { - $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } - try { - $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING); - $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress)); - } catch (ProcessingException $e) { - $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } catch (\Throwable $e) { - $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]); - $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); - // Schedule again - $this->jobList->add(self::class, $argument); - return; - } - $this->taskProcessingManager->setTaskResult($task->getId(), null, $output); - } catch (NotFoundException $e) { - $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]); - } catch (Exception $e) { - $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]); + if (!$this->taskProcessingManager->processTask($task, $provider)) { + // Schedule again + $this->jobList->add(self::class, $argument); } } + // check if this job needs to be scheduled again: + // if there is at least one preferred synchronous provider that has a scheduled task $synchronousProviders = array_filter($providers, fn ($provider) => $provider instanceof ISynchronousProvider); - $taskTypes = array_values(array_map(fn ($provider) => - $provider->getTaskTypeId(), - $synchronousProviders - )); + $synchronousPreferredProviders = array_filter($synchronousProviders, function ($provider) { + $taskTypeId = $provider->getTaskTypeId(); + $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId); + return $provider->getId() === $preferredProvider->getId(); + }); + $taskTypes = array_values( + array_map( + fn ($provider) => $provider->getTaskTypeId(), + $synchronousPreferredProviders + ) + ); $taskTypesWithTasks = array_filter($taskTypes, function ($taskType) { try { $this->taskProcessingManager->getNextScheduledTask([$taskType]); |