aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJulien Veyssier <julien-nc@posteo.net>2024-08-27 13:11:51 +0200
committerJulien Veyssier <julien-nc@posteo.net>2024-08-27 13:11:51 +0200
commitc1ed256d50a2f365060e2827be8893145073e434 (patch)
treef5f3028ea9a46621f4a455bb7d1bb8a2843a24d6
parent829642c9e504813bad55e39be7527abe70e488cc (diff)
downloadnextcloud-server-c1ed256d50a2f365060e2827be8893145073e434.tar.gz
nextcloud-server-c1ed256d50a2f365060e2827be8893145073e434.zip
feat(taskprocessing): add IManager::runTask method to run task synchronously
Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
-rw-r--r--lib/private/TaskProcessing/Manager.php164
-rw-r--r--lib/private/TaskProcessing/SynchronousBackgroundJob.php34
-rw-r--r--lib/public/TaskProcessing/IManager.php27
3 files changed, 152 insertions, 73 deletions
diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php
index 51dcc7c94da..e9db978034e 100644
--- a/lib/private/TaskProcessing/Manager.php
+++ b/lib/private/TaskProcessing/Manager.php
@@ -716,55 +716,69 @@ class Manager implements IManager {
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
- $taskTypes = $this->getAvailableTaskTypes();
- $inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
- $inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
- $inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
- $optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
- $optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
- $optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
- // validate input
- $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
- $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
- // authenticate access to mentioned files
- $ids = [];
- foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
- if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
- /** @var list<int>|int $inputSlot */
- $inputSlot = $task->getInput()[$key];
- if (is_array($inputSlot)) {
- $ids += $inputSlot;
- } else {
- $ids[] = $inputSlot;
- }
- }
- }
- foreach ($ids as $fileId) {
- $this->validateFileId($fileId);
- $this->validateUserAccessToFile($fileId, $task->getUserId());
- }
- // remove superfluous keys and set input
- $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
- $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
- $task->setInput($inputWithDefaults);
+ $this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
- $task->setScheduledAt(time());
- $provider = $this->getPreferredProvider($task->getTaskTypeId());
- // calculate expected completion time
- $completionExpectedAt = new \DateTime('now');
- $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
- $task->setCompletionExpectedAt($completionExpectedAt);
- // create a db entity and insert into db table
- $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
- $this->taskMapper->insert($taskEntity);
- // make sure the scheduler knows the id
- $task->setId($taskEntity->getId());
+ $this->storeTask($task);
// schedule synchronous job if the provider is synchronous
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->jobList->add(SynchronousBackgroundJob::class, null);
}
}
+ public function runTask(Task $task): Task {
+ if (!$this->canHandleTask($task)) {
+ throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
+ }
+
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
+ if ($provider instanceof ISynchronousProvider) {
+ $this->prepareTask($task);
+ $task->setStatus(Task::STATUS_SCHEDULED);
+ $this->storeTask($task);
+ $this->processTask($task, $provider);
+ $task = $this->getTask($task->getId());
+ } else {
+ $this->scheduleTask($task);
+ // poll task
+ while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
+ sleep(1);
+ $task = $this->getTask($task->getId());
+ }
+ }
+ return $task;
+ }
+
+ public function processTask(Task $task, ISynchronousProvider $provider): bool {
+ try {
+ try {
+ $input = $this->prepareInputData($task);
+ } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
+ $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ }
+ try {
+ $this->setTaskStatus($task, Task::STATUS_RUNNING);
+ $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
+ } catch (ProcessingException $e) {
+ $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ } catch (\Throwable $e) {
+ $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ }
+ $this->setTaskResult($task->getId(), null, $output);
+ } catch (NotFoundException $e) {
+ $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
+ } catch (Exception $e) {
+ $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+ }
+ return true;
+ }
+
public function deleteTask(Task $task): void {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->delete($taskEntity);
@@ -1096,6 +1110,72 @@ class Manager implements IManager {
}
/**
+ * Validate input, fill input default values, set completionExpectedAt, set scheduledAt
+ *
+ * @param Task $task
+ * @return void
+ * @throws UnauthorizedException
+ * @throws ValidationException
+ * @throws \OCP\TaskProcessing\Exception\Exception
+ */
+ private function prepareTask(Task $task): void {
+ $taskTypes = $this->getAvailableTaskTypes();
+ $taskType = $taskTypes[$task->getTaskTypeId()];
+ $inputShape = $taskType['inputShape'];
+ $inputShapeDefaults = $taskType['inputShapeDefaults'];
+ $inputShapeEnumValues = $taskType['inputShapeEnumValues'];
+ $optionalInputShape = $taskType['optionalInputShape'];
+ $optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
+ $optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
+ // validate input
+ $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
+ $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
+ // authenticate access to mentioned files
+ $ids = [];
+ foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
+ if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
+ /** @var list<int>|int $inputSlot */
+ $inputSlot = $task->getInput()[$key];
+ if (is_array($inputSlot)) {
+ $ids += $inputSlot;
+ } else {
+ $ids[] = $inputSlot;
+ }
+ }
+ }
+ foreach ($ids as $fileId) {
+ $this->validateFileId($fileId);
+ $this->validateUserAccessToFile($fileId, $task->getUserId());
+ }
+ // remove superfluous keys and set input
+ $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
+ $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
+ $task->setInput($inputWithDefaults);
+ $task->setScheduledAt(time());
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
+ // calculate expected completion time
+ $completionExpectedAt = new \DateTime('now');
+ $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
+ $task->setCompletionExpectedAt($completionExpectedAt);
+ }
+
+ /**
+ * Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
+ *
+ * @param Task $task
+ * @return void
+ * @throws Exception
+ * @throws \JsonException
+ */
+ private function storeTask(Task $task): void {
+ // create a db entity and insert into db table
+ $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
+ $this->taskMapper->insert($taskEntity);
+ // make sure the scheduler knows the id
+ $task->setId($taskEntity->getId());
+ }
+
+ /**
* @param array $output
* @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
* @return array
diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php
index 85a8fbc21f6..3d85625da8f 100644
--- a/lib/private/TaskProcessing/SynchronousBackgroundJob.php
+++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php
@@ -57,37 +57,9 @@ class SynchronousBackgroundJob extends QueuedJob {
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
continue;
}
- try {
- try {
- $input = $this->taskProcessingManager->prepareInputData($task);
- } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
- $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- }
- try {
- $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
- $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
- } catch (ProcessingException $e) {
- $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- } catch (\Throwable $e) {
- $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- }
- $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
- } catch (NotFoundException $e) {
- $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
- } catch (Exception $e) {
- $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+ if (!$this->taskProcessingManager->processTask($task, $provider)) {
+ // Schedule again
+ $this->jobList->add(self::class, $argument);
}
}
diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php
index 86788449aaf..c26a5d67339 100644
--- a/lib/public/TaskProcessing/IManager.php
+++ b/lib/public/TaskProcessing/IManager.php
@@ -62,6 +62,33 @@ interface IManager {
public function scheduleTask(Task $task): void;
/**
+ * Run the task and return the finished task
+ *
+ * @param Task $task The task to run
+ * @return Task The result task
+ * @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called
+ * @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs
+ * @throws Exception storing the task in the database failed
+ * @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input
+ * @since 30.0.0
+ */
+ public function runTask(Task $task): Task;
+
+ /**
+ * Process task with a synchronous provider
+ *
+ * Prepare task input data and run the process method of the provider
+ * This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask()
+ *
+ * @param Task $task
+ * @param ISynchronousProvider $provider
+ * @return bool True if the task has run successfully
+ * @throws Exception
+ * @since 30.0.0
+ */
+ public function processTask(Task $task, ISynchronousProvider $provider): bool;
+
+ /**
* Delete a task that has been scheduled before
*
* @param Task $task The task to delete