aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorMorris Jobke <hey@morrisjobke.de>2017-09-04 14:22:06 +0200
committerGitHub <noreply@github.com>2017-09-04 14:22:06 +0200
commit4fd3240b5f0d2f8a42675fa499c06a7f35b726fe (patch)
tree9d640ebb8f88780acfdd449f121aff1f93c87252 /lib
parent30ca3b70edea01dea092e495db1acf85183bfb80 (diff)
parent1ebf91ec16c00ea0696defe70d29884e2819237c (diff)
downloadnextcloud-server-4fd3240b5f0d2f8a42675fa499c06a7f35b726fe.tar.gz
nextcloud-server-4fd3240b5f0d2f8a42675fa499c06a7f35b726fe.zip
Merge pull request #6254 from nextcloud/async-bus-split
Allow configuring different command bus backends
Diffstat (limited to 'lib')
-rw-r--r--lib/private/Command/AsyncBus.php55
-rw-r--r--lib/private/Command/CronBus.php75
-rw-r--r--lib/private/Server.php73
3 files changed, 126 insertions, 77 deletions
diff --git a/lib/private/Command/AsyncBus.php b/lib/private/Command/AsyncBus.php
index fb3cbee7240..2dffc9c784d 100644
--- a/lib/private/Command/AsyncBus.php
+++ b/lib/private/Command/AsyncBus.php
@@ -24,17 +24,11 @@ namespace OC\Command;
use OCP\Command\IBus;
use OCP\Command\ICommand;
-use SuperClosure\Serializer;
/**
* Asynchronous command bus that uses the background job system as backend
*/
-class AsyncBus implements IBus {
- /**
- * @var \OCP\BackgroundJob\IJobList
- */
- private $jobList;
-
+abstract class AsyncBus implements IBus {
/**
* List of traits for command which require sync execution
*
@@ -43,26 +37,26 @@ class AsyncBus implements IBus {
private $syncTraits = [];
/**
- * @param \OCP\BackgroundJob\IJobList $jobList
- */
- public function __construct($jobList) {
- $this->jobList = $jobList;
- }
-
- /**
* Schedule a command to be fired
*
* @param \OCP\Command\ICommand | callable $command
*/
public function push($command) {
if ($this->canRunAsync($command)) {
- $this->jobList->add($this->getJobClass($command), $this->serializeCommand($command));
+ $this->queueCommand($command);
} else {
$this->runCommand($command);
}
}
/**
+ * Queue a command in the bus
+ *
+ * @param \OCP\Command\ICommand | callable $command
+ */
+ abstract protected function queueCommand($command);
+
+ /**
* Require all commands using a trait to be run synchronous
*
* @param string $trait
@@ -84,37 +78,6 @@ class AsyncBus implements IBus {
/**
* @param \OCP\Command\ICommand | callable $command
- * @return string
- */
- private function getJobClass($command) {
- if ($command instanceof \Closure) {
- return 'OC\Command\ClosureJob';
- } else if (is_callable($command)) {
- return 'OC\Command\CallableJob';
- } else if ($command instanceof ICommand) {
- return 'OC\Command\CommandJob';
- } else {
- throw new \InvalidArgumentException('Invalid command');
- }
- }
-
- /**
- * @param \OCP\Command\ICommand | callable $command
- * @return string
- */
- private function serializeCommand($command) {
- if ($command instanceof \Closure) {
- $serializer = new Serializer();
- return $serializer->serialize($command);
- } else if (is_callable($command) or $command instanceof ICommand) {
- return serialize($command);
- } else {
- throw new \InvalidArgumentException('Invalid command');
- }
- }
-
- /**
- * @param \OCP\Command\ICommand | callable $command
* @return bool
*/
private function canRunAsync($command) {
diff --git a/lib/private/Command/CronBus.php b/lib/private/Command/CronBus.php
new file mode 100644
index 00000000000..9bde4d88242
--- /dev/null
+++ b/lib/private/Command/CronBus.php
@@ -0,0 +1,75 @@
+<?php
+/**
+ * @copyright Copyright (c) 2017 Robin Appelman <robin@icewind.nl>
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace OC\Command;
+
+use OCP\Command\ICommand;
+use SuperClosure\Serializer;
+
+class CronBus extends AsyncBus {
+ /**
+ * @var \OCP\BackgroundJob\IJobList
+ */
+ private $jobList;
+
+
+ /**
+ * @param \OCP\BackgroundJob\IJobList $jobList
+ */
+ public function __construct($jobList) {
+ $this->jobList = $jobList;
+ }
+
+ protected function queueCommand($command) {
+ $this->jobList->add($this->getJobClass($command), $this->serializeCommand($command));
+ }
+
+ /**
+ * @param \OCP\Command\ICommand | callable $command
+ * @return string
+ */
+ private function getJobClass($command) {
+ if ($command instanceof \Closure) {
+ return 'OC\Command\ClosureJob';
+ } else if (is_callable($command)) {
+ return 'OC\Command\CallableJob';
+ } else if ($command instanceof ICommand) {
+ return 'OC\Command\CommandJob';
+ } else {
+ throw new \InvalidArgumentException('Invalid command');
+ }
+ }
+
+ /**
+ * @param \OCP\Command\ICommand | callable $command
+ * @return string
+ */
+ private function serializeCommand($command) {
+ if ($command instanceof \Closure) {
+ $serializer = new Serializer();
+ return $serializer->serialize($command);
+ } else if (is_callable($command) or $command instanceof ICommand) {
+ return serialize($command);
+ } else {
+ throw new \InvalidArgumentException('Invalid command');
+ }
+ }
+}
diff --git a/lib/private/Server.php b/lib/private/Server.php
index 18f09eb30b7..3a4fac175e8 100644
--- a/lib/private/Server.php
+++ b/lib/private/Server.php
@@ -39,6 +39,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>
*
*/
+
namespace OC;
use bantu\IniGetWrapper\IniGetWrapper;
@@ -51,7 +52,7 @@ use OC\AppFramework\Http\Request;
use OC\AppFramework\Utility\SimpleContainer;
use OC\AppFramework\Utility\TimeFactory;
use OC\Authentication\LoginCredentials\Store;
-use OC\Command\AsyncBus;
+use OC\Command\CronBus;
use OC\Contacts\ContactsMenu\ActionFactory;
use OC\Diagnostics\EventLogger;
use OC\Diagnostics\NullEventLogger;
@@ -140,7 +141,7 @@ class Server extends ServerContainer implements IServerContainer {
parent::__construct();
$this->webRoot = $webRoot;
- $this->registerService(\OCP\IServerContainer::class, function(IServerContainer $c) {
+ $this->registerService(\OCP\IServerContainer::class, function (IServerContainer $c) {
return $c;
});
@@ -150,7 +151,6 @@ class Server extends ServerContainer implements IServerContainer {
$this->registerAlias(IActionFactory::class, ActionFactory::class);
-
$this->registerService(\OCP\IPreview::class, function (Server $c) {
return new PreviewManager(
$c->getConfig(),
@@ -257,8 +257,8 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('SystemTagObjectMapper', \OCP\SystemTag\ISystemTagObjectMapper::class);
- $this->registerService(\OCP\Files\IRootFolder::class, function(Server $c) {
- return new LazyRoot(function() use ($c) {
+ $this->registerService(\OCP\Files\IRootFolder::class, function (Server $c) {
+ return new LazyRoot(function () use ($c) {
return $c->query('RootFolder');
});
});
@@ -296,7 +296,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('GroupManager', \OCP\IGroupManager::class);
- $this->registerService(Store::class, function(Server $c) {
+ $this->registerService(Store::class, function (Server $c) {
$session = $c->getSession();
if (\OC::$server->getSystemConfig()->getValue('installed', false)) {
$tokenProvider = $c->query('OC\Authentication\Token\IProvider');
@@ -544,7 +544,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('Search', \OCP\ISearch::class);
- $this->registerService(\OC\Security\RateLimiting\Limiter::class, function($c) {
+ $this->registerService(\OC\Security\RateLimiting\Limiter::class, function ($c) {
return new \OC\Security\RateLimiting\Limiter(
$this->getUserSession(),
$this->getRequest(),
@@ -552,7 +552,7 @@ class Server extends ServerContainer implements IServerContainer {
$c->query(\OC\Security\RateLimiting\Backend\IBackend::class)
);
});
- $this->registerService(\OC\Security\RateLimiting\Backend\IBackend::class, function($c) {
+ $this->registerService(\OC\Security\RateLimiting\Backend\IBackend::class, function ($c) {
return new \OC\Security\RateLimiting\Backend\MemoryCache(
$this->getMemCacheFactory(),
new \OC\AppFramework\Utility\TimeFactory()
@@ -686,7 +686,7 @@ class Server extends ServerContainer implements IServerContainer {
$this->registerService(\OCP\Files\Config\IMountProviderCollection::class, function (Server $c) {
$loader = \OC\Files\Filesystem::getLoader();
$mountCache = $c->query('UserMountCache');
- $manager = new \OC\Files\Config\MountProviderCollection($loader, $mountCache);
+ $manager = new \OC\Files\Config\MountProviderCollection($loader, $mountCache);
// builtin providers
@@ -703,13 +703,24 @@ class Server extends ServerContainer implements IServerContainer {
return new IniGetWrapper();
});
$this->registerService('AsyncCommandBus', function (Server $c) {
- $jobList = $c->getJobList();
- return new AsyncBus($jobList);
+ $busClass = $c->getConfig()->getSystemValue('commandbus');
+ if ($busClass) {
+ list($app, $class) = explode('::', $busClass, 2);
+ if ($c->getAppManager()->isInstalled($app)) {
+ \OC_App::loadApp($app);
+ return $c->query($class);
+ } else {
+ throw new ServiceUnavailableException("The app providing the command bus ($app) is not enabled");
+ }
+ } else {
+ $jobList = $c->getJobList();
+ return new CronBus($jobList);
+ }
});
$this->registerService('TrustedDomainHelper', function ($c) {
return new TrustedDomainHelper($this->getConfig());
});
- $this->registerService('Throttler', function(Server $c) {
+ $this->registerService('Throttler', function (Server $c) {
return new Throttler(
$c->getDatabaseConnection(),
new TimeFactory(),
@@ -720,7 +731,7 @@ class Server extends ServerContainer implements IServerContainer {
$this->registerService('IntegrityCodeChecker', function (Server $c) {
// IConfig and IAppManager requires a working database. This code
// might however be called when ownCloud is not yet setup.
- if(\OC::$server->getSystemConfig()->getValue('installed', false)) {
+ if (\OC::$server->getSystemConfig()->getValue('installed', false)) {
$config = $c->getConfig();
$appManager = $c->getAppManager();
} else {
@@ -729,13 +740,13 @@ class Server extends ServerContainer implements IServerContainer {
}
return new Checker(
- new EnvironmentHelper(),
- new FileAccessHelper(),
- new AppLocator(),
- $config,
- $c->getMemCacheFactory(),
- $appManager,
- $c->getTempManager()
+ new EnvironmentHelper(),
+ new FileAccessHelper(),
+ new AppLocator(),
+ $config,
+ $c->getMemCacheFactory(),
+ $appManager,
+ $c->getTempManager()
);
});
$this->registerService(\OCP\IRequest::class, function ($c) {
@@ -785,10 +796,10 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('Mailer', \OCP\Mail\IMailer::class);
- $this->registerService('LDAPProvider', function(Server $c) {
+ $this->registerService('LDAPProvider', function (Server $c) {
$config = $c->getConfig();
$factoryClass = $config->getSystemValue('ldapProviderFactory', null);
- if(is_null($factoryClass)) {
+ if (is_null($factoryClass)) {
throw new \Exception('ldapProviderFactory not set');
}
/** @var \OCP\LDAP\ILDAPProviderFactory $factory */
@@ -854,7 +865,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('CapabilitiesManager', \OC\CapabilitiesManager::class);
- $this->registerService(\OCP\Comments\ICommentsManager::class, function(Server $c) {
+ $this->registerService(\OCP\Comments\ICommentsManager::class, function (Server $c) {
$config = $c->getConfig();
$factoryClass = $config->getSystemValue('comments.managerFactory', '\OC\Comments\ManagerFactory');
/** @var \OCP\Comments\ICommentsManagerFactory $factory */
@@ -863,7 +874,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('CommentsManager', \OCP\Comments\ICommentsManager::class);
- $this->registerService('ThemingDefaults', function(Server $c) {
+ $this->registerService('ThemingDefaults', function (Server $c) {
/*
* Dark magic for autoloader.
* If we do a class_exists it will try to load the class which will
@@ -889,7 +900,7 @@ class Server extends ServerContainer implements IServerContainer {
}
return new \OC_Defaults();
});
- $this->registerService(SCSSCacher::class, function(Server $c) {
+ $this->registerService(SCSSCacher::class, function (Server $c) {
/** @var Factory $cacheFactory */
$cacheFactory = $c->query(Factory::class);
return new SCSSCacher(
@@ -949,14 +960,14 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('ContentSecurityPolicyManager', \OCP\Security\IContentSecurityPolicyManager::class);
- $this->registerService('ContentSecurityPolicyNonceManager', function(Server $c) {
+ $this->registerService('ContentSecurityPolicyNonceManager', function (Server $c) {
return new ContentSecurityPolicyNonceManager(
$c->getCsrfTokenManager(),
$c->getRequest()
);
});
- $this->registerService(\OCP\Share\IManager::class, function(Server $c) {
+ $this->registerService(\OCP\Share\IManager::class, function (Server $c) {
$config = $c->getConfig();
$factoryClass = $config->getSystemValue('sharing.managerFactory', '\OC\Share20\ProviderFactory');
/** @var \OCP\Share\IProviderFactory $factory */
@@ -983,7 +994,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('ShareManager', \OCP\Share\IManager::class);
- $this->registerService('SettingsManager', function(Server $c) {
+ $this->registerService('SettingsManager', function (Server $c) {
$manager = new \OC\Settings\Manager(
$c->getLogger(),
$c->getDatabaseConnection(),
@@ -1011,7 +1022,7 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerService('LockdownManager', function (Server $c) {
- return new LockdownManager(function() use ($c) {
+ return new LockdownManager(function () use ($c) {
return $c->getSession();
});
});
@@ -1047,11 +1058,11 @@ class Server extends ServerContainer implements IServerContainer {
});
$this->registerAlias('Defaults', \OCP\Defaults::class);
- $this->registerService(\OCP\ISession::class, function(SimpleContainer $c) {
+ $this->registerService(\OCP\ISession::class, function (SimpleContainer $c) {
return $c->query(\OCP\IUserSession::class)->getSession();
});
- $this->registerService(IShareHelper::class, function(Server $c) {
+ $this->registerService(IShareHelper::class, function (Server $c) {
return new ShareHelper(
$c->query(\OCP\Share\IManager::class)
);