Parcourir la source

Fix Order of QueuePriorities

OutputHandlers are now incharge of creating QueueItems and comparing them.
Parser now allows for replacing the OutputQueue entirely
Default OutputQueue allows replacing of the OutputHandler (by giving it a new QueueFactory that outputs the alternative QueueHandler)
Sending PING is now High Priority
QueueItems now sort by a unique itemNumber not time, as priority queue doesn't guarentee order for itesm of teh same priority created at the same time
Add a rate-limited OutputQueue
tags/0.6.3
Shane Mc Cormack il y a 14 ans
Parent
révision
9ccb19a44a

+ 4
- 4
src/com/dmdirc/parser/common/QueuePriority.java Voir le fichier

@@ -28,10 +28,10 @@ package com.dmdirc.parser.common;
28 28
  * @author shane
29 29
  */
30 30
 public enum QueuePriority {
31
-    /** Low priority. */
32
-    LOW,
31
+    /** High priority. */
32
+    HIGH,
33 33
     /** Normal priority. */
34 34
     NORMAL,
35
-    /** High Priority. */
36
-    HIGH;
35
+    /** Low Priority. */
36
+    LOW;
37 37
 }

+ 26
- 3
src/com/dmdirc/parser/irc/IRCParser.java Voir le fichier

@@ -294,11 +294,34 @@ public class IRCParser implements SecureParser, Runnable {
294 294
      * @param myDetails Client information.
295 295
      */
296 296
     public IRCParser(final MyInfo myDetails, final ServerInfo serverDetails) {
297
+        out = new OutputQueue();
297 298
         if (myDetails != null) { this.me = myDetails; }
298 299
         if (serverDetails != null) { this.server = serverDetails; }
299 300
         resetState();
300 301
     }
301 302
 
303
+    /**
304
+     * Get the current OutputQueue
305
+     *
306
+     * @return the current OutputQueue
307
+     */
308
+    public OutputQueue getOut() {
309
+        return out;
310
+    }
311
+
312
+    /**
313
+     * Set the OutputQueue
314
+     *
315
+     * @param out the new current OutputQueue
316
+     */
317
+    public void setOut(final OutputQueue out) throws IRCParserException {
318
+        if (currentSocketState == SocketState.CLOSED) {
319
+            this.out = out;
320
+        } else {
321
+            throw new IRCParserException("OutputQueue can only be changed when disconnected.");
322
+        }
323
+    }
324
+
302 325
     /**
303 326
      * Get the current Value of bindIP.
304 327
      *
@@ -710,7 +733,7 @@ public class IRCParser implements SecureParser, Runnable {
710 733
         }
711 734
 
712 735
         callDebugInfo(DEBUG_SOCKET, "\t-> Opening socket output stream PrintWriter");
713
-        out = new OutputQueue(socket.getOutputStream());
736
+        out.setOutputStream(socket.getOutputStream());
714 737
         out.setQueueEnabled(true);
715 738
         callDebugInfo(DEBUG_SOCKET, "\t-> Opening socket input stream BufferedReader");
716 739
         in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
@@ -725,7 +748,7 @@ public class IRCParser implements SecureParser, Runnable {
725 748
             sendString("PASS " + server.getPassword());
726 749
         }
727 750
         sendString("NICK " + me.getNickname());
728
-    thinkNickname = me.getNickname();
751
+        thinkNickname = me.getNickname();
729 752
         String localhost;
730 753
         try {
731 754
             localhost = InetAddress.getLocalHost().getHostAddress();
@@ -1853,7 +1876,7 @@ public class IRCParser implements SecureParser, Runnable {
1853 1876
                 setPingNeeded(true);
1854 1877
                 pingCountDown = pingCountDownLength;
1855 1878
                 lastPingValue = String.valueOf(System.currentTimeMillis());
1856
-                if (sendString("PING " + lastPingValue)) {
1879
+                if (sendString("PING " + lastPingValue, QueuePriority.HIGH)) {
1857 1880
                     callPingSent();
1858 1881
                 }
1859 1882
             }

+ 27
- 0
src/com/dmdirc/parser/irc/IRCParserException.java Voir le fichier

@@ -0,0 +1,27 @@
1
+/*
2
+ * To change this template, choose Tools | Templates
3
+ * and open the template in the editor.
4
+ */
5
+
6
+package com.dmdirc.parser.irc;
7
+
8
+/**
9
+ * IRC Parser Exception!
10
+ *
11
+ * @author shane
12
+ */
13
+class IRCParserException extends Exception {
14
+
15
+    /** Version of this class. */
16
+    private static final long serialVersionUID = 1L;
17
+
18
+    /**
19
+     * Create a new IRCParserException
20
+     *
21
+     * @param message Reason for exception
22
+     */
23
+    public IRCParserException(final String message) {
24
+        super(message);
25
+    }
26
+
27
+}

+ 60
- 26
src/com/dmdirc/parser/irc/outputqueue/OutputQueue.java Voir le fichier

@@ -35,7 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
35 35
  */
36 36
 public class OutputQueue {
37 37
     /** PrintWriter for sending output. */
38
-    private PrintWriter out;
38
+    private PrintWriter out = null;
39 39
 
40 40
     /** Is queueing enabled? */
41 41
     private boolean queueEnabled = true;
@@ -44,14 +44,23 @@ public class OutputQueue {
44 44
     private BlockingQueue<QueueItem> queue = new PriorityBlockingQueue<QueueItem>();
45 45
 
46 46
     /** Thread for the sending queue. */
47
-    private Thread queueThread;
47
+    private QueueHandler queueHandler;
48
+
49
+    /** The QueueFactory for this OutputQueue. */
50
+    private QueueFactory queueFactory = PriorityQueueHandler.getFactory();
51
+    // private QueueFactory queueFactory = SimpleRateLimitedQueueHandler.getFactory();
48 52
 
49 53
     /**
50 54
      * Create a new OutputQueue
51
-     * 
52
-     * @param outputStream PrintWriter to use to actually send stuff.
53 55
      */
54
-    public OutputQueue (final OutputStream outputStream) {
56
+    public OutputQueue() { }
57
+
58
+    /**
59
+     * Set the output stream for this queue.
60
+     *
61
+     * @param outputStream Output Stream to use.
62
+     */
63
+    public void setOutputStream(final OutputStream outputStream) {
55 64
         this.out = new PrintWriter(outputStream, true);
56 65
     }
57 66
 
@@ -64,6 +73,27 @@ public class OutputQueue {
64 73
         return queueEnabled;
65 74
     }
66 75
 
76
+    /**
77
+     * Set the QueueFactory.
78
+     * Changing this will not change an existing QueueHandler unless queueing is
79
+     * disabled and reenabled.
80
+     * If this is called before the first lien of output is queued then there is
81
+     * no need to disable and reenable the queue.
82
+     *
83
+     * @param manager New QueueFactory to use.
84
+     */
85
+    public void setQueueManager(final QueueFactory manager) {
86
+        queueFactory = manager;
87
+    }
88
+
89
+    /**
90
+     * Get the QueueFactory.
91
+     * @return The current QueueFactory.
92
+     */
93
+    public QueueFactory getQueueManager() {
94
+        return queueFactory;
95
+    }
96
+
67 97
     /**
68 98
      * Set if queueing is enabled.
69 99
      * if this is changed from enabled to disabled, all currently queued items
@@ -72,12 +102,15 @@ public class OutputQueue {
72 102
      * @param queueEnabled new value for queueEnabled
73 103
      */
74 104
     public void setQueueEnabled(final boolean queueEnabled) {
105
+        if (out == null) {
106
+            throw new NullPointerException("No output stream has been set.");
107
+        }
75 108
         final boolean old = this.queueEnabled;
76 109
         this.queueEnabled = queueEnabled;
77 110
 
78 111
         if (old != queueEnabled && old) {
79
-            queueThread.interrupt();
80
-            queueThread = null;
112
+            queueHandler.interrupt();
113
+            queueHandler = null;
81 114
 
82 115
             while (!queue.isEmpty()) {
83 116
                 try {
@@ -94,14 +127,23 @@ public class OutputQueue {
94 127
      */
95 128
     public void clearQueue() {
96 129
         this.queueEnabled = false;
97
-        if (queueThread != null) {
98
-            queueThread.interrupt();
99
-            queueThread = null;
130
+        if (queueHandler != null) {
131
+            queueHandler.interrupt();
132
+            queueHandler = null;
100 133
         }
101 134
 
102 135
         queue.clear();
103 136
     }
104 137
 
138
+    /**
139
+     * Get the number of items currently in the queue.
140
+     *
141
+     * @return Number of items in the queue.
142
+     */
143
+    public int queueCount() {
144
+        return queue.size();
145
+    }
146
+
105 147
     /**
106 148
      * Send the given line.
107 149
      * If queueing is enabled, this will queue it, else it will send it
@@ -122,26 +164,18 @@ public class OutputQueue {
122 164
      * @param priority Priority of item (ignored if queue is disabled)
123 165
      */
124 166
     public void sendLine(final String line, final QueuePriority priority) {
167
+        if (out == null) {
168
+            throw new NullPointerException("No output stream has been set.");
169
+        }
125 170
         if (queueEnabled) {
126
-            queue.add(new QueueItem(line, priority));
127
-
128
-            if (queueThread == null || !queueThread.isAlive()) {
129
-                queueThread = getQueueHandler(queue, out);
130
-                queueThread.start();
171
+            if (queueHandler == null || !queueHandler.isAlive()) {
172
+                queueHandler = queueFactory.getQueueHandler(this, queue, out);
173
+                queueHandler.start();
131 174
             }
175
+
176
+            queue.add(queueHandler.getQueueItem(line, priority));
132 177
         } else {
133 178
             out.printf("%s\r\n", line);
134 179
         }
135 180
     }
136
-
137
-    /**
138
-     * Get a new QueueHandler instance as needed.
139
-     * 
140
-     * @param queue The queue to handle.
141
-     * @param out Where to send crap.
142
-     * @return the new queue handler object.
143
-     */
144
-    private QueueHandler getQueueHandler(final BlockingQueue<QueueItem> queue, final PrintWriter out) {
145
-        return new PriorityQueueHandler(this, queue, out);
146
-    }
147 181
 }

+ 21
- 1
src/com/dmdirc/parser/irc/outputqueue/PriorityQueueHandler.java Voir le fichier

@@ -31,8 +31,28 @@ import java.util.concurrent.BlockingQueue;
31 31
  * @author shane
32 32
  */
33 33
 public class PriorityQueueHandler extends QueueHandler {
34
+    /**
35
+     * Get a QueueFactory that produces PriorityQueueHandlers
36
+     *
37
+     * @return a QueueFactory that produces PrirortyQueueHandlers.
38
+     */
39
+    public static QueueFactory getFactory() {
40
+        return new QueueFactory(){
41
+            /** {@inheritDoc} */
42
+            @Override
43
+            public QueueHandler getQueueHandler(final OutputQueue outputQueue, final BlockingQueue<QueueItem> queue, final PrintWriter out) {
44
+                return new PriorityQueueHandler(outputQueue, queue, out);
45
+            }
46
+        };
47
+    }
34 48
 
35
-
49
+    /**
50
+     * Create a new PriorityQueueHandler
51
+     *
52
+     * @param outputQueue Owner of this Queue Handler
53
+     * @param queue Queue to use
54
+     * @param out Output Stream to use
55
+     */
36 56
     public PriorityQueueHandler(final OutputQueue outputQueue, final BlockingQueue<QueueItem> queue, final PrintWriter out) {
37 57
         super(outputQueue, queue, out);
38 58
     }

+ 28
- 0
src/com/dmdirc/parser/irc/outputqueue/QueueFactory.java Voir le fichier

@@ -0,0 +1,28 @@
1
+/*
2
+ * To change this template, choose Tools | Templates
3
+ * and open the template in the editor.
4
+ */
5
+
6
+package com.dmdirc.parser.irc.outputqueue;
7
+
8
+import java.io.PrintWriter;
9
+import java.util.concurrent.BlockingQueue;
10
+
11
+/**
12
+ * A QueueFactory produces QueueHandlers for OutputQueue.
13
+ *
14
+ * @author shane
15
+ */
16
+public interface QueueFactory {
17
+
18
+    /**
19
+     * Get a new QueueHandler instance as needed.
20
+     *
21
+     * @param outputQueue the OutputQueue that will own this QueueHandler
22
+     * @param queue The queue to handle.
23
+     * @param out Where to send crap.
24
+     * @return the new queue handler object.
25
+     */
26
+    public QueueHandler getQueueHandler(final OutputQueue outputQueue, final BlockingQueue<QueueItem> queue, final PrintWriter out);
27
+
28
+}

+ 56
- 1
src/com/dmdirc/parser/irc/outputqueue/QueueHandler.java Voir le fichier

@@ -22,6 +22,7 @@
22 22
 
23 23
 package com.dmdirc.parser.irc.outputqueue;
24 24
 
25
+import com.dmdirc.parser.common.QueuePriority;
25 26
 import java.io.PrintWriter;
26 27
 import java.util.concurrent.BlockingQueue;
27 28
 
@@ -62,7 +63,61 @@ public abstract class QueueHandler extends Thread {
62 63
         out.printf("%s\r\n", line);
63 64
     }
64 65
 
65
-    /** {@inheritDoc} */
66
+    /**
67
+     * Get a new QueueItem for the given line and priority.
68
+     * By default this will just create a new QueueItem with the given
69
+     * parameters, but QueueHandlers are free to override it if they need to
70
+     * instead produce subclasses of QueueItem or do anything else with the
71
+     * data given.
72
+     *
73
+     * @param line Line to send
74
+     * @param priority Priority of the line.
75
+     * @return A QueueItem for teh given parameters
76
+     */
77
+    public QueueItem getQueueItem(final String line, final QueuePriority priority) {
78
+        return new QueueItem(this, line, priority);
79
+    }
80
+
81
+    /**
82
+     * Compare two QueueItems for sorting purposes.
83
+     * This is called by the default QueueItem in its compareTo method. The
84
+     * calling object will be the first parameter, the object to compare it to
85
+     * will be second.
86
+     * This allows QueueHandlers to sort items differently if needed.
87
+     *
88
+     * The default implementation works as follows:
89
+     * Compare based on priorty firstly, if the priorities are the same,
90
+     * compare based on the order the items were added to the queue.
91
+     *
92
+     * If an item has been in the queue longer than 10 seconds, it will not
93
+     * check its priority and soley position itself based on adding order.
94
+     *
95
+     * @param mainObject Main object we are comparing against.
96
+     * @param otherObject Object we are comparing to.
97
+     * @return A QueueItem for teh given parameters
98
+     */
99
+    public int compareQueueItem(final QueueItem mainObject, final QueueItem otherObject) {
100
+        if (mainObject.getTime() < 10 * 1000 && mainObject.getPriority().compareTo(otherObject.getPriority()) != 0) {
101
+            return mainObject.getPriority().compareTo(otherObject.getPriority());
102
+        }
103
+
104
+        if (mainObject.getItemNumber() > otherObject.getItemNumber()) {
105
+            return 1;
106
+        } else if (mainObject.getItemNumber() < otherObject.getItemNumber()) {
107
+            return -1;
108
+        } else {
109
+            // This can't happen.
110
+            return 0;
111
+        }
112
+    }
113
+
114
+    /**
115
+     * This is the main even loop of the queue.
116
+     * It needs to handle pulling items out of the queue and calling
117
+     * sendLine.
118
+     * 
119
+     * It also needs to handle any delays in sending that it deems needed.
120
+     */
66 121
     @Override
67 122
     public abstract void run();
68 123
 }

+ 25
- 18
src/com/dmdirc/parser/irc/outputqueue/QueueItem.java Voir le fichier

@@ -30,15 +30,24 @@ import com.dmdirc.parser.common.QueuePriority;
30 30
  * @author shane
31 31
  */
32 32
 public class QueueItem implements Comparable<QueueItem> {
33
+    /** Global Item Number*/
34
+    private static long number = 0L;
35
+
33 36
     /** Line to send */
34 37
     private final String line;
35 38
 
36 39
     /** Time this line was added. */
37 40
     private final long time;
38 41
 
42
+    /** Item Number */
43
+    private final long itemNumber;
44
+
39 45
     /** What is the priority of this line? */
40 46
     private final QueuePriority priority;
41 47
 
48
+    /** Our handler. */
49
+    private final QueueHandler handler;
50
+
42 51
     /**
43 52
      * Get the value of line
44 53
      *
@@ -57,6 +66,15 @@ public class QueueItem implements Comparable<QueueItem> {
57 66
         return time;
58 67
     }
59 68
 
69
+    /**
70
+     * Get the number of this item
71
+     *
72
+     * @return the value of itemNumber
73
+     */
74
+    public long getItemNumber() {
75
+        return itemNumber;
76
+    }
77
+
60 78
     /**
61 79
      * Get the value of priority
62 80
      *
@@ -67,42 +85,31 @@ public class QueueItem implements Comparable<QueueItem> {
67 85
     }
68 86
 
69 87
     /**
70
-     * Create a new QueuePriority
88
+     * Create a new QueueItem
71 89
      *
90
+     * @param handler Handler for this QueueItem
72 91
      * @param line Line to send
73 92
      * @param priority
74 93
      */
75
-    public QueueItem(final String line, final QueuePriority priority) {
94
+    public QueueItem(final QueueHandler handler, final String line, final QueuePriority priority) {
95
+        this.handler = handler;
76 96
         this.line = line;
77 97
         this.priority = priority;
78 98
 
79 99
         this.time = System.currentTimeMillis();
100
+        this.itemNumber = number++;
80 101
     }
81 102
 
82 103
     /**
83 104
      * Compare objects.
84
-     * Compare based on priorty firstly, if the priorities are the same,
85
-     * compare based on time added.
105
+     * This will use the compareQueueItem method of the current QueueHandler.
86 106
      *
87
-     * If an item has been in the queue longer than 10 seconds, it will not
88
-     * check its priority and soley position itself based on time.
89
-     * 
90 107
      * @param o Object to compare to
91 108
      * @return Position of this item in reference to the given item.
92 109
      */
93 110
     @Override
94 111
     public int compareTo(final QueueItem o) {
95
-        if (this.getTime() < 10 * 1000 && this.getPriority().compareTo(o.getPriority()) > 0) {
96
-            return 1;
97
-        }
98
-
99
-        if (this.getTime() < o.getTime()) {
100
-            return 1;
101
-        } else if (this.getTime() > o.getTime()) {
102
-            return -1;
103
-        } else {
104
-            return 0;
105
-        }
112
+        return handler.compareQueueItem(this, o);
106 113
     }
107 114
 
108 115
     /** {@inheritDoc} */

+ 258
- 0
src/com/dmdirc/parser/irc/outputqueue/SimpleRateLimitedQueueHandler.java Voir le fichier

@@ -0,0 +1,258 @@
1
+/*
2
+ *  Copyright (c) 2006-2009 Chris Smith, Shane Mc Cormack, Gregory Holmes
3
+ * 
4
+ *  Permission is hereby granted, free of charge, to any person obtaining a copy
5
+ *  of this software and associated documentation files (the "Software"), to deal
6
+ *  in the Software without restriction, including without limitation the rights
7
+ *  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
+ *  copies of the Software, and to permit persons to whom the Software is
9
+ *  furnished to do so, subject to the following conditions:
10
+ * 
11
+ *  The above copyright notice and this permission notice shall be included in
12
+ *  all copies or substantial portions of the Software.
13
+ * 
14
+ *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
+ *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
+ *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
+ *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
+ *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
+ *  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
+ *  SOFTWARE.
21
+ */
22
+
23
+package com.dmdirc.parser.irc.outputqueue;
24
+
25
+import com.dmdirc.parser.common.QueuePriority;
26
+import java.io.PrintWriter;
27
+import java.util.concurrent.BlockingQueue;
28
+
29
+/**
30
+ * This is a simple rate limiting queue.
31
+ * If more than 4 items are added in 4 seconds it will start limiting.
32
+ * The first 4 items will be sent un-limited and then limiting will commence at
33
+ * a rate of 1 per second.
34
+ *
35
+ * @author shane
36
+ */
37
+public class SimpleRateLimitedQueueHandler extends QueueHandler {
38
+    /**
39
+     * Get a QueueFactory that produces PriorityQueueHandlers
40
+     *
41
+     * @return a QueueFactory that produces PrirortyQueueHandlers.
42
+     */
43
+    public static QueueFactory getFactory() {
44
+        return new QueueFactory(){
45
+            /** {@inheritDoc} */
46
+            @Override
47
+            public QueueHandler getQueueHandler(final OutputQueue outputQueue, final BlockingQueue<QueueItem> queue, final PrintWriter out) {
48
+                return new SimpleRateLimitedQueueHandler(outputQueue, queue, out);
49
+            }
50
+        };
51
+    }
52
+
53
+    /** Current count. */
54
+    private int count = 0;
55
+
56
+    /** Time last item was added. */
57
+    private long lastItemTime = 0L;
58
+
59
+    /** Are we limiting? */
60
+    private boolean isLimiting = false;
61
+
62
+    /** How many items are allowed before limiting? */
63
+    private int items = 4;
64
+
65
+    /** How many microseconds do we care about when checking for items? */
66
+    private int limitTime = 4000;
67
+
68
+    /** How long to wait in between each item when limiting? */
69
+    private int waitTime = 3000;
70
+
71
+    /** Always update the lastItemTime or only if its been > limitTime? */
72
+    private boolean alwaysUpdateTime = true;
73
+
74
+    /**
75
+     * Create a new SimpleRateLimitedQueueHandler
76
+     *
77
+     * @param outputQueue Owner of this Queue Handler
78
+     * @param queue Queue to use
79
+     * @param out Output Stream to use
80
+     */
81
+    public SimpleRateLimitedQueueHandler(final OutputQueue outputQueue, final BlockingQueue<QueueItem> queue, final PrintWriter out) {
82
+        super(outputQueue, queue, out);
83
+    }
84
+
85
+    /**
86
+     * Get the number of items needed to activate rate limiting.
87
+     * 
88
+     * @return Number of items needed to activate rate limiting.
89
+     */
90
+    public int getItems() {
91
+        return items;
92
+    }
93
+
94
+    /**
95
+     * Set the number of items needed to activate rate limiting.
96
+     *
97
+     * @param items Number of items needed to activate rate limiting.
98
+     */
99
+    public void setItems(final int items) {
100
+        this.items = items;
101
+    }
102
+
103
+    /**
104
+     * Get the length of time that is used when checking for rate limiting. (If
105
+     * more than getItems() number of lines are added less that this time apart
106
+     * from each other then rate limiting is activated.)
107
+     *
108
+     * @return Number of items needed to activate rate limiting.
109
+     */
110
+    public int getLimitTime() {
111
+        return limitTime;
112
+    }
113
+
114
+    /**
115
+     * Set the length of time that is used when checking for rate limiting. (If
116
+     * more than getItems() number of lines are added less that this time apart
117
+     * from each other then rate limiting is activated.)
118
+     *
119
+     * @param limitTime Number of items needed to activate rate limiting.
120
+     */
121
+    public void setLimitTime(final int limitTime) {
122
+        this.limitTime = limitTime;
123
+    }
124
+
125
+    /**
126
+     * Get the length of time that we wait inbetween lines when limiting.
127
+     *
128
+     * @return length of time that we wait inbetween lines when limiting.
129
+     */
130
+    public int getWaitTime() {
131
+        return waitTime;
132
+    }
133
+
134
+    /**
135
+     * Set the length of time that we wait inbetween lines when limiting.
136
+     *
137
+     * @param waitTime length of time that we wait inbetween lines when limiting.
138
+     */
139
+    public void setWaitTime(final int waitTime) {
140
+        this.waitTime = waitTime;
141
+    }
142
+
143
+    /**
144
+     * Will the internal "lastItemTime" be updated every time an item is added,
145
+     * or only after limitTime has passed?
146
+     *
147
+     * If true, assuming the default settings) items sent at 0, 3, 6, 9 will
148
+     * activate rate limiting, if false it would need to be 0, 1, 2, 3.
149
+     *
150
+     * @return is LastItemTime always updated?
151
+     */
152
+    public boolean getAlwaysUpdateTime() {
153
+        return alwaysUpdateTime;
154
+    }
155
+
156
+    /**
157
+     * Set if the internal "lastItemTime" should be updated every time an item
158
+     * is added, or only after limitTime has passed?
159
+     *
160
+     * If true, assuming the default settings) items sent at 0, 3, 6, 9 will
161
+     * activate rate limiting, if false it would need to be 0, 1, 2, 3.
162
+     * 
163
+     * @param alwaysUpdateTime Should LastItemTime always updated?
164
+     */
165
+    public void setAlwaysUpdateTime(final boolean alwaysUpdateTime) {
166
+        this.alwaysUpdateTime = alwaysUpdateTime;
167
+    }
168
+
169
+    /**
170
+     * Are we currently limiting?
171
+     * 
172
+     * @return True if limiting is active.
173
+     */
174
+    public boolean isLimiting() {
175
+        return isLimiting;
176
+    }
177
+
178
+    /**
179
+     * Compare queue items, if priorities differ, then  higher priority items
180
+     * will always be put further ahead in the queue (This queue ignores the
181
+     * 10-second rule of the normal queue) otherwise the normal comparison is
182
+     * used.
183
+     */
184
+    @Override
185
+    public int compareQueueItem(final QueueItem mainObject, final QueueItem otherObject) {
186
+        if (mainObject.getPriority().compareTo(otherObject.getPriority()) != 0) {
187
+            return mainObject.getPriority().compareTo(otherObject.getPriority());
188
+        } else {
189
+            return super.compareQueueItem(mainObject, otherObject);
190
+        }
191
+    }
192
+
193
+    /** {@inheritDoc} */
194
+    @Override
195
+    public QueueItem getQueueItem(String line, QueuePriority priority) {
196
+        // Was the last line added less than limitTime ago?
197
+        synchronized (this) {
198
+            final boolean overTime = (lastItemTime + limitTime > System.currentTimeMillis());
199
+            if (overTime) {
200
+                // If we are not currently limiting, and this is the items-th item
201
+                // added in the last limitTime, start limiting.
202
+                if (!isLimiting) {
203
+                    if (++count > (items - 1)) {
204
+                        System.out.println("++ Begin Limiting");
205
+                        isLimiting = true;
206
+                        count = 0;
207
+                    }
208
+                }
209
+            } else if (!isLimiting) {
210
+                // If it has been more than limitTime seconds since the last line
211
+                // and we are not currently limiting, reset the count.
212
+                count = 0;
213
+            } else {
214
+                // It has been longer than limitTime and we are still shown as
215
+                // limiting, check to see if the queue is empty or not, if it is
216
+                // disable limiting.
217
+                if (queue.size() == 0) {
218
+                    isLimiting = false;
219
+                }
220
+            }
221
+            if (alwaysUpdateTime || overTime) {
222
+                lastItemTime = System.currentTimeMillis();
223
+            }
224
+        }
225
+        
226
+        return super.getQueueItem(line, priority);
227
+    }
228
+
229
+    /** {@inheritDoc} */
230
+    @Override
231
+    public void run() {
232
+        try {
233
+            while (outputQueue.isQueueEnabled()) {
234
+                final QueueItem item = queue.take();
235
+                
236
+                sendLine(item.getLine());
237
+
238
+                final boolean doSleep;
239
+                synchronized (this) {
240
+                    doSleep = isLimiting;
241
+                    if (isLimiting) {
242
+                        if (queue.size() == 0) {
243
+                            isLimiting = false;
244
+                        }
245
+                    }
246
+                }
247
+
248
+                if (doSleep) {
249
+                    try {
250
+                        Thread.sleep(waitTime);
251
+                    } catch (InterruptedException ex) { /* Do Nothing. */ }
252
+                }
253
+            }
254
+        } catch (InterruptedException ex) {
255
+            // Do nothing
256
+        }
257
+    }
258
+}

Chargement…
Annuler
Enregistrer