From 7613df52959b6e2ac1094d2263be310fb3e2723b Mon Sep 17 00:00:00 2001 From: David Ostrovsky Date: Mon, 17 Feb 2014 21:56:36 +0100 Subject: SSHD: Add support for generic commands Change-Id: I5a60710323ca674d70e34f7451422ec167105429 --- src/main/java/com/gitblit/utils/WorkQueue.java | 340 +++++++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 src/main/java/com/gitblit/utils/WorkQueue.java (limited to 'src/main/java/com/gitblit/utils/WorkQueue.java') diff --git a/src/main/java/com/gitblit/utils/WorkQueue.java b/src/main/java/com/gitblit/utils/WorkQueue.java new file mode 100644 index 00000000..778e754c --- /dev/null +++ b/src/main/java/com/gitblit/utils/WorkQueue.java @@ -0,0 +1,340 @@ +// Copyright (C) 2009 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.gitblit.utils; + +import com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.inject.Inject; + +/** Delayed execution of tasks using a background thread pool. */ +public class WorkQueue { + private static final Logger log = LoggerFactory.getLogger(WorkQueue.class); + private static final UncaughtExceptionHandler LOG_UNCAUGHT_EXCEPTION = + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.error("WorkQueue thread " + t.getName() + " threw exception", e); + } + }; + + private Executor defaultQueue; + private final IdGenerator idGenerator; + private final CopyOnWriteArrayList queues; + + @Inject + public WorkQueue(final IdGenerator idGenerator) { + this.idGenerator = idGenerator; + this.queues = new CopyOnWriteArrayList(); + } + + /** Get the default work queue, for miscellaneous tasks. */ + public synchronized Executor getDefaultQueue() { + if (defaultQueue == null) { + defaultQueue = createQueue(1, "WorkQueue"); + } + return defaultQueue; + } + + /** Create a new executor queue with one thread. */ + public Executor createQueue(final int poolsize, final String prefix) { + final Executor r = new Executor(poolsize, prefix); + r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + r.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + queues.add(r); + return r; + } + + /** Get all of the tasks currently scheduled in any work queue. */ + public List> getTasks() { + final List> r = new ArrayList>(); + for (final Executor e : queues) { + e.addAllTo(r); + } + return r; + } + + public List getTaskInfos(TaskInfoFactory factory) { + List taskInfos = Lists.newArrayList(); + for (Executor exe : queues) { + for (Task task : exe.getTasks()) { + taskInfos.add(factory.getTaskInfo(task)); + } + } + return taskInfos; + } + + /** Locate a task by its unique id, null if no task matches. */ + public Task getTask(final int id) { + Task result = null; + for (final Executor e : queues) { + final Task t = e.getTask(id); + if (t != null) { + if (result != null) { + // Don't return the task if we have a duplicate. Lie instead. + return null; + } else { + result = t; + } + } + } + return result; + } + + public void stop() { + for (final Executor p : queues) { + p.shutdown(); + boolean isTerminated; + do { + try { + isTerminated = p.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + isTerminated = false; + } + } while (!isTerminated); + } + queues.clear(); + } + + /** An isolated queue. */ + public class Executor extends ScheduledThreadPoolExecutor { + private final ConcurrentHashMap> all; + + Executor(final int corePoolSize, final String prefix) { + super(corePoolSize, new ThreadFactory() { + private final ThreadFactory parent = Executors.defaultThreadFactory(); + private final AtomicInteger tid = new AtomicInteger(1); + + @Override + public Thread newThread(final Runnable task) { + final Thread t = parent.newThread(task); + t.setName(prefix + "-" + tid.getAndIncrement()); + t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION); + return t; + } + }); + + all = new ConcurrentHashMap>( // + corePoolSize << 1, // table size + 0.75f, // load factor + corePoolSize + 4 // concurrency level + ); + } + + public void unregisterWorkQueue() { + queues.remove(this); + } + + @Override + protected RunnableScheduledFuture decorateTask( + final Runnable runnable, RunnableScheduledFuture r) { + r = super.decorateTask(runnable, r); + for (;;) { + final int id = idGenerator.next(); + + Task task; + task = new Task(runnable, r, this, id); + + if (all.putIfAbsent(task.getTaskId(), task) == null) { + return task; + } + } + } + + @Override + protected RunnableScheduledFuture decorateTask( + final Callable callable, final RunnableScheduledFuture task) { + throw new UnsupportedOperationException("Callable not implemented"); + } + + void remove(final Task task) { + all.remove(task.getTaskId(), task); + } + + Task getTask(final int id) { + return all.get(id); + } + + void addAllTo(final List> list) { + list.addAll(all.values()); // iterator is thread safe + } + + Collection> getTasks() { + return all.values(); + } + } + + /** Runnable needing to know it was canceled. */ + public interface CancelableRunnable extends Runnable { + /** Notifies the runnable it was canceled. */ + public void cancel(); + } + + /** A wrapper around a scheduled Runnable, as maintained in the queue. */ + public static class Task implements RunnableScheduledFuture { + /** + * Summarized status of a single task. + *

