Browse Source

eliminate dedicated RunSocketWriter goroutine

tags/v0.12.0
Shivaram Lingamneni 6 years ago
parent
commit
b2f798cf03
2 changed files with 58 additions and 40 deletions
  1. 0
    1
      irc/client.go
  2. 58
    39
      irc/socket.go

+ 0
- 1
irc/client.go View File

@@ -90,7 +90,6 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client {
90 90
 	limits := server.Limits()
91 91
 	fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest
92 92
 	socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes())
93
-	go socket.RunSocketWriter()
94 93
 	client := &Client{
95 94
 		atime:          now,
96 95
 		authorized:     server.Password() == nil,

+ 58
- 39
irc/socket.go View File

@@ -31,23 +31,26 @@ type Socket struct {
31 31
 
32 32
 	maxSendQBytes int
33 33
 
34
-	// coordination system for asynchronous writes
35
-	buffer           []byte
36
-	lineToSendExists chan bool
34
+	// this is a trylock enforcing that only one goroutine can write to `conn` at a time
35
+	writerSlotOpen chan bool
37 36
 
37
+	buffer        []byte
38 38
 	closed        bool
39 39
 	sendQExceeded bool
40 40
 	finalData     string // what to send when we die
41
+	finalized     bool
41 42
 }
42 43
 
43 44
 // NewSocket returns a new Socket.
44 45
 func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket {
45
-	return Socket{
46
-		conn:             conn,
47
-		reader:           bufio.NewReaderSize(conn, maxReadQBytes),
48
-		maxSendQBytes:    maxSendQBytes,
49
-		lineToSendExists: make(chan bool, 1),
46
+	result := Socket{
47
+		conn:           conn,
48
+		reader:         bufio.NewReaderSize(conn, maxReadQBytes),
49
+		maxSendQBytes:  maxSendQBytes,
50
+		writerSlotOpen: make(chan bool, 1),
50 51
 	}
52
+	result.writerSlotOpen <- true
53
+	return result
51 54
 }
52 55
 
53 56
 // Close stops a Socket from being able to send/receive any more data.
@@ -56,7 +59,7 @@ func (socket *Socket) Close() {
56 59
 	socket.closed = true
57 60
 	socket.Unlock()
58 61
 
59
-	socket.wakeWriter()
62
+	go socket.send()
60 63
 }
61 64
 
62 65
 // CertFP returns the fingerprint of the certificate provided by the client.
@@ -114,7 +117,11 @@ func (socket *Socket) Read() (string, error) {
114 117
 	return line, nil
115 118
 }
116 119
 
117
-// Write sends the given string out of Socket.
120
+// Write sends the given string out of Socket. Requirements:
121
+// 1. MUST NOT block for macroscopic amounts of time
122
+// 2. MUST NOT reorder messages
123
+// 3. MUST provide mutual exclusion for socket.conn.Write
124
+// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write
118 125
 func (socket *Socket) Write(data string) (err error) {
119 126
 	socket.Lock()
120 127
 	if socket.closed {
@@ -127,19 +134,10 @@ func (socket *Socket) Write(data string) (err error) {
127 134
 	}
128 135
 	socket.Unlock()
129 136
 
130
-	socket.wakeWriter()
137
+	go socket.send()
131 138
 	return
132 139
 }
133 140
 
134
-// wakeWriter wakes up the goroutine that actually performs the write, without blocking
135
-func (socket *Socket) wakeWriter() {
136
-	// nonblocking send to the channel, no-op if it's full
137
-	select {
138
-	case socket.lineToSendExists <- true:
139
-	default:
140
-	}
141
-}
142
-
143 141
 // SetFinalData sets the final data to send when the SocketWriter closes.
144 142
 func (socket *Socket) SetFinalData(data string) {
145 143
 	socket.Lock()
@@ -154,32 +152,53 @@ func (socket *Socket) IsClosed() bool {
154 152
 	return socket.closed
155 153
 }
156 154
 
157
-// RunSocketWriter starts writing messages to the outgoing socket.
158
-func (socket *Socket) RunSocketWriter() {
159
-	localBuffer := make([]byte, 0)
160
-	shouldStop := false
161
-	for !shouldStop {
162
-		// wait for new lines
155
+// is there data to write?
156
+func (socket *Socket) readyToWrite() bool {
157
+	socket.Lock()
158
+	defer socket.Unlock()
159
+	// on the first time observing socket.closed, we still have to write socket.finalData
160
+	return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded)
161
+}
162
+
163
+// send actually writes messages to socket.Conn; it may block
164
+func (socket *Socket) send() {
165
+	// one of these checks happens-after every call to Write(), so we can't miss writes
166
+	for socket.readyToWrite() {
163 167
 		select {
164
-		case <-socket.lineToSendExists:
165
-			// retrieve the buffered data, clear the buffer
166
-			socket.Lock()
167
-			localBuffer = append(localBuffer, socket.buffer...)
168
-			socket.buffer = socket.buffer[:0]
169
-			socket.Unlock()
170
-
171
-			_, err := socket.conn.Write(localBuffer)
172
-			localBuffer = localBuffer[:0]
173
-
174
-			socket.Lock()
175
-			shouldStop = (err != nil) || socket.closed || socket.sendQExceeded
176
-			socket.Unlock()
168
+		case <-socket.writerSlotOpen:
169
+			// got the trylock: actually do the write
170
+			socket.performWrite()
171
+			socket.writerSlotOpen <- true
172
+		default:
173
+			// another goroutine is in progress; exit and wait for them to loop back around
174
+			// and observe readyToWrite() again
175
+			return
177 176
 		}
178 177
 	}
178
+}
179
+
180
+// write the contents of the buffer, then see if we need to close
181
+func (socket *Socket) performWrite() {
182
+	// retrieve the buffered data, clear the buffer
183
+	socket.Lock()
184
+	buffer := socket.buffer
185
+	socket.buffer = nil
186
+	socket.Unlock()
187
+
188
+	_, err := socket.conn.Write(buffer)
189
+
190
+	socket.Lock()
191
+	shouldClose := (err != nil) || socket.closed || socket.sendQExceeded
192
+	socket.Unlock()
193
+
194
+	if !shouldClose {
195
+		return
196
+	}
179 197
 
180 198
 	// mark the socket closed (if someone hasn't already), then write error lines
181 199
 	socket.Lock()
182 200
 	socket.closed = true
201
+	socket.finalized = true
183 202
 	finalData := socket.finalData
184 203
 	if socket.sendQExceeded {
185 204
 		finalData = "\r\nERROR :SendQ Exceeded\r\n"

Loading…
Cancel
Save