You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

OutputQueue.java 6.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. /*
  2. * Copyright (c) 2006-2015 DMDirc Developers
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a copy
  5. * of this software and associated documentation files (the "Software"), to deal
  6. * in the Software without restriction, including without limitation the rights
  7. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. * copies of the Software, and to permit persons to whom the Software is
  9. * furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. * SOFTWARE.
  21. */
  22. package com.dmdirc.parser.irc.outputqueue;
  23. import com.dmdirc.parser.common.QueuePriority;
  24. import java.io.OutputStream;
  25. import java.io.PrintWriter;
  26. import java.util.Comparator;
  27. import java.util.concurrent.BlockingQueue;
  28. import java.util.concurrent.PriorityBlockingQueue;
  29. /**
  30. * This class handles the Parser output Queue.
  31. */
  32. public abstract class OutputQueue {
  33. /** PrintWriter for sending output. */
  34. private PrintWriter out;
  35. /** Is queueing enabled? */
  36. private boolean queueEnabled = true;
  37. /** Are we discarding all futher input? */
  38. private boolean discarding;
  39. /** The output queue! */
  40. private final BlockingQueue<QueueItem> queue;
  41. /** The thread on which we will send items. */
  42. private Thread sendingThread;
  43. /**
  44. * Creates a new instance of {@link OutputQueue} that will sort items using the given
  45. * comparator.
  46. *
  47. * @param itemComparator The comparator to use to sort queued items.
  48. */
  49. protected OutputQueue(final Comparator<QueueItem> itemComparator) {
  50. queue = new PriorityBlockingQueue<>(10, itemComparator);
  51. }
  52. /**
  53. * Set the output stream for this queue.
  54. *
  55. * @param outputStream Output Stream to use.
  56. */
  57. public void setOutputStream(final OutputStream outputStream) {
  58. out = new PrintWriter(outputStream, true);
  59. }
  60. /**
  61. * Is output queueing enabled?
  62. *
  63. * @return true if output queueing is enabled.
  64. */
  65. public boolean isQueueEnabled() {
  66. return queueEnabled;
  67. }
  68. /**
  69. * Set if queueing is enabled.
  70. * if this is changed from enabled to disabled, all currently queued items
  71. * will be sent immediately!
  72. *
  73. * @param queueEnabled new value for queueEnabled
  74. */
  75. public void setQueueEnabled(final boolean queueEnabled) {
  76. if (out == null) {
  77. throw new NullPointerException("No output stream has been set.");
  78. }
  79. final boolean old = this.queueEnabled;
  80. this.queueEnabled = queueEnabled;
  81. // If the new value is not the same as the old one, and we used to be enabled
  82. // then flush the queue.
  83. if (old != queueEnabled && old) {
  84. if (sendingThread != null) {
  85. sendingThread.interrupt();
  86. sendingThread = null;
  87. }
  88. while (!queue.isEmpty()) {
  89. try {
  90. out.printf("%s\r\n", queue.take().getLine());
  91. } catch (InterruptedException ex) {
  92. // Do nothing, we'll try again.
  93. }
  94. }
  95. }
  96. }
  97. /**
  98. * Direct access to the queue of items waiting to be sent.
  99. *
  100. * @return This queue's backing queue.
  101. */
  102. public BlockingQueue<QueueItem> getQueue() {
  103. return queue;
  104. }
  105. /**
  106. * Should we be discarding?
  107. *
  108. * @param newValue true to enable discarding.
  109. */
  110. public void setDiscarding(final boolean newValue) {
  111. discarding = newValue;
  112. }
  113. /**
  114. * Are we discarding?
  115. *
  116. * @return true if discarding
  117. */
  118. public boolean isDiscarding() {
  119. return discarding;
  120. }
  121. /**
  122. * Clears any pending items.
  123. */
  124. public void clearQueue() {
  125. queueEnabled = false;
  126. if (sendingThread != null) {
  127. sendingThread.interrupt();
  128. sendingThread = null;
  129. }
  130. queue.clear();
  131. }
  132. /**
  133. * Get the number of items currently in the queue.
  134. *
  135. * @return Number of items in the queue.
  136. */
  137. public int queueCount() {
  138. return queue.size();
  139. }
  140. /**
  141. * Send the given line.
  142. *
  143. * <p>If queueing is enabled, this will queue it, else it will send it immediately.
  144. *
  145. * @param line Line to send
  146. */
  147. public void sendLine(final String line) {
  148. sendLine(line, QueuePriority.NORMAL);
  149. }
  150. /**
  151. * Send the given line.
  152. *
  153. * <p>If queueing is enabled, this will queue it, else it will send it immediately.
  154. *
  155. * @param line Line to send
  156. * @param priority Priority of item (ignored if queue is disabled)
  157. */
  158. public void sendLine(final String line, final QueuePriority priority) {
  159. if (discarding) {
  160. return;
  161. }
  162. if (queueEnabled && priority == QueuePriority.IMMEDIATE) {
  163. send(line);
  164. } else {
  165. if (sendingThread == null || !sendingThread.isAlive()) {
  166. sendingThread = new Thread(this::handleQueuedItems, "IRC Parser queue handler");
  167. sendingThread.start();
  168. }
  169. enqueue(line, priority);
  170. }
  171. }
  172. /**
  173. * Sends queued items to the output channel, blocking or waiting as necessary.
  174. *
  175. * <p>This is a long running method that will be executed in a separate thread.
  176. */
  177. protected abstract void handleQueuedItems();
  178. /**
  179. * Enqueues a new line to be sent.
  180. *
  181. * @param line The raw line to be sent to the server.
  182. * @param priority The priority at which the line should be sent.
  183. */
  184. protected void enqueue(final String line, final QueuePriority priority) {
  185. queue.add(QueueItem.create(line, priority));
  186. }
  187. /**
  188. * Sends a line immediately to the server.
  189. *
  190. * @param line The line to be sent.
  191. */
  192. protected void send(final String line) {
  193. if (out == null) {
  194. throw new IllegalStateException("No output stream has been set.");
  195. }
  196. out.printf("%s\r\n", line);
  197. }
  198. }