|
@@ -193,16 +193,25 @@ func (mysql *MySQL) createTables() (err error) {
|
193
|
193
|
}
|
194
|
194
|
|
195
|
195
|
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence (
|
196
|
|
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
|
196
|
+ history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
|
197
|
197
|
target VARBINARY(%[1]d) NOT NULL,
|
198
|
198
|
nanotime BIGINT UNSIGNED NOT NULL,
|
199
|
|
- history_id BIGINT NOT NULL,
|
200
|
|
- KEY (target, nanotime),
|
201
|
|
- KEY (history_id)
|
|
199
|
+ KEY (target, nanotime)
|
202
|
200
|
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
|
203
|
201
|
if err != nil {
|
204
|
202
|
return err
|
205
|
203
|
}
|
|
204
|
+ /* XXX: this table used to be:
|
|
205
|
+ CREATE TABLE sequence (
|
|
206
|
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
|
207
|
+ target VARBINARY(%[1]d) NOT NULL,
|
|
208
|
+ nanotime BIGINT UNSIGNED NOT NULL,
|
|
209
|
+ history_id BIGINT NOT NULL,
|
|
210
|
+ KEY (target, nanotime),
|
|
211
|
+ KEY (history_id)
|
|
212
|
+ ) CHARSET=ascii COLLATE=ascii_bin;
|
|
213
|
+ Some users may still be using the old schema.
|
|
214
|
+ */
|
206
|
215
|
|
207
|
216
|
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
|
208
|
217
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
|
@@ -352,31 +361,32 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err
|
352
|
361
|
|
353
|
362
|
func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) {
|
354
|
363
|
rows, err := mysql.db.QueryContext(ctx, `
|
355
|
|
- SELECT history.id, sequence.nanotime
|
|
364
|
+ SELECT history.id, sequence.nanotime, conversations.nanotime
|
356
|
365
|
FROM history
|
357
|
366
|
LEFT JOIN sequence ON history.id = sequence.history_id
|
|
367
|
+ LEFT JOIN conversations on history.id = conversations.history_id
|
358
|
368
|
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
|
359
|
369
|
if err != nil {
|
360
|
370
|
return
|
361
|
371
|
}
|
362
|
372
|
defer rows.Close()
|
363
|
373
|
|
364
|
|
- // a history ID may have 0-2 rows in sequence: 1 for a channel entry,
|
365
|
|
- // 2 for a DM, 0 if the data is inconsistent. therefore, deduplicate
|
366
|
|
- // and delete anything that doesn't have a sequence entry:
|
367
|
374
|
idset := make(map[uint64]struct{}, cleanupRowLimit)
|
368
|
375
|
threshold := time.Now().Add(-age).UnixNano()
|
369
|
376
|
for rows.Next() {
|
370
|
377
|
var id uint64
|
371
|
|
- var nanotime sql.NullInt64
|
372
|
|
- err = rows.Scan(&id, &nanotime)
|
|
378
|
+ var seqNano, convNano sql.NullInt64
|
|
379
|
+ err = rows.Scan(&id, &seqNano, &convNano)
|
373
|
380
|
if err != nil {
|
374
|
381
|
return
|
375
|
382
|
}
|
376
|
|
- if !nanotime.Valid || nanotime.Int64 < threshold {
|
|
383
|
+ nanotime := extractNanotime(seqNano, convNano)
|
|
384
|
+ // returns 0 if not found; in that case the data is inconsistent
|
|
385
|
+ // and we should delete the entry
|
|
386
|
+ if nanotime < threshold {
|
377
|
387
|
idset[id] = struct{}{}
|
378
|
|
- if nanotime.Valid && nanotime.Int64 > maxNanotime {
|
379
|
|
- maxNanotime = nanotime.Int64
|
|
388
|
+ if nanotime > maxNanotime {
|
|
389
|
+ maxNanotime = nanotime
|
380
|
390
|
}
|
381
|
391
|
}
|
382
|
392
|
}
|
|
@@ -675,10 +685,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
675
|
685
|
nanotime := item.Message.Time.UnixNano()
|
676
|
686
|
|
677
|
687
|
if senderAccount != "" {
|
678
|
|
- err = mysql.insertSequenceEntry(ctx, senderAccount, nanotime, id)
|
679
|
|
- if err != nil {
|
680
|
|
- return
|
681
|
|
- }
|
682
|
688
|
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
|
683
|
689
|
if err != nil {
|
684
|
690
|
return
|
|
@@ -690,10 +696,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
690
|
696
|
}
|
691
|
697
|
|
692
|
698
|
if recipientAccount != "" && sender != recipient {
|
693
|
|
- err = mysql.insertSequenceEntry(ctx, recipientAccount, nanotime, id)
|
694
|
|
- if err != nil {
|
695
|
|
- return
|
696
|
|
- }
|
697
|
699
|
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
|
698
|
700
|
if err != nil {
|
699
|
701
|
return
|
|
@@ -800,31 +802,24 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
|
800
|
802
|
}
|
801
|
803
|
|
802
|
804
|
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
|
803
|
|
- // in theory, we could optimize out a roundtrip to the database by using a subquery instead:
|
804
|
|
- // sequence.nanotime > (
|
805
|
|
- // SELECT sequence.nanotime FROM sequence, history
|
806
|
|
- // WHERE sequence.history_id = history.id AND history.msgid = ?
|
807
|
|
- // LIMIT 1)
|
808
|
|
- // however, this doesn't handle the BETWEEN case with one or two msgids, where we
|
809
|
|
- // don't initially know whether the interval is going forwards or backwards. to simplify
|
810
|
|
- // the logic, resolve msgids to timestamps "manually" in all cases, using a separate query.
|
811
|
805
|
decoded, err := decodeMsgid(msgid)
|
812
|
806
|
if err != nil {
|
813
|
807
|
return
|
814
|
808
|
}
|
815
|
|
- cols := `sequence.nanotime`
|
|
809
|
+ cols := `sequence.nanotime, conversations.nanotime`
|
816
|
810
|
if includeData {
|
817
|
|
- cols = `sequence.nanotime, sequence.history_id, history.data`
|
|
811
|
+ cols = `sequence.nanotime, conversations.nanotime, history.id, history.data`
|
818
|
812
|
}
|
819
|
813
|
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
|
820
|
|
- SELECT %s FROM sequence
|
821
|
|
- INNER JOIN history ON history.id = sequence.history_id
|
|
814
|
+ SELECT %s FROM history
|
|
815
|
+ LEFT JOIN sequence ON history.id = sequence.history_id
|
|
816
|
+ LEFT JOIN conversations ON history.id = conversations.history_id
|
822
|
817
|
WHERE history.msgid = ? LIMIT 1;`, cols), decoded)
|
823
|
|
- var nanotime int64
|
|
818
|
+ var nanoSeq, nanoConv sql.NullInt64
|
824
|
819
|
if !includeData {
|
825
|
|
- err = row.Scan(&nanotime)
|
|
820
|
+ err = row.Scan(&nanoSeq, &nanoConv)
|
826
|
821
|
} else {
|
827
|
|
- err = row.Scan(&nanotime, &id, &data)
|
|
822
|
+ err = row.Scan(&nanoSeq, &nanoConv, &id, &data)
|
828
|
823
|
}
|
829
|
824
|
if err != sql.ErrNoRows {
|
830
|
825
|
mysql.logError("could not resolve msgid to time", err)
|
|
@@ -832,11 +827,24 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
|
832
|
827
|
if err != nil {
|
833
|
828
|
return
|
834
|
829
|
}
|
835
|
|
-
|
|
830
|
+ nanotime := extractNanotime(nanoSeq, nanoConv)
|
|
831
|
+ if nanotime == 0 {
|
|
832
|
+ err = sql.ErrNoRows
|
|
833
|
+ return
|
|
834
|
+ }
|
836
|
835
|
result = time.Unix(0, nanotime).UTC()
|
837
|
836
|
return
|
838
|
837
|
}
|
839
|
838
|
|
|
839
|
+func extractNanotime(seq, conv sql.NullInt64) (result int64) {
|
|
840
|
+ if seq.Valid {
|
|
841
|
+ return seq.Int64
|
|
842
|
+ } else if conv.Valid {
|
|
843
|
+ return conv.Int64
|
|
844
|
+ }
|
|
845
|
+ return
|
|
846
|
+}
|
|
847
|
+
|
840
|
848
|
func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) {
|
841
|
849
|
rows, err := mysql.db.QueryContext(ctx, query, args...)
|
842
|
850
|
if mysql.logError("could not select history items", err) {
|