Browse Source

Merge OutputQueue and QueueHandlers.

OutputQueue is now abstract, and the different queue methods
are subclasses.

This allows the same flexibility as before (subclasses can change
the type of items they add, implement their own throttling logic,
and say how the queue is ordered).

Organising it this way makes the package a lot easier to understand
as there's no longer a messy separation of concerns between
OutputQueue and QueueHandler (e.g. the output stream is no longer
in two places with two send methods, all of the definitions
relating to how the queue works are in one place, etc).
pull/128/head
Chris Smith 7 years ago
parent
commit
c340e7a9d1

+ 4
- 5
irc/src/com/dmdirc/parser/irc/IRCParser.java View File

@@ -48,6 +48,7 @@ import com.dmdirc.parser.irc.IRCReader.ReadLine;
48 48
 import com.dmdirc.parser.irc.events.IRCDataInEvent;
49 49
 import com.dmdirc.parser.irc.events.IRCDataOutEvent;
50 50
 import com.dmdirc.parser.irc.outputqueue.OutputQueue;
51
+import com.dmdirc.parser.irc.outputqueue.PriorityOutputQueue;
51 52
 
52 53
 import java.io.IOException;
53 54
 import java.net.InetAddress;
@@ -275,7 +276,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
275 276
         myProcessingManager = graph.get(ProcessingManager.class);
276 277
         myself = new IRCClientInfo(this, userModes, "myself").setFake(true);
277 278
 
278
-        out = new OutputQueue();
279
+        out = new PriorityOutputQueue();
279 280
         if (myDetails != null) {
280 281
             this.me = myDetails;
281 282
         }
@@ -662,9 +663,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
662 663
             userModes.clear();
663 664
             chanPrefix = DEFAULT_CHAN_PREFIX;
664 665
             // Clear output queue.
665
-            if (out != null) {
666
-                out.clearQueue();
667
-            }
666
+            out.clearQueue();
668 667
             setServerName("");
669 668
             networkName = "";
670 669
             lastLine = null;
@@ -1026,7 +1025,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
1026 1025
      * @return True if line was sent, else false.
1027 1026
      */
