選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

history.go 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. // Copyright (c) 2018 Shivaram Lingamneni <slingamn@cs.stanford.edu>
  2. // released under the MIT license
  3. package history
  4. import (
  5. "github.com/oragono/oragono/irc/utils"
  6. "sync"
  7. "time"
  8. )
  9. type ItemType uint
  10. const (
  11. uninitializedItem ItemType = iota
  12. Privmsg
  13. Notice
  14. Join
  15. Part
  16. Kick
  17. Quit
  18. Mode
  19. Tagmsg
  20. Nick
  21. Topic
  22. )
  23. const (
  24. initialAutoSize = 32
  25. )
  26. // Item represents an event (e.g., a PRIVMSG or a JOIN) and its associated data
  27. type Item struct {
  28. Type ItemType
  29. Nick string
  30. // this is the uncasefolded account name, if there's no account it should be set to "*"
  31. AccountName string
  32. // for non-privmsg items, we may stuff some other data in here
  33. Message utils.SplitMessage
  34. Tags map[string]string
  35. Params [1]string
  36. // for a DM, this is the casefolded nickname of the other party (whether this is
  37. // an incoming or outgoing message). this lets us emulate the "query buffer" functionality
  38. // required by CHATHISTORY:
  39. CfCorrespondent string
  40. }
  41. // HasMsgid tests whether a message has the message id `msgid`.
  42. func (item *Item) HasMsgid(msgid string) bool {
  43. return item.Message.Msgid == msgid
  44. }
  45. type Predicate func(item *Item) (matches bool)
  46. func Reverse(results []Item) {
  47. for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
  48. results[i], results[j] = results[j], results[i]
  49. }
  50. }
  51. // Buffer is a ring buffer holding message/event history for a channel or user
  52. type Buffer struct {
  53. sync.RWMutex
  54. // ring buffer, see irc/whowas.go for conventions
  55. buffer []Item
  56. start int
  57. end int
  58. maximumSize int
  59. window time.Duration
  60. lastDiscarded time.Time
  61. nowFunc func() time.Time
  62. }
  63. func NewHistoryBuffer(size int, window time.Duration) (result *Buffer) {
  64. result = new(Buffer)
  65. result.Initialize(size, window)
  66. return
  67. }
  68. func (hist *Buffer) Initialize(size int, window time.Duration) {
  69. hist.buffer = make([]Item, hist.initialSize(size, window))
  70. hist.start = -1
  71. hist.end = -1
  72. hist.window = window
  73. hist.maximumSize = size
  74. hist.nowFunc = time.Now
  75. }
  76. // compute the initial size for the buffer, taking into account autoresize
  77. func (hist *Buffer) initialSize(size int, window time.Duration) (result int) {
  78. result = size
  79. if window != 0 {
  80. result = initialAutoSize
  81. if size < result {
  82. result = size // min(initialAutoSize, size)
  83. }
  84. }
  85. return
  86. }
  87. // Add adds a history item to the buffer
  88. func (list *Buffer) Add(item Item) {
  89. if item.Message.Time.IsZero() {
  90. item.Message.Time = time.Now().UTC()
  91. }
  92. list.Lock()
  93. defer list.Unlock()
  94. if len(list.buffer) == 0 {
  95. return
  96. }
  97. list.maybeExpand()
  98. var pos int
  99. if list.start == -1 { // empty
  100. pos = 0
  101. list.start = 0
  102. list.end = 1 % len(list.buffer)
  103. } else if list.start != list.end { // partially full
  104. pos = list.end
  105. list.end = (list.end + 1) % len(list.buffer)
  106. } else if list.start == list.end { // full
  107. pos = list.end
  108. list.end = (list.end + 1) % len(list.buffer)
  109. list.start = list.end // advance start as well, overwriting first entry
  110. // record the timestamp of the overwritten item
  111. if list.lastDiscarded.Before(list.buffer[pos].Message.Time) {
  112. list.lastDiscarded = list.buffer[pos].Message.Time
  113. }
  114. }
  115. list.buffer[pos] = item
  116. }
  117. func (list *Buffer) lookup(msgid string) (result Item, found bool) {
  118. predicate := func(item *Item) bool {
  119. return item.HasMsgid(msgid)
  120. }
  121. results := list.matchInternal(predicate, false, 1)
  122. if len(results) != 0 {
  123. return results[0], true
  124. }
  125. return
  126. }
  127. // Between returns all history items with a time `after` <= time <= `before`,
  128. // with an indication of whether the results are complete or are missing items
  129. // because some of that period was discarded. A zero value of `before` is considered
  130. // higher than all other times.
  131. func (list *Buffer) betweenHelper(start, end Selector, cutoff time.Time, pred Predicate, limit int) (results []Item, complete bool, err error) {
  132. var ascending bool
  133. defer func() {
  134. if !ascending {
  135. Reverse(results)
  136. }
  137. }()
  138. list.RLock()
  139. defer list.RUnlock()
  140. if len(list.buffer) == 0 {
  141. return
  142. }
  143. after := start.Time
  144. if start.Msgid != "" {
  145. item, found := list.lookup(start.Msgid)
  146. if !found {
  147. return
  148. }
  149. after = item.Message.Time
  150. }
  151. before := end.Time
  152. if end.Msgid != "" {
  153. item, found := list.lookup(end.Msgid)
  154. if !found {
  155. return
  156. }
  157. before = item.Message.Time
  158. }
  159. after, before, ascending = MinMaxAsc(after, before, cutoff)
  160. complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
  161. satisfies := func(item *Item) bool {
  162. return (after.IsZero() || item.Message.Time.After(after)) &&
  163. (before.IsZero() || item.Message.Time.Before(before)) &&
  164. (pred == nil || pred(item))
  165. }
  166. return list.matchInternal(satisfies, ascending, limit), complete, nil
  167. }
  168. // implements history.Sequence, emulating a single history buffer (for a channel,
  169. // a single user's DMs, or a DM conversation)
  170. type bufferSequence struct {
  171. list *Buffer
  172. pred Predicate
  173. cutoff time.Time
  174. }
  175. func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time) Sequence {
  176. var pred Predicate
  177. if correspondent != "" {
  178. pred = func(item *Item) bool {
  179. return item.CfCorrespondent == correspondent
  180. }
  181. }
  182. return &bufferSequence{
  183. list: list,
  184. pred: pred,
  185. cutoff: cutoff,
  186. }
  187. }
  188. func (seq *bufferSequence) Between(start, end Selector, limit int) (results []Item, complete bool, err error) {
  189. return seq.list.betweenHelper(start, end, seq.cutoff, seq.pred, limit)
  190. }
  191. func (seq *bufferSequence) Around(start Selector, limit int) (results []Item, err error) {
  192. return GenericAround(seq, start, limit)
  193. }
  194. // you must be holding the read lock to call this
  195. func (list *Buffer) matchInternal(predicate Predicate, ascending bool, limit int) (results []Item) {
  196. if list.start == -1 || len(list.buffer) == 0 {
  197. return
  198. }
  199. var pos, stop int
  200. if ascending {
  201. pos = list.start
  202. stop = list.prev(list.end)
  203. } else {
  204. pos = list.prev(list.end)
  205. stop = list.start
  206. }
  207. for {
  208. if predicate(&list.buffer[pos]) {
  209. results = append(results, list.buffer[pos])
  210. }
  211. if pos == stop || (limit != 0 && len(results) == limit) {
  212. break
  213. }
  214. if ascending {
  215. pos = list.next(pos)
  216. } else {
  217. pos = list.prev(pos)
  218. }
  219. }
  220. return
  221. }
  222. // Delete deletes messages matching some predicate.
  223. func (list *Buffer) Delete(predicate Predicate) (count int) {
  224. list.Lock()
  225. defer list.Unlock()
  226. if list.start == -1 || len(list.buffer) == 0 {
  227. return
  228. }
  229. pos := list.start
  230. stop := list.prev(list.end)
  231. for {
  232. if predicate(&list.buffer[pos]) {
  233. list.buffer[pos] = Item{}
  234. count++
  235. }
  236. if pos == stop {
  237. break
  238. }
  239. pos = list.next(pos)
  240. }
  241. return
  242. }
  243. // latest returns the items most recently added, up to `limit`. If `limit` is 0,
  244. // it returns all items.
  245. func (list *Buffer) latest(limit int) (results []Item) {
  246. results, _, _ = list.betweenHelper(Selector{}, Selector{}, time.Time{}, nil, limit)
  247. return
  248. }
  249. // LastDiscarded returns the latest time of any entry that was evicted
  250. // from the ring buffer.
  251. func (list *Buffer) LastDiscarded() time.Time {
  252. list.RLock()
  253. defer list.RUnlock()
  254. return list.lastDiscarded
  255. }
  256. func (list *Buffer) prev(index int) int {
  257. switch index {
  258. case 0:
  259. return len(list.buffer) - 1
  260. default:
  261. return index - 1
  262. }
  263. }
  264. func (list *Buffer) next(index int) int {
  265. switch index {
  266. case len(list.buffer) - 1:
  267. return 0
  268. default:
  269. return index + 1
  270. }
  271. }
  272. // return n such that v <= n and n == 2**i for some i
  273. func roundUpToPowerOfTwo(v int) int {
  274. // http://graphics.stanford.edu/~seander/bithacks.html
  275. v -= 1
  276. v |= v >> 1
  277. v |= v >> 2
  278. v |= v >> 4
  279. v |= v >> 8
  280. v |= v >> 16
  281. return v + 1
  282. }
  283. func (list *Buffer) maybeExpand() {
  284. if list.window == 0 {
  285. return // autoresize is disabled
  286. }
  287. length := list.length()
  288. if length < len(list.buffer) {
  289. return // we have spare capacity already
  290. }
  291. if len(list.buffer) == list.maximumSize {
  292. return // cannot expand any further
  293. }
  294. wouldDiscard := list.buffer[list.start].Message.Time
  295. if list.window < list.nowFunc().Sub(wouldDiscard) {
  296. return // oldest element is old enough to overwrite
  297. }
  298. newSize := roundUpToPowerOfTwo(length + 1)
  299. if list.maximumSize < newSize {
  300. newSize = list.maximumSize
  301. }
  302. list.resize(newSize)
  303. }
  304. // Resize shrinks or expands the buffer
  305. func (list *Buffer) Resize(maximumSize int, window time.Duration) {
  306. list.Lock()
  307. defer list.Unlock()
  308. if list.maximumSize == maximumSize && list.window == window {
  309. return // no-op
  310. }
  311. list.maximumSize = maximumSize
  312. list.window = window
  313. // three cases where we need to preemptively resize:
  314. // (1) we are not autoresizing
  315. // (2) the buffer is currently larger than maximumSize and needs to be shrunk
  316. // (3) the buffer is currently smaller than the recommended initial size
  317. // (including the case where it is currently disabled and needs to be enabled)
  318. // TODO make it possible to shrink the buffer so that it only contains `window`
  319. if window == 0 || maximumSize < len(list.buffer) {
  320. list.resize(maximumSize)
  321. } else {
  322. initialSize := list.initialSize(maximumSize, window)
  323. if len(list.buffer) < initialSize {
  324. list.resize(initialSize)
  325. }
  326. }
  327. }
  328. func (list *Buffer) resize(size int) {
  329. newbuffer := make([]Item, size)
  330. if list.start == -1 {
  331. // indices are already correct and nothing needs to be copied
  332. } else if size == 0 {
  333. // this is now the empty list
  334. list.start = -1
  335. list.end = -1
  336. } else {
  337. currentLength := list.length()
  338. start := list.start
  339. end := list.end
  340. // if we're truncating, keep the latest entries, not the earliest
  341. if size < currentLength {
  342. start = list.end - size
  343. if start < 0 {
  344. start += len(list.buffer)
  345. }
  346. // update lastDiscarded for discarded entries
  347. for i := list.start; i != start; i = (i + 1) % len(list.buffer) {
  348. if list.lastDiscarded.Before(list.buffer[i].Message.Time) {
  349. list.lastDiscarded = list.buffer[i].Message.Time
  350. }
  351. }
  352. }
  353. if start < end {
  354. copied := copy(newbuffer, list.buffer[start:end])
  355. list.start = 0
  356. list.end = copied % size
  357. } else {
  358. lenInitial := len(list.buffer) - start
  359. copied := copy(newbuffer, list.buffer[start:])
  360. copied += copy(newbuffer[lenInitial:], list.buffer[:end])
  361. list.start = 0
  362. list.end = copied % size
  363. }
  364. }
  365. list.buffer = newbuffer
  366. }
  367. func (hist *Buffer) length() int {
  368. if hist.start == -1 {
  369. return 0
  370. } else if hist.start < hist.end {
  371. return hist.end - hist.start
  372. } else {
  373. return len(hist.buffer) - (hist.start - hist.end)
  374. }
  375. }