diff options
author | Marcel Klehr <mklehr@gmx.net> | 2024-04-29 16:21:07 +0200 |
---|---|---|
committer | Marcel Klehr <mklehr@gmx.net> | 2024-05-14 11:38:39 +0200 |
commit | 00894e24208e4e6ae78609b8828ba32544e88ee8 (patch) | |
tree | e3cb1c6026c9a8a7688b2df5220bfee9f64e7b13 | |
parent | e3f341fecb8a63b88f897d22af185cb2886ebe56 (diff) | |
download | nextcloud-server-00894e24208e4e6ae78609b8828ba32544e88ee8.tar.gz nextcloud-server-00894e24208e4e6ae78609b8828ba32544e88ee8.zip |
feat: first pass at TaskProcessing API
Signed-off-by: Marcel Klehr <mklehr@gmx.net>
29 files changed, 3307 insertions, 1 deletions
diff --git a/core/Migrations/Version30000Date20240429122720.php b/core/Migrations/Version30000Date20240429122720.php new file mode 100644 index 00000000000..1f53aacd66b --- /dev/null +++ b/core/Migrations/Version30000Date20240429122720.php @@ -0,0 +1,114 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license AGPL-3.0-or-later + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +namespace OC\Core\Migrations; + +use Closure; +use OCP\DB\ISchemaWrapper; +use OCP\DB\Types; +use OCP\Migration\IOutput; +use OCP\Migration\SimpleMigrationStep; + +/** + * + */ +class Version30000Date20240429122720 extends SimpleMigrationStep { + + /** + * @param IOutput $output + * @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper` + * @param array $options + * @return null|ISchemaWrapper + */ + public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper { + /** @var ISchemaWrapper $schema */ + $schema = $schemaClosure(); + + if (!$schema->hasTable('taskprocessing_tasks')) { + $table = $schema->createTable('taskprocessing_tasks'); + + $table->addColumn('id', Types::BIGINT, [ + 'notnull' => true, + 'length' => 64, + 'autoincrement' => true, + ]); + $table->addColumn('type', Types::STRING, [ + 'notnull' => true, + 'length' => 255, + ]); + $table->addColumn('input', Types::TEXT, [ + 'notnull' => true, + ]); + $table->addColumn('output', Types::TEXT, [ + 'notnull' => false, + ]); + $table->addColumn('status', Types::INTEGER, [ + 'notnull' => false, + 'length' => 6, + 'default' => 0, + ]); + $table->addColumn('user_id', Types::STRING, [ + 'notnull' => false, + 'length' => 64, + ]); + $table->addColumn('app_id', Types::STRING, [ + 'notnull' => true, + 'length' => 32, + 'default' => '', + ]); + $table->addColumn('identifier', Types::STRING, [ + 'notnull' => true, + 'length' => 255, + 'default' => '', + ]); + $table->addColumn('last_updated', Types::INTEGER, [ + 'notnull' => false, + 'length' => 4, + 'default' => 0, + 'unsigned' => true, + ]); + $table->addColumn('completion_expected_at', Types::DATETIME, [ + 'notnull' => false, + ]); + $table->addColumn('progress', Types::FLOAT, [ + 'notnull' => false, + ]); + $table->addColumn('error_message', Types::STRING, [ + 'notnull' => false, + 'length' => 255, + ]); + + $table->setPrimaryKey(['id'], 'tasks_id_index'); + $table->addIndex(['status', 'type'], 'tasks_status_type'); + $table->addIndex(['last_updated'], 'tasks_updated'); + $table->addIndex(['user_id', 'app_id', 'identifier'], 'tasks_uid_appid_ident'); + + return $schema; + } + + return null; + } +} diff --git a/lib/private/AppFramework/Bootstrap/RegistrationContext.php b/lib/private/AppFramework/Bootstrap/RegistrationContext.php index b1b2c57da55..31f3dd7e4d2 100644 --- a/lib/private/AppFramework/Bootstrap/RegistrationContext.php +++ b/lib/private/AppFramework/Bootstrap/RegistrationContext.php @@ -163,6 +163,12 @@ class RegistrationContext { /** @var ServiceRegistration<ITeamResourceProvider>[] */ private array $teamResourceProviders = []; + /** @var ServiceRegistration<\OCP\TaskProcessing\IProvider>[] */ + private $taskProcessingProviders = []; + + /** @var ServiceRegistration<\OCP\TaskProcessing\ITaskType>[] */ + private $taskProcessingTaskTypes = []; + public function __construct(LoggerInterface $logger) { $this->logger = $logger; } @@ -411,6 +417,20 @@ class RegistrationContext { $declarativeSettingsClass ); } + + public function registerTaskProcessingProvider(string $taskProcessingProviderClass): void { + $this->context->registerTaskProcessingProvider( + $this->appId, + $taskProcessingProviderClass + ); + } + + public function registerTaskProcessingTaskType(string $taskProcessingTaskTypeClass): void { + $this->context->registerTaskProcessingTaskType( + $this->appId, + $taskProcessingTaskTypeClass + ); + } }; } @@ -591,6 +611,20 @@ class RegistrationContext { } /** + * @psalm-param class-string<\OCP\TaskProcessing\IProvider> $declarativeSettingsClass + */ + public function registerTaskProcessingProvider(string $appId, string $taskProcessingProviderClass): void { + $this->taskProcessingProviders[] = new ServiceRegistration($appId, $taskProcessingProviderClass); + } + + /** + * @psalm-param class-string<\OCP\TaskProcessing\ITaskType> $declarativeSettingsClass + */ + public function registerTaskProcessingTaskType(string $appId, string $taskProcessingTaskTypeClass) { + $this->taskProcessingTaskTypes[] = new ServiceRegistration($appId, $taskProcessingTaskTypeClass); + } + + /** * @param App[] $apps */ public function delegateCapabilityRegistrations(array $apps): void { @@ -920,4 +954,18 @@ class RegistrationContext { public function getDeclarativeSettings(): array { return $this->declarativeSettings; } + + /** + * @return ServiceRegistration<\OCP\TaskProcessing\IProvider>[] + */ + public function getTaskProcessingProviders(): array { + return $this->taskProcessingProviders; + } + + /** + * @return ServiceRegistration<\OCP\TaskProcessing\ITaskType>[] + */ + public function getTaskProcessingTaskTypes(): array { + return $this->taskProcessingTaskTypes; + } } diff --git a/lib/private/Files/Node/Folder.php b/lib/private/Files/Node/Folder.php index 52e7b55676a..5dc9b41c10b 100644 --- a/lib/private/Files/Node/Folder.php +++ b/lib/private/Files/Node/Folder.php @@ -317,7 +317,7 @@ class Folder extends Node implements \OCP\Files\Folder { return current($this->getById($id)) ?: null; } - protected function getAppDataDirectoryName(): string { + public function getAppDataDirectoryName(): string { $instanceId = \OC::$server->getConfig()->getSystemValueString('instanceid'); return 'appdata_' . $instanceId; } diff --git a/lib/private/TaskProcessing/Db/Task.php b/lib/private/TaskProcessing/Db/Task.php new file mode 100644 index 00000000000..3712d0ac422 --- /dev/null +++ b/lib/private/TaskProcessing/Db/Task.php @@ -0,0 +1,134 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2023 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OC\TaskProcessing\Db; + +use OCP\AppFramework\Db\Entity; +use OCP\TaskProcessing\Task as OCPTask; + +/** + * @method setType(string $type) + * @method string getType() + * @method setLastUpdated(int $lastUpdated) + * @method int getLastUpdated() + * @method setStatus(int $status) + * @method int getStatus() + * @method setOutput(string $output) + * @method string getOutput() + * @method setInput(string $input) + * @method string getInput() + * @method setUserId(?string $userId) + * @method string|null getUserId() + * @method setAppId(string $type) + * @method string getAppId() + * @method setIdentifier(string $identifier) + * @method string getIdentifier() + * @method setCompletionExpectedAt(null|\DateTime $completionExpectedAt) + * @method null|\DateTime getCompletionExpectedAt() + * @method setErrorMessage(null|string $error) + * @method null|string getErrorMessage() + * @method setProgress(null|float $progress) + * @method null|float getProgress() + */ +class Task extends Entity { + protected $lastUpdated; + protected $type; + protected $input; + protected $output; + protected $status; + protected $userId; + protected $appId; + protected $identifier; + protected $completionExpectedAt; + protected $errorMessage; + protected $progress; + + /** + * @var string[] + */ + public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'identifier', 'completion_expected_at', 'error_message', 'progress']; + + /** + * @var string[] + */ + public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'identifier', 'completionExpectedAt', 'errorMessage', 'progress']; + + + public function __construct() { + // add types in constructor + $this->addType('id', 'integer'); + $this->addType('lastUpdated', 'integer'); + $this->addType('type', 'string'); + $this->addType('input', 'string'); + $this->addType('output', 'string'); + $this->addType('status', 'integer'); + $this->addType('userId', 'string'); + $this->addType('appId', 'string'); + $this->addType('identifier', 'string'); + $this->addType('completionExpectedAt', 'datetime'); + $this->addType('errorMessage', 'string'); + $this->addType('progress', 'float'); + } + + public function toRow(): array { + return array_combine(self::$columns, array_map(function ($field) { + return $this->{'get'.ucfirst($field)}(); + }, self::$fields)); + } + + public static function fromPublicTask(OCPTask $task): Task { + /** @var Task $taskEntity */ + $taskEntity = Task::fromParams([ + 'id' => $task->getId(), + 'type' => $task->getTaskType(), + 'lastUpdated' => time(), + 'status' => $task->getStatus(), + 'input' => json_encode($task->getInput(), JSON_THROW_ON_ERROR), + 'output' => json_encode($task->getOutput(), JSON_THROW_ON_ERROR), + 'errorMessage' => $task->getErrorMessage(), + 'userId' => $task->getUserId(), + 'appId' => $task->getAppId(), + 'identifier' => $task->getIdentifier(), + 'completionExpectedAt' => $task->getCompletionExpectedAt(), + 'progress' => $task->getProgress(), + ]); + return $taskEntity; + } + + /** + * @return OCPTask + * @throws \JsonException + */ + public function toPublicTask(): OCPTask { + $task = new OCPTask($this->getType(), json_decode($this->getInput(), true, 512, JSON_THROW_ON_ERROR), $this->getAppId(), $this->getuserId(), $this->getIdentifier()); + $task->setId($this->getId()); + $task->setStatus($this->getStatus()); + $task->setOutput(json_decode($this->getOutput(), true, 512, JSON_THROW_ON_ERROR)); + $task->setCompletionExpectedAt($this->getCompletionExpectedAt()); + $task->setErrorMessage($this->getErrorMessage()); + $task->setProgress($this->getProgress()); + return $task; + } +} diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php new file mode 100644 index 00000000000..a1cc3d1409a --- /dev/null +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -0,0 +1,138 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2023 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OC\TaskProcessing\Db; + +use OCP\AppFramework\Db\DoesNotExistException; +use OCP\AppFramework\Db\Entity; +use OCP\AppFramework\Db\MultipleObjectsReturnedException; +use OCP\AppFramework\Db\QBMapper; +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\DB\Exception; +use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IDBConnection; + +/** + * @extends QBMapper<Task> + */ +class TaskMapper extends QBMapper { + public function __construct( + IDBConnection $db, + private ITimeFactory $timeFactory, + ) { + parent::__construct($db, 'taskprocessing_tasks', Task::class); + } + + /** + * @param int $id + * @return Task + * @throws Exception + * @throws DoesNotExistException + * @throws MultipleObjectsReturnedException + */ + public function find(int $id): Task { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::$columns) + ->from($this->tableName) + ->where($qb->expr()->eq('id', $qb->createPositionalParameter($id))); + return $this->findEntity($qb); + } + + /** + * @param string|null $taskType + * @return Task + * @throws DoesNotExistException + * @throws Exception + */ + public function findOldestScheduledByType(?string $taskType): Task { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::$columns) + ->from($this->tableName) + ->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT))) + ->setMaxResults(1) + ->orderBy('last_updated', 'ASC'); + if ($taskType !== null) { + $qb->andWhere($qb->expr()->eq('type', $qb->createPositionalParameter($taskType))); + } + return $this->findEntity($qb); + } + + /** + * @param int $id + * @param string|null $userId + * @return Task + * @throws DoesNotExistException + * @throws Exception + * @throws MultipleObjectsReturnedException + */ + public function findByIdAndUser(int $id, ?string $userId): Task { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::$columns) + ->from($this->tableName) + ->where($qb->expr()->eq('id', $qb->createPositionalParameter($id))); + if ($userId === null) { + $qb->andWhere($qb->expr()->isNull('user_id')); + } else { + $qb->andWhere($qb->expr()->eq('user_id', $qb->createPositionalParameter($userId))); + } + return $this->findEntity($qb); + } + + /** + * @param string $userId + * @param string $appId + * @param string|null $identifier + * @return array + * @throws Exception + */ + public function findUserTasksByApp(string $userId, string $appId, ?string $identifier = null): array { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::$columns) + ->from($this->tableName) + ->where($qb->expr()->eq('user_id', $qb->createPositionalParameter($userId))) + ->andWhere($qb->expr()->eq('app_id', $qb->createPositionalParameter($appId))); + if ($identifier !== null) { + $qb->andWhere($qb->expr()->eq('identifier', $qb->createPositionalParameter($identifier))); + } + return $this->findEntities($qb); + } + + /** + * @param int $timeout + * @return int the number of deleted tasks + * @throws Exception + */ + public function deleteOlderThan(int $timeout): int { + $qb = $this->db->getQueryBuilder(); + $qb->delete($this->tableName) + ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter(time() - $timeout))); + return $qb->executeStatement(); + } + + public function update(Entity $entity): Entity { + $entity->setLastUpdated($this->timeFactory->now()->getTimestamp()); + return parent::update($entity); + } +} diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php new file mode 100644 index 00000000000..08cf9679087 --- /dev/null +++ b/lib/private/TaskProcessing/Manager.php @@ -0,0 +1,890 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OC\TaskProcessing; + +use OC\AppFramework\Bootstrap\Coordinator; +use OC\TaskProcessing\Db\TaskMapper; +use OCP\AppFramework\Db\DoesNotExistException; +use OCP\AppFramework\Db\MultipleObjectsReturnedException; +use OCP\BackgroundJob\IJobList; +use OCP\EventDispatcher\IEventDispatcher; +use OCP\Files\AppData\IAppDataFactory; +use OCP\Files\File; +use OCP\Files\Folder; +use OCP\Files\GenericFileException; +use OCP\Files\IAppData; +use OCP\Files\IRootFolder; +use OCP\Files\NotPermittedException; +use OCP\IServerContainer; +use OCP\Lock\LockedException; +use OCP\PreConditionNotMetException; +use OCP\SpeechToText\ISpeechToTextProvider; +use OCP\SpeechToText\ISpeechToTextProviderWithId; +use OCP\SpeechToText\ISpeechToTextProviderWithUserId; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\Events\TaskFailedEvent; +use OCP\TaskProcessing\Events\TaskSuccessfulEvent; +use OCP\TaskProcessing\Exception\Exception; +use OCP\TaskProcessing\Exception\NotFoundException; +use OCP\TaskProcessing\Exception\ProcessingException; +use OCP\TaskProcessing\Exception\ValidationException; +use OCP\TaskProcessing\IManager; +use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProvider; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; +use OCP\TaskProcessing\Task; +use OCP\TaskProcessing\TaskTypes\AudioToText; +use OCP\TaskProcessing\TaskTypes\TextToImage; +use OCP\TaskProcessing\TaskTypes\TextToText; +use OCP\TaskProcessing\TaskTypes\TextToTextHeadline; +use OCP\TaskProcessing\TaskTypes\TextToTextSummary; +use OCP\TaskProcessing\TaskTypes\TextToTextTopics; +use Psr\Log\LoggerInterface; + +class Manager implements IManager { + + public const LEGACY_PREFIX_TEXTPROCESSING = 'legacy:TextProcessing:'; + public const LEGACY_PREFIX_TEXTTOIMAGE = 'legacy:TextToImage:'; + public const LEGACY_PREFIX_SPEECHTOTEXT = 'legacy:SpeechToText:'; + + /** @var |null */ + private ?array $providers = null; + + /** @var array<string,array{name: string, description: string, inputShape: array<string, ShapeDescriptor>, optionalInputShape: array<string, ShapeDescriptor>, outputShape: array<string, ShapeDescriptor>, optionalOutputShape: array<string, ShapeDescriptor>}>|null */ + private ?array $availableTaskTypes = null; + + private IAppData $appData; + + public function __construct( + private Coordinator $coordinator, + private IServerContainer $serverContainer, + private LoggerInterface $logger, + private TaskMapper $taskMapper, + private IJobList $jobList, + private IEventDispatcher $dispatcher, + IAppDataFactory $appDataFactory, + private IRootFolder $rootFolder, + private \OCP\TextProcessing\IManager $textProcessingManager, + private \OCP\TextToImage\IManager $textToImageManager, + private \OCP\SpeechToText\ISpeechToTextManager $speechToTextManager, + ) { + $this->appData = $appDataFactory->get('core'); + } + + /** + * @return IProvider[] + */ + private function _getTextProcessingProviders(): array { + $oldProviders = $this->textProcessingManager->getProviders(); + $newProviders = []; + foreach ($oldProviders as $oldProvider) { + $provider = new class($oldProvider) implements IProvider, ISynchronousProvider { + private \OCP\TextProcessing\IProvider $provider; + + public function __construct(\OCP\TextProcessing\IProvider $provider) { + $this->provider = $provider; + } + + public function getId(): string { + if ($this->provider instanceof \OCP\TextProcessing\IProviderWithId) { + return $this->provider->getId(); + } + return Manager::LEGACY_PREFIX_TEXTPROCESSING . $this->provider::class; + } + + public function getName(): string { + return $this->provider->getName(); + } + + public function getTaskType(): string { + return match ($this->provider->getTaskType()) { + \OCP\TextProcessing\FreePromptTaskType::class => TextToText::ID, + \OCP\TextProcessing\HeadlineTaskType::class => TextToTextHeadline::ID, + \OCP\TextProcessing\TopicsTaskType::class => TextToTextTopics::ID, + \OCP\TextProcessing\SummaryTaskType::class => TextToTextSummary::ID, + default => Manager::LEGACY_PREFIX_TEXTPROCESSING . $this->provider->getTaskType(), + }; + } + + public function getExpectedRuntime(): int { + if ($this->provider instanceof \OCP\TextProcessing\IProviderWithExpectedRuntime) { + return $this->provider->getExpectedRuntime(); + } + return 60; + } + + public function getOptionalInputShape(): array { + return []; + } + + public function getOptionalOutputShape(): array { + return []; + } + + public function process(?string $userId, array $input): array { + if ($this->provider instanceof \OCP\TextProcessing\IProviderWithUserId) { + $this->provider->setUserId($userId); + } + try { + return ['output' => $this->provider->process($input['input'])]; + } catch(\RuntimeException $e) { + throw new ProcessingException($e->getMessage(), 0, $e); + } + } + }; + $newProviders[$provider->getId()] = $provider; + } + + return $newProviders; + } + + /** + * @return IProvider[] + */ + private function _getTextProcessingTaskTypes(): array { + $oldProviders = $this->textProcessingManager->getProviders(); + $newTaskTypes = []; + foreach ($oldProviders as $oldProvider) { + // These are already implemented in the TaskProcessing realm + if (in_array($oldProvider->getTaskType(), [ + \OCP\TextProcessing\FreePromptTaskType::class, + \OCP\TextProcessing\HeadlineTaskType::class, + \OCP\TextProcessing\TopicsTaskType::class, + \OCP\TextProcessing\SummaryTaskType::class + ], true)) { + continue; + } + $taskType = new class($oldProvider->getTaskType()) implements ITaskType { + private string $oldTaskTypeClass; + private \OCP\TextProcessing\ITaskType $oldTaskType; + + public function __construct(string $oldTaskTypeClass) { + $this->oldTaskTypeClass = $oldTaskTypeClass; + $this->oldTaskType = \OCP\Server::get($oldTaskTypeClass); + } + + public function getId(): string { + return Manager::LEGACY_PREFIX_TEXTPROCESSING . $this->oldTaskTypeClass; + } + + public function getName(): string { + return $this->oldTaskType->getName(); + } + + public function getDescription(): string { + return $this->oldTaskType->getDescription(); + } + + public function getInputShape(): array { + return ['input' => EShapeType::Text]; + } + + public function getOutputShape(): array { + return ['output' => EShapeType::Text]; + } + }; + $newTaskTypes[$taskType->getId()] = $taskType; + } + + return $newTaskTypes; + } + + /** + * @return IProvider[] + */ + private function _getTextToImageProviders(): array { + $oldProviders = $this->textToImageManager->getProviders(); + $newProviders = []; + foreach ($oldProviders as $oldProvider) { + $newProvider = new class($oldProvider, $this->appData) implements IProvider, ISynchronousProvider { + private \OCP\TextToImage\IProvider $provider; + private IAppData $appData; + + public function __construct(\OCP\TextToImage\IProvider $provider, IAppData $appData) { + $this->provider = $provider; + $this->appData = $appData; + } + + public function getId(): string { + return Manager::LEGACY_PREFIX_TEXTTOIMAGE . $this->provider->getId(); + } + + public function getName(): string { + return $this->provider->getName(); + } + + public function getTaskType(): string { + return TextToImage::ID; + } + + public function getExpectedRuntime(): int { + return $this->provider->getExpectedRuntime(); + } + + public function getOptionalInputShape(): array { + return []; + } + + public function getOptionalOutputShape(): array { + return []; + } + + public function process(?string $userId, array $input): array { + try { + $folder = $this->appData->getFolder('text2image'); + } catch(\OCP\Files\NotFoundException) { + $folder = $this->appData->newFolder('text2image'); + } + try { + $folder = $folder->getFolder((string) rand(1, 100000)); + } catch(\OCP\Files\NotFoundException) { + $folder = $folder->newFolder((string) rand(1, 100000)); + } + $resources = []; + $files = []; + for ($i = 0; $i < $input['numberOfImages']; $i++) { + $file = $folder->newFile((string) $i); + $files[] = $file; + $resource = $file->write(); + if ($resource !== false && $resource !== true && is_resource($resource)) { + $resources[] = $resource; + } else { + throw new ProcessingException('Text2Image generation using provider "' . $this->getName() . '" failed: Couldn\'t open file to write.'); + } + } + if ($this->provider instanceof \OCP\TextToImage\IProviderWithUserId) { + $this->provider->setUserId($userId); + } + try { + $this->provider->generate($input['input'], $resources); + }catch (\RuntimeException $e) { + throw new ProcessingException($e->getMessage(), 0, $e); + } + return ['images' => array_map(fn(File $file) => base64_encode($file->getContent()), $files)]; + } + }; + $newProviders[$newProvider->getId()] = $newProvider; + } + + return $newProviders; + } + + + /** + * @return IProvider[] + */ + private function _getSpeechToTextProviders(): array { + $oldProviders = $this->speechToTextManager->getProviders(); + $newProviders = []; + foreach ($oldProviders as $oldProvider) { + $newProvider = new class($oldProvider, $this->rootFolder, $this->appData) implements IProvider, ISynchronousProvider { + private ISpeechToTextProvider $provider; + private IAppData $appData; + + public function __construct(ISpeechToTextProvider $provider, IRootFolder $rootFolder, IAppData $appData) { + $this->provider = $provider; + $this->rootFolder = $rootFolder; + $this->appData = $appData; + } + + public function getId(): string { + if ($this->provider instanceof ISpeechToTextProviderWithId) { + return Manager::LEGACY_PREFIX_SPEECHTOTEXT . $this->provider->getId(); + } + return Manager::LEGACY_PREFIX_SPEECHTOTEXT . $this->provider::class; + } + + public function getName(): string { + return $this->provider->getName(); + } + + public function getTaskType(): string { + return AudioToText::ID; + } + + public function getExpectedRuntime(): int { + return 60; + } + + public function getOptionalInputShape(): array { + return []; + } + + public function getOptionalOutputShape(): array { + return []; + } + + public function process(?string $userId, array $input): array { + try { + $folder = $this->appData->getFolder('audio2text'); + } catch(\OCP\Files\NotFoundException) { + $folder = $this->appData->newFolder('audio2text'); + } + try { + $folder = $folder->getFolder((string) rand(1, 100000)); + } catch(\OCP\Files\NotFoundException) { + $folder = $folder->newFolder((string) rand(1, 100000)); + } + $simpleFile = $folder->newFile((string) rand(0, 100000), base64_decode($input['input'])); + $id = $simpleFile->getId(); + /** @var File $file */ + $file = current($this->rootFolder->getById($id)); + if ($this->provider instanceof ISpeechToTextProviderWithUserId) { + $this->provider->setUserId($userId); + } + try { + $result = $this->provider->transcribeFile($file); + }catch (\RuntimeException $e) { + throw new ProcessingException($e->getMessage(), 0, $e); + } + return ['output' => $result]; + } + }; + $newProviders[$newProvider->getId()] = $newProvider; + } + + return $newProviders; + } + + /** + * @return IProvider[] + */ + private function _getProviders(): array { + $context = $this->coordinator->getRegistrationContext(); + + if ($context === null) { + return []; + } + + $providers = []; + + foreach ($context->getTaskProcessingProviders() as $providerServiceRegistration) { + $class = $providerServiceRegistration->getService(); + try { + /** @var IProvider $provider */ + $provider = $this->serverContainer->get($class); + $providers[$provider->getId()] = $provider; + } catch (\Throwable $e) { + $this->logger->error('Failed to load task processing provider ' . $class, [ + 'exception' => $e, + ]); + } + } + + $providers += $this->_getTextProcessingProviders() + $this->_getTextToImageProviders() + $this->_getSpeechToTextProviders(); + + return $providers; + } + + /** + * @return ITaskType[] + */ + private function _getTaskTypes(): array { + $context = $this->coordinator->getRegistrationContext(); + + if ($context === null) { + return []; + } + + // Default task types + $taskTypes = [ + \OCP\TaskProcessing\TaskTypes\TextToText::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToText::class), + \OCP\TaskProcessing\TaskTypes\TextToTextTopics::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextTopics::class), + \OCP\TaskProcessing\TaskTypes\TextToTextHeadline::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextHeadline::class), + \OCP\TaskProcessing\TaskTypes\TextToTextSummary::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToTextSummary::class), + \OCP\TaskProcessing\TaskTypes\TextToImage::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\TextToImage::class), + \OCP\TaskProcessing\TaskTypes\AudioToText::ID => \OCP\Server::get(\OCP\TaskProcessing\TaskTypes\AudioToText::class), + ]; + + foreach ($context->getTaskProcessingTaskTypes() as $providerServiceRegistration) { + $class = $providerServiceRegistration->getService(); + try { + /** @var ITaskType $provider */ + $taskType = $this->serverContainer->get($class); + $taskTypes[$taskType->getId()] = $taskType; + } catch (\Throwable $e) { + $this->logger->error('Failed to load task processing task type ' . $class, [ + 'exception' => $e, + ]); + } + } + + $taskTypes += $this->_getTextProcessingTaskTypes(); + + return $taskTypes; + } + + /** + * @param string $taskType + * @return IProvider + * @throws \OCP\TaskProcessing\Exception\Exception + */ + private function _getPreferredProvider(string $taskType){ + $providers = $this->getProviders(); + foreach ($providers as $provider) { + if ($provider->getTaskType() === $taskType) { + return $provider; + } + } + throw new \OCP\TaskProcessing\Exception\Exception('No matching provider found'); + } + + /** + * @param array<string, ShapeDescriptor> $spec + * @param array<string, mixed> $io + * @return void + * @throws ValidationException + */ + private function validateInput(array $spec, array $io, bool $optional = false) { + foreach ($spec as $key => $descriptor) { + $type = $descriptor->getShapeType(); + if (!isset($io[$key])) { + if ($optional) { + continue; + } + throw new \OCP\TaskProcessing\Exception\ValidationException('Missing key: "' . $key . '"'); + } + if ($type === EShapeType::Text && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('Non-text item provided for Text key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfTexts && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-text list item provided for ListOfTexts key: "' . $key . '"'); + } + if ($type === EShapeType::Number && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-numeric item provided for Number key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfNumbers && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-numeric list item provided for ListOfNumbers key: "' . $key . '"'); + } + if ($type === EShapeType::Image && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-image item provided for Image key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfImages && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-image list item provided for ListOfImages key: "' . $key . '"'); + } + if ($type === EShapeType::Audio && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio item provided for Audio key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfAudio && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio list item provided for ListOfAudio key: "' . $key . '"'); + } + if ($type === EShapeType::Video && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-video item provided for Video key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfVideo && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-video list item provided for ListOfTexts key: "' . $key . '"'); + } + if ($type === EShapeType::File && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-file item provided for File key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfFiles && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio list item provided for ListOfFiles key: "' . $key . '"'); + } + } + } + + /** + * @param array<string, ShapeDescriptor> $spec + * @param array $io + * @return void + * @throws ValidationException + */ + private function validateOutput(array $spec, array $io, bool $optional = false) { + foreach ($spec as $key => $descriptor) { + $type = $descriptor->getShapeType(); + if (!isset($io[$key])) { + if ($optional) { + continue; + } + throw new \OCP\TaskProcessing\Exception\ValidationException('Missing key: "' . $key . '"'); + } + if ($type === EShapeType::Text && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('Non-text item provided for Text key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfTexts && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-text list item provided for ListOfTexts key: "' . $key . '"'); + } + if ($type === EShapeType::Number && !is_numeric($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-numeric item provided for Number key: "' . $key . '"'); + } + if ($type === EShapeType::ListOfNumbers && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_numeric($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-numeric list item provided for ListOfNumbers key: "' . $key . '"'); + } + if ($type === EShapeType::Image && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-image item provided for Image key: "' . $key . '". Expecting base64 encoded image data.'); + } + if ($type === EShapeType::ListOfImages && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-image list item provided for ListOfImages key: "' . $key . '". Expecting base64 encoded image data.'); + } + if ($type === EShapeType::Audio && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio item provided for Audio key: "' . $key . '". Expecting base64 encoded audio data.'); + } + if ($type === EShapeType::ListOfAudio && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio list item provided for ListOfAudio key: "' . $key . '". Expecting base64 encoded audio data.'); + } + if ($type === EShapeType::Video && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-video item provided for Video key: "' . $key . '". Expecting base64 encoded video data.'); + } + if ($type === EShapeType::ListOfVideo && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-video list item provided for ListOfTexts key: "' . $key . '". Expecting base64 encoded video data.'); + } + if ($type === EShapeType::File && !is_string($io[$key])) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-file item provided for File key: "' . $key . '". Expecting base64 encoded file data.'); + } + if ($type === EShapeType::ListOfFiles && (!is_array($io[$key]) || count(array_filter($io[$key], fn($item) => !is_string($item))) > 0)) { + throw new \OCP\TaskProcessing\Exception\ValidationException('None-audio list item provided for ListOfFiles key: "' . $key . '". Expecting base64 encoded image data.'); + } + } + } + + /** + * @param array<string,mixed> $array The array to filter + * @param array<string, mixed> ...$specs the specs that define which keys to keep + * @return array<string, mixed> + */ + private function removeSuperfluousArrayKeys(array $array, ...$specs): array { + $keys = array_unique(array_reduce($specs, fn($carry, $spec) => $carry + array_keys($spec), [])); + $values = array_map(fn(string $key) => $array[$key], $keys); + return array_combine($keys, $values); + } + + public function hasProviders(): bool { + return count($this->getProviders()) !== 0; + } + + public function getProviders(): array { + if ($this->providers === null) { + $this->providers = $this->_getProviders(); + } + + return $this->providers; + } + + public function getAvailableTaskTypes(): array { + if ($this->availableTaskTypes === null) { + $taskTypes = $this->_getTaskTypes(); + $providers = $this->getProviders(); + + $availableTaskTypes = []; + foreach ($providers as $provider) { + if (!isset($taskTypes[$provider->getTaskType()])) { + continue; + } + $taskType = $taskTypes[$provider->getTaskType()]; + $availableTaskTypes[$provider->getTaskType()] = [ + 'name' => $taskType->getName(), + 'description' => $taskType->getDescription(), + 'inputShape' => $taskType->getInputShape(), + 'optionalInputShape' => $provider->getOptionalInputShape(), + 'outputShape' => $taskType->getOutputShape(), + 'optionalOutputShape' => $provider->getOptionalOutputShape(), + ]; + } + + $this->availableTaskTypes = $availableTaskTypes; + } + + return $this->availableTaskTypes; + } + + public function canHandleTask(Task $task): bool { + return isset($this->getAvailableTaskTypes()[$task->getTaskType()]); + } + + public function scheduleTask(Task $task): void { + if (!$this->canHandleTask($task)) { + throw new PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskType()); + } + $taskTypes = $this->getAvailableTaskTypes(); + $inputShape = $taskTypes[$task->getTaskType()]['inputShape']; + $optionalInputShape = $taskTypes[$task->getTaskType()]['optionalInputShape']; + // validate input + $this->validateInput($inputShape, $task->getInput()); + $this->validateInput($optionalInputShape, $task->getInput(), true); + // remove superfluous keys and set input + $task->setInput($this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape)); + $task->setStatus(Task::STATUS_SCHEDULED); + $provider = $this->_getPreferredProvider($task->getTaskType()); + // calculate expected completion time + $completionExpectedAt = new \DateTime('now'); + $completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S')); + $task->setCompletionExpectedAt($completionExpectedAt); + // create a db entity and insert into db table + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + $this->taskMapper->insert($taskEntity); + // make sure the scheduler knows the id + $task->setId($taskEntity->getId()); + // schedule synchronous job if the provider is synchronous + if ($provider instanceof ISynchronousProvider) { + $this->jobList->add(SynchronousBackgroundJob::class, null); + } + } + + public function deleteTask(Task $task): void { + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + $this->taskMapper->delete($taskEntity); + } + + public function getTask(int $id): Task { + try { + $taskEntity = $this->taskMapper->find($id); + return $taskEntity->toPublicTask(); + } catch (DoesNotExistException $e) { + throw new NotFoundException('Couldn\'t find task with id ' . $id, 0, $e); + } catch (MultipleObjectsReturnedException|\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } catch (\JsonException $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', 0, $e); + } + } + + public function cancelTask(int $id): void { + $task = $this->getTask($id); + $task->setStatus(Task::STATUS_CANCELLED); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + } + + public function setTaskProgress(int $id, float $progress): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() === Task::STATUS_CANCELLED) { + return false; + } + $task->setStatus(Task::STATUS_RUNNING); + $task->setProgress($progress); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + + public function setTaskResult(int $id, ?string $error, ?array $result): void { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() === Task::STATUS_CANCELLED) { + $this->logger->info('A TaskProcessing ' . $task->getTaskType() . ' task with id ' . $id . ' finished but was cancelled in the mean time. Moving on without storing result.'); + return; + } + if ($error !== null) { + $task->setStatus(Task::STATUS_FAILED); + $task->setErrorMessage($error); + $this->logger->warning('A TaskProcessing ' . $task->getTaskType() . ' task with id ' . $id . ' failed with the following message: ' . $error); + } else if ($result !== null) { + $taskTypes = $this->getAvailableTaskTypes(); + $outputShape = $taskTypes[$task->getTaskType()]['outputShape']; + $optionalOutputShape = $taskTypes[$task->getTaskType()]['optionalOutputShape']; + try { + // validate output + $this->validateOutput($outputShape, $result); + $this->validateOutput($optionalOutputShape, $result, true); + $output = $this->removeSuperfluousArrayKeys($result, $outputShape, $optionalOutputShape); + // extract base64 data and put it in files, replace it with file ids + $output = $this->encapsulateInputOutputFileData($output, $outputShape, $optionalOutputShape); + $task->setOutput($output); + $task->setProgress(1); + $task->setStatus(Task::STATUS_SUCCESSFUL); + } catch (ValidationException $e) { + $task->setProgress(1); + $task->setStatus(Task::STATUS_FAILED); + $error = 'The task was processed successfully but the provider\'s output doesn\'t pass validation against the task type\'s outputShape spec and/or the provider\'s own optionalOutputShape spec'; + $task->setErrorMessage($error); + $this->logger->error($error, ['exception' => $e]); + } catch (NotPermittedException $e) { + $task->setProgress(1); + $task->setStatus(Task::STATUS_FAILED); + $error = 'The task was processed successfully but storing the output in a file failed'; + $task->setErrorMessage($error); + $this->logger->error($error, ['exception' => $e]); + + } + } + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + if ($task->getStatus() === Task::STATUS_SUCCESSFUL) { + $event = new TaskSuccessfulEvent($task); + }else{ + $event = new TaskFailedEvent($task, $error); + } + $this->dispatcher->dispatchTyped($event); + } + + public function getNextScheduledTask(?string $taskTypeId = null): Task { + try { + $taskEntity = $this->taskMapper->findOldestScheduledByType($taskTypeId); + return $taskEntity->toPublicTask(); + } catch (DoesNotExistException $e) { + throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', 0, $e); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } catch (\JsonException $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', 0, $e); + } + } + + public function getUserTask(int $id, ?string $userId): Task { + try { + $taskEntity = $this->taskMapper->findByIdAndUser($id, $userId); + return $taskEntity->toPublicTask(); + } catch (DoesNotExistException $e) { + throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', 0, $e); + } catch (MultipleObjectsReturnedException|\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } catch (\JsonException $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', 0, $e); + } + } + + public function getUserTasksByApp(?string $userId, string $appId, ?string $identifier = null): array { + try { + $taskEntities = $this->taskMapper->findUserTasksByApp($userId, $appId, $identifier); + return array_map(fn($taskEntity) => $taskEntity->toPublicTask(), $taskEntities); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding a task', 0, $e); + } catch (\JsonException $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding a task', 0, $e); + } + } + + /** + * Takes task input or output data and replaces fileIds with base64 data + * + * @param array<string, ShapeDescriptor> ...$specs the specs + * @param array $inputOutput + * @return array<string, mixed> + * @throws GenericFileException + * @throws LockedException + * @throws NotPermittedException + * @throws ValidationException + */ + public function fillInputOutputFileData(array $inputOutput, ...$specs): array { + $newInputOutput = []; + $spec = array_reduce($specs, fn($carry, $spec) => $carry + $spec, []); + foreach($spec as $key => $descriptor) { + $type = $descriptor->getShapeType(); + if (!isset($inputOutput[$key])) { + continue; + } + if (!in_array(EShapeType::from($type->value % 10), [EShapeType::Image, EShapeType::Audio, EShapeType::Video, EShapeType::File], true)) { + $newInputOutput[$key] = $inputOutput[$key]; + continue; + } + if ($type->value < 10) { + $node = $this->rootFolder->getFirstNodeById((int)$inputOutput[$key]); + if ($node === null) { + $node = $this->rootFolder->getFirstNodeByIdInPath((int)$inputOutput[$key], '/' . $this->rootFolder->getAppDataDirectoryName() . '/'); + if (!$node instanceof File) { + throw new ValidationException('File id given for key "' . $key . '" is not a file'); + } + } else if (!$node instanceof File) { + throw new ValidationException('File id given for key "' . $key . '" is not a file'); + } + // TODO: Validate if userId has access to this file + $newInputOutput[$key] = base64_encode($node->getContent()); + } else { + $newInputOutput[$key] = []; + foreach ($inputOutput[$key] as $item) { + $node = $this->rootFolder->getFirstNodeById((int)$inputOutput[$key]); + if ($node === null) { + $node = $this->rootFolder->getFirstNodeByIdInPath((int)$inputOutput[$key], '/' . $this->rootFolder->getAppDataDirectoryName() . '/'); + if (!$node instanceof File) { + throw new ValidationException('File id given for key "' . $key . '" is not a file'); + } + } else if (!$node instanceof File) { + throw new ValidationException('File id given for key "' . $key . '" is not a file'); + } + // TODO: Validate if userId has access to this file + $newInputOutput[$key][] = base64_encode($node->getContent()); + } + } + } + return $newInputOutput; + } + + /** + *Takes task input or output and replaces base64 data with file ids + * + * @param array<string, mixed> $inputOutput + * @param array<string, ShapeDescriptor> ...$specs the specs that define which keys to keep + * @return array<string, mixed> + * @throws NotPermittedException + */ + public function encapsulateInputOutputFileData(array $inputOutput, ...$specs): array { + $newInputOutput = []; + try { + $folder = $this->appData->getFolder('TaskProcessing'); + } catch (\OCP\Files\NotFoundException) { + $folder = $this->appData->newFolder('TaskProcessing'); + } + $spec = array_reduce($specs, fn($carry, $spec) => $carry + $spec, []); + foreach($spec as $key => $descriptor) { + $type = $descriptor->getShapeType(); + if (!isset($inputOutput[$key])) { + continue; + } + if (!in_array(EShapeType::from($type->value % 10), [EShapeType::Image, EShapeType::Audio, EShapeType::Video, EShapeType::File], true)) { + $newInputOutput[$key] = $inputOutput[$key]; + continue; + } + if ($type->value < 10) { + $file = $folder->newFile((string) rand(0, 10000000), base64_decode($inputOutput[$key])); + $newInputOutput[$key] = $file->getId(); + } else { + $newInputOutput = []; + foreach ($inputOutput[$key] as $item) { + $file = $folder->newFile((string) rand(0, 10000000), base64_decode($item)); + $newInputOutput[$key][] = $file->getId(); + } + } + } + return $newInputOutput; + } + + public function prepareInputData(Task $task): array { + $taskTypes = $this->getAvailableTaskTypes(); + $inputShape = $taskTypes[$task->getTaskType()]['inputShape']; + $optionalInputShape = $taskTypes[$task->getTaskType()]['optionalInputShape']; + $input = $task->getInput(); + // validate input, again for good measure (should have been validated in scheduleTask) + $this->validateInput($inputShape, $input); + $this->validateInput($optionalInputShape, $input, true); + $input = $this->removeSuperfluousArrayKeys($input, $inputShape, $optionalInputShape); + $input = $this->fillInputOutputFileData($input, $inputShape, $optionalInputShape); + return $input; + } +} diff --git a/lib/private/TaskProcessing/SynchronousBackgroundJob.php b/lib/private/TaskProcessing/SynchronousBackgroundJob.php new file mode 100644 index 00000000000..ab85d469089 --- /dev/null +++ b/lib/private/TaskProcessing/SynchronousBackgroundJob.php @@ -0,0 +1,85 @@ +<?php + +namespace OC\TaskProcessing; + +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\BackgroundJob\IJobList; +use OCP\BackgroundJob\QueuedJob; +use OCP\Files\GenericFileException; +use OCP\Files\NotPermittedException; +use OCP\Lock\LockedException; +use OCP\TaskProcessing\Exception\Exception; +use OCP\TaskProcessing\Exception\NotFoundException; +use OCP\TaskProcessing\Exception\ProcessingException; +use OCP\TaskProcessing\Exception\ValidationException; +use OCP\TaskProcessing\IManager; +use OCP\TaskProcessing\ISynchronousProvider; +use Psr\Log\LoggerInterface; + +class SynchronousBackgroundJob extends QueuedJob { + public function __construct( + ITimeFactory $timeFactory, + private readonly IManager $taskProcessingManager, + private readonly IJobList $jobList, + private readonly LoggerInterface $logger, + ) { + parent::__construct($timeFactory); + } + + + /** + * @inheritDoc + */ + protected function run($argument) { + $providers = $this->taskProcessingManager->getProviders(); + + foreach ($providers as $provider) { + if (!$provider instanceof ISynchronousProvider) { + continue; + } + $taskType = $provider->getTaskType(); + try { + $task = $this->taskProcessingManager->getNextScheduledTask($taskType); + } catch (NotFoundException $e) { + continue; + } catch (Exception $e) { + $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + continue; + } + try { + try { + $input = $this->taskProcessingManager->prepareInputData($task); + } catch (GenericFileException|NotPermittedException|LockedException|ValidationException $e) { + $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); + // Schedule again + $this->jobList->add(self::class, $argument); + return; + } + try { + $output = $provider->process($task->getUserId(), $input); + } catch (ProcessingException $e) { + $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]); + $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); + // Schedule again + $this->jobList->add(self::class, $argument); + return; + } catch (\Throwable $e) { + $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]); + $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null); + // Schedule again + $this->jobList->add(self::class, $argument); + return; + } + $this->taskProcessingManager->setTaskResult($task->getId(), null, $output); + } catch (NotFoundException $e) { + $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]); + } catch (Exception $e) { + $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]); + } + } + + // Schedule again + $this->jobList->add(self::class, $argument); + } +} diff --git a/lib/public/Files/SimpleFS/ISimpleFile.php b/lib/public/Files/SimpleFS/ISimpleFile.php index 8afc3108836..cf848d33724 100644 --- a/lib/public/Files/SimpleFS/ISimpleFile.php +++ b/lib/public/Files/SimpleFS/ISimpleFile.php @@ -121,4 +121,12 @@ interface ISimpleFile { * @since 14.0.0 */ public function write(); + + /** + * Returns the file id + * + * @return int + * @since 30.0.0 + */ + public function getId(): int; } diff --git a/lib/public/TaskProcessing/EShapeType.php b/lib/public/TaskProcessing/EShapeType.php new file mode 100644 index 00000000000..514451da068 --- /dev/null +++ b/lib/public/TaskProcessing/EShapeType.php @@ -0,0 +1,42 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing; + +enum EShapeType: int { + case Number = 0; + case Text = 1; + case Image = 2; + case Audio = 3; + case Video = 4; + case File = 5; + case ListOfNumbers = 10; + case ListOfTexts = 11; + case ListOfImages = 12; + case ListOfAudio = 13; + case ListOfVideo = 14; + case ListOfFiles = 15; +} + diff --git a/lib/public/TaskProcessing/Events/AbstractTextProcessingEvent.php b/lib/public/TaskProcessing/Events/AbstractTextProcessingEvent.php new file mode 100644 index 00000000000..0d8f6ddb2e0 --- /dev/null +++ b/lib/public/TaskProcessing/Events/AbstractTextProcessingEvent.php @@ -0,0 +1,51 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ +namespace OCP\TaskProcessing\Events; + +use OCP\EventDispatcher\Event; +use OCP\TaskProcessing\Task; + +/** + * @since 30.0.0 + */ +abstract class AbstractTextProcessingEvent extends Event { + /** + * @since 30.0.0 + */ + public function __construct( + private readonly Task $task + ) { + parent::__construct(); + } + + /** + * @return Task + * @since 30.0.0 + */ + public function getTask(): Task { + return $this->task; + } +} diff --git a/lib/public/TaskProcessing/Events/TaskFailedEvent.php b/lib/public/TaskProcessing/Events/TaskFailedEvent.php new file mode 100644 index 00000000000..7b118c08b8c --- /dev/null +++ b/lib/public/TaskProcessing/Events/TaskFailedEvent.php @@ -0,0 +1,30 @@ +<?php + +namespace OCP\TaskProcessing\Events; + +use OCP\TaskProcessing\Task; + +/** + * @since 30.0.0 + */ +class TaskFailedEvent extends AbstractTextProcessingEvent { + /** + * @param Task $task + * @param string $errorMessage + * @since 30.0.0 + */ + public function __construct( + Task $task, + private readonly string $errorMessage, + ) { + parent::__construct($task); + } + + /** + * @return string + * @since 30.0.0 + */ + public function getErrorMessage(): string { + return $this->errorMessage; + } +} diff --git a/lib/public/TaskProcessing/Events/TaskSuccessfulEvent.php b/lib/public/TaskProcessing/Events/TaskSuccessfulEvent.php new file mode 100644 index 00000000000..88214a451aa --- /dev/null +++ b/lib/public/TaskProcessing/Events/TaskSuccessfulEvent.php @@ -0,0 +1,9 @@ +<?php + +namespace OCP\TaskProcessing\Events; + +/** + * @since 30.0.0 + */ +class TaskSuccessfulEvent extends AbstractTextProcessingEvent { +} diff --git a/lib/public/TaskProcessing/Exception/Exception.php b/lib/public/TaskProcessing/Exception/Exception.php new file mode 100644 index 00000000000..5b6a9f88b80 --- /dev/null +++ b/lib/public/TaskProcessing/Exception/Exception.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OCP\TaskProcessing\Exception; + +/** + * TaskProcessing Exception + * @since 30.0.0 + */ +class Exception extends \Exception { +} diff --git a/lib/public/TaskProcessing/Exception/NotFoundException.php b/lib/public/TaskProcessing/Exception/NotFoundException.php new file mode 100644 index 00000000000..ef3eee9009c --- /dev/null +++ b/lib/public/TaskProcessing/Exception/NotFoundException.php @@ -0,0 +1,7 @@ +<?php + +namespace OCP\TaskProcessing\Exception; + +class NotFoundException extends Exception { + +} diff --git a/lib/public/TaskProcessing/Exception/ProcessingException.php b/lib/public/TaskProcessing/Exception/ProcessingException.php new file mode 100644 index 00000000000..528876acb24 --- /dev/null +++ b/lib/public/TaskProcessing/Exception/ProcessingException.php @@ -0,0 +1,35 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OCP\TaskProcessing\Exception; + +/** + * Exception thrown during processing of a task + * by a synchronous provider + * @since 30.0.0 + */ +class ProcessingException extends \RuntimeException { +} diff --git a/lib/public/TaskProcessing/Exception/ValidationException.php b/lib/public/TaskProcessing/Exception/ValidationException.php new file mode 100644 index 00000000000..82de81226b4 --- /dev/null +++ b/lib/public/TaskProcessing/Exception/ValidationException.php @@ -0,0 +1,7 @@ +<?php + +namespace OCP\TaskProcessing\Exception; + +class ValidationException extends Exception { + +} diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php new file mode 100644 index 00000000000..73e4c85701e --- /dev/null +++ b/lib/public/TaskProcessing/IManager.php @@ -0,0 +1,157 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2023 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OCP\TaskProcessing; + +use OCP\Files\GenericFileException; +use OCP\Files\NotPermittedException; +use OCP\Lock\LockedException; +use OCP\PreConditionNotMetException; +use OCP\TaskProcessing\Exception\Exception; +use OCP\TaskProcessing\Exception\NotFoundException; +use OCP\TaskProcessing\Exception\ValidationException; + +/** + * API surface for apps interacting with and making use of LanguageModel providers + * without known which providers are installed + * @since 30.0.0 + */ +interface IManager { + /** + * @since 30.0.0 + */ + public function hasProviders(): bool; + + /** + * @return IProvider[] + * @since 30.0.0 + */ + public function getProviders(): array; + + /** + * @return array<string,array{name: string, description: string, inputShape: array<string, ShapeDescriptor>, optionalInputShape: array<string, ShapeDescriptor>, outputShape: array<string, ShapeDescriptor>, optionalOutputShape: array<string, ShapeDescriptor>}> + * @since 30.0.0 + */ + public function getAvailableTaskTypes(): array; + + /** + * @param Task $task The task to run + * @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called + * @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs + * @throws Exception storing the task in the database failed + * @since 30.0.0 + */ + public function scheduleTask(Task $task): void; + + /** + * Delete a task that has been scheduled before + * + * @param Task $task The task to delete + * @throws Exception if deleting the task in the database failed + * @since 30.0.0 + */ + public function deleteTask(Task $task): void; + + /** + * @param int $id The id of the task + * @return Task + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 30.0.0 + */ + public function getTask(int $id): Task; + + /** + * @param int $id The id of the task + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 30.0.0 + */ + public function cancelTask(int $id): void; + + /** + * @param int $id The id of the task + * @param string|null $error + * @param array|null $result + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 30.0.0 + */ + public function setTaskResult(int $id, ?string $error, ?array $result): void; + + /** + * @param int $id + * @param float $progress + * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime + * @throws ValidationException + * @throws Exception + * @throws NotFoundException + */ + public function setTaskProgress(int $id, float $progress): bool; + + /** + * @param string|null $taskTypeId + * @return Task + * @throws Exception If the query failed + * @throws NotFoundException If no task could not be found + * @since 30.0.0 + */ + public function getNextScheduledTask(?string $taskTypeId = null): Task; + + /** + * @param int $id The id of the task + * @param string|null $userId The user id that scheduled the task + * @return Task + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 30.0.0 + */ + public function getUserTask(int $id, ?string $userId): Task; + + /** + * @param string|null $userId + * @param string $appId + * @param string|null $identifier + * @return list<Task> + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 30.0.0 + */ + public function getUserTasksByApp(?string $userId, string $appId, ?string $identifier = null): array; + + /** + * Prepare the task's input data, so it can be processed by the provider + * ie. this replaces file ids with base64 data + * + * @param Task $task + * @return array<string, mixed> + * @throws NotPermittedException + * @throws GenericFileException + * @throws LockedException + * @throws ValidationException + */ + public function prepareInputData(Task $task): array; +} diff --git a/lib/public/TaskProcessing/IProvider.php b/lib/public/TaskProcessing/IProvider.php new file mode 100644 index 00000000000..be6aa33d125 --- /dev/null +++ b/lib/public/TaskProcessing/IProvider.php @@ -0,0 +1,80 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OCP\TaskProcessing; + +use OCP\TextProcessing\ITaskType; +use RuntimeException; + +/** + * This is the interface that is implemented by apps that + * implement a task processing provider + * @since 30.0.0 + */ +interface IProvider { + /** + * The unique id of this provider + * @since 30.0.0 + */ + public function getId(): string; + + /** + * The localized name of this provider + * @since 30.0.0 + */ + public function getName(): string; + + /** + * Returns the task type id of the task type, that this + * provider handles + * + * @since 30.0.0 + * @return string + */ + public function getTaskType(): string; + + /** + * @return int The expected average runtime of a task in seconds + * @since 30.0.0 + */ + public function getExpectedRuntime(): int; + + /** + * Returns the shape of optional input parameters + * + * @since 30.0.0 + * @psalm-return array{string, ShapeDescriptor} + */ + public function getOptionalInputShape(): array; + + /** + * Returns the shape of optional output parameters + * + * @since 30.0.0 + * @psalm-return array{string, ShapeDescriptor} + */ + public function getOptionalOutputShape(): array; +} diff --git a/lib/public/TaskProcessing/ISynchronousProvider.php b/lib/public/TaskProcessing/ISynchronousProvider.php new file mode 100644 index 00000000000..e4fc0b1ea7f --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousProvider.php @@ -0,0 +1,48 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OCP\TaskProcessing; + +use OCP\TaskProcessing\Exception\ProcessingException; + +/** + * This is the interface that is implemented by apps that + * implement a task processing provider + * @since 30.0.0 + */ +interface ISynchronousProvider extends IProvider { + + /** + * Returns the shape of optional output parameters + * + * @since 30.0.0 + * @param null|string $userId The user that created the current task + * @param array<string, string> $input The task input + * @psalm-return array<string, string> + * @throws ProcessingException + */ + public function process(?string $userId, array $input): array; +} diff --git a/lib/public/TaskProcessing/ITaskType.php b/lib/public/TaskProcessing/ITaskType.php new file mode 100644 index 00000000000..bdac1ec397e --- /dev/null +++ b/lib/public/TaskProcessing/ITaskType.php @@ -0,0 +1,73 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing; + +/** + * This is a task type interface that is implemented by task processing + * task types + * @since 30.0.0 + */ +interface ITaskType { + /** + * Returns the unique id of this task type + * + * @since 30.0.0 + * @return string + */ + public function getId(): string; + + /** + * Returns the localized name of this task type + * + * @since 30.0.0 + * @return string + */ + public function getName(): string; + + /** + * Returns the localized description of this task type + * + * @since 30.0.0 + * @return string + */ + public function getDescription(): string; + + /** + * Returns the shape of the input array + * + * @since 30.0.0 + * @psalm-return array{string, ShapeDescriptor} + */ + public function getInputShape(): array; + + /** + * Returns the shape of the output array + * + * @since 30.0.0 + * @psalm-return array{string, ShapeDescriptor} + */ + public function getOutputShape(): array; +} diff --git a/lib/public/TaskProcessing/ShapeDescriptor.php b/lib/public/TaskProcessing/ShapeDescriptor.php new file mode 100644 index 00000000000..0c770b7d07e --- /dev/null +++ b/lib/public/TaskProcessing/ShapeDescriptor.php @@ -0,0 +1,24 @@ +<?php + +namespace OCP\TaskProcessing; + +class ShapeDescriptor { + public function __construct( + private string $name, + private string $description, + private EShapeType $shapeType, + ) { + } + + public function getName(): string { + return $this->name; + } + + public function getDescription(): string { + return $this->description; + } + + public function getShapeType(): EShapeType { + return $this->shapeType; + } +} diff --git a/lib/public/TaskProcessing/Task.php b/lib/public/TaskProcessing/Task.php new file mode 100644 index 00000000000..a467c0d57d0 --- /dev/null +++ b/lib/public/TaskProcessing/Task.php @@ -0,0 +1,263 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing; + +use DateTime; +use OCP\Files\AppData\IAppDataFactory; +use OCP\Files\NotFoundException; +use OCP\Files\NotPermittedException; +use OCP\IImage; +use OCP\Image; +use OCP\TaskProcessing\Exception\ValidationException; + +/** + * This is a task processing task + * + * @since 30.0.0 + */ +final class Task implements \JsonSerializable { + protected ?int $id = null; + + protected ?DateTime $completionExpectedAt = null; + + protected ?array $output = null; + + protected ?string $errorMessage = null; + + protected ?float $progress = null; + + /** + * @since 30.0.0 + */ + public const STATUS_CANCELLED = 5; + /** + * @since 30.0.0 + */ + public const STATUS_FAILED = 4; + /** + * @since 30.0.0 + */ + public const STATUS_SUCCESSFUL = 3; + /** + * @since 30.0.0 + */ + public const STATUS_RUNNING = 2; + /** + * @since 30.0.0 + */ + public const STATUS_SCHEDULED = 1; + /** + * @since 30.0.0 + */ + public const STATUS_UNKNOWN = 0; + + /** + * @psalm-var self::STATUS_* + */ + protected int $status = self::STATUS_UNKNOWN; + + /** + * @param array<string,mixed> $input + * @param string $appId + * @param string|null $userId + * @param null|string $identifier An arbitrary identifier for this task. max length: 255 chars + * @since 30.0.0 + */ + final public function __construct( + protected readonly string $taskType, + protected array $input, + protected readonly string $appId, + protected readonly ?string $userId, + protected readonly ?string $identifier = '', + ) { + } + + /** + * @since 30.0.0 + */ + final public function getTaskType(): string { + return $this->taskType; + } + + /** + * @psalm-return self::STATUS_* + * @since 30.0.0 + */ + final public function getStatus(): int { + return $this->status; + } + + /** + * @psalm-param self::STATUS_* $status + * @since 30.0.0 + */ + final public function setStatus(int $status): void { + $this->status = $status; + } + + /** + * @param ?DateTime $at + * @since 30.0.0 + */ + final public function setCompletionExpectedAt(?DateTime $at): void { + $this->completionExpectedAt = $at; + } + + /** + * @return ?DateTime + * @since 30.0.0 + */ + final public function getCompletionExpectedAt(): ?DateTime { + return $this->completionExpectedAt; + } + + /** + * @return int|null + * @since 30.0.0 + */ + final public function getId(): ?int { + return $this->id; + } + + /** + * @param int|null $id + * @since 30.0.0 + */ + final public function setId(?int $id): void { + $this->id = $id; + } + + /** + * @since 30.0.0 + */ + final public function setOutput(?array $output): void { + $this->output = $output; + } + + /** + * @since 30.0.0 + */ + final public function getOutput(): ?array { + return $this->output; + } + + /** + * @return string + * @since 30.0.0 + */ + final public function getInput(): array { + return $this->input; + } + + /** + * @return string + * @since 30.0.0 + */ + final public function getAppId(): string { + return $this->appId; + } + + /** + * @return null|string + * @since 30.0.0 + */ + final public function getIdentifier(): ?string { + return $this->identifier; + } + + /** + * @return string|null + * @since 30.0.0 + */ + final public function getUserId(): ?string { + return $this->userId; + } + + /** + * @psalm-return array{id: ?int, status: self::STATUS_*, userId: ?string, appId: string, input: ?array, output: ?array, identifier: ?string, completionExpectedAt: ?int, progress: ?float} + * @since 30.0.0 + */ + public function jsonSerialize(): array { + return [ + 'id' => $this->getId(), + 'status' => $this->getStatus(), + 'userId' => $this->getUserId(), + 'appId' => $this->getAppId(), + 'input' => $this->getInput(), + 'output' => $this->getOutput(), + 'identifier' => $this->getIdentifier(), + 'completionExpectedAt' => $this->getCompletionExpectedAt()->getTimestamp(), + 'progress' => $this->getProgress(), + ]; + } + + /** + * @param string|null $error + * @return void + * @since 30.0.0 + */ + public function setErrorMessage(?string $error) { + $this->errorMessage = $error; + } + + /** + * @return string|null + * @since 30.0.0 + */ + public function getErrorMessage(): ?string { + return $this->errorMessage; + } + + /** + * @param array $input + * @return void + * @since 30.0.0 + */ + public function setInput(array $input): void { + $this->input = $input; + } + + /** + * @param float|null $progress + * @return void + * @throws ValidationException + * @since 30.0.0 + */ + public function setProgress(?float $progress): void { + if ($progress < 0 || $progress > 1.0) { + throw new ValidationException('Progress must be between 0.0 and 1.0 inclusively; ' . $progress . ' given'); + } + $this->progress = $progress; + } + + /** + * @return float|null + * @since 30.0.0 + */ + public function getProgress(): ?float { + return $this->progress; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/AudioToText.php b/lib/public/TaskProcessing/TaskTypes/AudioToText.php new file mode 100644 index 00000000000..c074c154341 --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/AudioToText.php @@ -0,0 +1,93 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for generic transcription + * @since 30.0.0 + */ +class AudioToText implements ITaskType { + const ID = 'core:audio2text'; + + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Transcribe audio'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Transcribe the things said in an audio'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Audio input'), + $this->l->t('The audio to transcribe'), + EShapeType::Audio + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'output' => new ShapeDescriptor( + $this->l->t('Transcription'), + $this->l->t('The transcribed text'), + EShapeType::Text + ), + ]; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/TextToImage.php b/lib/public/TaskProcessing/TaskTypes/TextToImage.php new file mode 100644 index 00000000000..264238afee5 --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/TextToImage.php @@ -0,0 +1,98 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for image generation + * @since 30.0.0 + */ +class TextToImage implements ITaskType { + const ID = 'core:text2image'; + + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Generate image'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Generate an image from a text prompt'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Prompt'), + $this->l->t('Describe the image you want to generate'), + EShapeType::Text + ), + 'numberOfImages' => new ShapeDescriptor( + $this->l->t('Number of images'), + $this->l->t('How many images to generate'), + EShapeType::Number + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'images' => new ShapeDescriptor( + $this->l->t('Output images'), + $this->l->t('The generated images'), + EShapeType::ListOfImages + ), + ]; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/TextToText.php b/lib/public/TaskProcessing/TaskTypes/TextToText.php new file mode 100644 index 00000000000..436c47aa8ee --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/TextToText.php @@ -0,0 +1,93 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for generic text processing + * @since 30.0.0 + */ +class TextToText implements ITaskType { + const ID = 'core:text2text'; + + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Free text to text prompt'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Runs an arbitrary prompt through a language model that retuns a reply'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Prompt'), + $this->l->t('Describe a task that you want the assistant to do or ask a question'), + EShapeType::Text + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'output' => new ShapeDescriptor( + $this->l->t('Generated reply'), + $this->l->t('The generated text from the assistant'), + EShapeType::Text + ), + ]; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/TextToTextHeadline.php b/lib/public/TaskProcessing/TaskTypes/TextToTextHeadline.php new file mode 100644 index 00000000000..e524c83fe55 --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/TextToTextHeadline.php @@ -0,0 +1,93 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for creating headline + * @since 30.0.0 + */ +class TextToTextHeadline implements ITaskType { + const ID = 'core:text2text:headline'; + + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Generate a headline'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Generates a possible headline for a text.'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Original text'), + $this->l->t('The original text to generate a headline for'), + EShapeType::Text + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'output' => new ShapeDescriptor( + $this->l->t('Headline'), + $this->l->t('The generated headline'), + EShapeType::Text + ), + ]; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/TextToTextSummary.php b/lib/public/TaskProcessing/TaskTypes/TextToTextSummary.php new file mode 100644 index 00000000000..4db13b24a24 --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/TextToTextSummary.php @@ -0,0 +1,92 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for summaries + * @since 30.0.0 + */ +class TextToTextSummary implements ITaskType { + const ID = 'core:text2text:summary'; + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Summarize'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Summarizes a text'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Original text'), + $this->l->t('The original text to summarize'), + EShapeType::Text + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'output' => new ShapeDescriptor( + $this->l->t('Summary'), + $this->l->t('The generated summary'), + EShapeType::Text + ), + ]; + } +} diff --git a/lib/public/TaskProcessing/TaskTypes/TextToTextTopics.php b/lib/public/TaskProcessing/TaskTypes/TextToTextTopics.php new file mode 100644 index 00000000000..f2f0c5c1b7d --- /dev/null +++ b/lib/public/TaskProcessing/TaskTypes/TextToTextTopics.php @@ -0,0 +1,93 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2023 Marcel Klehr <mklehr@gmx.net> + * + * @author Marcel Klehr <mklehr@gmx.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +namespace OCP\TaskProcessing\TaskTypes; + +use OCP\IL10N; +use OCP\L10N\IFactory; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; + +/** + * This is the task processing task type for topics extraction + * @since 30.0.0 + */ +class TextToTextTopics implements ITaskType { + const ID = 'core:text2text:topics'; + + private IL10N $l; + + /** + * @param IFactory $l10nFactory + * @since 30.0.0 + */ + public function __construct( + IFactory $l10nFactory, + ) { + $this->l = $l10nFactory->get('core'); + } + + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getName(): string { + return $this->l->t('Extract topics'); + } + + /** + * @inheritDoc + * @since 30.0.0 + */ + public function getDescription(): string { + return $this->l->t('Extracts topics from a text and outputs them separated by commas'); + } + + public function getId(): string { + return self::ID; + } + + public function getInputShape(): array { + return [ + 'input' => new ShapeDescriptor( + $this->l->t('Original text'), + $this->l->t('The original text to extract topics from'), + EShapeType::Text + ), + ]; + } + + public function getOutputShape(): array { + return [ + 'output' => new ShapeDescriptor( + $this->l->t('Topics'), + $this->l->t('The list of extracted topics'), + EShapeType::Text + ), + ]; + } +} diff --git a/tests/lib/TaskProcessing/TaskProcessingTest.php b/tests/lib/TaskProcessing/TaskProcessingTest.php new file mode 100644 index 00000000000..65ee5382883 --- /dev/null +++ b/tests/lib/TaskProcessing/TaskProcessingTest.php @@ -0,0 +1,467 @@ +<?php +/** + * Copyright (c) 2024 Marcel Klehr <mklehr@gmx.net> + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace Test\TextProcessing; + +use OC\AppFramework\Bootstrap\Coordinator; +use OC\AppFramework\Bootstrap\RegistrationContext; +use OC\AppFramework\Bootstrap\ServiceRegistration; +use OC\EventDispatcher\EventDispatcher; +use OC\TaskProcessing\Db\TaskMapper; +use OC\TaskProcessing\Db\Task as DbTask; +use OC\TaskProcessing\Manager; +use OCP\AppFramework\Db\DoesNotExistException; +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\BackgroundJob\IJobList; +use OCP\EventDispatcher\IEventDispatcher; +use OCP\Files\AppData\IAppDataFactory; +use OCP\Files\IAppData; +use OCP\Files\IRootFolder; +use OCP\IConfig; +use OCP\IServerContainer; +use OCP\PreConditionNotMetException; +use OCP\SpeechToText\ISpeechToTextManager; +use OCP\TaskProcessing\EShapeType; +use OCP\TaskProcessing\Events\TaskFailedEvent; +use OCP\TaskProcessing\Events\TaskSuccessfulEvent; +use OCP\TaskProcessing\Exception\ProcessingException; +use OCP\TaskProcessing\Exception\ValidationException; +use OCP\TaskProcessing\IManager; +use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProvider; +use OCP\TaskProcessing\ITaskType; +use OCP\TaskProcessing\ShapeDescriptor; +use OCP\TaskProcessing\Task; +use OCP\TaskProcessing\TaskTypes\TextToText; +use PHPUnit\Framework\Constraint\IsInstanceOf; +use Psr\Log\LoggerInterface; +use Test\BackgroundJob\DummyJobList; + +class AudioToImage implements ITaskType { + const ID = 'test:audiotoimage'; + + public function getId(): string { + return self::ID; + } + + public function getName(): string { + return self::class; + } + + public function getDescription(): string { + return self::class; + } + + public function getInputShape(): array { + return [ + 'audio' => new ShapeDescriptor('Audio', 'The audio', EShapeType::Audio), + ]; + } + + public function getOutputShape(): array { + return [ + 'spectrogram' => new ShapeDescriptor('Spectrogram', 'The audio spectrogram', EShapeType::Image), + ]; + } +} + +class AsyncProvider implements IProvider { + public function getId(): string { + return 'test:sync:success'; + } + + public function getName(): string { + return self::class; + } + + public function getTaskType(): string { + return AudioToImage::ID; + } + + public function getExpectedRuntime(): int { + return 10; + } + + public function getOptionalInputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function getOptionalOutputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } +} + +class SuccessfulSyncProvider implements IProvider, ISynchronousProvider { + public function getId(): string { + return 'test:sync:success'; + } + + public function getName(): string { + return self::class; + } + + public function getTaskType(): string { + return TextToText::ID; + } + + public function getExpectedRuntime(): int { + return 10; + } + + public function getOptionalInputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function getOptionalOutputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function process(?string $userId, array $input): array { + return ['output' => $input['input']]; + } +} + +class FailingSyncProvider implements IProvider, ISynchronousProvider { + const ERROR_MESSAGE = 'Failure'; + public function getId(): string { + return 'test:sync:fail'; + } + + public function getName(): string { + return self::class; + } + + public function getTaskType(): string { + return TextToText::ID; + } + + public function getExpectedRuntime(): int { + return 10; + } + + public function getOptionalInputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function getOptionalOutputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function process(?string $userId, array $input): array { + throw new ProcessingException(self::ERROR_MESSAGE); + } +} + +class BrokenSyncProvider implements IProvider, ISynchronousProvider { + public function getId(): string { + return 'test:sync:broken-output'; + } + + public function getName(): string { + return self::class; + } + + public function getTaskType(): string { + return TextToText::ID; + } + + public function getExpectedRuntime(): int { + return 10; + } + + public function getOptionalInputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function getOptionalOutputShape(): array { + return [ + 'optionalKey' => new ShapeDescriptor('optional Key', 'AN optional key', EShapeType::Text), + ]; + } + + public function process(?string $userId, array $input): array { + return []; + } +} + +/** + * @group DB + */ +class TaskProcessingTest extends \Test\TestCase { + private IManager $manager; + private Coordinator $coordinator; + private array $providers; + private IServerContainer $serverContainer; + private IEventDispatcher $eventDispatcher; + private RegistrationContext $registrationContext; + private \DateTimeImmutable $currentTime; + private TaskMapper $taskMapper; + private array $tasksDb; + private IJobList $jobList; + private IAppData $appData; + + protected function setUp(): void { + parent::setUp(); + + $this->providers = [ + SuccessfulSyncProvider::class => new SuccessfulSyncProvider(), + FailingSyncProvider::class => new FailingSyncProvider(), + BrokenSyncProvider::class => new BrokenSyncProvider(), + AsyncProvider::class => new AsyncProvider(), + AudioToImage::class => new AudioToImage(), + ]; + + $this->serverContainer = $this->createMock(IServerContainer::class); + $this->serverContainer->expects($this->any())->method('get')->willReturnCallback(function ($class) { + return $this->providers[$class]; + }); + + $this->eventDispatcher = new EventDispatcher( + new \Symfony\Component\EventDispatcher\EventDispatcher(), + $this->serverContainer, + \OC::$server->get(LoggerInterface::class), + ); + + $this->registrationContext = $this->createMock(RegistrationContext::class); + $this->coordinator = $this->createMock(Coordinator::class); + $this->coordinator->expects($this->any())->method('getRegistrationContext')->willReturn($this->registrationContext); + + $this->currentTime = new \DateTimeImmutable('now'); + + $this->taskMapper = \OCP\Server::get(TaskMapper::class); + + $this->jobList = $this->createPartialMock(DummyJobList::class, ['add']); + $this->jobList->expects($this->any())->method('add')->willReturnCallback(function () { + }); + + $config = $this->createMock(IConfig::class); + $config->method('getAppValue') + ->with('core', 'ai.textprocessing_provider_preferences', '') + ->willReturn(''); + + $this->eventDispatcher = $this->createMock(IEventDispatcher::class); + + $this->manager = new Manager( + $this->coordinator, + $this->serverContainer, + \OC::$server->get(LoggerInterface::class), + $this->taskMapper, + $this->jobList, + $this->eventDispatcher, + \OC::$server->get(IAppDataFactory::class), + \OC::$server->get(IRootFolder::class), + \OC::$server->get(\OCP\TextProcessing\IManager::class), + \OC::$server->get(\OCP\TextToImage\IManager::class), + \OC::$server->get(ISpeechToTextManager::class), + ); + } + + private function getFile(string $name, string $content): \OCP\Files\File { + /** @var IRootFolder $rootFolder */ + $rootFolder = \OC::$server->get(IRootFolder::class); + $this->appData = \OC::$server->get(IAppDataFactory::class)->get('core'); + try { + $folder = $this->appData->getFolder('test'); + } catch (\OCP\Files\NotFoundException $e) { + $folder = $this->appData->newFolder('test'); + } + $file = $folder->newFile($name, $content); + $inputFile = current($rootFolder->getByIdInPath($file->getId(), '/' . $rootFolder->getAppDataDirectoryName() . '/')); + if (!$inputFile instanceof \OCP\Files\File) { + throw new \Exception('PEBCAK'); + } + return $inputFile; + } + + public function testShouldNotHaveAnyProviders() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([]); + self::assertCount(0, $this->manager->getAvailableTaskTypes()); + self::assertFalse($this->manager->hasProviders()); + self::expectException(PreConditionNotMetException::class); + $this->manager->scheduleTask(new Task(TextToText::ID, ['input' => 'Hello'], 'test', null)); + } + + public function testProviderShouldBeRegisteredAndTaskFailValidation() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', BrokenSyncProvider::class) + ]); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertTrue($this->manager->hasProviders()); + $task = new Task(TextToText::ID, ['wrongInputKey' => 'Hello'], 'test', null); + self::assertNull($task->getId()); + self::expectException(ValidationException::class); + $this->manager->scheduleTask($task); + } + + public function testProviderShouldBeRegisteredAndFail() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', FailingSyncProvider::class) + ]); + $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + $this->assertTrue($this->manager->hasProviders()); + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + self::assertNull($task->getId()); + self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); + $this->manager->scheduleTask($task); + self::assertNotNull($task->getId()); + self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus()); + + $this->eventDispatcher->expects($this->once())->method('dispatchTyped')->with(new IsInstanceOf(TaskFailedEvent::class)); + + $backgroundJob = new \OC\TaskProcessing\SynchronousBackgroundJob( + \OCP\Server::get(ITimeFactory::class), + $this->manager, + $this->jobList, + \OCP\Server::get(LoggerInterface::class), + ); + $backgroundJob->start($this->jobList); + + $task = $this->manager->getTask($task->getId()); + self::assertEquals(Task::STATUS_FAILED, $task->getStatus()); + self::assertEquals(FailingSyncProvider::ERROR_MESSAGE, $task->getErrorMessage()); + } + + public function testProviderShouldBeRegisteredAndFailOutputValidation() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', BrokenSyncProvider::class) + ]); + $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + $this->assertTrue($this->manager->hasProviders()); + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + self::assertNull($task->getId()); + self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); + $this->manager->scheduleTask($task); + self::assertNotNull($task->getId()); + self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus()); + + $this->eventDispatcher->expects($this->once())->method('dispatchTyped')->with(new IsInstanceOf(TaskFailedEvent::class)); + + $backgroundJob = new \OC\TaskProcessing\SynchronousBackgroundJob( + \OCP\Server::get(ITimeFactory::class), + $this->manager, + $this->jobList, + \OCP\Server::get(LoggerInterface::class), + ); + $backgroundJob->start($this->jobList); + + $task = $this->manager->getTask($task->getId()); + self::assertEquals(Task::STATUS_FAILED, $task->getStatus()); + self::assertEquals('The task was processed successfully but the provider\'s output doesn\'t pass validation against the task type\'s outputShape spec and/or the provider\'s own optionalOutputShape spec', $task->getErrorMessage()); + } + + public function testProviderShouldBeRegisteredAndRun() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', SuccessfulSyncProvider::class) + ]); + $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + $taskTypeStruct = $this->manager->getAvailableTaskTypes()[array_keys($this->manager->getAvailableTaskTypes())[0]]; + $this->assertTrue(isset($taskTypeStruct['inputShape']['input'])); + $this->assertEquals(EShapeType::Text, $taskTypeStruct['inputShape']['input']->getShapeType()); + $this->assertTrue(isset($taskTypeStruct['optionalInputShape']['optionalKey'])); + $this->assertEquals(EShapeType::Text, $taskTypeStruct['optionalInputShape']['optionalKey']->getShapeType()); + $this->assertTrue(isset($taskTypeStruct['outputShape']['output'])); + $this->assertEquals(EShapeType::Text, $taskTypeStruct['outputShape']['output']->getShapeType()); + $this->assertTrue(isset($taskTypeStruct['optionalOutputShape']['optionalKey'])); + $this->assertEquals(EShapeType::Text, $taskTypeStruct['optionalOutputShape']['optionalKey']->getShapeType()); + + $this->assertTrue($this->manager->hasProviders()); + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + self::assertNull($task->getId()); + self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); + $this->manager->scheduleTask($task); + self::assertNotNull($task->getId()); + self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus()); + + // Task object retrieved from db is up-to-date + $task2 = $this->manager->getTask($task->getId()); + self::assertEquals($task->getId(), $task2->getId()); + self::assertEquals(['input' => 'Hello'], $task2->getInput()); + self::assertNull($task2->getOutput()); + self::assertEquals(Task::STATUS_SCHEDULED, $task2->getStatus()); + + $this->eventDispatcher->expects($this->once())->method('dispatchTyped')->with(new IsInstanceOf(TaskSuccessfulEvent::class)); + + $backgroundJob = new \OC\TaskProcessing\SynchronousBackgroundJob( + \OCP\Server::get(ITimeFactory::class), + $this->manager, + $this->jobList, + \OCP\Server::get(LoggerInterface::class), + ); + $backgroundJob->start($this->jobList); + + $task = $this->manager->getTask($task->getId()); + self::assertEquals(Task::STATUS_SUCCESSFUL, $task->getStatus(), 'Status is '. $task->getStatus() . ' with error message: ' . $task->getErrorMessage()); + self::assertEquals(['output' => 'Hello'], $task->getOutput()); + self::assertEquals(1, $task->getProgress()); + } + + public function testAsyncProviderWithFilesShouldBeRegisteredAndRun() { + $this->registrationContext->expects($this->any())->method('getTaskProcessingTaskTypes')->willReturn([ + new ServiceRegistration('test', AudioToImage::class) + ]); + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', AsyncProvider::class) + ]); + $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + + $this->assertTrue($this->manager->hasProviders()); + $audioId = $this->getFile('audioInput', 'Hello')->getId(); + $task = new Task(AudioToImage::ID, ['audio' => $audioId], 'test', null); + self::assertNull($task->getId()); + self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); + $this->manager->scheduleTask($task); + self::assertNotNull($task->getId()); + self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus()); + + // Task object retrieved from db is up-to-date + $task2 = $this->manager->getTask($task->getId()); + self::assertEquals($task->getId(), $task2->getId()); + self::assertEquals(['audio' => $audioId], $task2->getInput()); + self::assertNull($task2->getOutput()); + self::assertEquals(Task::STATUS_SCHEDULED, $task2->getStatus()); + + $this->eventDispatcher->expects($this->once())->method('dispatchTyped')->with(new IsInstanceOf(TaskSuccessfulEvent::class)); + + $this->manager->setTaskProgress($task2->getId(), 0.1); + $input = $this->manager->prepareInputData($task2); + self::assertTrue(isset($input['audio'])); + self::assertEquals(base64_encode('Hello'), $input['audio']); + + $this->manager->setTaskResult($task2->getId(), null, ['spectrogram' => base64_encode('World')]); + + $task = $this->manager->getTask($task->getId()); + self::assertEquals(Task::STATUS_SUCCESSFUL, $task->getStatus()); + self::assertEquals(1, $task->getProgress()); + self::assertTrue(isset($task->getOutput()['spectrogram'])); + $root = \OCP\Server::get(IRootFolder::class); + $node = $root->getFirstNodeByIdInPath($task->getOutput()['spectrogram'], '/' . $root->getAppDataDirectoryName() . '/'); + self::assertNotNull($node); + self::assertInstanceOf(\OCP\Files\File::class, $node); + self::assertEquals('World', $node->getContent()); + + } + + public function testNonexistentTask() { + $this->expectException(\OCP\TaskProcessing\Exception\NotFoundException::class); + $this->manager->getTask(2147483646); + } +} |