|
@@ -34,7 +34,8 @@ type Socket struct {
|
34
|
34
|
// this is a trylock enforcing that only one goroutine can write to `conn` at a time
|
35
|
35
|
writerSemaphore Semaphore
|
36
|
36
|
|
37
|
|
- buffer []byte
|
|
37
|
+ buffers [][]byte
|
|
38
|
+ totalLength int
|
38
|
39
|
closed bool
|
39
|
40
|
sendQExceeded bool
|
40
|
41
|
finalData string // what to send when we die
|
|
@@ -121,15 +122,23 @@ func (socket *Socket) Read() (string, error) {
|
121
|
122
|
// 2. MUST NOT reorder messages
|
122
|
123
|
// 3. MUST provide mutual exclusion for socket.conn.Write
|
123
|
124
|
// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write
|
124
|
|
-func (socket *Socket) Write(data string) (err error) {
|
|
125
|
+func (socket *Socket) Write(data []byte) (err error) {
|
|
126
|
+ if len(data) == 0 {
|
|
127
|
+ return
|
|
128
|
+ }
|
|
129
|
+
|
125
|
130
|
socket.Lock()
|
126
|
131
|
if socket.closed {
|
127
|
132
|
err = io.EOF
|
128
|
|
- } else if len(data)+len(socket.buffer) > socket.maxSendQBytes {
|
129
|
|
- socket.sendQExceeded = true
|
130
|
|
- err = errSendQExceeded
|
131
|
133
|
} else {
|
132
|
|
- socket.buffer = append(socket.buffer, data...)
|
|
134
|
+ prospectiveLen := socket.totalLength + len(data)
|
|
135
|
+ if prospectiveLen > socket.maxSendQBytes {
|
|
136
|
+ socket.sendQExceeded = true
|
|
137
|
+ err = errSendQExceeded
|
|
138
|
+ } else {
|
|
139
|
+ socket.buffers = append(socket.buffers, data)
|
|
140
|
+ socket.totalLength = prospectiveLen
|
|
141
|
+ }
|
133
|
142
|
}
|
134
|
143
|
socket.Unlock()
|
135
|
144
|
|
|
@@ -165,7 +174,7 @@ func (socket *Socket) readyToWrite() bool {
|
165
|
174
|
socket.Lock()
|
166
|
175
|
defer socket.Unlock()
|
167
|
176
|
// on the first time observing socket.closed, we still have to write socket.finalData
|
168
|
|
- return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded)
|
|
177
|
+ return !socket.finalized && (socket.totalLength > 0 || socket.closed || socket.sendQExceeded)
|
169
|
178
|
}
|
170
|
179
|
|
171
|
180
|
// send actually writes messages to socket.Conn; it may block
|
|
@@ -193,11 +202,13 @@ func (socket *Socket) send() {
|
193
|
202
|
func (socket *Socket) performWrite() {
|
194
|
203
|
// retrieve the buffered data, clear the buffer
|
195
|
204
|
socket.Lock()
|
196
|
|
- buffer := socket.buffer
|
197
|
|
- socket.buffer = nil
|
|
205
|
+ buffers := socket.buffers
|
|
206
|
+ socket.buffers = nil
|
|
207
|
+ socket.totalLength = 0
|
198
|
208
|
socket.Unlock()
|
199
|
209
|
|
200
|
|
- _, err := socket.conn.Write(buffer)
|
|
210
|
+ // on Linux, the runtime will optimize this into a single writev(2) call:
|
|
211
|
+ _, err := (*net.Buffers)(&buffers).WriteTo(socket.conn)
|
201
|
212
|
|
202
|
213
|
socket.Lock()
|
203
|
214
|
shouldClose := (err != nil) || socket.closed || socket.sendQExceeded
|