diff options
-rw-r--r-- | lib/private/TaskProcessing/Db/TaskMapper.php | 2 | ||||
-rw-r--r-- | lib/private/TaskProcessing/Manager.php | 2 | ||||
-rw-r--r-- | lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php | 46 | ||||
-rw-r--r-- | tests/lib/TaskProcessing/TaskProcessingTest.php | 91 |
4 files changed, 116 insertions, 25 deletions
diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index a1cc3d1409a..f8a1adc695c 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -127,7 +127,7 @@ class TaskMapper extends QBMapper { public function deleteOlderThan(int $timeout): int { $qb = $this->db->getQueryBuilder(); $qb->delete($this->tableName) - ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter(time() - $timeout))); + ->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout))); return $qb->executeStatement(); } diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 9ea92691f2a..4cc2119f299 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -711,7 +711,7 @@ class Manager implements IManager { $this->validateOutput($optionalOutputShape, $result, true); $output = $this->removeSuperfluousArrayKeys($result, $outputShape, $optionalOutputShape); // extract base64 data and put it in files, replace it with file ids - $output = $this->encapsulateInputOutputFileData($output, $outputShape, $optionalOutputShape); + $output = $this->encapsulateOutputFileData($output, $outputShape, $optionalOutputShape); $task->setOutput($output); $task->setProgress(1); $task->setStatus(Task::STATUS_SUCCESSFUL); diff --git a/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php new file mode 100644 index 00000000000..76786412059 --- /dev/null +++ b/lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php @@ -0,0 +1,46 @@ +<?php + +namespace OC\TaskProcessing; + +use OC\TaskProcessing\Db\TaskMapper; +use OCP\AppFramework\Utility\ITimeFactory; +use OCP\BackgroundJob\IJobList; +use OCP\BackgroundJob\QueuedJob; +use OCP\BackgroundJob\TimedJob; +use OCP\Files\GenericFileException; +use OCP\Files\NotPermittedException; +use OCP\Lock\LockedException; +use OCP\TaskProcessing\Exception\Exception; +use OCP\TaskProcessing\Exception\NotFoundException; +use OCP\TaskProcessing\Exception\ProcessingException; +use OCP\TaskProcessing\Exception\ValidationException; +use OCP\TaskProcessing\IManager; +use OCP\TaskProcessing\ISynchronousProvider; +use Psr\Log\LoggerInterface; + +class RemoveOldTasksBackgroundJob extends TimedJob { + public const MAX_TASK_AGE_SECONDS = 60 * 50 * 24 * 7 * 4; // 4 weeks + + public function __construct( + ITimeFactory $timeFactory, + private TaskMapper $taskMapper, + private LoggerInterface $logger, + ) { + parent::__construct($timeFactory); + $this->setInterval(60 * 60 * 24); + // can be deferred to maintenance window + $this->setTimeSensitivity(TimedJob::TIME_INSENSITIVE); + } + + + /** + * @inheritDoc + */ + protected function run($argument) { + try { + $this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS); + } catch (\OCP\DB\Exception $e) { + $this->logger->warning('Failed to delete stale language model tasks', ['exception' => $e]); + } + } +} diff --git a/tests/lib/TaskProcessing/TaskProcessingTest.php b/tests/lib/TaskProcessing/TaskProcessingTest.php index 01bb0253853..e1ddaf82500 100644 --- a/tests/lib/TaskProcessing/TaskProcessingTest.php +++ b/tests/lib/TaskProcessing/TaskProcessingTest.php @@ -14,6 +14,7 @@ use OC\AppFramework\Bootstrap\ServiceRegistration; use OC\EventDispatcher\EventDispatcher; use OC\TaskProcessing\Db\TaskMapper; use OC\TaskProcessing\Manager; +use OC\TaskProcessing\RemoveOldTasksBackgroundJob; use OCP\AppFramework\Utility\ITimeFactory; use OCP\BackgroundJob\IJobList; use OCP\EventDispatcher\IEventDispatcher; @@ -21,12 +22,14 @@ use OCP\Files\AppData\IAppDataFactory; use OCP\Files\IAppData; use OCP\Files\IRootFolder; use OCP\IConfig; +use OCP\IDBConnection; use OCP\IServerContainer; use OCP\PreConditionNotMetException; use OCP\SpeechToText\ISpeechToTextManager; use OCP\TaskProcessing\EShapeType; use OCP\TaskProcessing\Events\TaskFailedEvent; use OCP\TaskProcessing\Events\TaskSuccessfulEvent; +use OCP\TaskProcessing\Exception\NotFoundException; use OCP\TaskProcessing\Exception\ProcessingException; use OCP\TaskProcessing\Exception\ValidationException; use OCP\TaskProcessing\IManager; @@ -211,9 +214,7 @@ class TaskProcessingTest extends \Test\TestCase { private IServerContainer $serverContainer; private IEventDispatcher $eventDispatcher; private RegistrationContext $registrationContext; - private \DateTimeImmutable $currentTime; private TaskMapper $taskMapper; - private array $tasksDb; private IJobList $jobList; private IAppData $appData; @@ -243,8 +244,6 @@ class TaskProcessingTest extends \Test\TestCase { $this->coordinator = $this->createMock(Coordinator::class); $this->coordinator->expects($this->any())->method('getRegistrationContext')->willReturn($this->registrationContext); - $this->currentTime = new \DateTimeImmutable('now'); - $this->taskMapper = \OCP\Server::get(TaskMapper::class); $this->jobList = $this->createPartialMock(DummyJobList::class, ['add']); @@ -314,8 +313,8 @@ class TaskProcessingTest extends \Test\TestCase { $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ new ServiceRegistration('test', FailingSyncProvider::class) ]); - $this->assertCount(1, $this->manager->getAvailableTaskTypes()); - $this->assertTrue($this->manager->hasProviders()); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertTrue($this->manager->hasProviders()); $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); self::assertNull($task->getId()); self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); @@ -342,8 +341,8 @@ class TaskProcessingTest extends \Test\TestCase { $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ new ServiceRegistration('test', BrokenSyncProvider::class) ]); - $this->assertCount(1, $this->manager->getAvailableTaskTypes()); - $this->assertTrue($this->manager->hasProviders()); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertTrue($this->manager->hasProviders()); $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); self::assertNull($task->getId()); self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); @@ -370,18 +369,18 @@ class TaskProcessingTest extends \Test\TestCase { $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ new ServiceRegistration('test', SuccessfulSyncProvider::class) ]); - $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); $taskTypeStruct = $this->manager->getAvailableTaskTypes()[array_keys($this->manager->getAvailableTaskTypes())[0]]; - $this->assertTrue(isset($taskTypeStruct['inputShape']['input'])); - $this->assertEquals(EShapeType::Text, $taskTypeStruct['inputShape']['input']->getShapeType()); - $this->assertTrue(isset($taskTypeStruct['optionalInputShape']['optionalKey'])); - $this->assertEquals(EShapeType::Text, $taskTypeStruct['optionalInputShape']['optionalKey']->getShapeType()); - $this->assertTrue(isset($taskTypeStruct['outputShape']['output'])); - $this->assertEquals(EShapeType::Text, $taskTypeStruct['outputShape']['output']->getShapeType()); - $this->assertTrue(isset($taskTypeStruct['optionalOutputShape']['optionalKey'])); - $this->assertEquals(EShapeType::Text, $taskTypeStruct['optionalOutputShape']['optionalKey']->getShapeType()); - - $this->assertTrue($this->manager->hasProviders()); + self::assertTrue(isset($taskTypeStruct['inputShape']['input'])); + self::assertEquals(EShapeType::Text, $taskTypeStruct['inputShape']['input']->getShapeType()); + self::assertTrue(isset($taskTypeStruct['optionalInputShape']['optionalKey'])); + self::assertEquals(EShapeType::Text, $taskTypeStruct['optionalInputShape']['optionalKey']->getShapeType()); + self::assertTrue(isset($taskTypeStruct['outputShape']['output'])); + self::assertEquals(EShapeType::Text, $taskTypeStruct['outputShape']['output']->getShapeType()); + self::assertTrue(isset($taskTypeStruct['optionalOutputShape']['optionalKey'])); + self::assertEquals(EShapeType::Text, $taskTypeStruct['optionalOutputShape']['optionalKey']->getShapeType()); + + self::assertTrue($this->manager->hasProviders()); $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); self::assertNull($task->getId()); self::assertEquals(Task::STATUS_UNKNOWN, $task->getStatus()); @@ -419,9 +418,9 @@ class TaskProcessingTest extends \Test\TestCase { $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ new ServiceRegistration('test', AsyncProvider::class) ]); - $this->assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); - $this->assertTrue($this->manager->hasProviders()); + self::assertTrue($this->manager->hasProviders()); $audioId = $this->getFile('audioInput', 'Hello')->getId(); $task = new Task(AudioToImage::ID, ['audio' => $audioId], 'test', null); self::assertNull($task->getId()); @@ -442,9 +441,10 @@ class TaskProcessingTest extends \Test\TestCase { $this->manager->setTaskProgress($task2->getId(), 0.1); $input = $this->manager->prepareInputData($task2); self::assertTrue(isset($input['audio'])); - self::assertEquals(base64_encode('Hello'), $input['audio']); + self::assertInstanceOf(\OCP\Files\File::class, $input['audio']); + self::assertEquals($audioId, $input['audio']->getId()); - $this->manager->setTaskResult($task2->getId(), null, ['spectrogram' => base64_encode('World')]); + $this->manager->setTaskResult($task2->getId(), null, ['spectrogram' => 'World']); $task = $this->manager->getTask($task->getId()); self::assertEquals(Task::STATUS_SUCCESSFUL, $task->getStatus()); @@ -462,4 +462,49 @@ class TaskProcessingTest extends \Test\TestCase { $this->expectException(\OCP\TaskProcessing\Exception\NotFoundException::class); $this->manager->getTask(2147483646); } + + public function testOldTasksShouldBeCleanedUp() { + $currentTime = new \DateTime('now'); + $timeFactory = $this->createMock(ITimeFactory::class); + $timeFactory->expects($this->any())->method('getDateTime')->willReturnCallback(fn() => $currentTime); + $timeFactory->expects($this->any())->method('getTime')->willReturnCallback(fn() => $currentTime->getTimestamp()); + + $this->taskMapper = new TaskMapper( + \OCP\Server::get(IDBConnection::class), + $timeFactory, + ); + + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', SuccessfulSyncProvider::class) + ]); + self::assertCount(1, $this->manager->getAvailableTaskTypes()); + self::assertTrue($this->manager->hasProviders()); + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + + $this->eventDispatcher->expects($this->once())->method('dispatchTyped')->with(new IsInstanceOf(TaskSuccessfulEvent::class)); + + $backgroundJob = new \OC\TaskProcessing\SynchronousBackgroundJob( + \OCP\Server::get(ITimeFactory::class), + $this->manager, + $this->jobList, + \OCP\Server::get(LoggerInterface::class), + ); + $backgroundJob->start($this->jobList); + + $task = $this->manager->getTask($task->getId()); + + $currentTime = $currentTime->add(new \DateInterval('P1Y')); + // run background job + $bgJob = new RemoveOldTasksBackgroundJob( + $timeFactory, + $this->taskMapper, + \OC::$server->get(LoggerInterface::class), + ); + $bgJob->setArgument([]); + $bgJob->start($this->jobList); + + $this->expectException(NotFoundException::class); + $this->manager->getTask($task->getId()); + } } |