diff options
Diffstat (limited to 'lib/private/TaskProcessing')
-rw-r--r-- | lib/private/TaskProcessing/Db/Task.php | 10 | ||||
-rw-r--r-- | lib/private/TaskProcessing/Db/TaskMapper.php | 25 | ||||
-rw-r--r-- | lib/private/TaskProcessing/Manager.php | 194 | ||||
-rw-r--r-- | lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php | 44 | ||||
-rw-r--r-- | lib/private/TaskProcessing/SynchronousBackgroundJob.php | 4 |
5 files changed, 233 insertions, 44 deletions
diff --git a/lib/private/TaskProcessing/Db/Task.php b/lib/private/TaskProcessing/Db/Task.php index 4d919deaf94..05c0ae9ac74 100644 --- a/lib/private/TaskProcessing/Db/Task.php +++ b/lib/private/TaskProcessing/Db/Task.php @@ -45,6 +45,8 @@ use OCP\TaskProcessing\Task as OCPTask; * @method int getStartedAt() * @method setEndedAt(int $endedAt) * @method int getEndedAt() + * @method setAllowCleanup(int $allowCleanup) + * @method int getAllowCleanup() */ class Task extends Entity { protected $lastUpdated; @@ -63,16 +65,17 @@ class Task extends Entity { protected $scheduledAt; protected $startedAt; protected $endedAt; + protected $allowCleanup; /** * @var string[] */ - public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at']; + public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup']; /** * @var string[] */ - public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt']; + public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup']; public function __construct() { @@ -94,6 +97,7 @@ class Task extends Entity { $this->addType('scheduledAt', 'integer'); $this->addType('startedAt', 'integer'); $this->addType('endedAt', 'integer'); + $this->addType('allowCleanup', 'integer'); } public function toRow(): array { @@ -122,6 +126,7 @@ class Task extends Entity { 'scheduledAt' => $task->getScheduledAt(), 'startedAt' => $task->getStartedAt(), 'endedAt' => $task->getEndedAt(), + 'allowCleanup' => $task->getAllowCleanup() ? 1 : 0, ]); return $taskEntity; } @@ -144,6 +149,7 @@ class Task extends Entity { $task->setScheduledAt($this->getScheduledAt()); $task->setStartedAt($this->getStartedAt()); $task->setEndedAt($this->getEndedAt()); + $task->setAllowCleanup($this->getAllowCleanup() !== 0); return $task; } } diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index 91fd68820ae..fee96534633 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -183,16 +183,39 @@ class TaskMapper extends QBMapper { /** * @param int $timeout + * @param bool $force If true, ignore the allow_cleanup flag * @return int the number of deleted tasks * @throws Exception */ - public function deleteOlderThan(int $timeout): int { + public function deleteOlderThan(int $timeout, bool $force = false): int { $qb = $this->db->getQueryBuilder(); $qb->delete($this->tableName) ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout))); + if (!$force) { + $qb->andWhere($qb->expr()->eq('allow_cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT))); + } return $qb->executeStatement(); } + /** + * @param int $timeout + * @param bool $force If true, ignore the allow_cleanup flag + * @return \Generator<Task> + * @throws Exception + */ + public function getTasksToCleanup(int $timeout, bool $force = false): \Generator { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::$columns) + ->from($this->tableName) + ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout))); + if (!$force) { + $qb->andWhere($qb->expr()->eq('allow_cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT))); + } + foreach ($this->yieldEntities($qb) as $entity) { + yield $entity; + }; + } + public function update(Entity $entity): Entity { $entity->setLastUpdated($this->timeFactory->now()->getTimestamp()); return parent::update($entity); diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 0582d801e3d..e288f2981a8 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -30,17 +30,21 @@ use OCP\Files\IRootFolder; use OCP\Files\Node; use OCP\Files\NotPermittedException; use OCP\Files\SimpleFS\ISimpleFile; +use OCP\Files\SimpleFS\ISimpleFolder; use OCP\Http\Client\IClientService; +use OCP\IAppConfig; use OCP\ICache; use OCP\ICacheFactory; -use OCP\IConfig; use OCP\IL10N; use OCP\IServerContainer; +use OCP\IUserManager; +use OCP\IUserSession; use OCP\L10N\IFactory; use OCP\Lock\LockedException; use OCP\SpeechToText\ISpeechToTextProvider; use OCP\SpeechToText\ISpeechToTextProviderWithId; use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\Events\GetTaskProcessingProvidersEvent; use OCP\TaskProcessing\Events\TaskFailedEvent; use OCP\TaskProcessing\Events\TaskSuccessfulEvent; use OCP\TaskProcessing\Exception\NotFoundException; @@ -70,6 +74,13 @@ class Manager implements IManager { public const LEGACY_PREFIX_TEXTTOIMAGE = 'legacy:TextToImage:'; public const LEGACY_PREFIX_SPEECHTOTEXT = 'legacy:SpeechToText:'; + public const LAZY_CONFIG_KEYS = [ + 'ai.taskprocessing_type_preferences', + 'ai.taskprocessing_provider_preferences', + ]; + + public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 30 * 4; // 4 months + /** @var list<IProvider>|null */ private ?array $providers = null; @@ -81,10 +92,15 @@ class Manager implements IManager { private IAppData $appData; private ?array $preferences = null; private ?array $providersById = null; + + /** @var ITaskType[]|null */ + private ?array $taskTypes = null; private ICache $distributedCache; + private ?GetTaskProcessingProvidersEvent $eventResult = null; + public function __construct( - private IConfig $config, + private IAppConfig $appConfig, private Coordinator $coordinator, private IServerContainer $serverContainer, private LoggerInterface $logger, @@ -97,6 +113,8 @@ class Manager implements IManager { private IUserMountCache $userMountCache, private IClientService $clientService, private IAppManager $appManager, + private IUserManager $userManager, + private IUserSession $userSession, ICacheFactory $cacheFactory, ) { $this->appData = $appDataFactory->get('core'); @@ -489,6 +507,20 @@ class Manager implements IManager { } /** + * Dispatches the event to collect external providers and task types. + * Caches the result within the request. + */ + private function dispatchGetProvidersEvent(): GetTaskProcessingProvidersEvent { + if ($this->eventResult !== null) { + return $this->eventResult; + } + + $this->eventResult = new GetTaskProcessingProvidersEvent(); + $this->dispatcher->dispatchTyped($this->eventResult); + return $this->eventResult ; + } + + /** * @return IProvider[] */ private function _getProviders(): array { @@ -516,6 +548,16 @@ class Manager implements IManager { } } + $event = $this->dispatchGetProvidersEvent(); + $externalProviders = $event->getProviders(); + foreach ($externalProviders as $provider) { + if (!isset($providers[$provider->getId()])) { + $providers[$provider->getId()] = $provider; + } else { + $this->logger->info('Skipping external task processing provider with ID ' . $provider->getId() . ' because a local provider with the same ID already exists.'); + } + } + $providers += $this->_getTextProcessingProviders() + $this->_getTextToImageProviders() + $this->_getSpeechToTextProviders(); return $providers; @@ -531,6 +573,10 @@ class Manager implements IManager { return []; } + if ($this->taskTypes !== null) { + return $this->taskTypes; + } + // Default task types $taskTypes = [ \OCP\TaskProcessing\TaskTypes\TextToText::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToText::class), @@ -550,6 +596,10 @@ class Manager implements IManager { \OCP\TaskProcessing\TaskTypes\TextToTextChatWithTools::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextChatWithTools::class), \OCP\TaskProcessing\TaskTypes\ContextAgentInteraction::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\ContextAgentInteraction::class), \OCP\TaskProcessing\TaskTypes\TextToTextProofread::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextProofread::class), + \OCP\TaskProcessing\TaskTypes\TextToSpeech::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToSpeech::class), + \OCP\TaskProcessing\TaskTypes\AudioToAudioChat::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\AudioToAudioChat::class), + \OCP\TaskProcessing\TaskTypes\ContextAgentAudioInteraction::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\ContextAgentAudioInteraction::class), + \OCP\TaskProcessing\TaskTypes\AnalyzeImages::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\AnalyzeImages::class), ]; foreach ($context->getTaskProcessingTaskTypes() as $providerServiceRegistration) { @@ -568,9 +618,19 @@ class Manager implements IManager { } } + $event = $this->dispatchGetProvidersEvent(); + $externalTaskTypes = $event->getTaskTypes(); + foreach ($externalTaskTypes as $taskType) { + if (isset($taskTypes[$taskType->getId()])) { + $this->logger->warning('External task processing task type is using ID ' . $taskType->getId() . ' which is already used by a locally registered task type (' . get_class($taskTypes[$taskType->getId()]) . ')'); + } + $taskTypes[$taskType->getId()] = $taskType; + } + $taskTypes += $this->_getTextProcessingTaskTypes(); - return $taskTypes; + $this->taskTypes = $taskTypes; + return $this->taskTypes; } /** @@ -578,7 +638,7 @@ class Manager implements IManager { */ private function _getTaskTypeSettings(): array { try { - $json = $this->config->getAppValue('core', 'ai.taskprocessing_type_preferences', ''); + $json = $this->appConfig->getValueString('core', 'ai.taskprocessing_type_preferences', '', lazy: true); if ($json === '') { return []; } @@ -736,7 +796,11 @@ class Manager implements IManager { if ($this->preferences === null) { $this->preferences = $this->distributedCache->get('ai.taskprocessing_provider_preferences'); if ($this->preferences === null) { - $this->preferences = json_decode($this->config->getAppValue('core', 'ai.taskprocessing_provider_preferences', 'null'), associative: true, flags: JSON_THROW_ON_ERROR); + $this->preferences = json_decode( + $this->appConfig->getValueString('core', 'ai.taskprocessing_provider_preferences', 'null', lazy: true), + associative: true, + flags: JSON_THROW_ON_ERROR, + ); $this->distributedCache->set('ai.taskprocessing_provider_preferences', $this->preferences, 60 * 3); } } @@ -764,7 +828,11 @@ class Manager implements IManager { throw new \OCP\TaskProcessing\Exception\Exception('No matching provider found'); } - public function getAvailableTaskTypes(bool $showDisabled = false): array { + public function getAvailableTaskTypes(bool $showDisabled = false, ?string $userId = null): array { + // userId will be obtained from the session if left to null + if (!$this->checkGuestAccess($userId)) { + return []; + } if ($this->availableTaskTypes === null) { $cachedValue = $this->distributedCache->get('available_task_types_v2'); if ($cachedValue !== null) { @@ -823,7 +891,27 @@ class Manager implements IManager { return isset($this->getAvailableTaskTypes()[$task->getTaskTypeId()]); } + private function checkGuestAccess(?string $userId = null): bool { + if ($userId === null && !$this->userSession->isLoggedIn()) { + return true; + } + if ($userId === null) { + $user = $this->userSession->getUser(); + } else { + $user = $this->userManager->get($userId); + } + + $guestsAllowed = $this->appConfig->getValueString('core', 'ai.taskprocessing_guests', 'false'); + if ($guestsAllowed == 'true' || !class_exists(\OCA\Guests\UserBackend::class) || !($user->getBackend() instanceof \OCA\Guests\UserBackend)) { + return true; + } + return false; + } + public function scheduleTask(Task $task): void { + if (!$this->checkGuestAccess($task->getUserId())) { + throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('Access to this resource is forbidden for guests.'); + } if (!$this->canHandleTask($task)) { throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); } @@ -838,6 +926,9 @@ class Manager implements IManager { } public function runTask(Task $task): Task { + if (!$this->checkGuestAccess($task->getUserId())) { + throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('Access to this resource is forbidden for guests.'); + } if (!$this->canHandleTask($task)) { throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId()); } @@ -1361,6 +1452,97 @@ class Manager implements IManager { } /** + * @param Task $task + * @return list<int> + * @throws NotFoundException + */ + public function extractFileIdsFromTask(Task $task): array { + $ids = []; + $taskTypes = $this->getAvailableTaskTypes(); + if (!isset($taskTypes[$task->getTaskTypeId()])) { + throw new NotFoundException('Could not find task type'); + } + $taskType = $taskTypes[$task->getTaskTypeId()]; + foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) { + if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { + /** @var int|list<int> $inputSlot */ + $inputSlot = $task->getInput()[$key]; + if (is_array($inputSlot)) { + $ids = array_merge($inputSlot, $ids); + } else { + $ids[] = $inputSlot; + } + } + } + if ($task->getOutput() !== null) { + foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) { + if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) { + /** @var int|list<int> $outputSlot */ + $outputSlot = $task->getOutput()[$key]; + if (is_array($outputSlot)) { + $ids = array_merge($outputSlot, $ids); + } else { + $ids[] = $outputSlot; + } + } + } + } + return $ids; + } + + /** + * @param ISimpleFolder $folder + * @param int $ageInSeconds + * @return \Generator + */ + public function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds = self::MAX_TASK_AGE_SECONDS): \Generator { + foreach ($folder->getDirectoryListing() as $file) { + if ($file->getMTime() < time() - $ageInSeconds) { + try { + $fileName = $file->getName(); + $file->delete(); + yield $fileName; + } catch (NotPermittedException $e) { + $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]); + } + } + } + } + + /** + * @param int $ageInSeconds + * @return \Generator + * @throws Exception + * @throws InvalidPathException + * @throws NotFoundException + * @throws \JsonException + * @throws \OCP\Files\NotFoundException + */ + public function cleanupTaskProcessingTaskFiles(int $ageInSeconds = self::MAX_TASK_AGE_SECONDS): \Generator { + $taskIdsToCleanup = []; + foreach ($this->taskMapper->getTasksToCleanup($ageInSeconds) as $task) { + $taskIdsToCleanup[] = $task->getId(); + $ocpTask = $task->toPublicTask(); + $fileIds = $this->extractFileIdsFromTask($ocpTask); + foreach ($fileIds as $fileId) { + // only look for output files stored in appData/TaskProcessing/ + $file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/core/TaskProcessing/'); + if ($file instanceof File) { + try { + $fileId = $file->getId(); + $fileName = $file->getName(); + $file->delete(); + yield ['task_id' => $task->getId(), 'file_id' => $fileId, 'file_name' => $fileName]; + } catch (NotPermittedException $e) { + $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]); + } + } + } + } + return $taskIdsToCleanup; + } + + /** * Make a request to the task's webhookUri if necessary * * @param Task $task diff --git a/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php index c6f26e3aa8b..52fc204b752 100644 --- a/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php +++ b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php @@ -10,17 +10,14 @@ use OC\TaskProcessing\Db\TaskMapper; use OCP\AppFramework\Utility\ITimeFactory; use OCP\BackgroundJob\TimedJob; use OCP\Files\AppData\IAppDataFactory; -use OCP\Files\NotFoundException; -use OCP\Files\NotPermittedException; -use OCP\Files\SimpleFS\ISimpleFolder; use Psr\Log\LoggerInterface; class RemoveOldTasksBackgroundJob extends TimedJob { - public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 7 * 4; // 4 weeks private \OCP\Files\IAppData $appData; public function __construct( ITimeFactory $timeFactory, + private Manager $taskProcessingManager, private TaskMapper $taskMapper, private LoggerInterface $logger, IAppDataFactory $appDataFactory, @@ -32,48 +29,29 @@ class RemoveOldTasksBackgroundJob extends TimedJob { $this->appData = $appDataFactory->get('core'); } - /** * @inheritDoc */ protected function run($argument): void { try { - $this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS); - } catch (\OCP\DB\Exception $e) { - $this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]); + iterator_to_array($this->taskProcessingManager->cleanupTaskProcessingTaskFiles()); + } catch (\Exception $e) { + $this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]); } try { - $this->clearFilesOlderThan($this->appData->getFolder('text2image'), self::MAX_TASK_AGE_SECONDS); - } catch (NotFoundException $e) { - // noop + $this->taskMapper->deleteOlderThan(Manager::MAX_TASK_AGE_SECONDS); + } catch (\OCP\DB\Exception $e) { + $this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]); } try { - $this->clearFilesOlderThan($this->appData->getFolder('audio2text'), self::MAX_TASK_AGE_SECONDS); - } catch (NotFoundException $e) { + iterator_to_array($this->taskProcessingManager->clearFilesOlderThan($this->appData->getFolder('text2image'))); + } catch (\OCP\Files\NotFoundException $e) { // noop } try { - $this->clearFilesOlderThan($this->appData->getFolder('TaskProcessing'), self::MAX_TASK_AGE_SECONDS); - } catch (NotFoundException $e) { + iterator_to_array($this->taskProcessingManager->clearFilesOlderThan($this->appData->getFolder('audio2text'))); + } catch (\OCP\Files\NotFoundException $e) { // noop } } - - /** - * @param ISimpleFolder $folder - * @param int $ageInSeconds - * @return void - */ - private function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds): void { - foreach ($folder->getDirectoryListing() as $file) { - if ($file->getMTime() < time() - $ageInSeconds) { - try { - $file->delete(); - } catch (NotPermittedException $e) { - $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]); - } - } - } - } - } diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php index de3b424176c..19c53d59932 100644 --- a/lib/private/TaskProcessing/SynchronousBackgroundJob.php +++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php @@ -59,8 +59,8 @@ class SynchronousBackgroundJob extends QueuedJob { // check if this job needs to be scheduled again: // if there is at least one preferred synchronous provider that has a scheduled task - $synchronousProviders = array_filter($providers, fn ($provider) => - $provider instanceof ISynchronousProvider); + $synchronousProviders = array_filter($providers, fn ($provider) + => $provider instanceof ISynchronousProvider); $synchronousPreferredProviders = array_filter($synchronousProviders, function ($provider) { $taskTypeId = $provider->getTaskTypeId(); $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId); |