You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

history.go 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. // Copyright (c) 2018 Shivaram Lingamneni <slingamn@cs.stanford.edu>
  2. // released under the MIT license
  3. package history
  4. import (
  5. "github.com/oragono/oragono/irc/utils"
  6. "sync"
  7. "time"
  8. )
  9. type ItemType uint
  10. const (
  11. uninitializedItem ItemType = iota
  12. Privmsg
  13. Notice
  14. Join
  15. Part
  16. Kick
  17. Quit
  18. Mode
  19. Tagmsg
  20. Nick
  21. Topic
  22. )
  23. const (
  24. initialAutoSize = 32
  25. )
  26. // a Tagmsg that consists entirely of transient tags is not stored
  27. var transientTags = map[string]bool{
  28. "+draft/typing": true,
  29. "+typing": true, // future-proofing
  30. }
  31. // Item represents an event (e.g., a PRIVMSG or a JOIN) and its associated data
  32. type Item struct {
  33. Type ItemType
  34. Nick string
  35. // this is the uncasefolded account name, if there's no account it should be set to "*"
  36. AccountName string
  37. // for non-privmsg items, we may stuff some other data in here
  38. Message utils.SplitMessage
  39. Tags map[string]string
  40. Params [1]string
  41. // for a DM, this is the casefolded nickname of the other party (whether this is
  42. // an incoming or outgoing message). this lets us emulate the "query buffer" functionality
  43. // required by CHATHISTORY:
  44. CfCorrespondent string
  45. }
  46. // HasMsgid tests whether a message has the message id `msgid`.
  47. func (item *Item) HasMsgid(msgid string) bool {
  48. return item.Message.Msgid == msgid
  49. }
  50. func (item *Item) IsStorable() bool {
  51. switch item.Type {
  52. case Tagmsg:
  53. for name := range item.Tags {
  54. if !transientTags[name] {
  55. return true
  56. }
  57. }
  58. return false // all tags were blacklisted
  59. case Privmsg, Notice:
  60. // don't store CTCP other than ACTION
  61. return !item.Message.IsRestrictedCTCPMessage()
  62. default:
  63. return true
  64. }
  65. }
  66. type Predicate func(item *Item) (matches bool)
  67. func Reverse(results []Item) {
  68. for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
  69. results[i], results[j] = results[j], results[i]
  70. }
  71. }
  72. // Buffer is a ring buffer holding message/event history for a channel or user
  73. type Buffer struct {
  74. sync.RWMutex
  75. // ring buffer, see irc/whowas.go for conventions
  76. buffer []Item
  77. start int
  78. end int
  79. maximumSize int
  80. window time.Duration
  81. lastDiscarded time.Time
  82. nowFunc func() time.Time
  83. }
  84. func NewHistoryBuffer(size int, window time.Duration) (result *Buffer) {
  85. result = new(Buffer)
  86. result.Initialize(size, window)
  87. return
  88. }
  89. func (hist *Buffer) Initialize(size int, window time.Duration) {
  90. hist.buffer = make([]Item, hist.initialSize(size, window))
  91. hist.start = -1
  92. hist.end = -1
  93. hist.window = window
  94. hist.maximumSize = size
  95. hist.nowFunc = time.Now
  96. }
  97. // compute the initial size for the buffer, taking into account autoresize
  98. func (hist *Buffer) initialSize(size int, window time.Duration) (result int) {
  99. result = size
  100. if window != 0 {
  101. result = initialAutoSize
  102. if size < result {
  103. result = size // min(initialAutoSize, size)
  104. }
  105. }
  106. return
  107. }
  108. // Add adds a history item to the buffer
  109. func (list *Buffer) Add(item Item) {
  110. if item.Message.Time.IsZero() {
  111. item.Message.Time = time.Now().UTC()
  112. }
  113. list.Lock()
  114. defer list.Unlock()
  115. if len(list.buffer) == 0 {
  116. return
  117. }
  118. list.maybeExpand()
  119. var pos int
  120. if list.start == -1 { // empty
  121. pos = 0
  122. list.start = 0
  123. list.end = 1 % len(list.buffer)
  124. } else if list.start != list.end { // partially full
  125. pos = list.end
  126. list.end = (list.end + 1) % len(list.buffer)
  127. } else if list.start == list.end { // full
  128. pos = list.end
  129. list.end = (list.end + 1) % len(list.buffer)
  130. list.start = list.end // advance start as well, overwriting first entry
  131. // record the timestamp of the overwritten item
  132. if list.lastDiscarded.Before(list.buffer[pos].Message.Time) {
  133. list.lastDiscarded = list.buffer[pos].Message.Time
  134. }
  135. }
  136. list.buffer[pos] = item
  137. }
  138. func (list *Buffer) lookup(msgid string) (result Item, found bool) {
  139. predicate := func(item *Item) bool {
  140. return item.HasMsgid(msgid)
  141. }
  142. results := list.matchInternal(predicate, false, 1)
  143. if len(results) != 0 {
  144. return results[0], true
  145. }
  146. return
  147. }
  148. // Between returns all history items with a time `after` <= time <= `before`,
  149. // with an indication of whether the results are complete or are missing items
  150. // because some of that period was discarded. A zero value of `before` is considered
  151. // higher than all other times.
  152. func (list *Buffer) betweenHelper(start, end Selector, cutoff time.Time, pred Predicate, limit int) (results []Item, complete bool, err error) {
  153. var ascending bool
  154. defer func() {
  155. if !ascending {
  156. Reverse(results)
  157. }
  158. }()
  159. list.RLock()
  160. defer list.RUnlock()
  161. if len(list.buffer) == 0 {
  162. return
  163. }
  164. after := start.Time
  165. if start.Msgid != "" {
  166. item, found := list.lookup(start.Msgid)
  167. if !found {
  168. return
  169. }
  170. after = item.Message.Time
  171. }
  172. before := end.Time
  173. if end.Msgid != "" {
  174. item, found := list.lookup(end.Msgid)
  175. if !found {
  176. return
  177. }
  178. before = item.Message.Time
  179. }
  180. after, before, ascending = MinMaxAsc(after, before, cutoff)
  181. complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
  182. satisfies := func(item *Item) bool {
  183. return (after.IsZero() || item.Message.Time.After(after)) &&
  184. (before.IsZero() || item.Message.Time.Before(before)) &&
  185. (pred == nil || pred(item))
  186. }
  187. return list.matchInternal(satisfies, ascending, limit), complete, nil
  188. }
  189. // implements history.Sequence, emulating a single history buffer (for a channel,
  190. // a single user's DMs, or a DM conversation)
  191. type bufferSequence struct {
  192. list *Buffer
  193. pred Predicate
  194. cutoff time.Time
  195. }
  196. func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time) Sequence {
  197. var pred Predicate
  198. if correspondent != "" {
  199. pred = func(item *Item) bool {
  200. return item.CfCorrespondent == correspondent
  201. }
  202. }
  203. return &bufferSequence{
  204. list: list,
  205. pred: pred,
  206. cutoff: cutoff,
  207. }
  208. }
  209. func (seq *bufferSequence) Between(start, end Selector, limit int) (results []Item, complete bool, err error) {
  210. return seq.list.betweenHelper(start, end, seq.cutoff, seq.pred, limit)
  211. }
  212. func (seq *bufferSequence) Around(start Selector, limit int) (results []Item, err error) {
  213. return GenericAround(seq, start, limit)
  214. }
  215. // you must be holding the read lock to call this
  216. func (list *Buffer) matchInternal(predicate Predicate, ascending bool, limit int) (results []Item) {
  217. if list.start == -1 || len(list.buffer) == 0 {
  218. return
  219. }
  220. var pos, stop int
  221. if ascending {
  222. pos = list.start
  223. stop = list.prev(list.end)
  224. } else {
  225. pos = list.prev(list.end)
  226. stop = list.start
  227. }
  228. for {
  229. if predicate(&list.buffer[pos]) {
  230. results = append(results, list.buffer[pos])
  231. }
  232. if pos == stop || (limit != 0 && len(results) == limit) {
  233. break
  234. }
  235. if ascending {
  236. pos = list.next(pos)
  237. } else {
  238. pos = list.prev(pos)
  239. }
  240. }
  241. return
  242. }
  243. // Delete deletes messages matching some predicate.
  244. func (list *Buffer) Delete(predicate Predicate) (count int) {
  245. list.Lock()
  246. defer list.Unlock()
  247. if list.start == -1 || len(list.buffer) == 0 {
  248. return
  249. }
  250. pos := list.start
  251. stop := list.prev(list.end)
  252. for {
  253. if predicate(&list.buffer[pos]) {
  254. list.buffer[pos] = Item{}
  255. count++
  256. }
  257. if pos == stop {
  258. break
  259. }
  260. pos = list.next(pos)
  261. }
  262. return
  263. }
  264. // latest returns the items most recently added, up to `limit`. If `limit` is 0,
  265. // it returns all items.
  266. func (list *Buffer) latest(limit int) (results []Item) {
  267. results, _, _ = list.betweenHelper(Selector{}, Selector{}, time.Time{}, nil, limit)
  268. return
  269. }
  270. // LastDiscarded returns the latest time of any entry that was evicted
  271. // from the ring buffer.
  272. func (list *Buffer) LastDiscarded() time.Time {
  273. list.RLock()
  274. defer list.RUnlock()
  275. return list.lastDiscarded
  276. }
  277. func (list *Buffer) prev(index int) int {
  278. switch index {
  279. case 0:
  280. return len(list.buffer) - 1
  281. default:
  282. return index - 1
  283. }
  284. }
  285. func (list *Buffer) next(index int) int {
  286. switch index {
  287. case len(list.buffer) - 1:
  288. return 0
  289. default:
  290. return index + 1
  291. }
  292. }
  293. // return n such that v <= n and n == 2**i for some i
  294. func roundUpToPowerOfTwo(v int) int {
  295. // http://graphics.stanford.edu/~seander/bithacks.html
  296. v -= 1
  297. v |= v >> 1
  298. v |= v >> 2
  299. v |= v >> 4
  300. v |= v >> 8
  301. v |= v >> 16
  302. return v + 1
  303. }
  304. func (list *Buffer) maybeExpand() {
  305. if list.window == 0 {
  306. return // autoresize is disabled
  307. }
  308. length := list.length()
  309. if length < len(list.buffer) {
  310. return // we have spare capacity already
  311. }
  312. if len(list.buffer) == list.maximumSize {
  313. return // cannot expand any further
  314. }
  315. wouldDiscard := list.buffer[list.start].Message.Time
  316. if list.window < list.nowFunc().Sub(wouldDiscard) {
  317. return // oldest element is old enough to overwrite
  318. }
  319. newSize := roundUpToPowerOfTwo(length + 1)
  320. if list.maximumSize < newSize {
  321. newSize = list.maximumSize
  322. }
  323. list.resize(newSize)
  324. }
  325. // Resize shrinks or expands the buffer
  326. func (list *Buffer) Resize(maximumSize int, window time.Duration) {
  327. list.Lock()
  328. defer list.Unlock()
  329. if list.maximumSize == maximumSize && list.window == window {
  330. return // no-op
  331. }
  332. list.maximumSize = maximumSize
  333. list.window = window
  334. // three cases where we need to preemptively resize:
  335. // (1) we are not autoresizing
  336. // (2) the buffer is currently larger than maximumSize and needs to be shrunk
  337. // (3) the buffer is currently smaller than the recommended initial size
  338. // (including the case where it is currently disabled and needs to be enabled)
  339. // TODO make it possible to shrink the buffer so that it only contains `window`
  340. if window == 0 || maximumSize < len(list.buffer) {
  341. list.resize(maximumSize)
  342. } else {
  343. initialSize := list.initialSize(maximumSize, window)
  344. if len(list.buffer) < initialSize {
  345. list.resize(initialSize)
  346. }
  347. }
  348. }
  349. func (list *Buffer) resize(size int) {
  350. newbuffer := make([]Item, size)
  351. if list.start == -1 {
  352. // indices are already correct and nothing needs to be copied
  353. } else if size == 0 {
  354. // this is now the empty list
  355. list.start = -1
  356. list.end = -1
  357. } else {
  358. currentLength := list.length()
  359. start := list.start
  360. end := list.end
  361. // if we're truncating, keep the latest entries, not the earliest
  362. if size < currentLength {
  363. start = list.end - size
  364. if start < 0 {
  365. start += len(list.buffer)
  366. }
  367. // update lastDiscarded for discarded entries
  368. for i := list.start; i != start; i = (i + 1) % len(list.buffer) {
  369. if list.lastDiscarded.Before(list.buffer[i].Message.Time) {
  370. list.lastDiscarded = list.buffer[i].Message.Time
  371. }
  372. }
  373. }
  374. if start < end {
  375. copied := copy(newbuffer, list.buffer[start:end])
  376. list.start = 0
  377. list.end = copied % size
  378. } else {
  379. lenInitial := len(list.buffer) - start
  380. copied := copy(newbuffer, list.buffer[start:])
  381. copied += copy(newbuffer[lenInitial:], list.buffer[:end])
  382. list.start = 0
  383. list.end = copied % size
  384. }
  385. }
  386. list.buffer = newbuffer
  387. }
  388. func (hist *Buffer) length() int {
  389. if hist.start == -1 {
  390. return 0
  391. } else if hist.start < hist.end {
  392. return hist.end - hist.start
  393. } else {
  394. return len(hist.buffer) - (hist.start - hist.end)
  395. }
  396. }