You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

AsynchronousEventBus.java 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package org.apache.archiva.event;
  2. /*
  3. * Licensed to the Apache Software Foundation (ASF) under one
  4. * or more contributor license agreements. See the NOTICE file
  5. * distributed with this work for additional information
  6. * regarding copyright ownership. The ASF licenses this file
  7. * to you under the Apache License, Version 2.0 (the
  8. * "License"); you may not use this file except in compliance
  9. * with the License. You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing,
  14. * software distributed under the License is distributed on an
  15. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. * KIND, either express or implied. See the License for the
  17. * specific language governing permissions and limitations
  18. * under the License.
  19. */
  20. import java.util.Collections;
  21. import java.util.HashSet;
  22. import java.util.Set;
  23. import java.util.concurrent.BlockingQueue;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.LinkedBlockingQueue;
  27. /**
  28. * Simple Async Event Bus implementation
  29. *
  30. * @author jdumay
  31. */
  32. public class AsynchronousEventBus implements EventBus
  33. {
  34. private final Set<EventObserver> observers = Collections.synchronizedSet(new HashSet());
  35. private final BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
  36. private final Thread workerThread;
  37. private final int threads;
  38. public AsynchronousEventBus(int threads)
  39. {
  40. this.threads = threads;
  41. workerThread = new Thread(new WorkerRunnable());
  42. workerThread.start();
  43. }
  44. public void emit(EventEmitter emitter, EventMessage message)
  45. {
  46. events.offer(new Event(emitter, message));
  47. }
  48. public void subscribe(EventObserver observer)
  49. {
  50. observers.add(observer);
  51. }
  52. public void unsubscribe(EventObserver observer)
  53. {
  54. observers.remove(observer);
  55. }
  56. public Set<EventObserver> getObservers() {
  57. return new HashSet<EventObserver>(observers);
  58. }
  59. class WorkerRunnable implements Runnable
  60. {
  61. private final ExecutorService service;
  62. public WorkerRunnable()
  63. {
  64. service = Executors.newFixedThreadPool(threads);
  65. }
  66. public void run()
  67. {
  68. while (true)
  69. {
  70. dequeueAndExecute();
  71. }
  72. }
  73. private void dequeueAndExecute()
  74. {
  75. try
  76. {
  77. final Event event = events.take();
  78. for (final EventObserver observer : observers)
  79. {
  80. service.execute(new Runnable()
  81. {
  82. public void run()
  83. {
  84. try
  85. {
  86. observer.observe(event);
  87. }
  88. finally
  89. {
  90. //log me
  91. }
  92. }
  93. });
  94. }
  95. }
  96. catch (InterruptedException e)
  97. {
  98. //Do nothing
  99. }
  100. }
  101. }
  102. }