aboutsummaryrefslogtreecommitdiffstats
path: root/lib/private/TaskProcessing
diff options
context:
space:
mode:
Diffstat (limited to 'lib/private/TaskProcessing')
-rw-r--r--lib/private/TaskProcessing/Db/Task.php10
-rw-r--r--lib/private/TaskProcessing/Db/TaskMapper.php25
-rw-r--r--lib/private/TaskProcessing/Manager.php194
-rw-r--r--lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php44
-rw-r--r--lib/private/TaskProcessing/SynchronousBackgroundJob.php4
5 files changed, 233 insertions, 44 deletions
diff --git a/lib/private/TaskProcessing/Db/Task.php b/lib/private/TaskProcessing/Db/Task.php
index 4d919deaf94..05c0ae9ac74 100644
--- a/lib/private/TaskProcessing/Db/Task.php
+++ b/lib/private/TaskProcessing/Db/Task.php
@@ -45,6 +45,8 @@ use OCP\TaskProcessing\Task as OCPTask;
* @method int getStartedAt()
* @method setEndedAt(int $endedAt)
* @method int getEndedAt()
+ * @method setAllowCleanup(int $allowCleanup)
+ * @method int getAllowCleanup()
*/
class Task extends Entity {
protected $lastUpdated;
@@ -63,16 +65,17 @@ class Task extends Entity {
protected $scheduledAt;
protected $startedAt;
protected $endedAt;
+ protected $allowCleanup;
/**
* @var string[]
*/
- public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at'];
+ public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup'];
/**
* @var string[]
*/
- public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt'];
+ public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup'];
public function __construct() {
@@ -94,6 +97,7 @@ class Task extends Entity {
$this->addType('scheduledAt', 'integer');
$this->addType('startedAt', 'integer');
$this->addType('endedAt', 'integer');
+ $this->addType('allowCleanup', 'integer');
}
public function toRow(): array {
@@ -122,6 +126,7 @@ class Task extends Entity {
'scheduledAt' => $task->getScheduledAt(),
'startedAt' => $task->getStartedAt(),
'endedAt' => $task->getEndedAt(),
+ 'allowCleanup' => $task->getAllowCleanup() ? 1 : 0,
]);
return $taskEntity;
}
@@ -144,6 +149,7 @@ class Task extends Entity {
$task->setScheduledAt($this->getScheduledAt());
$task->setStartedAt($this->getStartedAt());
$task->setEndedAt($this->getEndedAt());
+ $task->setAllowCleanup($this->getAllowCleanup() !== 0);
return $task;
}
}
diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php
index 91fd68820ae..fee96534633 100644
--- a/lib/private/TaskProcessing/Db/TaskMapper.php
+++ b/lib/private/TaskProcessing/Db/TaskMapper.php
@@ -183,16 +183,39 @@ class TaskMapper extends QBMapper {
/**
* @param int $timeout
+ * @param bool $force If true, ignore the allow_cleanup flag
* @return int the number of deleted tasks
* @throws Exception
*/
- public function deleteOlderThan(int $timeout): int {
+ public function deleteOlderThan(int $timeout, bool $force = false): int {
$qb = $this->db->getQueryBuilder();
$qb->delete($this->tableName)
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
+ if (!$force) {
+ $qb->andWhere($qb->expr()->eq('allow_cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
+ }
return $qb->executeStatement();
}
+ /**
+ * @param int $timeout
+ * @param bool $force If true, ignore the allow_cleanup flag
+ * @return \Generator<Task>
+ * @throws Exception
+ */
+ public function getTasksToCleanup(int $timeout, bool $force = false): \Generator {
+ $qb = $this->db->getQueryBuilder();
+ $qb->select(Task::$columns)
+ ->from($this->tableName)
+ ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
+ if (!$force) {
+ $qb->andWhere($qb->expr()->eq('allow_cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
+ }
+ foreach ($this->yieldEntities($qb) as $entity) {
+ yield $entity;
+ };
+ }
+
public function update(Entity $entity): Entity {
$entity->setLastUpdated($this->timeFactory->now()->getTimestamp());
return parent::update($entity);
diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php
index 0582d801e3d..e288f2981a8 100644
--- a/lib/private/TaskProcessing/Manager.php
+++ b/lib/private/TaskProcessing/Manager.php
@@ -30,17 +30,21 @@ use OCP\Files\IRootFolder;
use OCP\Files\Node;
use OCP\Files\NotPermittedException;
use OCP\Files\SimpleFS\ISimpleFile;
+use OCP\Files\SimpleFS\ISimpleFolder;
use OCP\Http\Client\IClientService;
+use OCP\IAppConfig;
use OCP\ICache;
use OCP\ICacheFactory;
-use OCP\IConfig;
use OCP\IL10N;
use OCP\IServerContainer;
+use OCP\IUserManager;
+use OCP\IUserSession;
use OCP\L10N\IFactory;
use OCP\Lock\LockedException;
use OCP\SpeechToText\ISpeechToTextProvider;
use OCP\SpeechToText\ISpeechToTextProviderWithId;
use OCP\TaskProcessing\EShapeType;
+use OCP\TaskProcessing\Events\GetTaskProcessingProvidersEvent;
use OCP\TaskProcessing\Events\TaskFailedEvent;
use OCP\TaskProcessing\Events\TaskSuccessfulEvent;
use OCP\TaskProcessing\Exception\NotFoundException;
@@ -70,6 +74,13 @@ class Manager implements IManager {
public const LEGACY_PREFIX_TEXTTOIMAGE = 'legacy:TextToImage:';
public const LEGACY_PREFIX_SPEECHTOTEXT = 'legacy:SpeechToText:';
+ public const LAZY_CONFIG_KEYS = [
+ 'ai.taskprocessing_type_preferences',
+ 'ai.taskprocessing_provider_preferences',
+ ];
+
+ public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 30 * 4; // 4 months
+
/** @var list<IProvider>|null */
private ?array $providers = null;
@@ -81,10 +92,15 @@ class Manager implements IManager {
private IAppData $appData;
private ?array $preferences = null;
private ?array $providersById = null;
+
+ /** @var ITaskType[]|null */
+ private ?array $taskTypes = null;
private ICache $distributedCache;
+ private ?GetTaskProcessingProvidersEvent $eventResult = null;
+
public function __construct(
- private IConfig $config,
+ private IAppConfig $appConfig,
private Coordinator $coordinator,
private IServerContainer $serverContainer,
private LoggerInterface $logger,
@@ -97,6 +113,8 @@ class Manager implements IManager {
private IUserMountCache $userMountCache,
private IClientService $clientService,
private IAppManager $appManager,
+ private IUserManager $userManager,
+ private IUserSession $userSession,
ICacheFactory $cacheFactory,
) {
$this->appData = $appDataFactory->get('core');
@@ -489,6 +507,20 @@ class Manager implements IManager {
}
/**
+ * Dispatches the event to collect external providers and task types.
+ * Caches the result within the request.
+ */
+ private function dispatchGetProvidersEvent(): GetTaskProcessingProvidersEvent {
+ if ($this->eventResult !== null) {
+ return $this->eventResult;
+ }
+
+ $this->eventResult = new GetTaskProcessingProvidersEvent();
+ $this->dispatcher->dispatchTyped($this->eventResult);
+ return $this->eventResult ;
+ }
+
+ /**
* @return IProvider[]
*/
private function _getProviders(): array {
@@ -516,6 +548,16 @@ class Manager implements IManager {
}
}
+ $event = $this->dispatchGetProvidersEvent();
+ $externalProviders = $event->getProviders();
+ foreach ($externalProviders as $provider) {
+ if (!isset($providers[$provider->getId()])) {
+ $providers[$provider->getId()] = $provider;
+ } else {
+ $this->logger->info('Skipping external task processing provider with ID ' . $provider->getId() . ' because a local provider with the same ID already exists.');
+ }
+ }
+
$providers += $this->_getTextProcessingProviders() + $this->_getTextToImageProviders() + $this->_getSpeechToTextProviders();
return $providers;
@@ -531,6 +573,10 @@ class Manager implements IManager {
return [];
}
+ if ($this->taskTypes !== null) {
+ return $this->taskTypes;
+ }
+
// Default task types
$taskTypes = [
\OCP\TaskProcessing\TaskTypes\TextToText::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToText::class),
@@ -550,6 +596,10 @@ class Manager implements IManager {
\OCP\TaskProcessing\TaskTypes\TextToTextChatWithTools::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextChatWithTools::class),
\OCP\TaskProcessing\TaskTypes\ContextAgentInteraction::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\ContextAgentInteraction::class),
\OCP\TaskProcessing\TaskTypes\TextToTextProofread::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextProofread::class),
+ \OCP\TaskProcessing\TaskTypes\TextToSpeech::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToSpeech::class),
+ \OCP\TaskProcessing\TaskTypes\AudioToAudioChat::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\AudioToAudioChat::class),
+ \OCP\TaskProcessing\TaskTypes\ContextAgentAudioInteraction::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\ContextAgentAudioInteraction::class),
+ \OCP\TaskProcessing\TaskTypes\AnalyzeImages::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\AnalyzeImages::class),
];
foreach ($context->getTaskProcessingTaskTypes() as $providerServiceRegistration) {
@@ -568,9 +618,19 @@ class Manager implements IManager {
}
}
+ $event = $this->dispatchGetProvidersEvent();
+ $externalTaskTypes = $event->getTaskTypes();
+ foreach ($externalTaskTypes as $taskType) {
+ if (isset($taskTypes[$taskType->getId()])) {
+ $this->logger->warning('External task processing task type is using ID ' . $taskType->getId() . ' which is already used by a locally registered task type (' . get_class($taskTypes[$taskType->getId()]) . ')');
+ }
+ $taskTypes[$taskType->getId()] = $taskType;
+ }
+
$taskTypes += $this->_getTextProcessingTaskTypes();
- return $taskTypes;
+ $this->taskTypes = $taskTypes;
+ return $this->taskTypes;
}
/**
@@ -578,7 +638,7 @@ class Manager implements IManager {
*/
private function _getTaskTypeSettings(): array {
try {
- $json = $this->config->getAppValue('core', 'ai.taskprocessing_type_preferences', '');
+ $json = $this->appConfig->getValueString('core', 'ai.taskprocessing_type_preferences', '', lazy: true);
if ($json === '') {
return [];
}
@@ -736,7 +796,11 @@ class Manager implements IManager {
if ($this->preferences === null) {
$this->preferences = $this->distributedCache->get('ai.taskprocessing_provider_preferences');
if ($this->preferences === null) {
- $this->preferences = json_decode($this->config->getAppValue('core', 'ai.taskprocessing_provider_preferences', 'null'), associative: true, flags: JSON_THROW_ON_ERROR);
+ $this->preferences = json_decode(
+ $this->appConfig->getValueString('core', 'ai.taskprocessing_provider_preferences', 'null', lazy: true),
+ associative: true,
+ flags: JSON_THROW_ON_ERROR,
+ );
$this->distributedCache->set('ai.taskprocessing_provider_preferences', $this->preferences, 60 * 3);
}
}
@@ -764,7 +828,11 @@ class Manager implements IManager {
throw new \OCP\TaskProcessing\Exception\Exception('No matching provider found');
}
- public function getAvailableTaskTypes(bool $showDisabled = false): array {
+ public function getAvailableTaskTypes(bool $showDisabled = false, ?string $userId = null): array {
+ // userId will be obtained from the session if left to null
+ if (!$this->checkGuestAccess($userId)) {
+ return [];
+ }
if ($this->availableTaskTypes === null) {
$cachedValue = $this->distributedCache->get('available_task_types_v2');
if ($cachedValue !== null) {
@@ -823,7 +891,27 @@ class Manager implements IManager {
return isset($this->getAvailableTaskTypes()[$task->getTaskTypeId()]);
}
+ private function checkGuestAccess(?string $userId = null): bool {
+ if ($userId === null && !$this->userSession->isLoggedIn()) {
+ return true;
+ }
+ if ($userId === null) {
+ $user = $this->userSession->getUser();
+ } else {
+ $user = $this->userManager->get($userId);
+ }
+
+ $guestsAllowed = $this->appConfig->getValueString('core', 'ai.taskprocessing_guests', 'false');
+ if ($guestsAllowed == 'true' || !class_exists(\OCA\Guests\UserBackend::class) || !($user->getBackend() instanceof \OCA\Guests\UserBackend)) {
+ return true;
+ }
+ return false;
+ }
+
public function scheduleTask(Task $task): void {
+ if (!$this->checkGuestAccess($task->getUserId())) {
+ throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('Access to this resource is forbidden for guests.');
+ }
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
@@ -838,6 +926,9 @@ class Manager implements IManager {
}
public function runTask(Task $task): Task {
+ if (!$this->checkGuestAccess($task->getUserId())) {
+ throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('Access to this resource is forbidden for guests.');
+ }
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
@@ -1361,6 +1452,97 @@ class Manager implements IManager {
}
/**
+ * @param Task $task
+ * @return list<int>
+ * @throws NotFoundException
+ */
+ public function extractFileIdsFromTask(Task $task): array {
+ $ids = [];
+ $taskTypes = $this->getAvailableTaskTypes();
+ if (!isset($taskTypes[$task->getTaskTypeId()])) {
+ throw new NotFoundException('Could not find task type');
+ }
+ $taskType = $taskTypes[$task->getTaskTypeId()];
+ foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) {
+ if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
+ /** @var int|list<int> $inputSlot */
+ $inputSlot = $task->getInput()[$key];
+ if (is_array($inputSlot)) {
+ $ids = array_merge($inputSlot, $ids);
+ } else {
+ $ids[] = $inputSlot;
+ }
+ }
+ }
+ if ($task->getOutput() !== null) {
+ foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) {
+ if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
+ /** @var int|list<int> $outputSlot */
+ $outputSlot = $task->getOutput()[$key];
+ if (is_array($outputSlot)) {
+ $ids = array_merge($outputSlot, $ids);
+ } else {
+ $ids[] = $outputSlot;
+ }
+ }
+ }
+ }
+ return $ids;
+ }
+
+ /**
+ * @param ISimpleFolder $folder
+ * @param int $ageInSeconds
+ * @return \Generator
+ */
+ public function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds = self::MAX_TASK_AGE_SECONDS): \Generator {
+ foreach ($folder->getDirectoryListing() as $file) {
+ if ($file->getMTime() < time() - $ageInSeconds) {
+ try {
+ $fileName = $file->getName();
+ $file->delete();
+ yield $fileName;
+ } catch (NotPermittedException $e) {
+ $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param int $ageInSeconds
+ * @return \Generator
+ * @throws Exception
+ * @throws InvalidPathException
+ * @throws NotFoundException
+ * @throws \JsonException
+ * @throws \OCP\Files\NotFoundException
+ */
+ public function cleanupTaskProcessingTaskFiles(int $ageInSeconds = self::MAX_TASK_AGE_SECONDS): \Generator {
+ $taskIdsToCleanup = [];
+ foreach ($this->taskMapper->getTasksToCleanup($ageInSeconds) as $task) {
+ $taskIdsToCleanup[] = $task->getId();
+ $ocpTask = $task->toPublicTask();
+ $fileIds = $this->extractFileIdsFromTask($ocpTask);
+ foreach ($fileIds as $fileId) {
+ // only look for output files stored in appData/TaskProcessing/
+ $file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/core/TaskProcessing/');
+ if ($file instanceof File) {
+ try {
+ $fileId = $file->getId();
+ $fileName = $file->getName();
+ $file->delete();
+ yield ['task_id' => $task->getId(), 'file_id' => $fileId, 'file_name' => $fileName];
+ } catch (NotPermittedException $e) {
+ $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
+ }
+ }
+ }
+ }
+ return $taskIdsToCleanup;
+ }
+
+ /**
* Make a request to the task's webhookUri if necessary
*
* @param Task $task
diff --git a/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php
index c6f26e3aa8b..52fc204b752 100644
--- a/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php
+++ b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php
@@ -10,17 +10,14 @@ use OC\TaskProcessing\Db\TaskMapper;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\TimedJob;
use OCP\Files\AppData\IAppDataFactory;
-use OCP\Files\NotFoundException;
-use OCP\Files\NotPermittedException;
-use OCP\Files\SimpleFS\ISimpleFolder;
use Psr\Log\LoggerInterface;
class RemoveOldTasksBackgroundJob extends TimedJob {
- public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 7 * 4; // 4 weeks
private \OCP\Files\IAppData $appData;
public function __construct(
ITimeFactory $timeFactory,
+ private Manager $taskProcessingManager,
private TaskMapper $taskMapper,
private LoggerInterface $logger,
IAppDataFactory $appDataFactory,
@@ -32,48 +29,29 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
$this->appData = $appDataFactory->get('core');
}
-
/**
* @inheritDoc
*/
protected function run($argument): void {
try {
- $this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS);
- } catch (\OCP\DB\Exception $e) {
- $this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]);
+ iterator_to_array($this->taskProcessingManager->cleanupTaskProcessingTaskFiles());
+ } catch (\Exception $e) {
+ $this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]);
}
try {
- $this->clearFilesOlderThan($this->appData->getFolder('text2image'), self::MAX_TASK_AGE_SECONDS);
- } catch (NotFoundException $e) {
- // noop
+ $this->taskMapper->deleteOlderThan(Manager::MAX_TASK_AGE_SECONDS);
+ } catch (\OCP\DB\Exception $e) {
+ $this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]);
}
try {
- $this->clearFilesOlderThan($this->appData->getFolder('audio2text'), self::MAX_TASK_AGE_SECONDS);
- } catch (NotFoundException $e) {
+ iterator_to_array($this->taskProcessingManager->clearFilesOlderThan($this->appData->getFolder('text2image')));
+ } catch (\OCP\Files\NotFoundException $e) {
// noop
}
try {
- $this->clearFilesOlderThan($this->appData->getFolder('TaskProcessing'), self::MAX_TASK_AGE_SECONDS);
- } catch (NotFoundException $e) {
+ iterator_to_array($this->taskProcessingManager->clearFilesOlderThan($this->appData->getFolder('audio2text')));
+ } catch (\OCP\Files\NotFoundException $e) {
// noop
}
}
-
- /**
- * @param ISimpleFolder $folder
- * @param int $ageInSeconds
- * @return void
- */
- private function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds): void {
- foreach ($folder->getDirectoryListing() as $file) {
- if ($file->getMTime() < time() - $ageInSeconds) {
- try {
- $file->delete();
- } catch (NotPermittedException $e) {
- $this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
- }
- }
- }
- }
-
}
diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php
index de3b424176c..19c53d59932 100644
--- a/lib/private/TaskProcessing/SynchronousBackgroundJob.php
+++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php
@@ -59,8 +59,8 @@ class SynchronousBackgroundJob extends QueuedJob {
// check if this job needs to be scheduled again:
// if there is at least one preferred synchronous provider that has a scheduled task
- $synchronousProviders = array_filter($providers, fn ($provider) =>
- $provider instanceof ISynchronousProvider);
+ $synchronousProviders = array_filter($providers, fn ($provider)
+ => $provider instanceof ISynchronousProvider);
$synchronousPreferredProviders = array_filter($synchronousProviders, function ($provider) {
$taskTypeId = $provider->getTaskTypeId();
$preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);