if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
- $taskTypes = $this->getAvailableTaskTypes();
- $inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
- $inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
- $inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
- $optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
- $optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
- $optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
- // validate input
- $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
- $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
- // authenticate access to mentioned files
- $ids = [];
- foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
- if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
- /** @var list<int>|int $inputSlot */
- $inputSlot = $task->getInput()[$key];
- if (is_array($inputSlot)) {
- $ids += $inputSlot;
- } else {
- $ids[] = $inputSlot;
- }
- }
- }
- foreach ($ids as $fileId) {
- $this->validateFileId($fileId);
- $this->validateUserAccessToFile($fileId, $task->getUserId());
- }
- // remove superfluous keys and set input
- $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
- $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
- $task->setInput($inputWithDefaults);
+ $this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
- $task->setScheduledAt(time());
- $provider = $this->getPreferredProvider($task->getTaskTypeId());
- // calculate expected completion time
- $completionExpectedAt = new \DateTime('now');
- $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
- $task->setCompletionExpectedAt($completionExpectedAt);
- // create a db entity and insert into db table
- $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
- $this->taskMapper->insert($taskEntity);
- // make sure the scheduler knows the id
- $task->setId($taskEntity->getId());
+ $this->storeTask($task);
// schedule synchronous job if the provider is synchronous
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->jobList->add(SynchronousBackgroundJob::class, null);
}
}
+ public function runTask(Task $task): Task {
+ if (!$this->canHandleTask($task)) {
+ throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
+ }
+
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
+ if ($provider instanceof ISynchronousProvider) {
+ $this->prepareTask($task);
+ $task->setStatus(Task::STATUS_SCHEDULED);
+ $this->storeTask($task);
+ $this->processTask($task, $provider);
+ $task = $this->getTask($task->getId());
+ } else {
+ $this->scheduleTask($task);
+ // poll task
+ while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
+ sleep(1);
+ $task = $this->getTask($task->getId());
+ }
+ }
+ return $task;
+ }
+
+ public function processTask(Task $task, ISynchronousProvider $provider): bool {
+ try {
+ try {
+ $input = $this->prepareInputData($task);
+ } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
+ $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ }
+ try {
+ $this->setTaskStatus($task, Task::STATUS_RUNNING);
+ $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
+ } catch (ProcessingException $e) {
+ $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ } catch (\Throwable $e) {
+ $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
+ $this->setTaskResult($task->getId(), $e->getMessage(), null);
+ return false;
+ }
+ $this->setTaskResult($task->getId(), null, $output);
+ } catch (NotFoundException $e) {
+ $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
+ } catch (Exception $e) {
+ $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+ }
+ return true;
+ }
+
public function deleteTask(Task $task): void {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->delete($taskEntity);
$this->taskMapper->update($taskEntity);
}
+ /**
+ * Validate input, fill input default values, set completionExpectedAt, set scheduledAt
+ *
+ * @param Task $task
+ * @return void
+ * @throws UnauthorizedException
+ * @throws ValidationException
+ * @throws \OCP\TaskProcessing\Exception\Exception
+ */
+ private function prepareTask(Task $task): void {
+ $taskTypes = $this->getAvailableTaskTypes();
+ $taskType = $taskTypes[$task->getTaskTypeId()];
+ $inputShape = $taskType['inputShape'];
+ $inputShapeDefaults = $taskType['inputShapeDefaults'];
+ $inputShapeEnumValues = $taskType['inputShapeEnumValues'];
+ $optionalInputShape = $taskType['optionalInputShape'];
+ $optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
+ $optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
+ // validate input
+ $this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
+ $this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
+ // authenticate access to mentioned files
+ $ids = [];
+ foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
+ if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
+ /** @var list<int>|int $inputSlot */
+ $inputSlot = $task->getInput()[$key];
+ if (is_array($inputSlot)) {
+ $ids += $inputSlot;
+ } else {
+ $ids[] = $inputSlot;
+ }
+ }
+ }
+ foreach ($ids as $fileId) {
+ $this->validateFileId($fileId);
+ $this->validateUserAccessToFile($fileId, $task->getUserId());
+ }
+ // remove superfluous keys and set input
+ $input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
+ $inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
+ $task->setInput($inputWithDefaults);
+ $task->setScheduledAt(time());
+ $provider = $this->getPreferredProvider($task->getTaskTypeId());
+ // calculate expected completion time
+ $completionExpectedAt = new \DateTime('now');
+ $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
+ $task->setCompletionExpectedAt($completionExpectedAt);
+ }
+
+ /**
+ * Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
+ *
+ * @param Task $task
+ * @return void
+ * @throws Exception
+ * @throws \JsonException
+ */
+ private function storeTask(Task $task): void {
+ // create a db entity and insert into db table
+ $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
+ $this->taskMapper->insert($taskEntity);
+ // make sure the scheduler knows the id
+ $task->setId($taskEntity->getId());
+ }
+
/**
* @param array $output
* @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
continue;
}
- try {
- try {
- $input = $this->taskProcessingManager->prepareInputData($task);
- } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
- $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- }
- try {
- $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
- $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
- } catch (ProcessingException $e) {
- $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- } catch (\Throwable $e) {
- $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
- $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
- // Schedule again
- $this->jobList->add(self::class, $argument);
- return;
- }
- $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
- } catch (NotFoundException $e) {
- $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
- } catch (Exception $e) {
- $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
+ if (!$this->taskProcessingManager->processTask($task, $provider)) {
+ // Schedule again
+ $this->jobList->add(self::class, $argument);
}
}