Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Copyright (c) 2012-2014 Jeremy Latt
  2. // Copyright (c) 2016-2017 Daniel Oaks <daniel@danieloaks.net>
  3. // released under the MIT license
  4. package irc
  5. import (
  6. "errors"
  7. "io"
  8. "strings"
  9. "sync"
  10. "github.com/oragono/oragono/irc/utils"
  11. )
  12. var (
  13. errSendQExceeded = errors.New("SendQ exceeded")
  14. sendQExceededMessage = []byte("\r\nERROR :SendQ Exceeded\r\n")
  15. )
  16. // Socket represents an IRC socket.
  17. type Socket struct {
  18. sync.Mutex
  19. conn IRCConn
  20. maxSendQBytes int
  21. // this is a trylock enforcing that only one goroutine can write to `conn` at a time
  22. writerSemaphore utils.Semaphore
  23. buffers [][]byte
  24. totalLength int
  25. closed bool
  26. sendQExceeded bool
  27. finalData []byte // what to send when we die
  28. finalized bool
  29. }
  30. // NewSocket returns a new Socket.
  31. func NewSocket(conn IRCConn, maxSendQBytes int) *Socket {
  32. result := Socket{
  33. conn: conn,
  34. maxSendQBytes: maxSendQBytes,
  35. }
  36. result.writerSemaphore.Initialize(1)
  37. return &result
  38. }
  39. // Close stops a Socket from being able to send/receive any more data.
  40. func (socket *Socket) Close() {
  41. socket.Lock()
  42. socket.closed = true
  43. socket.Unlock()
  44. socket.wakeWriter()
  45. }
  46. // Read returns a single IRC line from a Socket.
  47. func (socket *Socket) Read() (string, error) {
  48. if socket.IsClosed() {
  49. return "", io.EOF
  50. }
  51. lineBytes, err := socket.conn.ReadLine()
  52. // convert bytes to string
  53. line := string(lineBytes)
  54. // read last message properly (such as ERROR/QUIT/etc), just fail next reads/writes
  55. if err == io.EOF {
  56. socket.Close()
  57. }
  58. if err == io.EOF && strings.TrimSpace(line) != "" {
  59. // don't do anything
  60. } else if err != nil {
  61. return "", err
  62. }
  63. return line, nil
  64. }
  65. // Write sends the given string out of Socket. Requirements:
  66. // 1. MUST NOT block for macroscopic amounts of time
  67. // 2. MUST NOT reorder messages
  68. // 3. MUST provide mutual exclusion for socket.conn.Write
  69. // 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write
  70. func (socket *Socket) Write(data []byte) (err error) {
  71. if len(data) == 0 {
  72. return
  73. }
  74. socket.Lock()
  75. if socket.closed {
  76. err = io.EOF
  77. } else {
  78. prospectiveLen := socket.totalLength + len(data)
  79. if prospectiveLen > socket.maxSendQBytes {
  80. socket.sendQExceeded = true
  81. socket.closed = true
  82. err = errSendQExceeded
  83. } else {
  84. socket.buffers = append(socket.buffers, data)
  85. socket.totalLength = prospectiveLen
  86. }
  87. }
  88. socket.Unlock()
  89. socket.wakeWriter()
  90. return
  91. }
  92. // BlockingWrite sends the given string out of Socket. Requirements:
  93. // 1. MUST block until the message is sent
  94. // 2. MUST bypass sendq (calls to BlockingWrite cannot, on their own, cause a sendq overflow)
  95. // 3. MUST provide mutual exclusion for socket.conn.Write
  96. // 4. MUST respect the same ordering guarantees as Write (i.e., if a call to Write that sends
  97. // message m1 happens-before a call to BlockingWrite that sends message m2,
  98. // m1 must be sent on the wire before m2
  99. // Callers MUST be writing to the client's socket from the client's own goroutine;
  100. // other callers must use the nonblocking Write call instead. Otherwise, a client
  101. // with a slow/unreliable connection risks stalling the progress of the system as a whole.
  102. func (socket *Socket) BlockingWrite(data []byte) (err error) {
  103. if len(data) == 0 {
  104. return
  105. }
  106. // after releasing the semaphore, we must check for fresh data, same as `send`
  107. defer func() {
  108. if socket.readyToWrite() {
  109. socket.wakeWriter()
  110. }
  111. }()
  112. // blocking acquire of the trylock
  113. socket.writerSemaphore.Acquire()
  114. defer socket.writerSemaphore.Release()
  115. // first, flush any buffered data, to preserve the ordering guarantees
  116. closed := socket.performWrite()
  117. if closed {
  118. return io.EOF
  119. }
  120. err = socket.conn.WriteLine(data)
  121. if err != nil {
  122. socket.finalize()
  123. }
  124. return
  125. }
  126. // wakeWriter starts the goroutine that actually performs the write, without blocking
  127. func (socket *Socket) wakeWriter() {
  128. if socket.writerSemaphore.TryAcquire() {
  129. // acquired the trylock; send() will release it
  130. go socket.send()
  131. }
  132. // else: do nothing, the holder will check for more data after releasing it
  133. }
  134. // SetFinalData sets the final data to send when the SocketWriter closes.
  135. func (socket *Socket) SetFinalData(data []byte) {
  136. socket.Lock()
  137. defer socket.Unlock()
  138. socket.finalData = data
  139. }
  140. // IsClosed returns whether the socket is closed.
  141. func (socket *Socket) IsClosed() bool {
  142. socket.Lock()
  143. defer socket.Unlock()
  144. return socket.closed
  145. }
  146. // is there data to write?
  147. func (socket *Socket) readyToWrite() bool {
  148. socket.Lock()
  149. defer socket.Unlock()
  150. // on the first time observing socket.closed, we still have to write socket.finalData
  151. return !socket.finalized && (socket.totalLength > 0 || socket.closed)
  152. }
  153. // send actually writes messages to socket.Conn; it may block
  154. func (socket *Socket) send() {
  155. for {
  156. // we are holding the trylock: actually do the write
  157. socket.performWrite()
  158. // surrender the trylock, avoiding a race where a write comes in after we've
  159. // checked readyToWrite() and it returned false, but while we still hold the trylock:
  160. socket.writerSemaphore.Release()
  161. // check if more data came in while we held the trylock:
  162. if !socket.readyToWrite() {
  163. return
  164. }
  165. if !socket.writerSemaphore.TryAcquire() {
  166. // failed to acquire; exit and wait for the holder to observe readyToWrite()
  167. // after releasing it
  168. return
  169. }
  170. // got the lock again, loop back around and write
  171. }
  172. }
  173. // write the contents of the buffer, then see if we need to close
  174. // returns whether we closed
  175. func (socket *Socket) performWrite() (closed bool) {
  176. // retrieve the buffered data, clear the buffer
  177. socket.Lock()
  178. buffers := socket.buffers
  179. socket.buffers = nil
  180. socket.totalLength = 0
  181. closed = socket.closed
  182. socket.Unlock()
  183. var err error
  184. if 0 < len(buffers) {
  185. err = socket.conn.WriteLines(buffers)
  186. }
  187. closed = closed || err != nil
  188. if closed {
  189. socket.finalize()
  190. }
  191. return
  192. }
  193. // mark closed and send final data. you must be holding the semaphore to call this:
  194. func (socket *Socket) finalize() {
  195. // mark the socket closed (if someone hasn't already), then write error lines
  196. socket.Lock()
  197. socket.closed = true
  198. finalized := socket.finalized
  199. socket.finalized = true
  200. finalData := socket.finalData
  201. if socket.sendQExceeded {
  202. finalData = sendQExceededMessage
  203. }
  204. socket.Unlock()
  205. if finalized {
  206. return
  207. }
  208. if len(finalData) != 0 {
  209. socket.conn.WriteLine(finalData)
  210. }
  211. // close the connection
  212. socket.conn.Close()
  213. }