]> source.dussan.org Git - gitblit.git/commitdiff
Fix thread exhaustion in SSH daemon 89/89/1
authorJames Moger <james.moger@gitblit.com>
Thu, 29 May 2014 15:48:37 +0000 (11:48 -0400)
committerJames Moger <james.moger@gitblit.com>
Thu, 29 May 2014 15:48:37 +0000 (11:48 -0400)
src/main/java/com/gitblit/manager/ServicesManager.java
src/main/java/com/gitblit/transport/ssh/SshDaemon.java
src/main/java/com/gitblit/transport/ssh/commands/BaseCommand.java
src/main/java/com/gitblit/transport/ssh/commands/DispatchCommand.java
src/main/java/com/gitblit/transport/ssh/commands/RootDispatcher.java
src/main/java/com/gitblit/transport/ssh/commands/SshCommandFactory.java
src/main/java/com/gitblit/utils/WorkQueue.java

index e0fc8bbdb2834868b03be7e4ab51124c256b911e..b1c97ba4b1c5b391b8ad7066e66e3c6bd0ab150f 100644 (file)
@@ -47,6 +47,7 @@ import com.gitblit.transport.ssh.SshDaemon;
 import com.gitblit.utils.IdGenerator;
 import com.gitblit.utils.StringUtils;
 import com.gitblit.utils.TimeUtils;
+import com.gitblit.utils.WorkQueue;
 
 /**
  * Services manager manages long-running services/processes that either have no
@@ -66,6 +67,10 @@ public class ServicesManager implements IManager {
 
        private final IGitblit gitblit;
 
+       private final IdGenerator idGenerator;
+
+       private final WorkQueue workQueue;
+
        private FanoutService fanoutService;
 
        private GitDaemon gitDaemon;
@@ -75,6 +80,8 @@ public class ServicesManager implements IManager {
        public ServicesManager(IGitblit gitblit) {
                this.settings = gitblit.getSettings();
                this.gitblit = gitblit;
+               this.idGenerator = new IdGenerator();
+               this.workQueue = new WorkQueue(idGenerator, 1);
        }
 
        @Override
@@ -99,6 +106,7 @@ public class ServicesManager implements IManager {
                if (sshDaemon != null) {
                        sshDaemon.stop();
                }
+               workQueue.stop();
                return this;
        }
 
@@ -158,7 +166,7 @@ public class ServicesManager implements IManager {
                String bindInterface = settings.getString(Keys.git.sshBindInterface, "localhost");
                if (port > 0) {
                        try {
-                               sshDaemon = new SshDaemon(gitblit, new IdGenerator());
+                               sshDaemon = new SshDaemon(gitblit, workQueue);
                                sshDaemon.start();
                        } catch (IOException e) {
                                sshDaemon = null;
index 4d64cfb775fdc7ff4f36aa5841606d7bca7346e4..7c5129073a7974fec7632512eeda299460b4ea85 100644 (file)
@@ -41,9 +41,9 @@ import com.gitblit.IStoredSettings;
 import com.gitblit.Keys;
 import com.gitblit.manager.IGitblit;
 import com.gitblit.transport.ssh.commands.SshCommandFactory;
-import com.gitblit.utils.IdGenerator;
 import com.gitblit.utils.JnaUtils;
 import com.gitblit.utils.StringUtils;
+import com.gitblit.utils.WorkQueue;
 import com.google.common.io.Files;
 
 /**
@@ -76,8 +76,9 @@ public class SshDaemon {
         * Construct the Gitblit SSH daemon.
         *
         * @param gitblit
+        * @param workQueue
         */
