Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

history.go 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // Copyright (c) 2018 Shivaram Lingamneni <slingamn@cs.stanford.edu>
  2. // released under the MIT license
  3. package history
  4. import (
  5. "github.com/oragono/oragono/irc/utils"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type ItemType uint
  11. const (
  12. uninitializedItem ItemType = iota
  13. Privmsg
  14. Notice
  15. Join
  16. Part
  17. Kick
  18. Quit
  19. Mode
  20. )
  21. // Item represents an event (e.g., a PRIVMSG or a JOIN) and its associated data
  22. type Item struct {
  23. Type ItemType
  24. Time time.Time
  25. Nick string
  26. // this is the uncasefolded account name, if there's no account it should be set to "*"
  27. AccountName string
  28. Message utils.SplitMessage
  29. // for non-privmsg items, we may stuff some other data in here
  30. Msgid string
  31. }
  32. type Predicate func(item Item) (matches bool)
  33. // Buffer is a ring buffer holding message/event history for a channel or user
  34. type Buffer struct {
  35. sync.RWMutex
  36. // ring buffer, see irc/whowas.go for conventions
  37. buffer []Item
  38. start int
  39. end int
  40. lastDiscarded time.Time
  41. enabled uint32
  42. }
  43. func NewHistoryBuffer(size int) (result *Buffer) {
  44. result = new(Buffer)
  45. result.Initialize(size)
  46. return
  47. }
  48. func (hist *Buffer) Initialize(size int) {
  49. hist.buffer = make([]Item, size)
  50. hist.start = -1
  51. hist.end = -1
  52. hist.setEnabled(size)
  53. }
  54. func (hist *Buffer) setEnabled(size int) {
  55. var enabled uint32
  56. if size != 0 {
  57. enabled = 1
  58. }
  59. atomic.StoreUint32(&hist.enabled, enabled)
  60. }
  61. // Enabled returns whether the buffer is currently storing messages
  62. // (a disabled buffer blackholes everything it sees)
  63. func (list *Buffer) Enabled() bool {
  64. return atomic.LoadUint32(&list.enabled) != 0
  65. }
  66. // Add adds a history item to the buffer
  67. func (list *Buffer) Add(item Item) {
  68. // fast path without a lock acquisition for when we are not storing history
  69. if !list.Enabled() {
  70. return
  71. }
  72. if item.Time.IsZero() {
  73. item.Time = time.Now().UTC()
  74. }
  75. list.Lock()
  76. defer list.Unlock()
  77. var pos int
  78. if list.start == -1 { // empty
  79. pos = 0
  80. list.start = 0
  81. list.end = 1 % len(list.buffer)
  82. } else if list.start != list.end { // partially full
  83. pos = list.end
  84. list.end = (list.end + 1) % len(list.buffer)
  85. } else if list.start == list.end { // full
  86. pos = list.end
  87. list.end = (list.end + 1) % len(list.buffer)
  88. list.start = list.end // advance start as well, overwriting first entry
  89. // record the timestamp of the overwritten item
  90. if list.lastDiscarded.Before(list.buffer[pos].Time) {
  91. list.lastDiscarded = list.buffer[pos].Time
  92. }
  93. }
  94. list.buffer[pos] = item
  95. }
  96. func reverse(results []Item) {
  97. for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
  98. results[i], results[j] = results[j], results[i]
  99. }
  100. }
  101. // Between returns all history items with a time `after` <= time <= `before`,
  102. // with an indication of whether the results are complete or are missing items
  103. // because some of that period was discarded. A zero value of `before` is considered
  104. // higher than all other times.
  105. func (list *Buffer) Between(after, before time.Time) (results []Item, complete bool) {
  106. if !list.Enabled() {
  107. return
  108. }
  109. list.RLock()
  110. defer list.RUnlock()
  111. complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
  112. satisfies := func(item Item) bool {
  113. return (after.IsZero() || item.Time.After(after)) && (before.IsZero() || item.Time.Before(before))
  114. }
  115. return list.matchInternal(satisfies, 0), complete
  116. }
  117. // Match returns all history items such that `predicate` returns true for them.
  118. // Items are considered in reverse insertion order, up to a total of `limit` matches.
  119. // `predicate` MAY be a closure that maintains its own state across invocations;
  120. // it MUST NOT acquire any locks or otherwise do anything weird.
  121. func (list *Buffer) Match(predicate Predicate, limit int) (results []Item) {
  122. if !list.Enabled() {
  123. return
  124. }
  125. list.RLock()
  126. defer list.RUnlock()
  127. return list.matchInternal(predicate, limit)
  128. }
  129. // you must be holding the read lock to call this
  130. func (list *Buffer) matchInternal(predicate Predicate, limit int) (results []Item) {
  131. if list.start == -1 {
  132. return
  133. }
  134. pos := list.prev(list.end)
  135. for {
  136. if predicate(list.buffer[pos]) {
  137. results = append(results, list.buffer[pos])
  138. }
  139. if pos == list.start || (limit != 0 && len(results) == limit) {
  140. break
  141. }
  142. pos = list.prev(pos)
  143. }
  144. // TODO sort by time instead?
  145. reverse(results)
  146. return
  147. }
  148. // Latest returns the items most recently added, up to `limit`. If `limit` is 0,
  149. // it returns all items.
  150. func (list *Buffer) Latest(limit int) (results []Item) {
  151. matchAll := func(item Item) bool { return true }
  152. return list.Match(matchAll, limit)
  153. }
  154. // LastDiscarded returns the latest time of any entry that was evicted
  155. // from the ring buffer.
  156. func (list *Buffer) LastDiscarded() time.Time {
  157. list.RLock()
  158. defer list.RUnlock()
  159. return list.lastDiscarded
  160. }
  161. func (list *Buffer) prev(index int) int {
  162. switch index {
  163. case 0:
  164. return len(list.buffer) - 1
  165. default:
  166. return index - 1
  167. }
  168. }
  169. // Resize shrinks or expands the buffer
  170. func (list *Buffer) Resize(size int) {
  171. newbuffer := make([]Item, size)
  172. list.Lock()
  173. defer list.Unlock()
  174. list.setEnabled(size)
  175. if list.start == -1 {
  176. // indices are already correct and nothing needs to be copied
  177. } else if size == 0 {
  178. // this is now the empty list
  179. list.start = -1
  180. list.end = -1
  181. } else {
  182. currentLength := list.length()
  183. start := list.start
  184. end := list.end
  185. // if we're truncating, keep the latest entries, not the earliest
  186. if size < currentLength {
  187. start = list.end - size
  188. if start < 0 {
  189. start += len(list.buffer)
  190. }
  191. // update lastDiscarded for discarded entries
  192. for i := list.start; i != start; i = (i + 1) % len(list.buffer) {
  193. if list.lastDiscarded.Before(list.buffer[i].Time) {
  194. list.lastDiscarded = list.buffer[i].Time
  195. }
  196. }
  197. }
  198. if start < end {
  199. copied := copy(newbuffer, list.buffer[start:end])
  200. list.start = 0
  201. list.end = copied % size
  202. } else {
  203. lenInitial := len(list.buffer) - start
  204. copied := copy(newbuffer, list.buffer[start:])
  205. copied += copy(newbuffer[lenInitial:], list.buffer[:end])
  206. list.start = 0
  207. list.end = copied % size
  208. }
  209. }
  210. list.buffer = newbuffer
  211. }
  212. func (hist *Buffer) length() int {
  213. if hist.start == -1 {
  214. return 0
  215. } else if hist.start < hist.end {
  216. return hist.end - hist.start
  217. } else {
  218. return len(hist.buffer) - (hist.start - hist.end)
  219. }
  220. }