+ * Tasks have the following state flow: + *

    + *
  1. {@link #SLEEPING}: if scheduled with a non-zero delay.
  2. + *
  3. {@link #READY}: waiting for an available worker thread.
  4. + *
  5. {@link #RUNNING}: actively executing on a worker thread.
  6. + *
  7. {@link #DONE}: finished executing, if not periodic.
  8. + *
+ */ + public static enum State { + // Ordered like this so ordinal matches the order we would + // prefer to see tasks sorted in: done before running, + // running before ready, ready before sleeping. + // + DONE, CANCELLED, RUNNING, READY, SLEEPING, OTHER + } + + private final Runnable runnable; + private final RunnableScheduledFuture task; + private final Executor executor; + private final int taskId; + private final AtomicBoolean running; + private final Date startTime; + + Task(Runnable runnable, RunnableScheduledFuture task, Executor executor, + int taskId) { + this.runnable = runnable; + this.task = task; + this.executor = executor; + this.taskId = taskId; + this.running = new AtomicBoolean(); + this.startTime = new Date(); + } + + public int getTaskId() { + return taskId; + } + + public State getState() { + if (isCancelled()) { + return State.CANCELLED; + } else if (isDone() && !isPeriodic()) { + return State.DONE; + } else if (running.get()) { + return State.RUNNING; + } + + final long delay = getDelay(TimeUnit.MILLISECONDS); + if (delay <= 0) { + return State.READY; + } else if (0 < delay) { + return State.SLEEPING; + } + + return State.OTHER; + } + + public Date getStartTime() { + return startTime; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + if (task.cancel(mayInterruptIfRunning)) { + // Tiny abuse of running: if the task needs to know it was + // canceled (to clean up resources) and it hasn't started + // yet the task's run method won't execute. So we tag it + // as running and allow it to clean up. This ensures we do + // not invoke cancel twice. + // + if (runnable instanceof CancelableRunnable + && running.compareAndSet(false, true)) { + ((CancelableRunnable) runnable).cancel(); + } + executor.remove(this); + executor.purge(); + return true; + + } else { + return false; + } + } + + public int compareTo(Delayed o) { + return task.compareTo(o); + } + + public V get() throws InterruptedException, ExecutionException { + return task.get(); + } + + public V get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + return task.get(timeout, unit); + } + + public long getDelay(TimeUnit unit) { + return task.getDelay(unit); + } + + public boolean isCancelled() { + return task.isCancelled(); + } + + public boolean isDone() { + return task.isDone(); + } + + public boolean isPeriodic() { + return task.isPeriodic(); + } + + public void run() { + if (running.compareAndSet(false, true)) { + try { + task.run(); + } finally { + if (isPeriodic()) { + running.set(false); + } else { + executor.remove(this); + } + } + } + } + + @Override + public String toString() { + return runnable.toString(); + } + } +} -- cgit v1.2.3