1028 1027
     protected boolean sendString(final String line, final QueuePriority priority, final boolean fromParser) {
1029
-        if (out == null || getSocketState() != SocketState.OPEN) {
1028
+        if (getSocketState() != SocketState.OPEN) {
1030 1029
             return false;
1031 1030
         }
1032 1031
         callDataOut(line, fromParser);

+ 59
- 69
irc/src/com/dmdirc/parser/irc/outputqueue/OutputQueue.java View File

@@ -26,43 +26,34 @@ import com.dmdirc.parser.common.QueuePriority;
26 26
 
27 27
 import java.io.OutputStream;
28 28
 import java.io.PrintWriter;
29
+import java.util.Comparator;
29 30
 import java.util.concurrent.BlockingQueue;
30 31
 import java.util.concurrent.PriorityBlockingQueue;
31 32
 
32 33
 /**
33 34
  * This class handles the Parser output Queue.
34 35
  */
35
-public class OutputQueue {
36
+public abstract class OutputQueue {
36 37
 
37 38
     /** PrintWriter for sending output. */
38 39
     private PrintWriter out;
39 40
     /** Is queueing enabled? */
40 41
     private boolean queueEnabled = true;
41 42
     /** Are we discarding all futher input? */
42
-    private boolean discarding = false;
43
+    private boolean discarding;
43 44
     /** The output queue! */
44
-    private BlockingQueue<QueueItem> queue = new PriorityBlockingQueue<>();
45
-    /** Object for synchronising access to the {@link #queueHandler}. */
46
-    private final Object queueHandlerLock = new Object();
47
-    /** Thread for the sending queue. */
48
-    private QueueHandler queueHandler;
49
-    /** The QueueHandlerFactory for this OutputQueue. */
50
-    private final QueueHandlerFactory queueHandlerFactory;
45
+    private final BlockingQueue<QueueItem> queue;
46
+    /** The thread on which we will send items. */
47
+    private Thread sendingThread;
51 48
 
52 49
     /**
53
-     * Creates a new output queue using the default handler.
54
-     */
55
-    public OutputQueue() {
56
-        this(PriorityQueueHandler.getFactory());
57
-    }
58
-
59
-    /**
60
-     * Creates a new output queue using a handler from the given factory.
50
+     * Creates a new instance of {@link OutputQueue} that will sort items using the given
51
+     * comparator.
61 52
      *
62
-     * @param queueHandlerFactory The factory to use to create {@link QueueHandler}s.
53
+     * @param itemComparator The comparator to use to sort queued items.
63 54
      */
64
-    public OutputQueue(final QueueHandlerFactory queueHandlerFactory) {
65
-        this.queueHandlerFactory = queueHandlerFactory;
55
+    protected OutputQueue(final Comparator<QueueItem> itemComparator) {
56
+        queue = new PriorityBlockingQueue<>(10, itemComparator);
66 57
     }
67 58
 
68 59
     /**
@@ -83,15 +74,6 @@ public class OutputQueue {
83 74
         return queueEnabled;
84 75
     }
85 76
 
86
-    /**
87
-     * Get the QueueHandler.
88
-     *
89
-     * @return The current QueueHandler if there is one, else null.
90
-     */
91
-    public QueueHandler getQueueHandler() {
92
-        return queueHandler;
93
-    }
94
-
95 77
     /**
96 78
      * Set if queueing is enabled.
97 79
      * if this is changed from enabled to disabled, all currently queued items
@@ -110,11 +92,9 @@ public class OutputQueue {
110 92
         // If the new value is not the same as the old one, and we used to be enabled
111 93
         // then flush the queue.
112 94
         if (old != queueEnabled && old) {
113
-            synchronized (queueHandlerLock) {
114
-                if (queueHandler != null) {
115
-                    queueHandler.interrupt();
116
-                    queueHandler = null;
117
-                }
95
+            if (sendingThread != null) {
96
+                sendingThread.interrupt();
97
+                sendingThread = null;
118 98
             }
119 99
 
120 100
             while (!queue.isEmpty()) {
@@ -155,18 +135,14 @@ public class OutputQueue {
155 135
     }
156 136
 
157 137
     /**
158
-     * Clear the queue and stop the thread that is sending stuff.
138
+     * Clears any pending items.
159 139
      */
160 140
     public void clearQueue() {
161
-        this.queueEnabled = false;
162
-
163
-        synchronized (queueHandlerLock) {
164
-            if (queueHandler != null) {
165
-                queueHandler.interrupt();
166
-                queueHandler = null;
167
-            }
141
+        queueEnabled = false;
142
+        if (sendingThread != null) {
143
+            sendingThread.interrupt();
144
+            sendingThread = null;
168 145
         }
169
-
170 146
         queue.clear();
171 147
     }
172 148
 
@@ -181,8 +157,8 @@ public class OutputQueue {
181 157
 
182 158
     /**
183 159
      * Send the given line.
184
-     * If queueing is enabled, this will queue it, else it will send it
185
-     * immediately.
160
+     *
161
+     * <p>If queueing is enabled, this will queue it, else it will send it immediately.
186 162
      *
187 163
      * @param line Line to send
188 164
      */
@@ -192,44 +168,58 @@ public class OutputQueue {
192 168
 
193 169
     /**
194 170
      * Send the given line.
195
-     * If queueing is enabled, this will queue it, else it will send it
196
-     * immediately.
171
+     *
172
+     * <p>If queueing is enabled, this will queue it, else it will send it immediately.
197 173
      *
198 174
      * @param line Line to send
199 175
      * @param priority Priority of item (ignored if queue is disabled)
200 176
      */
201 177
     public void sendLine(final String line, final QueuePriority priority) {
202
-        if (discarding) { return; }
203
-        if (out == null) {
204
-            throw new NullPointerException("No output stream has been set.");
178
+        if (discarding) {
179
+            return;
205 180
         }
206 181
 
207
-        if (queueEnabled && priority != QueuePriority.IMMEDIATE) {
208
-            synchronized (queueHandlerLock) {
209
-                if (queueHandler == null || !queueHandler.isAlive()) {
210
-                    setQueueHandler(queueHandlerFactory.getQueueHandler(this, out));
211
-                    queueHandler.start();
212
-                }
213
-
214
-                queue.add(queueHandler.getQueueItem(line, priority));
215
-            }
182
+        if (queueEnabled && priority == QueuePriority.IMMEDIATE) {
183
+            send(line);
216 184
         } else {
217
-            out.printf("%s\r\n", line);
185
+            if (sendingThread == null || !sendingThread.isAlive()) {
186
+                sendingThread = new Thread(this::handleQueuedItems, "IRC Parser queue handler");
187
+                sendingThread.start();
188
+            }
189
+
190
+            enqueue(line, priority);
218 191
         }
219 192
     }
220 193
 
221 194
     /**
222
-     * Sets the hanlder that this queue will use. This will cause the existing {@link #queue}
223
-     * to be replaced with a new version with an updated comparator.
195
+     * Sends queued items to the output channel, blocking or waiting as necessary.
224 196
      *
225
-     * @param queueHandler The new queue handler to use.
197
+     * <p>This is a long running method that will be executed in a separate thread.
226 198
      */
227
-    private void setQueueHandler(final QueueHandler queueHandler) {
228
-        this.queueHandler = queueHandler;
229
-        final BlockingQueue<QueueItem> newQueue = new PriorityBlockingQueue<>(
230
-                10, queueHandler.getQueueItemComparator());
231
-        newQueue.addAll(queue);
232
-        queue = newQueue;
199
+    protected abstract void handleQueuedItems();
200
+
201
+    /**
202
+     * Enqueues a new line to be sent.
203
+     *
204
+     * @param line The raw line to be sent to the server.
205
+     * @param priority The priority at which the line should be sent.
206
+     */
207
+    protected void enqueue(final String line, final QueuePriority priority) {
208
+        queue.add(new QueueItem(line, priority));
233 209
     }
234 210
 
211
+    /**
212
+     * Sends a line immediately to the server.
213
+     *
214
+     * @param line The line to be sent.
215
+     */
216
+    protected void send(final String line) {
217
+        if (out == null) {
218
+            throw new IllegalStateException("No output stream has been set.");
219
+        }
220
+
221
+        out.printf("%s\r\n", line);
222
+    }
223
+
224
+
235 225
 }

irc/src/com/dmdirc/parser/irc/outputqueue/QueueHandlerFactory.java → irc/src/com/dmdirc/parser/irc/outputqueue/PriorityOutputQueue.java View File

@@ -22,20 +22,29 @@
22 22
 
23 23
 package com.dmdirc.parser.irc.outputqueue;
24 24
 
25
-import java.io.PrintWriter;
25
+import java.time.Duration;
26 26
 
27 27
 /**
28
- * A QueueHandlerFactory produces QueueHandlers for OutputQueue.
28
+ * This does no rate limiting, and just sends based on priority.
29 29
  */
30
-public interface QueueHandlerFactory {
30
+public class PriorityOutputQueue extends OutputQueue {
31 31
 
32 32
     /**
33
-     * Get a new QueueHandler instance as needed.
34
-     *
35
-     * @param outputQueue the OutputQueue that will own this QueueHandler
36
-     * @param out Where to send crap.
37
-     * @return the new queue handler object.
33
+     * Create a new PriorityOutputQueue.
38 34
      */
39
-    QueueHandler getQueueHandler(final OutputQueue outputQueue, final PrintWriter out);
35
+    public PriorityOutputQueue() {
36
+        super(QueueComparators.byPriorityThenNumber(Duration.ofSeconds(10)));
37
+    }
38
+
39
+    @Override
40
+    protected void handleQueuedItems() {
41
+        try {
42
+            while (isQueueEnabled()) {
43
+                send(getQueue().take().getLine());
44
+            }
45
+        } catch (InterruptedException ex) {
46
+            // Do nothing
47
+        }
48
+    }
40 49
 
41 50
 }

+ 0
- 69
irc/src/com/dmdirc/parser/irc/outputqueue/PriorityQueueHandler.java View File

@@ -1,69 +0,0 @@
1
-/*
2
- * Copyright (c) 2006-2015 DMDirc Developers
3
- *
4
- * Permission is hereby granted, free of charge, to any person obtaining a copy
5
- * of this software and associated documentation files (the "Software"), to deal
6
- * in the Software without restriction, including without limitation the rights
7
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
- * copies of the Software, and to permit persons to whom the Software is
9
- * furnished to do so, subject to the following conditions:
10
- *
11
- * The above copyright notice and this permission notice shall be included in
12
- * all copies or substantial portions of the Software.
13
- *
14
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
- * SOFTWARE.
21
- */
22
-
23
-package com.dmdirc.parser.irc.outputqueue;
24
-
25
-import java.io.PrintWriter;
26
-import java.time.Duration;
27
-
28
-/**
29
- * This does no rate limiting, and just sends based on priority.
30
- */
31
-public class PriorityQueueHandler extends QueueHandler {
32
-
33
-    /**
34
-     * Create a new PriorityQueueHandler.
35
-     *
36
-     * @param outputQueue Owner of this Queue Handler
37
-     * @param out Output Stream to use
38
-     */
39
-    public PriorityQueueHandler(
40
-            final OutputQueue outputQueue,
41
-            final PrintWriter out) {
42
-        super(outputQueue,
43
-                QueueComparators.byPriorityThenNumber(Duration.ofSeconds(10)),
44
-                out);
45
-    }
46
-
47
-    /**
48
-     * Get a QueueHandlerFactory that produces PriorityQueueHandlers.
49
-     *
50
-     * @return a QueueHandlerFactory that produces PrirortyQueueHandlers.
51
-     */
52
-    public static QueueHandlerFactory getFactory() {
53
-        return PriorityQueueHandler::new;
54
-    }
55
-
56
-    @Override
57
-    public void run() {
58
-        try {
59
-            while (outputQueue.isQueueEnabled()) {
60
-                final QueueItem item = outputQueue.getQueue().take();
61
-
62
-                sendLine(item.getLine());
63
-            }
64
-        } catch (InterruptedException ex) {
65
-            // Do nothing
66
-        }
67
-    }
68
-
69
-}

+ 0
- 103
irc/src/com/dmdirc/parser/irc/outputqueue/QueueHandler.java View File

@@ -1,103 +0,0 @@
1
-/*
2
- * Copyright (c) 2006-2015 DMDirc Developers
3
- *
4
- * Permission is hereby granted, free of charge, to any person obtaining a copy
5
- * of this software and associated documentation files (the "Software"), to deal
6
- * in the Software without restriction, including without limitation the rights
7
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
- * copies of the Software, and to permit persons to whom the Software is
9
- * furnished to do so, subject to the following conditions:
10
- *
11
- * The above copyright notice and this permission notice shall be included in
12
- * all copies or substantial portions of the Software.
13
- *
14
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
- * SOFTWARE.
21
- */
22
-
23
-package com.dmdirc.parser.irc.outputqueue;
24
-
25
-import com.dmdirc.parser.common.QueuePriority;
26
-
27
-import java.io.PrintWriter;
28
-import java.util.Comparator;
29
-
30
-/**
31
- * Sending queue.
32
- */
33
-public abstract class QueueHandler extends Thread {
34
-
35
-    /** The output queue that owns us. */
36
-    protected OutputQueue outputQueue;
37
-    /** Comparator to use to sort queue items. */
38
-    private final Comparator<QueueItem> comparator;
39
-    /** Where to send the output. */
40
-    private final PrintWriter out;
41
-
42
-    /**
43
-     * Create a new Queue Thread.
44
-     *
45
-     * @param outputQueue the OutputQueue that owns us.
46
-     * @param comparator Comparator to use to sort items in the queue.
47
-     * @param out Writer to send to.
48
-     */
49
-    public QueueHandler(
50
-            final OutputQueue outputQueue,
51
-            final Comparator<QueueItem> comparator,
52
-            final PrintWriter out) {
53
-        super("IRC Parser queue handler");
54
-
55
-        this.comparator = comparator;
56
-        this.out = out;
57
-        this.outputQueue = outputQueue;
58
-    }
59
-
60
-    /**
61
-     * Send the given item.
62
-     *
63
-     * @param line Line to send.
64
-     */
65
-    public void sendLine(final String line) {
66
-        out.printf("%s\r\n", line);
67
-    }
68
-
69
-    /**
70
-     * Get a new QueueItem for the given line and priority.
71
-     * By default this will just create a new QueueItem with the given
72
-     * parameters, but QueueHandlers are free to override it if they need to
73
-     * instead produce subclasses of QueueItem or do anything else with the
74
-     * data given.
75
-     *
76
-     * @param line Line to send
77
-     * @param priority Priority of the line.
78
-     * @return A QueueItem for teh given parameters
79
-     */
80
-    public QueueItem getQueueItem(final String line, final QueuePriority priority) {
81
-        return new QueueItem(line, priority);
82
-    }
83
-
84
-    /**
85
-     * Gets the comparator to use to sort queue items.
86
-     *
87
-     * @return The comparator to use to sort queue items.
88
-     */
89
-    public Comparator<QueueItem> getQueueItemComparator() {
90
-        return comparator;
91
-    }
92
-
93
-    /**
94
-     * This is the main even loop of the queue.
95
-     * It needs to handle pulling items out of the queue and calling
96
-     * sendLine.
97
-     *
98
-     * It also needs to handle any delays in sending that it deems needed.
99
-     */
100
-    @Override
101
-    public abstract void run();
102
-
103
-}

irc/src/com/dmdirc/parser/irc/outputqueue/SimpleRateLimitedQueueHandler.java → irc/src/com/dmdirc/parser/irc/outputqueue/SimpleRateLimitedOutputQueue.java View File

@@ -24,15 +24,13 @@ package com.dmdirc.parser.irc.outputqueue;
24 24
 
25 25
 import com.dmdirc.parser.common.QueuePriority;
26 26
 
27
-import java.io.PrintWriter;
28
-
29 27
 /**
30 28
  * This is a simple rate limiting queue.
31 29
  * If more than 4 items are added in 4 seconds it will start limiting.
32 30
  * The first 4 items will be sent un-limited and then limiting will commence at
33 31
  * a rate of 1 per second.
34 32
  */
35
-public class SimpleRateLimitedQueueHandler extends QueueHandler {
33
+public class SimpleRateLimitedOutputQueue extends OutputQueue {
36 34
 
37 35
     /** Current count. */
38 36
     private int count;
@@ -50,24 +48,10 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
50 48
     private boolean alwaysUpdateTime = true;
51 49
 
52 50
     /**
53
-     * Create a new SimpleRateLimitedQueueHandler.
54
-     *
55
-     * @param outputQueue Owner of this Queue Handler
56
-     * @param out Output Stream to use
57
-     */
58
-    public SimpleRateLimitedQueueHandler(
59
-            final OutputQueue outputQueue,
60
-            final PrintWriter out) {
61
-        super(outputQueue, QueueComparators.byPriorityThenNumber(), out);
62
-    }
63
-
64
-    /**
65
-     * Get a QueueHandlerFactory that produces PriorityQueueHandlers.
66
-     *
67
-     * @return a QueueHandlerFactory that produces PrirortyQueueHandlers.
51
+     * Create a new SimpleRateLimitedOutputQueue.
68 52
      */
69
-    public static QueueHandlerFactory getFactory() {
70
-        return SimpleRateLimitedQueueHandler::new;
53
+    public SimpleRateLimitedOutputQueue() {
54
+        super(QueueComparators.byPriorityThenNumber());
71 55
     }
72 56
 
73 57
     /**
@@ -164,7 +148,7 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
164 148
     }
165 149
 
166 150
     @Override
167
-    public QueueItem getQueueItem(final String line, final QueuePriority priority) {
151
+    protected void enqueue(final String line, final QueuePriority priority) {
168 152
         // Was the last line added less than limitTime ago?
169 153
         synchronized (this) {
170 154
             final boolean overTime = lastItemTime + limitTime > System.currentTimeMillis();
@@ -179,7 +163,7 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
179 163
                 // It has been longer than limitTime and we are still shown as
180 164
                 // limiting, check to see if the queue is empty or not, if it is
181 165
                 // disable limiting.
182
-                if (outputQueue.getQueue().isEmpty()) {
166
+                if (getQueue().isEmpty()) {
183 167
                     isLimiting = false;
184 168
                 }
185 169
             } else {
@@ -193,21 +177,19 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
193 177
             }
194 178
         }
195 179
 
196
-        return super.getQueueItem(line, priority);
180
+        super.enqueue(line, priority);
197 181
     }
198 182
 
199 183
     @Override
200
-    public void run() {
184
+    protected void handleQueuedItems() {
201 185
         try {
202
-            while (outputQueue.isQueueEnabled()) {
203
-                final QueueItem item = outputQueue.getQueue().take();
204
-
205
-                sendLine(item.getLine());
186
+            while (isQueueEnabled()) {
187
+                sendLine(getQueue().take().getLine());
206 188
 
207 189
                 final boolean doSleep;
208 190
                 synchronized (this) {
209 191
                     doSleep = isLimiting;
210
-                    if (isLimiting && outputQueue.getQueue().isEmpty()) {
192
+                    if (isLimiting && getQueue().isEmpty()) {
211 193
                         isLimiting = false;
212 194
                     }
213 195
                 }
@@ -215,11 +197,14 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
215 197
                 if (doSleep) {
216 198
                     try {
217 199
                         Thread.sleep(waitTime);
218
-                    } catch (InterruptedException ex) { /* Do Nothing. */ }
200
+                    } catch (InterruptedException ex) {
201
+                        /* Do Nothing. */
202
+                    }
219 203
                 }
220 204
             }
221 205
         } catch (InterruptedException ex) {
222 206
             // Do nothing
223 207
         }
224 208
     }
209
+
225 210
 }

Loading…
Cancel
Save