You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

history.go 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Copyright (c) 2018 Shivaram Lingamneni <slingamn@cs.stanford.edu>
  2. // released under the MIT license
  3. package history
  4. import (
  5. "github.com/oragono/oragono/irc/utils"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type ItemType uint
  11. const (
  12. uninitializedItem ItemType = iota
  13. Privmsg
  14. Notice
  15. Join
  16. Part
  17. Kick
  18. Quit
  19. Mode
  20. Tagmsg
  21. Nick
  22. )
  23. // a Tagmsg that consists entirely of transient tags is not stored
  24. var transientTags = map[string]bool{
  25. "+draft/typing": true,
  26. "+typing": true, // future-proofing
  27. }
  28. // Item represents an event (e.g., a PRIVMSG or a JOIN) and its associated data
  29. type Item struct {
  30. Type ItemType
  31. Nick string
  32. // this is the uncasefolded account name, if there's no account it should be set to "*"
  33. AccountName string
  34. // for non-privmsg items, we may stuff some other data in here
  35. Message utils.SplitMessage
  36. Tags map[string]string
  37. Params [1]string
  38. }
  39. // HasMsgid tests whether a message has the message id `msgid`.
  40. func (item *Item) HasMsgid(msgid string) bool {
  41. if item.Message.Msgid == msgid {
  42. return true
  43. }
  44. for _, pair := range item.Message.Wrapped {
  45. if pair.Msgid == msgid {
  46. return true
  47. }
  48. }
  49. return false
  50. }
  51. func (item *Item) isStorable() bool {
  52. if item.Type == Tagmsg {
  53. for name := range item.Tags {
  54. if !transientTags[name] {
  55. return true
  56. }
  57. }
  58. return false // all tags were blacklisted
  59. } else {
  60. return true
  61. }
  62. }
  63. type Predicate func(item Item) (matches bool)
  64. // Buffer is a ring buffer holding message/event history for a channel or user
  65. type Buffer struct {
  66. sync.RWMutex
  67. // ring buffer, see irc/whowas.go for conventions
  68. buffer []Item
  69. start int
  70. end int
  71. lastDiscarded time.Time
  72. enabled uint32
  73. }
  74. func NewHistoryBuffer(size int) (result *Buffer) {
  75. result = new(Buffer)
  76. result.Initialize(size)
  77. return
  78. }
  79. func (hist *Buffer) Initialize(size int) {
  80. hist.buffer = make([]Item, size)
  81. hist.start = -1
  82. hist.end = -1
  83. hist.setEnabled(size)
  84. }
  85. func (hist *Buffer) setEnabled(size int) {
  86. var enabled uint32
  87. if size != 0 {
  88. enabled = 1
  89. }
  90. atomic.StoreUint32(&hist.enabled, enabled)
  91. }
  92. // Enabled returns whether the buffer is currently storing messages
  93. // (a disabled buffer blackholes everything it sees)
  94. func (list *Buffer) Enabled() bool {
  95. return atomic.LoadUint32(&list.enabled) != 0
  96. }
  97. // Add adds a history item to the buffer
  98. func (list *Buffer) Add(item Item) {
  99. // fast path without a lock acquisition for when we are not storing history
  100. if !list.Enabled() {
  101. return
  102. }
  103. if !item.isStorable() {
  104. return
  105. }
  106. if item.Message.Time.IsZero() {
  107. item.Message.Time = time.Now().UTC()
  108. }
  109. list.Lock()
  110. defer list.Unlock()
  111. var pos int
  112. if list.start == -1 { // empty
  113. pos = 0
  114. list.start = 0
  115. list.end = 1 % len(list.buffer)
  116. } else if list.start != list.end { // partially full
  117. pos = list.end
  118. list.end = (list.end + 1) % len(list.buffer)
  119. } else if list.start == list.end { // full
  120. pos = list.end
  121. list.end = (list.end + 1) % len(list.buffer)
  122. list.start = list.end // advance start as well, overwriting first entry
  123. // record the timestamp of the overwritten item
  124. if list.lastDiscarded.Before(list.buffer[pos].Message.Time) {
  125. list.lastDiscarded = list.buffer[pos].Message.Time
  126. }
  127. }
  128. list.buffer[pos] = item
  129. }
  130. // Reverse reverses an []Item, in-place.
  131. func Reverse(results []Item) {
  132. for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
  133. results[i], results[j] = results[j], results[i]
  134. }
  135. }
  136. // Between returns all history items with a time `after` <= time <= `before`,
  137. // with an indication of whether the results are complete or are missing items
  138. // because some of that period was discarded. A zero value of `before` is considered
  139. // higher than all other times.
  140. func (list *Buffer) Between(after, before time.Time, ascending bool, limit int) (results []Item, complete bool) {
  141. if !list.Enabled() {
  142. return
  143. }
  144. list.RLock()
  145. defer list.RUnlock()
  146. complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
  147. satisfies := func(item Item) bool {
  148. return (after.IsZero() || item.Message.Time.After(after)) && (before.IsZero() || item.Message.Time.Before(before))
  149. }
  150. return list.matchInternal(satisfies, ascending, limit), complete
  151. }
  152. // Match returns all history items such that `predicate` returns true for them.
  153. // Items are considered in reverse insertion order if `ascending` is false, or
  154. // in insertion order if `ascending` is true, up to a total of `limit` matches
  155. // if `limit` > 0 (unlimited otherwise).
  156. // `predicate` MAY be a closure that maintains its own state across invocations;
  157. // it MUST NOT acquire any locks or otherwise do anything weird.
  158. // Results are always returned in insertion order.
  159. func (list *Buffer) Match(predicate Predicate, ascending bool, limit int) (results []Item) {
  160. if !list.Enabled() {
  161. return
  162. }
  163. list.RLock()
  164. defer list.RUnlock()
  165. return list.matchInternal(predicate, ascending, limit)
  166. }
  167. // you must be holding the read lock to call this
  168. func (list *Buffer) matchInternal(predicate Predicate, ascending bool, limit int) (results []Item) {
  169. if list.start == -1 {
  170. return
  171. }
  172. var pos, stop int
  173. if ascending {
  174. pos = list.start
  175. stop = list.prev(list.end)
  176. } else {
  177. pos = list.prev(list.end)
  178. stop = list.start
  179. }
  180. for {
  181. if predicate(list.buffer[pos]) {
  182. results = append(results, list.buffer[pos])
  183. }
  184. if pos == stop || (limit != 0 && len(results) == limit) {
  185. break
  186. }
  187. if ascending {
  188. pos = list.next(pos)
  189. } else {
  190. pos = list.prev(pos)
  191. }
  192. }
  193. // TODO sort by time instead?
  194. if !ascending {
  195. Reverse(results)
  196. }
  197. return
  198. }
  199. // Latest returns the items most recently added, up to `limit`. If `limit` is 0,
  200. // it returns all items.
  201. func (list *Buffer) Latest(limit int) (results []Item) {
  202. matchAll := func(item Item) bool { return true }
  203. return list.Match(matchAll, false, limit)
  204. }
  205. // LastDiscarded returns the latest time of any entry that was evicted
  206. // from the ring buffer.
  207. func (list *Buffer) LastDiscarded() time.Time {
  208. list.RLock()
  209. defer list.RUnlock()
  210. return list.lastDiscarded
  211. }
  212. func (list *Buffer) prev(index int) int {
  213. switch index {
  214. case 0:
  215. return len(list.buffer) - 1
  216. default:
  217. return index - 1
  218. }
  219. }
  220. func (list *Buffer) next(index int) int {
  221. switch index {
  222. case len(list.buffer) - 1:
  223. return 0
  224. default:
  225. return index + 1
  226. }
  227. }
  228. // Resize shrinks or expands the buffer
  229. func (list *Buffer) Resize(size int) {
  230. newbuffer := make([]Item, size)
  231. list.Lock()
  232. defer list.Unlock()
  233. list.setEnabled(size)
  234. if list.start == -1 {
  235. // indices are already correct and nothing needs to be copied
  236. } else if size == 0 {
  237. // this is now the empty list
  238. list.start = -1
  239. list.end = -1
  240. } else {
  241. currentLength := list.length()
  242. start := list.start
  243. end := list.end
  244. // if we're truncating, keep the latest entries, not the earliest
  245. if size < currentLength {
  246. start = list.end - size
  247. if start < 0 {
  248. start += len(list.buffer)
  249. }
  250. // update lastDiscarded for discarded entries
  251. for i := list.start; i != start; i = (i + 1) % len(list.buffer) {
  252. if list.lastDiscarded.Before(list.buffer[i].Message.Time) {
  253. list.lastDiscarded = list.buffer[i].Message.Time
  254. }
  255. }
  256. }
  257. if start < end {
  258. copied := copy(newbuffer, list.buffer[start:end])
  259. list.start = 0
  260. list.end = copied % size
  261. } else {
  262. lenInitial := len(list.buffer) - start
  263. copied := copy(newbuffer, list.buffer[start:])
  264. copied += copy(newbuffer[lenInitial:], list.buffer[:end])
  265. list.start = 0
  266. list.end = copied % size
  267. }
  268. }
  269. list.buffer = newbuffer
  270. }
  271. func (hist *Buffer) length() int {
  272. if hist.start == -1 {
  273. return 0
  274. } else if hist.start < hist.end {
  275. return hist.end - hist.start
  276. } else {
  277. return len(hist.buffer) - (hist.start - hist.end)
  278. }
  279. }