Ver código fonte

Merge pull request #128 from csmith/master

Merge OutputQueue and QueueHandlers.
pull/129/head
Shane Mc Cormack 7 anos atrás
pai
commit
298788d21b

+ 4
- 5
irc/src/com/dmdirc/parser/irc/IRCParser.java Ver arquivo

@@ -48,6 +48,7 @@ import com.dmdirc.parser.irc.IRCReader.ReadLine;
48 48
 import com.dmdirc.parser.irc.events.IRCDataInEvent;
49 49
 import com.dmdirc.parser.irc.events.IRCDataOutEvent;
50 50
 import com.dmdirc.parser.irc.outputqueue.OutputQueue;
51
+import com.dmdirc.parser.irc.outputqueue.PriorityOutputQueue;
51 52
 
52 53
 import java.io.IOException;
53 54
 import java.net.InetAddress;
@@ -275,7 +276,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
275 276
         myProcessingManager = graph.get(ProcessingManager.class);
276 277
         myself = new IRCClientInfo(this, userModes, "myself").setFake(true);
277 278
 
278
-        out = new OutputQueue();
279
+        out = new PriorityOutputQueue();
279 280
         if (myDetails != null) {
280 281
             this.me = myDetails;
281 282
         }
@@ -662,9 +663,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
662 663
             userModes.clear();
663 664
             chanPrefix = DEFAULT_CHAN_PREFIX;
664 665
             // Clear output queue.
665
-            if (out != null) {
666
-                out.clearQueue();
667
-            }
666
+            out.clearQueue();
668 667
             setServerName("");
669 668
             networkName = "";
670 669
             lastLine = null;
@@ -1026,7 +1025,7 @@ public class IRCParser extends BaseSocketAwareParser implements SecureParser, En
1026 1025
      * @return True if line was sent, else false.
1027 1026
      */
