import com.gitblit.utils.IdGenerator;
import com.gitblit.utils.StringUtils;
import com.gitblit.utils.TimeUtils;
+import com.gitblit.utils.WorkQueue;
/**
* Services manager manages long-running services/processes that either have no
private final IGitblit gitblit;
+ private final IdGenerator idGenerator;
+
+ private final WorkQueue workQueue;
+
private FanoutService fanoutService;
private GitDaemon gitDaemon;
public ServicesManager(IGitblit gitblit) {
this.settings = gitblit.getSettings();
this.gitblit = gitblit;
+ this.idGenerator = new IdGenerator();
+ this.workQueue = new WorkQueue(idGenerator, 1);
}
@Override
if (sshDaemon != null) {
sshDaemon.stop();
}
+ workQueue.stop();
return this;
}
String bindInterface = settings.getString(Keys.git.sshBindInterface, "localhost");
if (port > 0) {
try {
- sshDaemon = new SshDaemon(gitblit, new IdGenerator());
+ sshDaemon = new SshDaemon(gitblit, workQueue);
sshDaemon.start();
} catch (IOException e) {
sshDaemon = null;
import com.gitblit.Keys;
import com.gitblit.manager.IGitblit;
import com.gitblit.transport.ssh.commands.SshCommandFactory;
-import com.gitblit.utils.IdGenerator;
import com.gitblit.utils.JnaUtils;
import com.gitblit.utils.StringUtils;
+import com.gitblit.utils.WorkQueue;
import com.google.common.io.Files;
/**
* Construct the Gitblit SSH daemon.
*
* @param gitblit
+ * @param workQueue
*/
- public SshDaemon(IGitblit gitblit, IdGenerator idGenerator) {
+ public SshDaemon(IGitblit gitblit, WorkQueue workQueue) {
this.gitblit = gitblit;
IStoredSettings settings = gitblit.getSettings();
sshd.setSessionFactory(new SshServerSessionFactory());
sshd.setFileSystemFactory(new DisabledFilesystemFactory());
sshd.setTcpipForwardingFilter(new NonForwardingFilter());
- sshd.setCommandFactory(new SshCommandFactory(gitblit, idGenerator));
+ sshd.setCommandFactory(new SshCommandFactory(gitblit, workQueue));
sshd.setShellFactory(new WelcomeShell(settings));
// Set the server id. This can be queried with:
import org.apache.sshd.server.ExitCallback;
import org.apache.sshd.server.SessionAware;
import org.apache.sshd.server.session.ServerSession;
+import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gitblit.Keys;
-import com.gitblit.utils.IdGenerator;
import com.gitblit.utils.StringUtils;
import com.gitblit.utils.WorkQueue;
import com.gitblit.utils.WorkQueue.CancelableRunnable;
/** The task, as scheduled on a worker thread. */
private final AtomicReference<Future<?>> task;
- private final WorkQueue.Executor executor;
+ private WorkQueue workQueue;
public BaseCommand() {
task = Atomics.newReference();
- IdGenerator gen = new IdGenerator();
- WorkQueue w = new WorkQueue(gen);
- this.executor = w.getDefaultQueue();
}
@Override
@Override
public void destroy() {
log.debug("destroying " + getClass().getName());
+ Future<?> future = task.getAndSet(null);
+ if (future != null && !future.isDone()) {
+ future.cancel(true);
+ }
session = null;
ctx = null;
}
protected void provideStateTo(final BaseCommand cmd) {
cmd.setContext(ctx);
+ cmd.setWorkQueue(workQueue);
cmd.setInputStream(in);
cmd.setOutputStream(out);
cmd.setErrorStream(err);
cmd.setExitCallback(exit);
}
+ public WorkQueue getWorkQueue() {
+ return workQueue;
+ }
+
+ public void setWorkQueue(WorkQueue workQueue) {
+ this.workQueue = workQueue;
+ }
+
public void setContext(SshCommandContext ctx) {
this.ctx = ctx;
}
*/
protected void startThread(final CommandRunnable thunk) {
final TaskThunk tt = new TaskThunk(thunk);
- task.set(executor.submit(tt));
+ task.set(workQueue.getDefaultQueue().submit(tt));
}
/** Thrown from {@link CommandRunnable#run()} with client message and code. */
try {
dispatcher.setContext(getContext());
+ dispatcher.setWorkQueue(getWorkQueue());
dispatcher.setup();
if (dispatcher.commands.isEmpty() && dispatcher.dispatchers.isEmpty()) {
log.debug(MessageFormat.format("excluding empty dispatcher {0} for {1}",
import com.gitblit.transport.ssh.SshDaemonClient;
import com.gitblit.transport.ssh.git.GitDispatcher;
import com.gitblit.transport.ssh.keys.KeysDispatcher;
+import com.gitblit.utils.WorkQueue;
/**
* The root dispatcher is the dispatch command that handles registering all
private Logger log = LoggerFactory.getLogger(getClass());
- public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine) {
+ public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine, WorkQueue workQueue) {
super();
setContext(new SshCommandContext(gitblit, client, cmdLine));
+ setWorkQueue(workQueue);
register(VersionCommand.class);
register(GitDispatcher.class);
import com.gitblit.Keys;
import com.gitblit.manager.IGitblit;
import com.gitblit.transport.ssh.SshDaemonClient;
-import com.gitblit.utils.IdGenerator;
import com.gitblit.utils.WorkQueue;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class SshCommandFactory implements CommandFactory {
private static final Logger logger = LoggerFactory.getLogger(SshCommandFactory.class);
+ private final WorkQueue workQueue;
private final IGitblit gitblit;
private final ScheduledExecutorService startExecutor;
private final ExecutorService destroyExecutor;
- public SshCommandFactory(IGitblit gitblit, IdGenerator idGenerator) {
+ public SshCommandFactory(IGitblit gitblit, WorkQueue workQueue) {
this.gitblit = gitblit;
+ this.workQueue = workQueue;
int threads = gitblit.getSettings().getInteger(Keys.git.sshCommandStartThreads, 2);
- WorkQueue workQueue = new WorkQueue(idGenerator);
startExecutor = workQueue.createQueue(threads, "SshCommandStart");
destroyExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
}
public RootDispatcher createRootDispatcher(SshDaemonClient client, String commandLine) {
- return new RootDispatcher(gitblit, client, commandLine);
+ return new RootDispatcher(gitblit, client, commandLine, workQueue);
}
@Override
private Executor defaultQueue;
private final IdGenerator idGenerator;
+ private final int defaultQueueSize;
private final CopyOnWriteArrayList<Executor> queues;
- public WorkQueue(final IdGenerator idGenerator) {
+ public WorkQueue(final IdGenerator idGenerator, final int defaultQueueSize) {
this.idGenerator = idGenerator;
+ this.defaultQueueSize = defaultQueueSize;
this.queues = new CopyOnWriteArrayList<Executor>();
}
/** Get the default work queue, for miscellaneous tasks. */
public synchronized Executor getDefaultQueue() {
if (defaultQueue == null) {
- defaultQueue = createQueue(1, "WorkQueue");
+ defaultQueue = createQueue(defaultQueueSize, "WorkQueue");
}
return defaultQueue;
}