Browse Source

add Semaphore and ServerSemaphores

tags/v0.12.0
Shivaram Lingamneni 6 years ago
parent
commit
ebfef1e848
4 changed files with 98 additions and 16 deletions
  1. 5
    0
      irc/client.go
  2. 81
    0
      irc/semaphores.go
  3. 2
    0
      irc/server.go
  4. 10
    16
      irc/socket.go

+ 5
- 0
irc/client.go View File

@@ -675,6 +675,11 @@ func (client *Client) destroy(beingResumed bool) {
675 675
 		return
676 676
 	}
677 677
 
678
+	// see #235: deduplicating the list of PART recipients uses (comparatively speaking)
679
+	// a lot of RAM, so limit concurrency to avoid thrashing
680
+	client.server.semaphores.ClientDestroy.Acquire()
681
+	defer client.server.semaphores.ClientDestroy.Release()
682
+
678 683
 	if beingResumed {
679 684
 		client.server.logger.Debug("quit", fmt.Sprintf("%s is being resumed", client.nick))
680 685
 	} else {

+ 81
- 0
irc/semaphores.go View File

@@ -0,0 +1,81 @@
1
+// Copyright (c) 2018 Shivaram Lingamneni
2
+
3
+package irc
4
+
5
+import (
6
+	"log"
7
+	"runtime"
8
+	"runtime/debug"
9
+)
10
+
11
+// See #237 for context. Operations that might allocate large amounts of temporary
12
+// garbage, or temporarily tie up some other resource, may cause thrashing unless
13
+// their concurrency is artificially restricted. We use `chan bool` as a
14
+// (regrettably, unary-encoded) counting semaphore to enforce these restrictions.
15
+
16
+const (
17
+	// this is a tradeoff between exploiting CPU-level parallelism (higher values better)
18
+	// and not thrashing the allocator (lower values better). really this is all just
19
+	// guesswork. oragono *can* make use of cores beyond this limit --- just not for
20
+	// the protected operations.
21
+	MaxServerSemaphoreCapacity = 32
22
+)
23
+
24
+// Semaphore is a counting semaphore. Note that a capacity of n requires O(n) storage.
25
+type Semaphore (chan bool)
26
+
27
+// ServerSemaphores includes a named Semaphore corresponding to each concurrency-limited
28
+// sever operation.
29
+type ServerSemaphores struct {
30
+	// each distinct operation MUST have its own semaphore;
31
+	// methods that acquire a semaphore MUST NOT call methods that acquire another
32
+	ClientDestroy Semaphore
33
+}
34
+
35
+// NewServerSemaphores creates a new ServerSemaphores.
36
+func NewServerSemaphores() (result *ServerSemaphores) {
37
+	capacity := runtime.NumCPU()
38
+	if capacity > MaxServerSemaphoreCapacity {
39
+		capacity = MaxServerSemaphoreCapacity
40
+	}
41
+	result = new(ServerSemaphores)
42
+	result.ClientDestroy.Initialize(capacity)
43
+	return
44
+}
45
+
46
+// Initialize initializes a semaphore to a given capacity.
47
+func (semaphore *Semaphore) Initialize(capacity int) {
48
+	*semaphore = make(chan bool, capacity)
49
+	for i := 0; i < capacity; i++ {
50
+		(*semaphore) <- true
51
+	}
52
+}
53
+
54
+// Acquire acquires a semaphore, blocking if necessary.
55
+func (semaphore *Semaphore) Acquire() {
56
+	<-(*semaphore)
57
+}
58
+
59
+// TryAcquire tries to acquire a semaphore, returning whether the acquire was
60
+// successful. It never blocks.
61
+func (semaphore *Semaphore) TryAcquire() (acquired bool) {
62
+	select {
63
+	case <-(*semaphore):
64
+		return true
65
+	default:
66
+		return false
67
+	}
68
+}
69
+
70
+// Release releases a semaphore. It never blocks. (This is not a license
71
+// to program spurious releases.)
72
+func (semaphore *Semaphore) Release() {
73
+	select {
74
+	case (*semaphore) <- true:
75
+		// good
76
+	default:
77
+		// spurious release
78
+		log.Printf("spurious semaphore release (full to capacity %d)", cap(*semaphore))
79
+		debug.PrintStack()
80
+	}
81
+}

+ 2
- 0
irc/server.go View File

@@ -131,6 +131,7 @@ type Server struct {
131 131
 	webirc                     []webircConfig
132 132
 	whoWas                     *WhoWasList
133 133
 	stats                      *Stats
134
+	semaphores                 *ServerSemaphores
134 135
 }
135 136
 
136 137
 var (
@@ -165,6 +166,7 @@ func NewServer(config *Config, logger *logger.Manager) (*Server, error) {
165 166
 		snomasks:            NewSnoManager(),
166 167
 		whoWas:              NewWhoWasList(config.Limits.WhowasEntries),
167 168
 		stats:               NewStats(),
169
+		semaphores:          NewServerSemaphores(),
168 170
 	}
169 171
 
170 172
 	if err := server.applyConfig(config, true); err != nil {

+ 10
- 16
irc/socket.go View File

@@ -32,7 +32,7 @@ type Socket struct {
32 32
 	maxSendQBytes int
33 33
 
34 34
 	// this is a trylock enforcing that only one goroutine can write to `conn` at a time
35
-	writerSlotOpen chan bool
35
+	writerSemaphore Semaphore
36 36
 
37 37
 	buffer        []byte
38 38
 	closed        bool
@@ -44,12 +44,11 @@ type Socket struct {
44 44
 // NewSocket returns a new Socket.
45 45
 func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket {
46 46
 	result := Socket{
47
-		conn:           conn,
48
-		reader:         bufio.NewReaderSize(conn, maxReadQBytes),
49
-		maxSendQBytes:  maxSendQBytes,
50
-		writerSlotOpen: make(chan bool, 1),
47
+		conn:          conn,
48
+		reader:        bufio.NewReaderSize(conn, maxReadQBytes),
49
+		maxSendQBytes: maxSendQBytes,
51 50
 	}
52
-	result.writerSlotOpen <- true
51
+	result.writerSemaphore.Initialize(1)
53 52
 	return &result
54 53
 }
55 54
 
@@ -140,14 +139,11 @@ func (socket *Socket) Write(data string) (err error) {
140 139
 
141 140
 // wakeWriter starts the goroutine that actually performs the write, without blocking
142 141
 func (socket *Socket) wakeWriter() {
143
-	// attempt to acquire the trylock
144
-	select {
145
-	case <-socket.writerSlotOpen:
142
+	if socket.writerSemaphore.TryAcquire() {
146 143
 		// acquired the trylock; send() will release it
147 144
 		go socket.send()
148
-	default:
149
-		// failed to acquire; the holder will check for more data after releasing it
150 145
 	}
146
+	// else: do nothing, the holder will check for more data after releasing it
151 147
 }
152 148
 
153 149
 // SetFinalData sets the final data to send when the SocketWriter closes.
@@ -179,19 +175,17 @@ func (socket *Socket) send() {
179 175
 		socket.performWrite()
180 176
 		// surrender the trylock, avoiding a race where a write comes in after we've
181 177
 		// checked readyToWrite() and it returned false, but while we still hold the trylock:
182
-		socket.writerSlotOpen <- true
178
+		socket.writerSemaphore.Release()
183 179
 		// check if more data came in while we held the trylock:
184 180
 		if !socket.readyToWrite() {
185 181
 			return
186 182
 		}
187
-		select {
188
-		case <-socket.writerSlotOpen:
189
-			// got the trylock, loop back around and write
190
-		default:
183
+		if !socket.writerSemaphore.TryAcquire() {
191 184
 			// failed to acquire; exit and wait for the holder to observe readyToWrite()
192 185
 			// after releasing it
193 186
 			return
194 187
 		}
188
+		// got the lock again, loop back around and write
195 189
 	}
196 190
 }
197 191
 

Loading…
Cancel
Save