@@ -47,6 +47,7 @@ import com.gitblit.transport.ssh.SshDaemon; | |||
import com.gitblit.utils.IdGenerator; | |||
import com.gitblit.utils.StringUtils; | |||
import com.gitblit.utils.TimeUtils; | |||
import com.gitblit.utils.WorkQueue; | |||
/** | |||
* Services manager manages long-running services/processes that either have no | |||
@@ -66,6 +67,10 @@ public class ServicesManager implements IManager { | |||
private final IGitblit gitblit; | |||
private final IdGenerator idGenerator; | |||
private final WorkQueue workQueue; | |||
private FanoutService fanoutService; | |||
private GitDaemon gitDaemon; | |||
@@ -75,6 +80,8 @@ public class ServicesManager implements IManager { | |||
public ServicesManager(IGitblit gitblit) { | |||
this.settings = gitblit.getSettings(); | |||
this.gitblit = gitblit; | |||
this.idGenerator = new IdGenerator(); | |||
this.workQueue = new WorkQueue(idGenerator, 1); | |||
} | |||
@Override | |||
@@ -99,6 +106,7 @@ public class ServicesManager implements IManager { | |||
if (sshDaemon != null) { | |||
sshDaemon.stop(); | |||
} | |||
workQueue.stop(); | |||
return this; | |||
} | |||
@@ -158,7 +166,7 @@ public class ServicesManager implements IManager { | |||
String bindInterface = settings.getString(Keys.git.sshBindInterface, "localhost"); | |||
if (port > 0) { | |||
try { | |||
sshDaemon = new SshDaemon(gitblit, new IdGenerator()); | |||
sshDaemon = new SshDaemon(gitblit, workQueue); | |||
sshDaemon.start(); | |||
} catch (IOException e) { | |||
sshDaemon = null; |
@@ -41,9 +41,9 @@ import com.gitblit.IStoredSettings; | |||
import com.gitblit.Keys; | |||
import com.gitblit.manager.IGitblit; | |||
import com.gitblit.transport.ssh.commands.SshCommandFactory; | |||
import com.gitblit.utils.IdGenerator; | |||
import com.gitblit.utils.JnaUtils; | |||
import com.gitblit.utils.StringUtils; | |||
import com.gitblit.utils.WorkQueue; | |||
import com.google.common.io.Files; | |||
/** | |||
@@ -76,8 +76,9 @@ public class SshDaemon { | |||
* Construct the Gitblit SSH daemon. | |||
* | |||
* @param gitblit | |||
* @param workQueue | |||
*/ | |||
public SshDaemon(IGitblit gitblit, IdGenerator idGenerator) { | |||
public SshDaemon(IGitblit gitblit, WorkQueue workQueue) { | |||
this.gitblit = gitblit; | |||
IStoredSettings settings = gitblit.getSettings(); | |||
@@ -126,7 +127,7 @@ public class SshDaemon { | |||
sshd.setSessionFactory(new SshServerSessionFactory()); | |||
sshd.setFileSystemFactory(new DisabledFilesystemFactory()); | |||
sshd.setTcpipForwardingFilter(new NonForwardingFilter()); | |||
sshd.setCommandFactory(new SshCommandFactory(gitblit, idGenerator)); | |||
sshd.setCommandFactory(new SshCommandFactory(gitblit, workQueue)); | |||
sshd.setShellFactory(new WelcomeShell(settings)); | |||
// Set the server id. This can be queried with: |
@@ -33,12 +33,13 @@ import org.apache.sshd.server.Environment; | |||
import org.apache.sshd.server.ExitCallback; | |||
import org.apache.sshd.server.SessionAware; | |||
import org.apache.sshd.server.session.ServerSession; | |||
import org.kohsuke.args4j.Argument; | |||
import org.kohsuke.args4j.CmdLineException; | |||
import org.kohsuke.args4j.Option; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import com.gitblit.Keys; | |||
import com.gitblit.utils.IdGenerator; | |||
import com.gitblit.utils.StringUtils; | |||
import com.gitblit.utils.WorkQueue; | |||
import com.gitblit.utils.WorkQueue.CancelableRunnable; | |||
@@ -80,13 +81,10 @@ public abstract class BaseCommand implements Command, SessionAware { | |||
/** The task, as scheduled on a worker thread. */ | |||
private final AtomicReference<Future<?>> task; | |||
private final WorkQueue.Executor executor; | |||
private WorkQueue workQueue; | |||
public BaseCommand() { | |||
task = Atomics.newReference(); | |||
IdGenerator gen = new IdGenerator(); | |||
WorkQueue w = new WorkQueue(gen); | |||
this.executor = w.getDefaultQueue(); | |||
} | |||
@Override | |||
@@ -97,6 +95,10 @@ public abstract class BaseCommand implements Command, SessionAware { | |||
@Override | |||
public void destroy() { | |||
log.debug("destroying " + getClass().getName()); | |||
Future<?> future = task.getAndSet(null); | |||
if (future != null && !future.isDone()) { | |||
future.cancel(true); | |||
} | |||
session = null; | |||
ctx = null; | |||
} | |||
@@ -110,12 +112,21 @@ public abstract class BaseCommand implements Command, SessionAware { | |||
protected void provideStateTo(final BaseCommand cmd) { | |||
cmd.setContext(ctx); | |||
cmd.setWorkQueue(workQueue); | |||
cmd.setInputStream(in); | |||
cmd.setOutputStream(out); | |||
cmd.setErrorStream(err); | |||
cmd.setExitCallback(exit); | |||
} | |||
public WorkQueue getWorkQueue() { | |||
return workQueue; | |||
} | |||
public void setWorkQueue(WorkQueue workQueue) { | |||
this.workQueue = workQueue; | |||
} | |||
public void setContext(SshCommandContext ctx) { | |||
this.ctx = ctx; | |||
} | |||
@@ -467,7 +478,7 @@ public abstract class BaseCommand implements Command, SessionAware { | |||
*/ | |||
protected void startThread(final CommandRunnable thunk) { | |||
final TaskThunk tt = new TaskThunk(thunk); | |||
task.set(executor.submit(tt)); | |||
task.set(workQueue.getDefaultQueue().submit(tt)); | |||
} | |||
/** Thrown from {@link CommandRunnable#run()} with client message and code. */ |
@@ -154,6 +154,7 @@ public abstract class DispatchCommand extends BaseCommand implements ExtensionPo | |||
try { | |||
dispatcher.setContext(getContext()); | |||
dispatcher.setWorkQueue(getWorkQueue()); | |||
dispatcher.setup(); | |||
if (dispatcher.commands.isEmpty() && dispatcher.dispatchers.isEmpty()) { | |||
log.debug(MessageFormat.format("excluding empty dispatcher {0} for {1}", |
@@ -26,6 +26,7 @@ import com.gitblit.manager.IGitblit; | |||
import com.gitblit.transport.ssh.SshDaemonClient; | |||
import com.gitblit.transport.ssh.git.GitDispatcher; | |||
import com.gitblit.transport.ssh.keys.KeysDispatcher; | |||
import com.gitblit.utils.WorkQueue; | |||
/** | |||
* The root dispatcher is the dispatch command that handles registering all | |||
@@ -37,9 +38,10 @@ class RootDispatcher extends DispatchCommand { | |||
private Logger log = LoggerFactory.getLogger(getClass()); | |||
public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine) { | |||
public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine, WorkQueue workQueue) { | |||
super(); | |||
setContext(new SshCommandContext(gitblit, client, cmdLine)); | |||
setWorkQueue(workQueue); | |||
register(VersionCommand.class); | |||
register(GitDispatcher.class); |
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; | |||
import com.gitblit.Keys; | |||
import com.gitblit.manager.IGitblit; | |||
import com.gitblit.transport.ssh.SshDaemonClient; | |||
import com.gitblit.utils.IdGenerator; | |||
import com.gitblit.utils.WorkQueue; | |||
import com.google.common.util.concurrent.Atomics; | |||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |||
@@ -48,15 +47,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; | |||
public class SshCommandFactory implements CommandFactory { | |||
private static final Logger logger = LoggerFactory.getLogger(SshCommandFactory.class); | |||
private final WorkQueue workQueue; | |||
private final IGitblit gitblit; | |||
private final ScheduledExecutorService startExecutor; | |||
private final ExecutorService destroyExecutor; | |||
public SshCommandFactory(IGitblit gitblit, IdGenerator idGenerator) { | |||
public SshCommandFactory(IGitblit gitblit, WorkQueue workQueue) { | |||
this.gitblit = gitblit; | |||
this.workQueue = workQueue; | |||
int threads = gitblit.getSettings().getInteger(Keys.git.sshCommandStartThreads, 2); | |||
WorkQueue workQueue = new WorkQueue(idGenerator); | |||
startExecutor = workQueue.createQueue(threads, "SshCommandStart"); | |||
destroyExecutor = Executors.newSingleThreadExecutor( | |||
new ThreadFactoryBuilder() | |||
@@ -70,7 +70,7 @@ public class SshCommandFactory implements CommandFactory { | |||
} | |||
public RootDispatcher createRootDispatcher(SshDaemonClient client, String commandLine) { | |||
return new RootDispatcher(gitblit, client, commandLine); | |||
return new RootDispatcher(gitblit, client, commandLine, workQueue); | |||
} | |||
@Override |
@@ -51,17 +51,19 @@ public class WorkQueue { | |||
private Executor defaultQueue; | |||
private final IdGenerator idGenerator; | |||
private final int defaultQueueSize; | |||
private final CopyOnWriteArrayList<Executor> queues; | |||
public WorkQueue(final IdGenerator idGenerator) { | |||
public WorkQueue(final IdGenerator idGenerator, final int defaultQueueSize) { | |||
this.idGenerator = idGenerator; | |||
this.defaultQueueSize = defaultQueueSize; | |||
this.queues = new CopyOnWriteArrayList<Executor>(); | |||
} | |||
/** Get the default work queue, for miscellaneous tasks. */ | |||
public synchronized Executor getDefaultQueue() { | |||
if (defaultQueue == null) { | |||
defaultQueue = createQueue(1, "WorkQueue"); | |||
defaultQueue = createQueue(defaultQueueSize, "WorkQueue"); | |||
} | |||
return defaultQueue; | |||
} |