1028 1027
     protected boolean sendString(final String line, final QueuePriority priority, final boolean fromParser) {
1029
-        if (out == null || getSocketState() != SocketState.OPEN) {
1028
+        if (getSocketState() != SocketState.OPEN) {
1030 1029
             return false;
1031 1030
         }
1032 1031
         callDataOut(line, fromParser);

+ 59
- 69
irc/src/com/dmdirc/parser/irc/outputqueue/OutputQueue.java Ver arquivo

@@ -26,43 +26,34 @@ import com.dmdirc.parser.common.QueuePriority;
26 26
 
27 27
 import java.io.OutputStream;
28 28
 import java.io.PrintWriter;
29
+import java.util.Comparator;
29 30
 import java.util.concurrent.BlockingQueue;
30 31
 import java.util.concurrent.PriorityBlockingQueue;
31 32
 
32 33
 /**
33 34
  * This class handles the Parser output Queue.
34 35
  */
35
-public class OutputQueue {
36
+public abstract class OutputQueue {
36 37
 
37 38
     /** PrintWriter for sending output. */
38 39
     private PrintWriter out;
39 40
     /** Is queueing enabled? */
40 41
     private boolean queueEnabled = true;
41 42
     /** Are we discarding all futher input? */
42
-    private boolean discarding = false;
43
+    private boolean discarding;
43 44
     /** The output queue! */
44
-    private BlockingQueue<QueueItem> queue = new PriorityBlockingQueue<>();
45
-    /** Object for synchronising access to the {@link #queueHandler}. */
46
-    private final Object queueHandlerLock = new Object();
47
-    /** Thread for the sending queue. */
48
-    private QueueHandler queueHandler;
49
-    /** The QueueHandlerFactory for this OutputQueue. */
50
-    private final QueueHandlerFactory queueHandlerFactory;
45
+    private final BlockingQueue<QueueItem> queue;
46
+    /** The thread on which we will send items. */
47
+    private Thread sendingThread;
51 48
 
52 49
     /**
53
-     * Creates a new output queue using the default handler.
54
-     */
55
-    public OutputQueue() {
56
-        this(PriorityQueueHandler.getFactory());
57
-    }
58
-
59
-    /**
60
-     * Creates a new output queue using a handler from the given factory.
50
+     * Creates a new instance of {@link OutputQueue} that will sort items using the given
51
+     * comparator.
61 52
      *
62
-     * @param queueHandlerFactory The factory to use to create {@link QueueHandler}s.
53
+     * @param itemComparator The comparator to use to sort queued items.
63 54
      */
64
-    public OutputQueue(final QueueHandlerFactory queueHandlerFactory) {
65
-        this.queueHandlerFactory = queueHandlerFactory;
55
+    protected OutputQueue(final Comparator<QueueItem> itemComparator) {
56
+        queue = new PriorityBlockingQueue<>(10, itemComparator);
66 57
     }
67 58
 
68 59
     /**
@@ -83,15 +74,6 @@ public class OutputQueue {
83 74
         return queueEnabled;
84 75
     }
85 76
 
86
-    /**
87
-     * Get the QueueHandler.
88
-     *
89
-     * @return The current QueueHandler if there is one, else null.
90
-     */
91
-    public QueueHandler getQueueHandler() {
92
-        return queueHandler;
93
-    }
94
-
95 77
     /**
96 78
      * Set if queueing is enabled.
97 79
      * if this is changed from enabled to disabled, all currently queued items
@@ -110,11 +92,9 @@ public class OutputQueue {
110 92
         // If the new value is not the same as the old one, and we used to be enabled
111 93
         // then flush the queue.
112 94
         if (old != queueEnabled && old) {
113
-            synchronized (queueHandlerLock) {
114
-                if (queueHandler != null) {
115
-                    queueHandler.interrupt();
116
-                    queueHandler = null;
117
-                }
95
+            if (sendingThread != null) {
96
+                sendingThread.interrupt();
97
+                sendingThread = null;
118 98
             }
119 99
 
120 100
             while (!queue.isEmpty()) {
@@ -155,18 +135,14 @@ public class OutputQueue {
155 135
     }
156 136
 
157 137
     /**
158
-     * Clear the queue and stop the thread that is sending stuff.
138
+     * Clears any pending items.
159 139
      */
160 140
     public void clearQueue() {
161
-        this.queueEnabled = false;
162
-
163
-        synchronized (queueHandlerLock) {
164
-            if (queueHandler != null) {
165
-                queueHandler.interrupt();
166
-                queueHandler = null;
167
-            }
141
+        queueEnabled = false;
142
+        if (sendingThread != null) {
143
+            sendingThread.interrupt();
144
+            sendingThread = null;
168 145
         }
169
-
170 146
         queue.clear();
171 147
     }
172 148
 
@@ -181,8 +157,8 @@ public class OutputQueue {
181 157
 
182 158
     /**
183 159
      * Send the given line.
184
-     * If queueing is enabled, this will queue it, else it will send it
185
-     * immediately.
160
+     *
161
+     * <p>If queueing is enabled, this will queue it, else it will send it immediately.
186 162
      *
187 163
      * @param line Line to send
188 164
      */
@@ -192,44 +168,58 @@ public class OutputQueue {
192 168
 
193 169
     /**
194 170
      * Send the given line.
195
-     * If queueing is enabled, this will queue it, else it will send it
196
-     * immediately.
171
+     *
172
+     * <p>If queueing is enabled, this will queue it, else it will send it immediately.
197 173
      *
198 174
      * @param line Line to send
199 175
      * @param priority Priority of item (ignored if queue is disabled)
200 176
      */
201 177
     public void sendLine(final String line, final QueuePriority priority) {
202
-        if (discarding) { return; }
203
-        if (out == null) {
204
-            throw new NullPointerException("No output stream has been set.");
178
+        if (discarding) {
179
+            return;
205 180
         }
206 181
 
207
-        if (queueEnabled && priority != QueuePriority.IMMEDIATE) {
208
-            synchronized (queueHandlerLock) {
209
-                if (queueHandler == null || !queueHandler.isAlive()) {
210
-                    setQueueHandler(queueHandlerFactory.getQueueHandler(this, out));
211
-                    queueHandler.start();
212
-                }
213
-
214
-                queue.add(queueHandler.getQueueItem(line, priority));
215
-            }
182
+        if (queueEnabled && priority == QueuePriority.IMMEDIATE) {
183
+            send(line);
216 184
         } else {
217
-            out.printf("%s\r\n", line);
185
+            if (sendingThread == null || !sendingThread.isAlive()) {
186
+                sendingThread = new Thread(this::handleQueuedItems, "IRC Parser queue handler");
187
+                sendingThread.start();
188
+            }
189
+
190
+            enqueue(line, priority);
218 191
         }
219 192
     }
220 193
 
221 194
     /**
222
-     * Sets the hanlder that this queue will use. This will cause the existing {@link #queue}
223
-     * to be replaced with a new version with an updated comparator.
195
+     * Sends queued items to the output channel, blocking or waiting as necessary.
224 196
      *
225
-     * @param queueHandler The new queue handler to use.
197
+     * <p>This is a long running method that will be executed in a separate thread.
226 198
      */
227
-    private void setQueueHandler(final QueueHandler queueHandler) {
228
-        this.queueHandler = queueHandler;
229
-        final BlockingQueue<QueueItem> newQueue = new PriorityBlockingQueue<>(
230
-                10, queueHandler.getQueueItemComparator());
231
-        newQueue.addAll(queue);
232
-        queue = newQueue;
199
+    protected abstract void handleQueuedItems();
200
+
201
+    /**
202
+     * Enqueues a new line to be sent.
203
+     *
204
+     * @param line The raw line to be sent to the server.
205
+     * @param priority The priority at which the line should be sent.
206
+     */
207
+    protected void enqueue(final String line, final QueuePriority priority) {
208
+        queue.add(new QueueItem(line, priority));
233 209
     }
234 210
 
211
+    /**
212
+     * Sends a line immediately to the server.
213
+     *
214
+     * @param line The line to be sent.
215
+     */
216
+    protected void send(final String line) {
217
+        if (out == null) {
218
+            throw new IllegalStateException("No output stream has been set.");
219
+        }
220
+
221
+        out.printf("%s\r\n", line);
222
+    }
223
+
224
+
235 225
 }

irc/src/com/dmdirc/parser/irc/outputqueue/QueueHandlerFactory.java → irc/src/com/dmdirc/parser/irc/outputqueue/PriorityOutputQueue.java Ver arquivo

@@ -22,20 +22,29 @@
22 22
 
23 23
 package com.dmdirc.parser.irc.outputqueue;
24 24
 
25
-import java.io.PrintWriter;
25
+import java.time.Duration;
26 26
 
27 27
 /**
28
- * A QueueHandlerFactory produces QueueHandlers for OutputQueue.
28
+ * This does no rate limiting, and just sends based on priority.
29 29
  */
30
-public interface QueueHandlerFactory {
30
+public class PriorityOutputQueue extends OutputQueue {
31 31
 
32 32
     /**
33
-     * Get a new QueueHandler instance as needed.
34
-     *
35
-     * @param outputQueue the OutputQueue that will own this QueueHandler
36
-     * @param out Where to send crap.
37
-     * @return the new queue handler object.
33
+     * Create a new PriorityOutputQueue.
38 34
      */
39
-    QueueHandler getQueueHandler(final OutputQueue outputQueue, final PrintWriter out);
35
+    public PriorityOutputQueue() {
36
+        super(QueueComparators.byPriorityThenNumber(Duration.ofSeconds(10)));
37
+    }
38
+
39
+    @Override
40
+    protected void handleQueuedItems() {
41
+        try {
42
+            while (isQueueEnabled()) {
43
+                send(getQueue().take().getLine());
44
+            }
45
+        } catch (InterruptedException ex) {
46
+            // Do nothing
47
+        }
48
+    }
40 49
 
41 50
 }

+ 0
- 69
irc/src/com/dmdirc/parser/irc/outputqueue/PriorityQueueHandler.java Ver arquivo

@@ -1,69 +0,0 @@
1
-/*
2
- * Copyright (c) 2006-2015 DMDirc Developers
3
- *
4
- * Permission is hereby granted, free of charge, to any person obtaining a copy
5
- * of this software and associated documentation files (the "Software"), to deal
6
- * in the Software without restriction, including without limitation the rights
7
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
- * copies of the Software, and to permit persons to whom the Software is
9
- * furnished to do so, subject to the following conditions:
10
- *
11
- * The above copyright notice and this permission notice shall be included in
12
- * all copies or substantial portions of the Software.
13
- *
14
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
- * SOFTWARE.
21
- */
22
-
23
-package com.dmdirc.parser.irc.outputqueue;
24
-
25
-import java.io.PrintWriter;
26
-import java.time.Duration;
27
-
28
-/**
29
- * This does no rate limiting, and just sends based on priority.
30
- */
31
-public class PriorityQueueHandler extends QueueHandler {
32
-
33
-    /**
34
-     * Create a new PriorityQueueHandler.
35
-     *
36
-     * @param outputQueue Owner of this Queue Handler
37
-     * @param out Output Stream to use
38
-     */
39
-    public PriorityQueueHandler(
40
-            final OutputQueue outputQueue,
41
-            final PrintWriter out) {
42
-        super(outputQueue,
43
-                QueueComparators.byPriorityThenNumber(Duration.ofSeconds(10)),
44
-                out);
45
-    }
46
-
47
-    /**
48
-     * Get a QueueHandlerFactory that produces PriorityQueueHandlers.
49
-     *
50
-     * @return a QueueHandlerFactory that produces PrirortyQueueHandlers.
51
-     */
52
-    public static QueueHandlerFactory getFactory() {
53
-        return PriorityQueueHandler::new;
54
-    }
55
-
56
-    @Override
57
-    public void run() {
58
-        try {
59
-            while (outputQueue.isQueueEnabled()) {
60
-                final QueueItem item = outputQueue.getQueue().take();
61
-
62
-                sendLine(item.getLine());
63
-            }
64
-        } catch (InterruptedException ex) {
65
-            // Do nothing
66
-        }
67
-    }
68
-
69
-}

+ 0
- 103
irc/src/com/dmdirc/parser/irc/outputqueue/QueueHandler.java Ver arquivo

@@ -1,103 +0,0 @@
1
-/*
2
- * Copyright (c) 2006-2015 DMDirc Developers
3
- *
4
- * Permission is hereby granted, free of charge, to any person obtaining a copy
5
- * of this software and associated documentation files (the "Software"), to deal
6
- * in the Software without restriction, including without limitation the rights
7
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
- * copies of the Software, and to permit persons to whom the Software is
9
- * furnished to do so, subject to the following conditions:
10
- *
11
- * The above copyright notice and this permission notice shall be included in
12
- * all copies or substantial portions of the Software.
13
- *
14
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
- * SOFTWARE.
21
- */
22
-
23
-package com.dmdirc.parser.irc.outputqueue;
24
-
25
-import com.dmdirc.parser.common.QueuePriority;
26
-
27
-import java.io.PrintWriter;
28
-import java.util.Comparator;
29
-
30
-/**
31
- * Sending queue.
32
- */
33
-public abstract class QueueHandler extends Thread {
34
-
35
-    /** The output queue that owns us. */
36
-    protected OutputQueue outputQueue;
37
-    /** Comparator to use to sort queue items. */
38
-    private final Comparator<QueueItem> comparator;
39
-    /** Where to send the output. */
40
-    private final PrintWriter out;
41
-
42
-    /**
43
-     * Create a new Queue Thread.
44
-     *
45
-     * @param outputQueue the OutputQueue that owns us.
46
-     * @param comparator Comparator to use to sort items in the queue.
47
-     * @param out Writer to send to.
48
-     */
49
-    public QueueHandler(
50
-            final OutputQueue outputQueue,
51
-            final Comparator<QueueItem> comparator,
52
-            final PrintWriter out) {
53
-        super("IRC Parser queue handler");
54
-
55
-        this.comparator = comparator;
56
-        this.out = out;
57
-        this.outputQueue = outputQueue;
58
-    }
59
-
60
-    /**
61
-     * Send the given item.
62
-     *
63
-     * @param line Line to send.
64
-     */
65
-    public void sendLine(final String line) {
66
-        out.printf("%s\r\n", line);
67
-    }
68
-
69
-    /**
70
-     * Get a new QueueItem for the given line and priority.
71
-     * By default this will just create a new QueueItem with the given
72
-     * parameters, but QueueHandlers are free to override it if they need to
73
-     * instead produce subclasses of QueueItem or do anything else with the
74
-     * data given.
75
-     *
76
-     * @param line Line to send
77
-     * @param priority Priority of the line.
78
-     * @return A QueueItem for teh given parameters
79
-     */
80
-    public QueueItem getQueueItem(final String line, final QueuePriority priority) {
81
-        return new QueueItem(line, priority);
82
-    }
83
-
84
-    /**
85
-     * Gets the comparator to use to sort queue items.
86
-     *
87
-     * @return The comparator to use to sort queue items.
88
-     */
89
-    public Comparator<QueueItem> getQueueItemComparator() {
90
-        return comparator;
91
-    }
92
-
93
-    /**
94
-     * This is the main even loop of the queue.
95
-     * It needs to handle pulling items out of the queue and calling
96
-     * sendLine.
97
-     *
98
-     * It also needs to handle any delays in sending that it deems needed.
99
-     */
100
-    @Override
101
-    public abstract void run();
102
-
103
-}

irc/src/com/dmdirc/parser/irc/outputqueue/SimpleRateLimitedQueueHandler.java → irc/src/com/dmdirc/parser/irc/outputqueue/SimpleRateLimitedOutputQueue.java Ver arquivo

@@ -24,15 +24,13 @@ package com.dmdirc.parser.irc.outputqueue;
24 24
 
25 25
 import com.dmdirc.parser.common.QueuePriority;
26 26
 
27
-import java.io.PrintWriter;
28
-
29 27
 /**
30 28
  * This is a simple rate limiting queue.
31 29
  * If more than 4 items are added in 4 seconds it will start limiting.
32 30
  * The first 4 items will be sent un-limited and then limiting will commence at
33 31
  * a rate of 1 per second.
34 32
  */
35
-public class SimpleRateLimitedQueueHandler extends QueueHandler {
33
+public class SimpleRateLimitedOutputQueue extends OutputQueue {
36 34
 
37 35
     /** Current count. */
38 36
     private int count;
@@ -50,24 +48,10 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
50 48
     private boolean alwaysUpdateTime = true;
51 49
 
52 50
     /**
53
-     * Create a new SimpleRateLimitedQueueHandler.
54
-     *
55
-     * @param outputQueue Owner of this Queue Handler
56
-     * @param out Output Stream to use
57
-     */
58
-    public SimpleRateLimitedQueueHandler(
59
-            final OutputQueue outputQueue,
60
-            final PrintWriter out) {
61
-        super(outputQueue, QueueComparators.byPriorityThenNumber(), out);
62
-    }
63
-
64
-    /**
65
-     * Get a QueueHandlerFactory that produces PriorityQueueHandlers.
66
-     *
67
-     * @return a QueueHandlerFactory that produces PrirortyQueueHandlers.
51
+     * Create a new SimpleRateLimitedOutputQueue.
68 52
      */
69
-    public static QueueHandlerFactory getFactory() {
70
-        return SimpleRateLimitedQueueHandler::new;
53
+    public SimpleRateLimitedOutputQueue() {
54
+        super(QueueComparators.byPriorityThenNumber());
71 55
     }
72 56
 
73 57
     /**
@@ -164,7 +148,7 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
164 148
     }
165 149
 
166 150
     @Override
167
-    public QueueItem getQueueItem(final String line, final QueuePriority priority) {
151
+    protected void enqueue(final String line, final QueuePriority priority) {
168 152
         // Was the last line added less than limitTime ago?
169 153
         synchronized (this) {
170 154
             final boolean overTime = lastItemTime + limitTime > System.currentTimeMillis();
@@ -179,7 +163,7 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
179 163
                 // It has been longer than limitTime and we are still shown as
180 164
                 // limiting, check to see if the queue is empty or not, if it is
181 165
                 // disable limiting.
182
-                if (outputQueue.getQueue().isEmpty()) {
166
+                if (getQueue().isEmpty()) {
183 167
                     isLimiting = false;
184 168
                 }
185 169
             } else {
@@ -193,21 +177,19 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
193 177
             }
194 178
         }
195 179
 
196
-        return super.getQueueItem(line, priority);
180
+        super.enqueue(line, priority);
197 181
     }
198 182
 
199 183
     @Override
200
-    public void run() {
184
+    protected void handleQueuedItems() {
201 185
         try {
202
-            while (outputQueue.isQueueEnabled()) {
203
-                final QueueItem item = outputQueue.getQueue().take();
204
-
205
-                sendLine(item.getLine());
186
+            while (isQueueEnabled()) {
187
+                sendLine(getQueue().take().getLine());
206 188
 
207 189
                 final boolean doSleep;
208 190
                 synchronized (this) {
209 191
                     doSleep = isLimiting;
210
-                    if (isLimiting && outputQueue.getQueue().isEmpty()) {
192
+                    if (isLimiting && getQueue().isEmpty()) {
211 193
                         isLimiting = false;
212 194
                     }
213 195
                 }
@@ -215,11 +197,14 @@ public class SimpleRateLimitedQueueHandler extends QueueHandler {
215 197
                 if (doSleep) {
216 198
                     try {
217 199
                         Thread.sleep(waitTime);
218
-                    } catch (InterruptedException ex) { /* Do Nothing. */ }
200
+                    } catch (InterruptedException ex) {
201
+                        /* Do Nothing. */
202
+                    }
219 203
                 }
220 204
             }
221 205
         } catch (InterruptedException ex) {
222 206
             // Do nothing
223 207
         }
224 208
     }
209
+
225 210
 }

Carregando…
Cancelar
Salvar