From a10d292efa2e454196049a8ff754b6e8e434301b Mon Sep 17 00:00:00 2001 From: Iris Lightshard Date: Tue, 25 Jun 2024 21:33:57 -0600 Subject: [PATCH] misskey.go: improve efficiency of timeline streaming --- adapter/misskey.go | 150 ++++++++++++++++++++++++++++----------------- 1 file changed, 95 insertions(+), 55 deletions(-) diff --git a/adapter/misskey.go b/adapter/misskey.go index 5492e28..9861d2a 100644 --- a/adapter/misskey.go +++ b/adapter/misskey.go @@ -5,7 +5,7 @@ import ( . "forge.lightcrystal.systems/lightcrystal/underbbs/models" "github.com/yitsushi/go-misskey" mkm "github.com/yitsushi/go-misskey/models" - notes "github.com/yitsushi/go-misskey/services/notes" + n "github.com/yitsushi/go-misskey/services/notes" tl "github.com/yitsushi/go-misskey/services/notes/timeline" _ "strings" "time" @@ -25,10 +25,6 @@ type MisskeyAdapter struct { cache map[string]time.Time stop chan bool - - notes chan mkm.Note - users chan mkm.User - follows chan mkm.FollowStatus } func (self *MisskeyAdapter) Init(settings Settings, data chan SocketData) error { @@ -76,60 +72,104 @@ func (self *MisskeyAdapter) Subscribe(filter string) []error { // if they are newer than the cache, convert them to UnderBBS objects // and send them on the data channel - go func() { - notesService := self.mk.Notes() - timelineService := notesService.Timeline() - - for { - select { - case _, ok := <-self.stop: - if !ok { - return - } - default: - - // TODO: we have to actually decode and pass our filter criteria - tlnotes, tlerr := timelineService.Get(tl.GetRequest{ - Limit: 50, - - }) - mentions, merr := notesService.Mentions(notes.MentionsRequest{ - Limit: 50, - }) - - if tlerr != nil { - fmt.Println(tlerr.Error()) - } - if merr != nil { - fmt.Println(merr.Error()) - } - - // check the cache for everything we just collected - // if anything is newer or as of yet not in the cache, add it - // and convert it to a SocketData implementation before sending on data channel - for _, n := range tlnotes { - msg := self.cacheAndConvert(n) - if msg != nil { - self.data <- msg - } - } - for _, n := range mentions { - msg := self.cacheAndConvert(n) - if msg != nil { - self.data <- msg - } - } - - time.Sleep(5*time.Second) - } - - } - - }() + go self.poll() return nil } +func (self *MisskeyAdapter) poll() { + + var latest *time.Time + + notesService := self.mk.Notes() + timelineService := notesService.Timeline() + + for { + select { + case _, ok := <-self.stop: + if !ok { + return + } + default: + + // TODO: we have to actually decode and pass our filter criteria + // probe for new notes + + probenote, err := timelineService.Get(tl.GetRequest{ + Limit: 1, + }) + if err == nil && len(probenote) > 0 && self.isNew(probenote[0]) { + if latest == nil { + latest = &probenote[0].CreatedAt + // this is the first fetch of notes, we can just grab them + notes, err := timelineService.Get(tl.GetRequest{ + Limit: 100, + }) + // if latest is nil also get mentions history + mentions, merr := notesService.Mentions(n.MentionsRequest{ + Limit: 100, + }) + if err != nil { + fmt.Println(err.Error()) + } + if merr != nil { + fmt.Println(merr.Error()) + } + + // check the cache for everything we just collected + // if anything is newer or as of yet not in the cache, add it + // and convert it to a SocketData implementation before sending on data channel + for _, n := range notes { + msg := self.cacheAndConvert(n) + if msg != nil { + self.data <- msg + } + } + for _, n := range mentions { + msg := self.cacheAndConvert(n) + if msg != nil { + self.data <- msg + } + } + + } else { + for { + // get notes since latest, until the probe + notes, err := timelineService.Get(tl.GetRequest{ + SinceDate: uint64(latest.Unix()), + UntilDate: uint64(probenote[0].CreatedAt.Unix()), + Limit: 100, + }) + if err != nil { + fmt.Println(err.Error()) + } + for _, n := range notes { + msg := self.cacheAndConvert(n) + if msg == nil { + + latest = &probenote[0].CreatedAt + break + } + } + if *latest == probenote[0].CreatedAt { + break + } + } + } + } + + time.Sleep(5 * time.Second) + } + + } + +} + +func (self *MisskeyAdapter) isNew(n mkm.Note) bool { + timestamp, exists := self.cache[n.ID] + return !exists || timestamp.Before(n.CreatedAt) +} + func (self *MisskeyAdapter) cacheAndConvert(n mkm.Note) *Message { timestamp, exists := self.cache[n.ID] if !exists || timestamp.Before(n.CreatedAt) {