Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. * Copyright (c) 2006-2015 DMDirc Developers
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a copy
  5. * of this software and associated documentation files (the "Software"), to deal
  6. * in the Software without restriction, including without limitation the rights
  7. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. * copies of the Software, and to permit persons to whom the Software is
  9. * furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. * SOFTWARE.
  21. */
  22. package com.dmdirc.parser.irc.outputqueue;
  23. import com.dmdirc.parser.common.QueuePriority;
  24. import java.io.OutputStream;
  25. import java.io.PrintWriter;
  26. import java.util.concurrent.BlockingQueue;
  27. import java.util.concurrent.PriorityBlockingQueue;
  28. /**
  29. * This class handles the Parser output Queue.
  30. */
  31. public class OutputQueue {
  32. /** PrintWriter for sending output. */
  33. private PrintWriter out;
  34. /** Is queueing enabled? */
  35. private boolean queueEnabled = true;
  36. /** Are we discarding all futher input? */
  37. private boolean discarding = false;
  38. /** The output queue! */
  39. private final BlockingQueue<QueueItem> queue = new PriorityBlockingQueue<>();
  40. /** Object for synchronising access to the {@link #queueHandler}. */
  41. private final Object queueHandlerLock = new Object();
  42. /** Thread for the sending queue. */
  43. private QueueHandler queueHandler;
  44. /** The QueueHandlerFactory for this OutputQueue. */
  45. private QueueHandlerFactory queueHandlerFactory = PriorityQueueHandler.getFactory();
  46. /**
  47. * Set the output stream for this queue.
  48. *
  49. * @param outputStream Output Stream to use.
  50. */
  51. public void setOutputStream(final OutputStream outputStream) {
  52. this.out = new PrintWriter(outputStream, true);
  53. }
  54. /**
  55. * Is output queueing enabled?
  56. *
  57. * @return true if output queueing is enabled.
  58. */
  59. public boolean isQueueEnabled() {
  60. return queueEnabled;
  61. }
  62. /**
  63. * Set the QueueHandlerFactory.
  64. * Changing this will not change an existing QueueHandler unless queueing is
  65. * disabled and reenabled.
  66. * If this is called before the first lien of output is queued then there is
  67. * no need to disable and reenable the queue.
  68. *
  69. * @param factory New QueueHandlerFactory to use.
  70. */
  71. public void setQueueHandlerFactory(final QueueHandlerFactory factory) {
  72. queueHandlerFactory = factory;
  73. }
  74. /**
  75. * Get the QueueHandlerFactory.
  76. *
  77. * @return The current QueueHandlerFactory.
  78. */
  79. public QueueHandlerFactory getQueueHandlerFactory() {
  80. return queueHandlerFactory;
  81. }
  82. /**
  83. * Get the QueueHandler.
  84. *
  85. * @return The current QueueHandler if there is one, else null.
  86. */
  87. public QueueHandler getQueueHandler() {
  88. return queueHandler;
  89. }
  90. /**
  91. * Set if queueing is enabled.
  92. * if this is changed from enabled to disabled, all currently queued items
  93. * will be sent immediately!
  94. *
  95. * @param queueEnabled new value for queueEnabled
  96. */
  97. public void setQueueEnabled(final boolean queueEnabled) {
  98. if (out == null) {
  99. throw new NullPointerException("No output stream has been set.");
  100. }
  101. final boolean old = this.queueEnabled;
  102. this.queueEnabled = queueEnabled;
  103. // If the new value is not the same as the old one, and we used to be enabled
  104. // then flush the queue.
  105. if (old != queueEnabled && old) {
  106. synchronized (queueHandlerLock) {
  107. if (queueHandler != null) {
  108. queueHandler.interrupt();
  109. queueHandler = null;
  110. }
  111. }
  112. while (!queue.isEmpty()) {
  113. try {
  114. out.printf("%s\r\n", queue.take().getLine());
  115. } catch (InterruptedException ex) {
  116. // Do nothing, we'll try again.
  117. }
  118. }
  119. }
  120. }
  121. /**
  122. * Direct access to the queue of items waiting to be sent.
  123. *
  124. * @return This queue's backing queue.
  125. */
  126. public BlockingQueue<QueueItem> getQueue() {
  127. return queue;
  128. }
  129. /**
  130. * Should we be discarding?
  131. *
  132. * @param newValue true to enable discarding.
  133. */
  134. public void setDiscarding(final boolean newValue) {
  135. discarding = newValue;
  136. }
  137. /**
  138. * Are we discarding?
  139. *
  140. * @return true if discarding
  141. */
  142. public boolean isDiscarding() {
  143. return discarding;
  144. }
  145. /**
  146. * Clear the queue and stop the thread that is sending stuff.
  147. */
  148. public void clearQueue() {
  149. this.queueEnabled = false;
  150. synchronized (queueHandlerLock) {
  151. if (queueHandler != null) {
  152. queueHandler.interrupt();
  153. queueHandler = null;
  154. }
  155. }
  156. queue.clear();
  157. }
  158. /**
  159. * Get the number of items currently in the queue.
  160. *
  161. * @return Number of items in the queue.
  162. */
  163. public int queueCount() {
  164. return queue.size();
  165. }
  166. /**
  167. * Send the given line.
  168. * If queueing is enabled, this will queue it, else it will send it
  169. * immediately.
  170. *
  171. * @param line Line to send
  172. */
  173. public void sendLine(final String line) {
  174. sendLine(line, QueuePriority.NORMAL);
  175. }
  176. /**
  177. * Send the given line.
  178. * If queueing is enabled, this will queue it, else it will send it
  179. * immediately.
  180. *
  181. * @param line Line to send
  182. * @param priority Priority of item (ignored if queue is disabled)
  183. */
  184. public void sendLine(final String line, final QueuePriority priority) {
  185. if (discarding) { return; }
  186. if (out == null) {
  187. throw new NullPointerException("No output stream has been set.");
  188. }
  189. if (queueEnabled && priority != QueuePriority.IMMEDIATE) {
  190. synchronized (queueHandlerLock) {
  191. if (queueHandler == null || !queueHandler.isAlive()) {
  192. queueHandler = queueHandlerFactory.getQueueHandler(this, out);
  193. queueHandler.start();
  194. }
  195. queue.add(queueHandler.getQueueItem(line, priority));
  196. }
  197. } else {
  198. out.printf("%s\r\n", line);
  199. }
  200. }
  201. }