Ver código fonte

refactor irc.Socket

tags/v0.11.0-beta
Shivaram Lingamneni 6 anos atrás
pai
commit
fa5d4be718
1 arquivos alterados com 69 adições e 110 exclusões
  1. 69
    110
      irc/socket.go

+ 69
- 110
irc/socket.go Ver arquivo

@@ -9,6 +9,7 @@ import (
9 9
 	"crypto/sha256"
10 10
 	"crypto/tls"
11 11
 	"encoding/hex"
12
+	"errors"
12 13
 	"io"
13 14
 	"net"
14 15
 	"strings"
@@ -18,24 +19,25 @@ import (
18 19
 
19 20
 var (
20 21
 	handshakeTimeout, _ = time.ParseDuration("5s")
22
+	errSendQExceeded    = errors.New("SendQ exceeded")
21 23
 )
22 24
 
23 25
 // Socket represents an IRC socket.
24 26
 type Socket struct {
27
+	sync.Mutex
28
+
25 29
 	conn   net.Conn
26 30
 	reader *bufio.Reader
27 31
 
28 32
 	MaxSendQBytes uint64
29 33
 
30
-	closed      bool
31
-	closedMutex sync.Mutex
32
-
33
-	finalData      string // what to send when we die
34
-	finalDataMutex sync.Mutex
35
-
34
+	// coordination system for asynchronous writes
35
+	buffer           []byte
36 36
 	lineToSendExists chan bool
37
-	linesToSend      []string
38
-	linesToSendMutex sync.Mutex
37
+
38
+	closed        bool
39
+	sendQExceeded bool
40
+	finalData     string // what to send when we die
39 41
 }
40 42
 
41 43
 // NewSocket returns a new Socket.
@@ -44,21 +46,24 @@ func NewSocket(conn net.Conn, maxSendQBytes uint64) Socket {
44 46
 		conn:             conn,
45 47
 		reader:           bufio.NewReader(conn),
46 48
 		MaxSendQBytes:    maxSendQBytes,
47
-		lineToSendExists: make(chan bool),
49
+		lineToSendExists: make(chan bool, 1),
48 50
 	}
49 51
 }
50 52
 
51 53
 // Close stops a Socket from being able to send/receive any more data.
52 54
 func (socket *Socket) Close() {
53
-	socket.closedMutex.Lock()
54
-	defer socket.closedMutex.Unlock()
55
-	if socket.closed {
56
-		return
57
-	}
58
-	socket.closed = true
55
+	alreadyClosed := func() bool {
56
+		socket.Lock()
57
+		defer socket.Unlock()
58
+		result := socket.closed
59
+		socket.closed = true
60
+		return result
61
+	}()
59 62
 
60
-	// force close loop to happen if it hasn't already
61
-	go socket.timedFillLineToSendExists(200 * time.Millisecond)
63
+	if !alreadyClosed {
64
+		// force close loop to happen if it hasn't already
65
+		socket.Write("")
66
+	}
62 67
 }
63 68
 
64 69
 // CertFP returns the fingerprint of the certificate provided by the client.
@@ -114,124 +119,78 @@ func (socket *Socket) Read() (string, error) {
114 119
 }
115 120
 
116 121
 // Write sends the given string out of Socket.
117
-func (socket *Socket) Write(data string) error {
118
-	if socket.IsClosed() {
119
-		return io.EOF
120
-	}
121
-
122
-	socket.linesToSendMutex.Lock()
123
-	socket.linesToSend = append(socket.linesToSend, data)
124
-	socket.linesToSendMutex.Unlock()
125
-
126
-	go socket.timedFillLineToSendExists(15 * time.Second)
122
+func (socket *Socket) Write(data string) (err error) {
123
+	socket.Lock()
124
+	defer socket.Unlock()
127 125
 
128
-	return nil
129
-}
126
+	if socket.closed {
127
+		err = io.EOF
128
+	} else if uint64(len(data)+len(socket.buffer)) > socket.MaxSendQBytes {
129
+		socket.sendQExceeded = true
130
+		err = errSendQExceeded
131
+	} else {
132
+		socket.buffer = append(socket.buffer, data...)
133
+	}
130 134
 
131
-// timedFillLineToSendExists either sends the note or times out.
132
-func (socket *Socket) timedFillLineToSendExists(duration time.Duration) {
133
-	lineToSendTimeout := time.NewTimer(duration)
134
-	defer lineToSendTimeout.Stop()
135
-	select {
136
-	case socket.lineToSendExists <- true:
137
-		// passed data successfully
138
-	case <-lineToSendTimeout.C:
139
-		// timed out send
135
+	// this can generate a spurious wakeup, since we are racing against the channel read,
136
+	// but since we are holding the mutex, we are not racing against the other writes
137
+	// and therefore we cannot miss a wakeup or block
138
+	if len(socket.lineToSendExists) == 0 {
139
+		socket.lineToSendExists <- true
140 140
 	}
141
+
142
+	return
141 143
 }
142 144
 
143 145
 // SetFinalData sets the final data to send when the SocketWriter closes.
144 146
 func (socket *Socket) SetFinalData(data string) {
145
-	socket.finalDataMutex.Lock()
147
+	socket.Lock()
148
+	defer socket.Unlock()
146 149
 	socket.finalData = data
147
-	socket.finalDataMutex.Unlock()
148 150
 }
149 151
 
150 152
 // IsClosed returns whether the socket is closed.
151 153
 func (socket *Socket) IsClosed() bool {
152
-	socket.closedMutex.Lock()
153
-	defer socket.closedMutex.Unlock()
154
+	socket.Lock()
155
+	defer socket.Unlock()
154 156
 	return socket.closed
155 157
 }
156 158
 
157 159
 // RunSocketWriter starts writing messages to the outgoing socket.
158 160
 func (socket *Socket) RunSocketWriter() {
159
-	for {
161
+	localBuffer := make([]byte, 0)
162
+	shouldStop := false
163
+	for !shouldStop {
160 164
 		// wait for new lines
161 165
 		select {
162 166
 		case <-socket.lineToSendExists:
163
-			socket.linesToSendMutex.Lock()
164
-
165
-			// check if we're closed
166
-			if socket.IsClosed() {
167
-				socket.linesToSendMutex.Unlock()
168
-				break
169
-			}
170
-
171
-			// check whether new lines actually exist or not
172
-			if len(socket.linesToSend) < 1 {
173
-				socket.linesToSendMutex.Unlock()
174
-				continue
175
-			}
176
-
177
-			// check sendq
178
-			var sendQBytes uint64
179
-			for _, line := range socket.linesToSend {
180
-				sendQBytes += uint64(len(line))
181
-				if socket.MaxSendQBytes < sendQBytes {
182
-					// don't unlock mutex because this break is just to escape this for loop
183
-					break
184
-				}
185
-			}
186
-			if socket.MaxSendQBytes < sendQBytes {
187
-				socket.SetFinalData("\r\nERROR :SendQ Exceeded\r\n")
188
-				socket.linesToSendMutex.Unlock()
189
-				break
190
-			}
191
-
192
-			// get all existing data
193
-			data := strings.Join(socket.linesToSend, "")
194
-			socket.linesToSend = []string{}
195
-
196
-			socket.linesToSendMutex.Unlock()
197
-
198
-			// write data
199
-			if 0 < len(data) {
200
-				_, err := socket.conn.Write([]byte(data))
201
-				if err != nil {
202
-					break
203
-				}
204
-			}
167
+			// retrieve the buffered data, clear the buffer
168
+			socket.Lock()
169
+			localBuffer = append(localBuffer, socket.buffer...)
170
+			socket.buffer = socket.buffer[:0]
171
+			socket.Unlock()
172
+
173
+			_, err := socket.conn.Write(localBuffer)
174
+			localBuffer = localBuffer[:0]
175
+
176
+			socket.Lock()
177
+			shouldStop = (err != nil) || socket.closed || socket.sendQExceeded
178
+			socket.Unlock()
205 179
 		}
206
-		if socket.IsClosed() {
207
-			// error out or we've been closed
208
-			break
209
-		}
210
-	}
211
-	// force closure of socket
212
-	socket.closedMutex.Lock()
213
-	if !socket.closed {
214
-		socket.closed = true
215 180
 	}
216
-	socket.closedMutex.Unlock()
217 181
 
218
-	// write error lines
219
-	socket.finalDataMutex.Lock()
220
-	if 0 < len(socket.finalData) {
221
-		socket.conn.Write([]byte(socket.finalData))
182
+	// mark the socket closed (if someone hasn't already), then write error lines
183
+	socket.Lock()
184
+	socket.closed = true
185
+	finalData := socket.finalData
186
+	if socket.sendQExceeded {
187
+		finalData = "\r\nERROR :SendQ Exceeded\r\n"
188
+	}
189
+	socket.Unlock()
190
+	if finalData != "" {
191
+		socket.conn.Write([]byte(finalData))
222 192
 	}
223
-	socket.finalDataMutex.Unlock()
224 193
 
225 194
 	// close the connection
226 195
 	socket.conn.Close()
227
-
228
-	// empty the lineToSendExists channel
229
-	for 0 < len(socket.lineToSendExists) {
230
-		<-socket.lineToSendExists
231
-	}
232
-}
233
-
234
-// WriteLine writes the given line out of Socket.
235
-func (socket *Socket) WriteLine(line string) error {
236
-	return socket.Write(line + "\r\n")
237 196
 }

Carregando…
Cancelar
Salvar