misskey.go: improve efficiency of timeline streaming

This commit is contained in:
Iris Lightshard 2024-06-25 21:33:57 -06:00
parent 75e62608b7
commit a10d292efa
Signed by: nilix
GPG Key ID: 688407174966CAF3
1 changed files with 95 additions and 55 deletions

View File

@ -5,7 +5,7 @@ import (
. "forge.lightcrystal.systems/lightcrystal/underbbs/models"
"github.com/yitsushi/go-misskey"
mkm "github.com/yitsushi/go-misskey/models"
notes "github.com/yitsushi/go-misskey/services/notes"
n "github.com/yitsushi/go-misskey/services/notes"
tl "github.com/yitsushi/go-misskey/services/notes/timeline"
_ "strings"
"time"
@ -25,10 +25,6 @@ type MisskeyAdapter struct {
cache map[string]time.Time
stop chan bool
notes chan mkm.Note
users chan mkm.User
follows chan mkm.FollowStatus
}
func (self *MisskeyAdapter) Init(settings Settings, data chan SocketData) error {
@ -76,60 +72,104 @@ func (self *MisskeyAdapter) Subscribe(filter string) []error {
// if they are newer than the cache, convert them to UnderBBS objects
// and send them on the data channel
go func() {
notesService := self.mk.Notes()
timelineService := notesService.Timeline()
for {
select {
case _, ok := <-self.stop:
if !ok {
return
}
default:
// TODO: we have to actually decode and pass our filter criteria
tlnotes, tlerr := timelineService.Get(tl.GetRequest{
Limit: 50,
})
mentions, merr := notesService.Mentions(notes.MentionsRequest{
Limit: 50,
})
if tlerr != nil {
fmt.Println(tlerr.Error())
}
if merr != nil {
fmt.Println(merr.Error())
}
// check the cache for everything we just collected
// if anything is newer or as of yet not in the cache, add it
// and convert it to a SocketData implementation before sending on data channel
for _, n := range tlnotes {
msg := self.cacheAndConvert(n)
if msg != nil {
self.data <- msg
}
}
for _, n := range mentions {
msg := self.cacheAndConvert(n)
if msg != nil {
self.data <- msg
}
}
time.Sleep(5*time.Second)
}
}
}()
go self.poll()
return nil
}
func (self *MisskeyAdapter) poll() {
var latest *time.Time
notesService := self.mk.Notes()
timelineService := notesService.Timeline()
for {
select {
case _, ok := <-self.stop:
if !ok {
return
}
default:
// TODO: we have to actually decode and pass our filter criteria
// probe for new notes
probenote, err := timelineService.Get(tl.GetRequest{
Limit: 1,
})
if err == nil && len(probenote) > 0 && self.isNew(probenote[0]) {
if latest == nil {
latest = &probenote[0].CreatedAt
// this is the first fetch of notes, we can just grab them
notes, err := timelineService.Get(tl.GetRequest{
Limit: 100,
})
// if latest is nil also get mentions history
mentions, merr := notesService.Mentions(n.MentionsRequest{
Limit: 100,
})
if err != nil {
fmt.Println(err.Error())
}
if merr != nil {
fmt.Println(merr.Error())
}
// check the cache for everything we just collected
// if anything is newer or as of yet not in the cache, add it
// and convert it to a SocketData implementation before sending on data channel
for _, n := range notes {
msg := self.cacheAndConvert(n)
if msg != nil {
self.data <- msg
}
}
for _, n := range mentions {
msg := self.cacheAndConvert(n)
if msg != nil {
self.data <- msg
}
}
} else {
for {
// get notes since latest, until the probe
notes, err := timelineService.Get(tl.GetRequest{
SinceDate: uint64(latest.Unix()),
UntilDate: uint64(probenote[0].CreatedAt.Unix()),
Limit: 100,
})
if err != nil {
fmt.Println(err.Error())
}
for _, n := range notes {
msg := self.cacheAndConvert(n)
if msg == nil {
latest = &probenote[0].CreatedAt
break
}
}
if *latest == probenote[0].CreatedAt {
break
}
}
}
}
time.Sleep(5 * time.Second)
}
}
}
func (self *MisskeyAdapter) isNew(n mkm.Note) bool {
timestamp, exists := self.cache[n.ID]
return !exists || timestamp.Before(n.CreatedAt)
}
func (self *MisskeyAdapter) cacheAndConvert(n mkm.Note) *Message {
timestamp, exists := self.cache[n.ID]
if !exists || timestamp.Before(n.CreatedAt) {