-       public SshDaemon(IGitblit gitblit, IdGenerator idGenerator) {
+       public SshDaemon(IGitblit gitblit, WorkQueue workQueue) {
                this.gitblit = gitblit;
 
                IStoredSettings settings = gitblit.getSettings();
@@ -126,7 +127,7 @@ public class SshDaemon {
                sshd.setSessionFactory(new SshServerSessionFactory());
                sshd.setFileSystemFactory(new DisabledFilesystemFactory());
                sshd.setTcpipForwardingFilter(new NonForwardingFilter());
-               sshd.setCommandFactory(new SshCommandFactory(gitblit, idGenerator));
+               sshd.setCommandFactory(new SshCommandFactory(gitblit, workQueue));
                sshd.setShellFactory(new WelcomeShell(settings));
 
                // Set the server id.  This can be queried with:
index d996ea9a9356c2e1ce81573ae87a72824d6acd37..ab2756d02b767752a20319e74e90ee26d7def8fc 100644 (file)
@@ -33,12 +33,13 @@ import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
 import org.apache.sshd.server.SessionAware;
 import org.apache.sshd.server.session.ServerSession;
+import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.gitblit.Keys;
-import com.gitblit.utils.IdGenerator;
 import com.gitblit.utils.StringUtils;
 import com.gitblit.utils.WorkQueue;
 import com.gitblit.utils.WorkQueue.CancelableRunnable;
@@ -80,13 +81,10 @@ public abstract class BaseCommand implements Command, SessionAware {
        /** The task, as scheduled on a worker thread. */
        private final AtomicReference<Future<?>> task;
 
-       private final WorkQueue.Executor executor;
+       private WorkQueue workQueue;
 
        public BaseCommand() {
                task = Atomics.newReference();
-               IdGenerator gen = new IdGenerator();
-               WorkQueue w = new WorkQueue(gen);
-               this.executor = w.getDefaultQueue();
        }
 
        @Override
@@ -97,6 +95,10 @@ public abstract class BaseCommand implements Command, SessionAware {
        @Override
        public void destroy() {
                log.debug("destroying " + getClass().getName());
+               Future<?> future = task.getAndSet(null);
+               if (future != null && !future.isDone()) {
+                       future.cancel(true);
+               }
                session = null;
                ctx = null;
        }
@@ -110,12 +112,21 @@ public abstract class BaseCommand implements Command, SessionAware {
 
        protected void provideStateTo(final BaseCommand cmd) {
                cmd.setContext(ctx);
+               cmd.setWorkQueue(workQueue);
                cmd.setInputStream(in);
                cmd.setOutputStream(out);
                cmd.setErrorStream(err);
                cmd.setExitCallback(exit);
        }
 
+       public WorkQueue getWorkQueue() {
+               return workQueue;
+       }
+
+       public void setWorkQueue(WorkQueue workQueue) {
+               this.workQueue = workQueue;
+       }
+
        public void setContext(SshCommandContext ctx) {
                this.ctx = ctx;
        }
@@ -467,7 +478,7 @@ public abstract class BaseCommand implements Command, SessionAware {
         */
        protected void startThread(final CommandRunnable thunk) {
                final TaskThunk tt = new TaskThunk(thunk);
-               task.set(executor.submit(tt));
+               task.set(workQueue.getDefaultQueue().submit(tt));
        }
 
        /** Thrown from {@link CommandRunnable#run()} with client message and code. */
index 86b3369da086d115d79c43f000da882993b2afa4..d17a4ebf1dc0bbe6a201edfe65d3bb54bab80577 100644 (file)
@@ -154,6 +154,7 @@ public abstract class DispatchCommand extends BaseCommand implements ExtensionPo
 
                try {
                        dispatcher.setContext(getContext());
+                       dispatcher.setWorkQueue(getWorkQueue());
                        dispatcher.setup();
                        if (dispatcher.commands.isEmpty() && dispatcher.dispatchers.isEmpty()) {
                                log.debug(MessageFormat.format("excluding empty dispatcher {0} for {1}",
index 0bf6d510e5c6f9477779656f91c321cd89dedff6..e41ee19fbdd08352d3eee083d340d185883995b9 100644 (file)
@@ -26,6 +26,7 @@ import com.gitblit.manager.IGitblit;
 import com.gitblit.transport.ssh.SshDaemonClient;
 import com.gitblit.transport.ssh.git.GitDispatcher;
 import com.gitblit.transport.ssh.keys.KeysDispatcher;
+import com.gitblit.utils.WorkQueue;
 
 /**
  * The root dispatcher is the dispatch command that handles registering all
@@ -37,9 +38,10 @@ class RootDispatcher extends DispatchCommand {
 
        private Logger log = LoggerFactory.getLogger(getClass());
 
-       public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine) {
+       public RootDispatcher(IGitblit gitblit, SshDaemonClient client, String cmdLine, WorkQueue workQueue) {
                super();
                setContext(new SshCommandContext(gitblit, client, cmdLine));
+               setWorkQueue(workQueue);
 
                register(VersionCommand.class);
                register(GitDispatcher.class);
index 599d94b3838cc75e3908448316403aa87f3d45cd..fa4b91673e54127424f6dba0a20f9516869ce72f 100644 (file)
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import com.gitblit.Keys;
 import com.gitblit.manager.IGitblit;
 import com.gitblit.transport.ssh.SshDaemonClient;
-import com.gitblit.utils.IdGenerator;
 import com.gitblit.utils.WorkQueue;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -48,15 +47,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class SshCommandFactory implements CommandFactory {
        private static final Logger logger = LoggerFactory.getLogger(SshCommandFactory.class);
 
+       private final WorkQueue workQueue;
        private final IGitblit gitblit;
        private final ScheduledExecutorService startExecutor;
        private final ExecutorService destroyExecutor;
 
-       public SshCommandFactory(IGitblit gitblit, IdGenerator idGenerator) {
+       public SshCommandFactory(IGitblit gitblit, WorkQueue workQueue) {
                this.gitblit = gitblit;
+               this.workQueue = workQueue;
 
                int threads = gitblit.getSettings().getInteger(Keys.git.sshCommandStartThreads, 2);
-               WorkQueue workQueue = new WorkQueue(idGenerator);
                startExecutor = workQueue.createQueue(threads, "SshCommandStart");
                destroyExecutor = Executors.newSingleThreadExecutor(
                                new ThreadFactoryBuilder()
@@ -70,7 +70,7 @@ public class SshCommandFactory implements CommandFactory {
        }
 
        public RootDispatcher createRootDispatcher(SshDaemonClient client, String commandLine) {
-               return new RootDispatcher(gitblit, client, commandLine);
+               return new RootDispatcher(gitblit, client, commandLine, workQueue);
        }
 
        @Override
index ba49a4c535122f863067966317c6540dd56d80ab..ce89d6902a32e42ca295216748bd3e25486f7c9b 100644 (file)
@@ -51,17 +51,19 @@ public class WorkQueue {
 
   private Executor defaultQueue;
   private final IdGenerator idGenerator;
+  private final int defaultQueueSize;
   private final CopyOnWriteArrayList<Executor> queues;
 
-  public WorkQueue(final IdGenerator idGenerator) {
+  public WorkQueue(final IdGenerator idGenerator, final int defaultQueueSize) {
     this.idGenerator = idGenerator;
+    this.defaultQueueSize = defaultQueueSize;
     this.queues = new CopyOnWriteArrayList<Executor>();
   }
 
   /** Get the default work queue, for miscellaneous tasks. */
   public synchronized Executor getDefaultQueue() {
     if (defaultQueue == null) {
-      defaultQueue = createQueue(1, "WorkQueue");
+      defaultQueue = createQueue(defaultQueueSize, "WorkQueue");
     }
     return defaultQueue;
   }