選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

history.go 9.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. // Copyright (c) 2018 Shivaram Lingamneni <slingamn@cs.stanford.edu>
  2. // released under the MIT license
  3. package history
  4. import (
  5. "github.com/oragono/oragono/irc/utils"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type ItemType uint
  11. const (
  12. uninitializedItem ItemType = iota
  13. Privmsg
  14. Notice
  15. Join
  16. Part
  17. Kick
  18. Quit
  19. Mode
  20. Tagmsg
  21. Nick
  22. )
  23. const (
  24. initialAutoSize = 32
  25. )
  26. // a Tagmsg that consists entirely of transient tags is not stored
  27. var transientTags = map[string]bool{
  28. "+draft/typing": true,
  29. "+typing": true, // future-proofing
  30. }
  31. // Item represents an event (e.g., a PRIVMSG or a JOIN) and its associated data
  32. type Item struct {
  33. Type ItemType
  34. Nick string
  35. // this is the uncasefolded account name, if there's no account it should be set to "*"
  36. AccountName string
  37. // for non-privmsg items, we may stuff some other data in here
  38. Message utils.SplitMessage
  39. Tags map[string]string
  40. Params [1]string
  41. }
  42. // HasMsgid tests whether a message has the message id `msgid`.
  43. func (item *Item) HasMsgid(msgid string) bool {
  44. return item.Message.Msgid == msgid
  45. }
  46. func (item *Item) isStorable() bool {
  47. if item.Type == Tagmsg {
  48. for name := range item.Tags {
  49. if !transientTags[name] {
  50. return true
  51. }
  52. }
  53. return false // all tags were blacklisted
  54. } else {
  55. return true
  56. }
  57. }
  58. type Predicate func(item Item) (matches bool)
  59. // Buffer is a ring buffer holding message/event history for a channel or user
  60. type Buffer struct {
  61. sync.RWMutex
  62. // ring buffer, see irc/whowas.go for conventions
  63. buffer []Item
  64. start int
  65. end int
  66. maximumSize int
  67. window time.Duration
  68. lastDiscarded time.Time
  69. enabled uint32
  70. nowFunc func() time.Time
  71. }
  72. func NewHistoryBuffer(size int, window time.Duration) (result *Buffer) {
  73. result = new(Buffer)
  74. result.Initialize(size, window)
  75. return
  76. }
  77. func (hist *Buffer) Initialize(size int, window time.Duration) {
  78. hist.buffer = make([]Item, hist.initialSize(size, window))
  79. hist.start = -1
  80. hist.end = -1
  81. hist.window = window
  82. hist.maximumSize = size
  83. hist.nowFunc = time.Now
  84. hist.setEnabled(size)
  85. }
  86. // compute the initial size for the buffer, taking into account autoresize
  87. func (hist *Buffer) initialSize(size int, window time.Duration) (result int) {
  88. result = size
  89. if window != 0 {
  90. result = initialAutoSize
  91. if size < result {
  92. result = size // min(initialAutoSize, size)
  93. }
  94. }
  95. return
  96. }
  97. func (hist *Buffer) setEnabled(size int) {
  98. var enabled uint32
  99. if size != 0 {
  100. enabled = 1
  101. }
  102. atomic.StoreUint32(&hist.enabled, enabled)
  103. }
  104. // Enabled returns whether the buffer is currently storing messages
  105. // (a disabled buffer blackholes everything it sees)
  106. func (list *Buffer) Enabled() bool {
  107. return atomic.LoadUint32(&list.enabled) != 0
  108. }
  109. // Add adds a history item to the buffer
  110. func (list *Buffer) Add(item Item) {
  111. // fast path without a lock acquisition for when we are not storing history
  112. if !list.Enabled() {
  113. return
  114. }
  115. if !item.isStorable() {
  116. return
  117. }
  118. if item.Message.Time.IsZero() {
  119. item.Message.Time = time.Now().UTC()
  120. }
  121. list.Lock()
  122. defer list.Unlock()
  123. list.maybeExpand()
  124. var pos int
  125. if list.start == -1 { // empty
  126. pos = 0
  127. list.start = 0
  128. list.end = 1 % len(list.buffer)
  129. } else if list.start != list.end { // partially full
  130. pos = list.end
  131. list.end = (list.end + 1) % len(list.buffer)
  132. } else if list.start == list.end { // full
  133. pos = list.end
  134. list.end = (list.end + 1) % len(list.buffer)
  135. list.start = list.end // advance start as well, overwriting first entry
  136. // record the timestamp of the overwritten item
  137. if list.lastDiscarded.Before(list.buffer[pos].Message.Time) {
  138. list.lastDiscarded = list.buffer[pos].Message.Time
  139. }
  140. }
  141. list.buffer[pos] = item
  142. }
  143. // Reverse reverses an []Item, in-place.
  144. func Reverse(results []Item) {
  145. for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
  146. results[i], results[j] = results[j], results[i]
  147. }
  148. }
  149. // Between returns all history items with a time `after` <= time <= `before`,
  150. // with an indication of whether the results are complete or are missing items
  151. // because some of that period was discarded. A zero value of `before` is considered
  152. // higher than all other times.
  153. func (list *Buffer) Between(after, before time.Time, ascending bool, limit int) (results []Item, complete bool) {
  154. if !list.Enabled() {
  155. return
  156. }
  157. list.RLock()
  158. defer list.RUnlock()
  159. complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
  160. satisfies := func(item Item) bool {
  161. return (after.IsZero() || item.Message.Time.After(after)) && (before.IsZero() || item.Message.Time.Before(before))
  162. }
  163. return list.matchInternal(satisfies, ascending, limit), complete
  164. }
  165. // Match returns all history items such that `predicate` returns true for them.
  166. // Items are considered in reverse insertion order if `ascending` is false, or
  167. // in insertion order if `ascending` is true, up to a total of `limit` matches
  168. // if `limit` > 0 (unlimited otherwise).
  169. // `predicate` MAY be a closure that maintains its own state across invocations;
  170. // it MUST NOT acquire any locks or otherwise do anything weird.
  171. // Results are always returned in insertion order.
  172. func (list *Buffer) Match(predicate Predicate, ascending bool, limit int) (results []Item) {
  173. if !list.Enabled() {
  174. return
  175. }
  176. list.RLock()
  177. defer list.RUnlock()
  178. return list.matchInternal(predicate, ascending, limit)
  179. }
  180. // you must be holding the read lock to call this
  181. func (list *Buffer) matchInternal(predicate Predicate, ascending bool, limit int) (results []Item) {
  182. if list.start == -1 {
  183. return
  184. }
  185. var pos, stop int
  186. if ascending {
  187. pos = list.start
  188. stop = list.prev(list.end)
  189. } else {
  190. pos = list.prev(list.end)
  191. stop = list.start
  192. }
  193. for {
  194. if predicate(list.buffer[pos]) {
  195. results = append(results, list.buffer[pos])
  196. }
  197. if pos == stop || (limit != 0 && len(results) == limit) {
  198. break
  199. }
  200. if ascending {
  201. pos = list.next(pos)
  202. } else {
  203. pos = list.prev(pos)
  204. }
  205. }
  206. // TODO sort by time instead?
  207. if !ascending {
  208. Reverse(results)
  209. }
  210. return
  211. }
  212. // Latest returns the items most recently added, up to `limit`. If `limit` is 0,
  213. // it returns all items.
  214. func (list *Buffer) Latest(limit int) (results []Item) {
  215. matchAll := func(item Item) bool { return true }
  216. return list.Match(matchAll, false, limit)
  217. }
  218. // LastDiscarded returns the latest time of any entry that was evicted
  219. // from the ring buffer.
  220. func (list *Buffer) LastDiscarded() time.Time {
  221. list.RLock()
  222. defer list.RUnlock()
  223. return list.lastDiscarded
  224. }
  225. func (list *Buffer) prev(index int) int {
  226. switch index {
  227. case 0:
  228. return len(list.buffer) - 1
  229. default:
  230. return index - 1
  231. }
  232. }
  233. func (list *Buffer) next(index int) int {
  234. switch index {
  235. case len(list.buffer) - 1:
  236. return 0
  237. default:
  238. return index + 1
  239. }
  240. }
  241. // return n such that v <= n and n == 2**i for some i
  242. func roundUpToPowerOfTwo(v int) int {
  243. // http://graphics.stanford.edu/~seander/bithacks.html
  244. v -= 1
  245. v |= v >> 1
  246. v |= v >> 2
  247. v |= v >> 4
  248. v |= v >> 8
  249. v |= v >> 16
  250. return v + 1
  251. }
  252. func (list *Buffer) maybeExpand() {
  253. if list.window == 0 {
  254. return // autoresize is disabled
  255. }
  256. length := list.length()
  257. if length < len(list.buffer) {
  258. return // we have spare capacity already
  259. }
  260. if len(list.buffer) == list.maximumSize {
  261. return // cannot expand any further
  262. }
  263. wouldDiscard := list.buffer[list.start].Message.Time
  264. if list.window < list.nowFunc().Sub(wouldDiscard) {
  265. return // oldest element is old enough to overwrite
  266. }
  267. newSize := roundUpToPowerOfTwo(length + 1)
  268. if list.maximumSize < newSize {
  269. newSize = list.maximumSize
  270. }
  271. list.resize(newSize)
  272. }
  273. // Resize shrinks or expands the buffer
  274. func (list *Buffer) Resize(maximumSize int, window time.Duration) {
  275. list.Lock()
  276. defer list.Unlock()
  277. if list.maximumSize == maximumSize && list.window == window {
  278. return // no-op
  279. }
  280. list.maximumSize = maximumSize
  281. list.window = window
  282. // three cases where we need to preemptively resize:
  283. // (1) we are not autoresizing
  284. // (2) the buffer is currently larger than maximumSize and needs to be shrunk
  285. // (3) the buffer is currently smaller than the recommended initial size
  286. // (including the case where it is currently disabled and needs to be enabled)
  287. // TODO make it possible to shrink the buffer so that it only contains `window`
  288. if window == 0 || maximumSize < len(list.buffer) {
  289. list.resize(maximumSize)
  290. } else {
  291. initialSize := list.initialSize(maximumSize, window)
  292. if len(list.buffer) < initialSize {
  293. list.resize(initialSize)
  294. }
  295. }
  296. }
  297. func (list *Buffer) resize(size int) {
  298. newbuffer := make([]Item, size)
  299. list.setEnabled(size)
  300. if list.start == -1 {
  301. // indices are already correct and nothing needs to be copied
  302. } else if size == 0 {
  303. // this is now the empty list
  304. list.start = -1
  305. list.end = -1
  306. } else {
  307. currentLength := list.length()
  308. start := list.start
  309. end := list.end
  310. // if we're truncating, keep the latest entries, not the earliest
  311. if size < currentLength {
  312. start = list.end - size
  313. if start < 0 {
  314. start += len(list.buffer)
  315. }
  316. // update lastDiscarded for discarded entries
  317. for i := list.start; i != start; i = (i + 1) % len(list.buffer) {
  318. if list.lastDiscarded.Before(list.buffer[i].Message.Time) {
  319. list.lastDiscarded = list.buffer[i].Message.Time
  320. }
  321. }
  322. }
  323. if start < end {
  324. copied := copy(newbuffer, list.buffer[start:end])
  325. list.start = 0
  326. list.end = copied % size
  327. } else {
  328. lenInitial := len(list.buffer) - start
  329. copied := copy(newbuffer, list.buffer[start:])
  330. copied += copy(newbuffer[lenInitial:], list.buffer[:end])
  331. list.start = 0
  332. list.end = copied % size
  333. }
  334. }
  335. list.buffer = newbuffer
  336. }
  337. func (hist *Buffer) length() int {
  338. if hist.start == -1 {
  339. return 0
  340. } else if hist.start < hist.end {
  341. return hist.end - hist.start
  342. } else {
  343. return len(hist.buffer) - (hist.start - hist.end)
  344. }
  345. }