diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/Command/Background/JobBase.php | 93 | ||||
-rw-r--r-- | core/Command/Background/JobWorker.php | 173 | ||||
-rw-r--r-- | core/Command/Background/Worker.php | 206 | ||||
-rw-r--r-- | core/register_command.php | 2 |
4 files changed, 267 insertions, 207 deletions
diff --git a/core/Command/Background/JobBase.php b/core/Command/Background/JobBase.php new file mode 100644 index 00000000000..fe2880c0988 --- /dev/null +++ b/core/Command/Background/JobBase.php @@ -0,0 +1,93 @@ +<?php + +declare(strict_types=1); + +/** + * @copyright Copyright (c) 2022 Julius Härtl <jus@bitgrid.net> + * + * @author Julius Härtl <jus@bitgrid.net> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +namespace OC\Core\Command\Background; + +use OCP\BackgroundJob\IJob; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Output\OutputInterface; + +abstract class JobBase extends \OC\Core\Command\Base { + protected IJobList $jobList; + protected LoggerInterface $logger; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct(); + $this->jobList = $jobList; + $this->logger = $logger; + } + + protected function printJobInfo(int $jobId, IJob $job, OutputInterface $output): void { + $row = $this->jobList->getDetailsById($jobId); + + $lastRun = new \DateTime(); + $lastRun->setTimestamp((int) $row['last_run']); + $lastChecked = new \DateTime(); + $lastChecked->setTimestamp((int) $row['last_checked']); + $reservedAt = new \DateTime(); + $reservedAt->setTimestamp((int) $row['reserved_at']); + + $output->writeln('Job class: ' . get_class($job)); + $output->writeln('Arguments: ' . json_encode($job->getArgument())); + + $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; + if ($isTimedJob) { + $output->writeln('Type: timed'); + } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { + $output->writeln('Type: queued'); + } else { + $output->writeln('Type: job'); + } + + $output->writeln(''); + $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); + if ((int) $row['reserved_at'] === 0) { + $output->writeln('Reserved at: -'); + } else { + $output->writeln('Reserved at: <comment>' . $reservedAt->format(\DateTimeInterface::ATOM) . '</comment>'); + } + $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); + $output->writeln('Last duration: ' . $row['execution_duration']); + + if ($isTimedJob) { + $reflection = new \ReflectionClass($job); + $intervalProperty = $reflection->getProperty('interval'); + $intervalProperty->setAccessible(true); + $interval = $intervalProperty->getValue($job); + + $nextRun = new \DateTime(); + $nextRun->setTimestamp($row['last_run'] + $interval); + + if ($nextRun > new \DateTime()) { + $output->writeln('Next execution: <comment>' . $nextRun->format(\DateTimeInterface::ATOM) . '</comment>'); + } else { + $output->writeln('Next execution: <info>' . $nextRun->format(\DateTimeInterface::ATOM) . '</info>'); + } + } + } +} diff --git a/core/Command/Background/JobWorker.php b/core/Command/Background/JobWorker.php new file mode 100644 index 00000000000..2ca4af73474 --- /dev/null +++ b/core/Command/Background/JobWorker.php @@ -0,0 +1,173 @@ +<?php + +declare(strict_types=1); +/** + * @copyright Copyright (c) 2021, Joas Schilling <coding@schilljs.com> + * + * @author Joas Schilling <coding@schilljs.com> + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +namespace OC\Core\Command\Background; + +use OC\Core\Command\InterruptedException; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class JobWorker extends JobBase { + private array $executedJobs = []; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct($jobList, $logger); + } + + protected function configure(): void { + parent::configure(); + + $this + ->setName('background-job:worker') + ->setDescription('Run a background job worker') + ->addArgument( + 'job-class', + InputArgument::OPTIONAL, + 'The class of the job in the database' + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Only execute the worker once (as a regular cron execution would do it)' + ) + ->addOption( + 'interval', + 'i', + InputOption::VALUE_OPTIONAL, + 'Interval in seconds in which the worker should repeat already processed jobs (set to 0 for no repeat)', + 5 + ) + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $jobClass = $input->getArgument('job-class'); + + if ($jobClass && !class_exists($jobClass)) { + $output->writeln('<error>Invalid job class</error>'); + return 1; + } + + while (true) { + // Handle canceling of the process + try { + $this->abortIfInterrupted(); + } catch (InterruptedException $e) { + $output->writeln('<comment>Cleaning up before quitting. Press Ctrl-C again to kill, but this may have unexpected side effects.</comment>'); + $this->unlockExecuted(); + $output->writeln('<info>Background job worker stopped</info>'); + break; + } + + $this->printSummary($input, $output); + + $interval = (int)($input->getOption('interval') ?? 5); + + // Unlock jobs that should be executed again after the interval + // Alternative could be to set last_checked to interval in the future to avoid the extra locks + foreach ($this->executedJobs as $id => $time) { + if ($time <= time() - $interval) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } + + usleep(50000); + $job = $this->jobList->getNext(false, $jobClass); + if (!$job) { + if ($input->getOption('once') === true || $interval === 0) { + break; + } + + $output->writeln("Waiting for new jobs to be queued", OutputInterface::VERBOSITY_VERBOSE); + // Re-check interval for new jobs + sleep(1); + continue; + } + + + if (isset($this->executedJobs[$job->getId()]) && ($this->executedJobs[$job->getId()] + $interval > time())) { + $output->writeln("<comment>Job already executed within timeframe " . get_class($job) . " " . $job->getId() . '</comment>', OutputInterface::VERBOSITY_VERBOSE); + continue; + } + + $output->writeln("Running job " . get_class($job) . " with ID " . $job->getId()); + + if ($output->isVerbose()) { + $this->printJobInfo($job->getId(), $job, $output); + } + + $job->execute($this->jobList, \OC::$server->getLogger()); + + // clean up after unclean jobs + \OC_Util::tearDownFS(); + \OC::$server->getTempManager()->clean(); + + $this->jobList->setLastJob($job); + $this->jobList->unlockJob($job); + $this->executedJobs[$job->getId()] = time(); + + if ($input->getOption('once') === true) { + break; + } + } + + $this->unlockExecuted(); + + return 0; + } + + private function printSummary(InputInterface $input, OutputInterface $output): void { + if (!$output->isVeryVerbose()) { + return; + } + $output->writeln("<comment>Summary</comment>"); + + $counts = []; + foreach ($this->jobList->countByClass() as $row) { + $counts[] = $row; + } + $this->writeTableInOutputFormat($input, $output, $counts); + } + + private function unlockExecuted() { + foreach ($this->executedJobs as $id => $time) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } +} diff --git a/core/Command/Background/Worker.php b/core/Command/Background/Worker.php deleted file mode 100644 index 61f8fbc495e..00000000000 --- a/core/Command/Background/Worker.php +++ /dev/null @@ -1,206 +0,0 @@ -<?php - -declare(strict_types=1); -/** - * @copyright Copyright (c) 2021, Joas Schilling <coding@schilljs.com> - * - * @author Joas Schilling <coding@schilljs.com> - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - */ - -namespace OC\Core\Command\Background; - -use OCP\BackgroundJob\IJob; -use OCP\BackgroundJob\IJobList; -use Psr\Log\LoggerInterface; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputArgument; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; - -class Worker extends Command { - protected IJobList $jobList; - protected LoggerInterface $logger; - - const DEFAULT_INTERVAL = 5; - - public function __construct(IJobList $jobList, - LoggerInterface $logger) { - parent::__construct(); - $this->jobList = $jobList; - $this->logger = $logger; - } - - protected function configure(): void { - $this - ->setName('background-job:worker') - ->setDescription('Run a background job worker') - ->addArgument( - 'job-class', - InputArgument::OPTIONAL, - 'The class of the job in the database' - ) - ->addOption( - 'once', - null, - InputOption::VALUE_NONE, - 'Only execute the worker once (as a regular cron execution would do it)' - ) - ; - } - - protected function execute(InputInterface $input, OutputInterface $output): int { - $jobClass = $input->getArgument('job-class'); - - $executedJobs = []; - - $ended = false; - pcntl_signal(SIGINT, function () use (&$ended, $output, $executedJobs) { - $output->writeln('SIGINT'); - if ($ended) { - foreach ($executedJobs as $id => $time) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - $output->writeln('<error>Killed'); - exit(1); - } - $ended = true; - $output->writeln('<comment>Waiting for job to finish. Press Ctrl-C again to kill, but this may have unexpected side effects.</comment>'); - }); - - while (true) { - if ($ended) { - break; - } - $count = 0; - $total = 0; - foreach($this->jobList->countByClass() as $row) { - if ((int)$row['count'] === 1) { - $count++; - } else { - $output->writeln($row['class'] . " " . $row['count']); - } - $total += $row['count']; - } - $output->writeln("Other jobs " . $count); - $output->writeln("Total jobs " . $count); - - - - foreach ($executedJobs as $id => $time) { - if ($time < time() - self::DEFAULT_INTERVAL) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - } - - $job = $this->jobList->getNext(false, $jobClass); - if (!$job) { - $output->writeln("Waiting for new jobs to be queued"); - sleep(1); - continue; - } - - - if (isset($executedJobs[$job->getId()])) { - continue; - } - - $output->writeln("- Running job " . get_class($job) . " " . $job->getId()); - - if ($output->isVerbose()) { - $this->printJobInfo($job->getId(), $job, $output); - } - - $job->execute($this->jobList, \OC::$server->getLogger()); - - // clean up after unclean jobs - \OC_Util::tearDownFS(); - \OC::$server->getTempManager()->clean(); - - $this->jobList->setLastJob($job); - $executedJobs[$job->getId()] = time(); - unset($job); - - if ($input->getOption('once')) { - break; - } - } - - foreach ($executedJobs as $id => $time) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - - return 0; - } - - protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void { - $row = $this->jobList->getDetailsById($jobId); - - $lastRun = new \DateTime(); - $lastRun->setTimestamp((int) $row['last_run']); - $lastChecked = new \DateTime(); - $lastChecked->setTimestamp((int) $row['last_checked']); - $reservedAt = new \DateTime(); - $reservedAt->setTimestamp((int) $row['reserved_at']); - - $output->writeln('Job class: ' . get_class($job)); - $output->writeln('Arguments: ' . json_encode($job->getArgument())); - - $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; - if ($isTimedJob) { - $output->writeln('Type: timed'); - } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { - $output->writeln('Type: queued'); - } else { - $output->writeln('Type: job'); - } - - $output->writeln(''); - $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); - if ((int) $row['reserved_at'] === 0) { - $output->writeln('Reserved at: -'); - } else { - $output->writeln('Reserved at: <comment>' . $reservedAt->format(\DateTimeInterface::ATOM) . '</comment>'); - } - $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); - $output->writeln('Last duration: ' . $row['execution_duration']); - - if ($isTimedJob) { - $reflection = new \ReflectionClass($job); - $intervalProperty = $reflection->getProperty('interval'); - $intervalProperty->setAccessible(true); - $interval = $intervalProperty->getValue($job); - - $nextRun = new \DateTime(); - $nextRun->setTimestamp($row['last_run'] + $interval); - - if ($nextRun > new \DateTime()) { - $output->writeln('Next execution: <comment>' . $nextRun->format(\DateTimeInterface::ATOM) . '</comment>'); - } else { - $output->writeln('Next execution: <info>' . $nextRun->format(\DateTimeInterface::ATOM) . '</info>'); - } - } - } -} diff --git a/core/register_command.php b/core/register_command.php index 96a821b6f8c..f3ae8efa300 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -94,7 +94,7 @@ if ($config->getSystemValueBool('installed', false)) { $application->add(Server::get(Command\Background\Job::class)); $application->add(Server::get(Command\Background\ListCommand::class)); $application->add(Server::get(Command\Background\Delete::class)); - $application->add(Server::get(Command\Background\Worker::class)); + $application->add(Server::get(Command\Background\JobWorker::class)); $application->add(Server::get(Command\Broadcast\Test::class)); |