You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ThreadSafeProgressMonitor.java 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. * Copyright (C) 2010, Google Inc. and others
  3. *
  4. * This program and the accompanying materials are made available under the
  5. * terms of the Eclipse Distribution License v. 1.0 which is available at
  6. * https://www.eclipse.org/org/documents/edl-v10.php.
  7. *
  8. * SPDX-License-Identifier: BSD-3-Clause
  9. */
  10. package org.eclipse.jgit.lib;
  11. import java.util.concurrent.Semaphore;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. import java.util.concurrent.locks.ReentrantLock;
  14. /**
  15. * Wrapper around the general {@link org.eclipse.jgit.lib.ProgressMonitor} to
  16. * make it thread safe.
  17. *
  18. * Updates to the underlying ProgressMonitor are made only from the thread that
  19. * allocated this wrapper. Callers are responsible for ensuring the allocating
  20. * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
  21. * update the underlying ProgressMonitor.
  22. *
  23. * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
  24. * may be invoked from a worker thread. All other methods of the ProgressMonitor
  25. * interface can only be called from the thread that allocates this wrapper.
  26. */
  27. public class ThreadSafeProgressMonitor implements ProgressMonitor {
  28. private final ProgressMonitor pm;
  29. private final ReentrantLock lock;
  30. private final Thread mainThread;
  31. private final AtomicInteger workers;
  32. private final AtomicInteger pendingUpdates;
  33. private final Semaphore process;
  34. /**
  35. * Wrap a ProgressMonitor to be thread safe.
  36. *
  37. * @param pm
  38. * the underlying monitor to receive events.
  39. */
  40. public ThreadSafeProgressMonitor(ProgressMonitor pm) {
  41. this.pm = pm;
  42. this.lock = new ReentrantLock();
  43. this.mainThread = Thread.currentThread();
  44. this.workers = new AtomicInteger(0);
  45. this.pendingUpdates = new AtomicInteger(0);
  46. this.process = new Semaphore(0);
  47. }
  48. /** {@inheritDoc} */
  49. @Override
  50. public void start(int totalTasks) {
  51. if (!isMainThread())
  52. throw new IllegalStateException();
  53. pm.start(totalTasks);
  54. }
  55. /** {@inheritDoc} */
  56. @Override
  57. public void beginTask(String title, int totalWork) {
  58. if (!isMainThread())
  59. throw new IllegalStateException();
  60. pm.beginTask(title, totalWork);
  61. }
  62. /**
  63. * Notify the monitor a worker is starting.
  64. */
  65. public void startWorker() {
  66. startWorkers(1);
  67. }
  68. /**
  69. * Notify the monitor of workers starting.
  70. *
  71. * @param count
  72. * the number of worker threads that are starting.
  73. */
  74. public void startWorkers(int count) {
  75. workers.addAndGet(count);
  76. }
  77. /**
  78. * Notify the monitor a worker is finished.
  79. */
  80. public void endWorker() {
  81. if (workers.decrementAndGet() == 0)
  82. process.release();
  83. }
  84. /**
  85. * Non-blocking poll for pending updates.
  86. *
  87. * This method can only be invoked by the same thread that allocated this
  88. * ThreadSafeProgressMonior.
  89. */
  90. public void pollForUpdates() {
  91. assert isMainThread();
  92. doUpdates();
  93. }
  94. /**
  95. * Process pending updates and wait for workers to finish.
  96. *
  97. * This method can only be invoked by the same thread that allocated this
  98. * ThreadSafeProgressMonior.
  99. *
  100. * @throws java.lang.InterruptedException
  101. * if the main thread is interrupted while waiting for
  102. * completion of workers.
  103. */
  104. public void waitForCompletion() throws InterruptedException {
  105. assert isMainThread();
  106. while (0 < workers.get()) {
  107. doUpdates();
  108. process.acquire();
  109. }
  110. doUpdates();
  111. }
  112. private void doUpdates() {
  113. int cnt = pendingUpdates.getAndSet(0);
  114. if (0 < cnt)
  115. pm.update(cnt);
  116. }
  117. /** {@inheritDoc} */
  118. @Override
  119. public void update(int completed) {
  120. if (0 == pendingUpdates.getAndAdd(completed))
  121. process.release();
  122. }
  123. /** {@inheritDoc} */
  124. @Override
  125. public boolean isCancelled() {
  126. lock.lock();
  127. try {
  128. return pm.isCancelled();
  129. } finally {
  130. lock.unlock();
  131. }
  132. }
  133. /** {@inheritDoc} */
  134. @Override
  135. public void endTask() {
  136. if (!isMainThread())
  137. throw new IllegalStateException();
  138. pm.endTask();
  139. }
  140. private boolean isMainThread() {
  141. return Thread.currentThread() == mainThread;
  142. }
  143. }