|
@@ -134,6 +134,7 @@ func (socket *Socket) Write(data []byte) (err error) {
|
134
|
134
|
prospectiveLen := socket.totalLength + len(data)
|
135
|
135
|
if prospectiveLen > socket.maxSendQBytes {
|
136
|
136
|
socket.sendQExceeded = true
|
|
137
|
+ socket.closed = true
|
137
|
138
|
err = errSendQExceeded
|
138
|
139
|
} else {
|
139
|
140
|
socket.buffers = append(socket.buffers, data)
|
|
@@ -161,6 +162,13 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) {
|
161
|
162
|
return
|
162
|
163
|
}
|
163
|
164
|
|
|
165
|
+ // after releasing the semaphore, we must check for fresh data, same as `send`
|
|
166
|
+ defer func() {
|
|
167
|
+ if socket.readyToWrite() {
|
|
168
|
+ socket.wakeWriter()
|
|
169
|
+ }
|
|
170
|
+ }()
|
|
171
|
+
|
164
|
172
|
// blocking acquire of the trylock
|
165
|
173
|
socket.writerSemaphore.Acquire()
|
166
|
174
|
defer socket.writerSemaphore.Release()
|
|
@@ -206,7 +214,7 @@ func (socket *Socket) readyToWrite() bool {
|
206
|
214
|
socket.Lock()
|
207
|
215
|
defer socket.Unlock()
|
208
|
216
|
// on the first time observing socket.closed, we still have to write socket.finalData
|
209
|
|
- return !socket.finalized && (socket.totalLength > 0 || socket.closed || socket.sendQExceeded)
|
|
217
|
+ return !socket.finalized && (socket.totalLength > 0 || socket.closed)
|
210
|
218
|
}
|
211
|
219
|
|
212
|
220
|
// send actually writes messages to socket.Conn; it may block
|
|
@@ -238,19 +246,20 @@ func (socket *Socket) performWrite() (closed bool) {
|
238
|
246
|
buffers := socket.buffers
|
239
|
247
|
socket.buffers = nil
|
240
|
248
|
socket.totalLength = 0
|
|
249
|
+ closed = socket.closed
|
241
|
250
|
socket.Unlock()
|
242
|
251
|
|
243
|
|
- // on Linux, the runtime will optimize this into a single writev(2) call:
|
244
|
|
- _, err := (*net.Buffers)(&buffers).WriteTo(socket.conn)
|
245
|
|
-
|
246
|
|
- socket.Lock()
|
247
|
|
- shouldClose := (err != nil) || socket.closed || socket.sendQExceeded
|
248
|
|
- socket.Unlock()
|
|
252
|
+ var err error
|
|
253
|
+ if !closed && len(buffers) > 0 {
|
|
254
|
+ // on Linux, the runtime will optimize this into a single writev(2) call:
|
|
255
|
+ _, err = (*net.Buffers)(&buffers).WriteTo(socket.conn)
|
|
256
|
+ }
|
249
|
257
|
|
250
|
|
- if shouldClose {
|
|
258
|
+ closed = closed || err != nil
|
|
259
|
+ if closed {
|
251
|
260
|
socket.finalize()
|
252
|
261
|
}
|
253
|
|
- return shouldClose
|
|
262
|
+ return
|
254
|
263
|
}
|
255
|
264
|
|
256
|
265
|
// mark closed and send final data. you must be holding the semaphore to call this:
|
|
@@ -258,12 +267,18 @@ func (socket *Socket) finalize() {
|
258
|
267
|
// mark the socket closed (if someone hasn't already), then write error lines
|
259
|
268
|
socket.Lock()
|
260
|
269
|
socket.closed = true
|
|
270
|
+ finalized := socket.finalized
|
261
|
271
|
socket.finalized = true
|
262
|
272
|
finalData := socket.finalData
|
263
|
273
|
if socket.sendQExceeded {
|
264
|
274
|
finalData = "\r\nERROR :SendQ Exceeded\r\n"
|
265
|
275
|
}
|
266
|
276
|
socket.Unlock()
|
|
277
|
+
|
|
278
|
+ if finalized {
|
|
279
|
+ return
|
|
280
|
+ }
|
|
281
|
+
|
267
|
282
|
if finalData != "" {
|
268
|
283
|
socket.conn.Write([]byte(finalData))
|
269
|
284
|
}
|