Просмотр исходного кода

Merge pull request #218 from slingamn/socketwriter.1

refactor irc.Socket
tags/v0.11.0-beta
Shivaram Lingamneni 6 лет назад
Родитель
Сommit
7cfa75a59e
Аккаунт пользователя с таким Email не найден
6 измененных файлов: 92 добавлений и 127 удалений
  1. 8
    2
      irc/client.go
  2. 3
    2
      irc/config.go
  3. 1
    0
      irc/errors.go
  4. 9
    0
      irc/getters.go
  5. 2
    11
      irc/server.go
  6. 69
    112
      irc/socket.go

+ 8
- 2
irc/client.go Просмотреть файл

@@ -87,7 +87,9 @@ type Client struct {
87 87
 // NewClient returns a client with all the appropriate info setup.
88 88
 func NewClient(server *Server, conn net.Conn, isTLS bool) *Client {
89 89
 	now := time.Now()
90
-	socket := NewSocket(conn, server.MaxSendQBytes)
90
+	limits := server.Limits()
91
+	fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest
92
+	socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes())
91 93
 	go socket.RunSocketWriter()
92 94
 	client := &Client{
93 95
 		atime:          now,
@@ -253,7 +255,11 @@ func (client *Client) run() {
253 255
 
254 256
 		line, err = client.socket.Read()
255 257
 		if err != nil {
256
-			client.Quit("connection closed")
258
+			quitMessage := "connection closed"
259
+			if err == errReadQ {
260
+				quitMessage = "readQ exceeded"
261
+			}
262
+			client.Quit(quitMessage)
257 263
 			break
258 264
 		}
259 265
 

+ 3
- 2
irc/config.go Просмотреть файл

@@ -216,7 +216,7 @@ type Config struct {
216 216
 		ProxyAllowedFrom    []string       `yaml:"proxy-allowed-from"`
217 217
 		WebIRC              []webircConfig `yaml:"webirc"`
218 218
 		MaxSendQString      string         `yaml:"max-sendq"`
219
-		MaxSendQBytes       uint64
219
+		MaxSendQBytes       int
220 220
 		ConnectionLimiter   connection_limits.LimiterConfig   `yaml:"connection-limits"`
221 221
 		ConnectionThrottler connection_limits.ThrottlerConfig `yaml:"connection-throttling"`
222 222
 	}
@@ -530,10 +530,11 @@ func LoadConfig(filename string) (config *Config, err error) {
530 530
 		}
531 531
 	}
532 532
 
533
-	config.Server.MaxSendQBytes, err = bytefmt.ToBytes(config.Server.MaxSendQString)
533
+	maxSendQBytes, err := bytefmt.ToBytes(config.Server.MaxSendQString)
534 534
 	if err != nil {
535 535
 		return nil, fmt.Errorf("Could not parse maximum SendQ size (make sure it only contains whole numbers): %s", err.Error())
536 536
 	}
537
+	config.Server.MaxSendQBytes = int(maxSendQBytes)
537 538
 
538 539
 	// get language files
539 540
 	config.Languages.Data = make(map[string]languages.LangData)

+ 1
- 0
irc/errors.go Просмотреть файл

@@ -40,6 +40,7 @@ var (
40 40
 var (
41 41
 	errNoPeerCerts = errors.New("Client did not provide a certificate")
42 42
 	errNotTLS      = errors.New("Not a TLS connection")
43
+	errReadQ       = errors.New("ReadQ Exceeded")
43 44
 )
44 45
 
45 46
 // String Errors

+ 9
- 0
irc/getters.go Просмотреть файл

@@ -6,8 +6,17 @@ package irc
6 6
 import (
7 7
 	"github.com/oragono/oragono/irc/isupport"
8 8
 	"github.com/oragono/oragono/irc/modes"
9
+	"sync/atomic"
9 10
 )
10 11
 
12
+func (server *Server) MaxSendQBytes() int {
13
+	return int(atomic.LoadUint32(&server.maxSendQBytes))
14
+}
15
+
16
+func (server *Server) SetMaxSendQBytes(m int) {
17
+	atomic.StoreUint32(&server.maxSendQBytes, uint32(m))
18
+}
19
+
11 20
 func (server *Server) ISupport() *isupport.List {
12 21
 	server.configurableStateMutex.RLock()
13 22
 	defer server.configurableStateMutex.RUnlock()

+ 2
- 11
irc/server.go Просмотреть файл

@@ -109,7 +109,7 @@ type Server struct {
109 109
 	limits                     Limits
110 110
 	listeners                  map[string]*ListenerWrapper
111 111
 	logger                     *logger.Manager
112
-	MaxSendQBytes              uint64
112
+	maxSendQBytes              uint32
113 113
 	monitorManager             *MonitorManager
114 114
 	motdLines                  []string
115 115
 	name                       string
@@ -928,16 +928,7 @@ func (server *Server) applyConfig(config *Config, initial bool) error {
928 928
 	server.configurableStateMutex.Unlock()
929 929
 
930 930
 	// set new sendqueue size
931
-	if config.Server.MaxSendQBytes != server.MaxSendQBytes {
932
-		server.configurableStateMutex.Lock()
933
-		server.MaxSendQBytes = config.Server.MaxSendQBytes
934
-		server.configurableStateMutex.Unlock()
935
-
936
-		// update on all clients
937
-		for _, sClient := range server.clients.AllClients() {
938
-			sClient.socket.MaxSendQBytes = config.Server.MaxSendQBytes
939
-		}
940
-	}
931
+	server.SetMaxSendQBytes(config.Server.MaxSendQBytes)
941 932
 
942 933
 	server.loadMOTD(config.Server.MOTD, config.Server.MOTDFormatting)
943 934
 

+ 69
- 112
irc/socket.go Просмотреть файл

@@ -9,6 +9,7 @@ import (
9 9
 	"crypto/sha256"
10 10
 	"crypto/tls"
11 11
 	"encoding/hex"
12
+	"errors"
12 13
 	"io"
13 14
 	"net"
14 15
 	"strings"
@@ -18,47 +19,44 @@ import (
18 19
 
19 20
 var (
20 21
 	handshakeTimeout, _ = time.ParseDuration("5s")
22
+	errSendQExceeded    = errors.New("SendQ exceeded")
21 23
 )
22 24
 
23 25
 // Socket represents an IRC socket.
24 26
 type Socket struct {
27
+	sync.Mutex
28
+
25 29
 	conn   net.Conn
26 30
 	reader *bufio.Reader
27 31
 
28
-	MaxSendQBytes uint64
29
-
30
-	closed      bool
31
-	closedMutex sync.Mutex
32
-
33
-	finalData      string // what to send when we die
34
-	finalDataMutex sync.Mutex
32
+	maxSendQBytes int
35 33
 
34
+	// coordination system for asynchronous writes
35
+	buffer           []byte
36 36
 	lineToSendExists chan bool
37
-	linesToSend      []string
38
-	linesToSendMutex sync.Mutex
37
+
38
+	closed        bool
39
+	sendQExceeded bool
40
+	finalData     string // what to send when we die
39 41
 }
40 42
 
41 43
 // NewSocket returns a new Socket.
42
-func NewSocket(conn net.Conn, maxSendQBytes uint64) Socket {
44
+func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket {
43 45
 	return Socket{
44 46
 		conn:             conn,
45
-		reader:           bufio.NewReader(conn),
46
-		MaxSendQBytes:    maxSendQBytes,
47
-		lineToSendExists: make(chan bool),
47
+		reader:           bufio.NewReaderSize(conn, maxReadQBytes),
48
+		maxSendQBytes:    maxSendQBytes,
49
+		lineToSendExists: make(chan bool, 1),
48 50
 	}
49 51
 }
50 52
 
51 53
 // Close stops a Socket from being able to send/receive any more data.
52 54
 func (socket *Socket) Close() {
53
-	socket.closedMutex.Lock()
54
-	defer socket.closedMutex.Unlock()
55
-	if socket.closed {
56
-		return
57
-	}
55
+	socket.Lock()
58 56
 	socket.closed = true
57
+	socket.Unlock()
59 58
 
60
-	// force close loop to happen if it hasn't already
61
-	go socket.timedFillLineToSendExists(200 * time.Millisecond)
59
+	socket.wakeWriter()
62 60
 }
63 61
 
64 62
 // CertFP returns the fingerprint of the certificate provided by the client.
@@ -94,10 +92,13 @@ func (socket *Socket) Read() (string, error) {
94 92
 		return "", io.EOF
95 93
 	}
96 94
 
97
-	lineBytes, err := socket.reader.ReadBytes('\n')
95
+	lineBytes, isPrefix, err := socket.reader.ReadLine()
96
+	if isPrefix {
97
+		return "", errReadQ
98
+	}
98 99
 
99 100
 	// convert bytes to string
100
-	line := string(lineBytes[:])
101
+	line := string(lineBytes)
101 102
 
102 103
 	// read last message properly (such as ERROR/QUIT/etc), just fail next reads/writes
103 104
 	if err == io.EOF {
@@ -110,128 +111,84 @@ func (socket *Socket) Read() (string, error) {
110 111
 		return "", err
111 112
 	}
112 113
 
113
-	return strings.TrimRight(line, "\r\n"), nil
114
+	return line, nil
114 115
 }
115 116
 
116 117
 // Write sends the given string out of Socket.
117
-func (socket *Socket) Write(data string) error {
118
-	if socket.IsClosed() {
119
-		return io.EOF
118
+func (socket *Socket) Write(data string) (err error) {
119
+	socket.Lock()
120
+	if socket.closed {
121
+		err = io.EOF
122
+	} else if len(data)+len(socket.buffer) > socket.maxSendQBytes {
123
+		socket.sendQExceeded = true
124
+		err = errSendQExceeded
125
+	} else {
126
+		socket.buffer = append(socket.buffer, data...)
120 127
 	}
128
+	socket.Unlock()
121 129
 
122
-	socket.linesToSendMutex.Lock()
123
-	socket.linesToSend = append(socket.linesToSend, data)
124
-	socket.linesToSendMutex.Unlock()
125
-
126
-	go socket.timedFillLineToSendExists(15 * time.Second)
127
-
128
-	return nil
130
+	socket.wakeWriter()
131
+	return
129 132
 }
130 133
 
131
-// timedFillLineToSendExists either sends the note or times out.
132
-func (socket *Socket) timedFillLineToSendExists(duration time.Duration) {
133
-	lineToSendTimeout := time.NewTimer(duration)
134
-	defer lineToSendTimeout.Stop()
134
+// wakeWriter wakes up the goroutine that actually performs the write, without blocking
135
+func (socket *Socket) wakeWriter() {
136
+	// nonblocking send to the channel, no-op if it's full
135 137
 	select {
136 138
 	case socket.lineToSendExists <- true:
137
-		// passed data successfully
138
-	case <-lineToSendTimeout.C:
139
-		// timed out send
139
+	default:
140 140
 	}
141 141
 }
142 142
 
143 143
 // SetFinalData sets the final data to send when the SocketWriter closes.
144 144
 func (socket *Socket) SetFinalData(data string) {
145
-	socket.finalDataMutex.Lock()
145
+	socket.Lock()
146
+	defer socket.Unlock()
146 147
 	socket.finalData = data
147
-	socket.finalDataMutex.Unlock()
148 148
 }
149 149
 
150 150
 // IsClosed returns whether the socket is closed.
151 151
 func (socket *Socket) IsClosed() bool {
152
-	socket.closedMutex.Lock()
153
-	defer socket.closedMutex.Unlock()
152
+	socket.Lock()
153
+	defer socket.Unlock()
154 154
 	return socket.closed
155 155
 }
156 156
 
157 157
 // RunSocketWriter starts writing messages to the outgoing socket.
158 158
 func (socket *Socket) RunSocketWriter() {
159
-	for {
159
+	localBuffer := make([]byte, 0)
160
+	shouldStop := false
161
+	for !shouldStop {
160 162
 		// wait for new lines
161 163
 		select {
162 164
 		case <-socket.lineToSendExists:
163
-			socket.linesToSendMutex.Lock()
164
-
165
-			// check if we're closed
166
-			if socket.IsClosed() {
167
-				socket.linesToSendMutex.Unlock()
168
-				break
169
-			}
170
-
171
-			// check whether new lines actually exist or not
172
-			if len(socket.linesToSend) < 1 {
173
-				socket.linesToSendMutex.Unlock()
174
-				continue
175
-			}
176
-
177
-			// check sendq
178
-			var sendQBytes uint64
179
-			for _, line := range socket.linesToSend {
180
-				sendQBytes += uint64(len(line))
181
-				if socket.MaxSendQBytes < sendQBytes {
182
-					// don't unlock mutex because this break is just to escape this for loop
183
-					break
184
-				}
185
-			}
186
-			if socket.MaxSendQBytes < sendQBytes {
187
-				socket.SetFinalData("\r\nERROR :SendQ Exceeded\r\n")
188
-				socket.linesToSendMutex.Unlock()
189
-				break
190
-			}
191
-
192
-			// get all existing data
193
-			data := strings.Join(socket.linesToSend, "")
194
-			socket.linesToSend = []string{}
195
-
196
-			socket.linesToSendMutex.Unlock()
197
-
198
-			// write data
199
-			if 0 < len(data) {
200
-				_, err := socket.conn.Write([]byte(data))
201
-				if err != nil {
202
-					break
203
-				}
204
-			}
205
-		}
206
-		if socket.IsClosed() {
207
-			// error out or we've been closed
208
-			break
165
+			// retrieve the buffered data, clear the buffer
166
+			socket.Lock()
167
+			localBuffer = append(localBuffer, socket.buffer...)
168
+			socket.buffer = socket.buffer[:0]
169
+			socket.Unlock()
170
+
171
+			_, err := socket.conn.Write(localBuffer)
172
+			localBuffer = localBuffer[:0]
173
+
174
+			socket.Lock()
175
+			shouldStop = (err != nil) || socket.closed || socket.sendQExceeded
176
+			socket.Unlock()
209 177
 		}
210 178
 	}
211
-	// force closure of socket
212
-	socket.closedMutex.Lock()
213
-	if !socket.closed {
214
-		socket.closed = true
215
-	}
216
-	socket.closedMutex.Unlock()
217 179
 
218
-	// write error lines
219
-	socket.finalDataMutex.Lock()
220
-	if 0 < len(socket.finalData) {
221
-		socket.conn.Write([]byte(socket.finalData))
180
+	// mark the socket closed (if someone hasn't already), then write error lines
181
+	socket.Lock()
182
+	socket.closed = true
183
+	finalData := socket.finalData
184
+	if socket.sendQExceeded {
185
+		finalData = "\r\nERROR :SendQ Exceeded\r\n"
186
+	}
187
+	socket.Unlock()
188
+	if finalData != "" {
189
+		socket.conn.Write([]byte(finalData))
222 190
 	}
223
-	socket.finalDataMutex.Unlock()
224 191
 
225 192
 	// close the connection
226 193
 	socket.conn.Close()
227
-
228
-	// empty the lineToSendExists channel
229
-	for 0 < len(socket.lineToSendExists) {
230
-		<-socket.lineToSendExists
231
-	}
232
-}
233
-
234
-// WriteLine writes the given line out of Socket.
235
-func (socket *Socket) WriteLine(line string) error {
236
-	return socket.Write(line + "\r\n")
237 194
 }

Загрузка…
Отмена
Сохранить