Преглед изворни кода

Merge pull request #236 from slingamn/socket_again.1

eliminate dedicated RunSocketWriter goroutine
tags/v0.12.0
Daniel Oaks пре 6 година
родитељ
комит
8f22d5ffd8
No account linked to committer's email address
2 измењених фајлова са 70 додато и 34 уклоњено
  1. 1
    2
      irc/client.go
  2. 69
    32
      irc/socket.go

+ 1
- 2
irc/client.go Прегледај датотеку

@@ -90,7 +90,6 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client {
90 90
 	limits := server.Limits()
91 91
 	fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest
92 92
 	socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes())
93
-	go socket.RunSocketWriter()
94 93
 	client := &Client{
95 94
 		atime:          now,
96 95
 		authorized:     server.Password() == nil,
@@ -101,7 +100,7 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client {
101 100
 		ctime:          now,
102 101
 		flags:          make(map[modes.Mode]bool),
103 102
 		server:         server,
104
-		socket:         &socket,
103
+		socket:         socket,
105 104
 		nick:           "*", // * is used until actual nick is given
106 105
 		nickCasefolded: "*",
107 106
 		nickMaskString: "*", // * is used until actual nick is given

+ 69
- 32
irc/socket.go Прегледај датотеку

@@ -31,23 +31,26 @@ type Socket struct {
31 31
 
32 32
 	maxSendQBytes int
33 33
 
34
-	// coordination system for asynchronous writes
35
-	buffer           []byte
36
-	lineToSendExists chan bool
34
+	// this is a trylock enforcing that only one goroutine can write to `conn` at a time
35
+	writerSlotOpen chan bool
37 36
 
37
+	buffer        []byte
38 38
 	closed        bool
39 39
 	sendQExceeded bool
40 40
 	finalData     string // what to send when we die
41
+	finalized     bool
41 42
 }
42 43
 
43 44
 // NewSocket returns a new Socket.
44
-func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket {
45
-	return Socket{
46
-		conn:             conn,
47
-		reader:           bufio.NewReaderSize(conn, maxReadQBytes),
48
-		maxSendQBytes:    maxSendQBytes,
49
-		lineToSendExists: make(chan bool, 1),
45
+func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket {
46
+	result := Socket{
47
+		conn:           conn,
48
+		reader:         bufio.NewReaderSize(conn, maxReadQBytes),
49
+		maxSendQBytes:  maxSendQBytes,
50
+		writerSlotOpen: make(chan bool, 1),
50 51
 	}
52
+	result.writerSlotOpen <- true
53
+	return &result
51 54
 }
52 55
 
53 56
 // Close stops a Socket from being able to send/receive any more data.
@@ -114,7 +117,11 @@ func (socket *Socket) Read() (string, error) {
114 117
 	return line, nil
115 118
 }
116 119
 
117
-// Write sends the given string out of Socket.
120
+// Write sends the given string out of Socket. Requirements:
121
+// 1. MUST NOT block for macroscopic amounts of time
122
+// 2. MUST NOT reorder messages
123
+// 3. MUST provide mutual exclusion for socket.conn.Write
124
+// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write
118 125
 func (socket *Socket) Write(data string) (err error) {
119 126
 	socket.Lock()
120 127
 	if socket.closed {
@@ -131,12 +138,15 @@ func (socket *Socket) Write(data string) (err error) {
131 138
 	return
132 139
 }
133 140
 
134
-// wakeWriter wakes up the goroutine that actually performs the write, without blocking
141
+// wakeWriter starts the goroutine that actually performs the write, without blocking
135 142
 func (socket *Socket) wakeWriter() {
136
-	// nonblocking send to the channel, no-op if it's full
143
+	// attempt to acquire the trylock
137 144
 	select {
138
-	case socket.lineToSendExists <- true:
145
+	case <-socket.writerSlotOpen:
146
+		// acquired the trylock; send() will release it
147
+		go socket.send()
139 148
 	default:
149
+		// failed to acquire; the holder will check for more data after releasing it
140 150
 	}
141 151
 }
142 152
 
@@ -154,32 +164,59 @@ func (socket *Socket) IsClosed() bool {
154 164
 	return socket.closed
155 165
 }
156 166
 
157
-// RunSocketWriter starts writing messages to the outgoing socket.
158
-func (socket *Socket) RunSocketWriter() {
159
-	localBuffer := make([]byte, 0)
160
-	shouldStop := false
161
-	for !shouldStop {
162
-		// wait for new lines
167
+// is there data to write?
168
+func (socket *Socket) readyToWrite() bool {
169
+	socket.Lock()
170
+	defer socket.Unlock()
171
+	// on the first time observing socket.closed, we still have to write socket.finalData
172
+	return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded)
173
+}
174
+
175
+// send actually writes messages to socket.Conn; it may block
176
+func (socket *Socket) send() {
177
+	for {
178
+		// we are holding the trylock: actually do the write
179
+		socket.performWrite()
180
+		// surrender the trylock, avoiding a race where a write comes in after we've
181
+		// checked readyToWrite() and it returned false, but while we still hold the trylock:
182
+		socket.writerSlotOpen <- true
183
+		// check if more data came in while we held the trylock:
184
+		if !socket.readyToWrite() {
185
+			return
186
+		}
163 187
 		select {
164
-		case <-socket.lineToSendExists:
165
-			// retrieve the buffered data, clear the buffer
166
-			socket.Lock()
167
-			localBuffer = append(localBuffer, socket.buffer...)
168
-			socket.buffer = socket.buffer[:0]
169
-			socket.Unlock()
170
-
171
-			_, err := socket.conn.Write(localBuffer)
172
-			localBuffer = localBuffer[:0]
173
-
174
-			socket.Lock()
175
-			shouldStop = (err != nil) || socket.closed || socket.sendQExceeded
176
-			socket.Unlock()
188
+		case <-socket.writerSlotOpen:
189
+			// got the trylock, loop back around and write
190
+		default:
191
+			// failed to acquire; exit and wait for the holder to observe readyToWrite()
192
+			// after releasing it
193
+			return
177 194
 		}
178 195
 	}
196
+}
197
+
198
+// write the contents of the buffer, then see if we need to close
199
+func (socket *Socket) performWrite() {
200
+	// retrieve the buffered data, clear the buffer
201
+	socket.Lock()
202
+	buffer := socket.buffer
203
+	socket.buffer = nil
204
+	socket.Unlock()
205
+
206
+	_, err := socket.conn.Write(buffer)
207
+
208
+	socket.Lock()
209
+	shouldClose := (err != nil) || socket.closed || socket.sendQExceeded
210
+	socket.Unlock()
211
+
212
+	if !shouldClose {
213
+		return
214
+	}
179 215
 
180 216
 	// mark the socket closed (if someone hasn't already), then write error lines
181 217
 	socket.Lock()
182 218
 	socket.closed = true
219
+	socket.finalized = true
183 220
 	finalData := socket.finalData
184 221
 	if socket.sendQExceeded {
185 222
 		finalData = "\r\nERROR :SendQ Exceeded\r\n"

Loading…
Откажи
Сачувај