misskey adapter can fetch notes; it needs a lock for its cache

This commit is contained in:
Iris Lightshard 2024-07-06 21:13:18 -06:00
parent e9e286ba3c
commit 9dc27a4b9a
Signed by: nilix
GPG key ID: 688407174966CAF3
9 changed files with 193 additions and 19 deletions

View file

@ -6,8 +6,9 @@ import (
type Adapter interface { type Adapter interface {
Init(Settings, chan SocketData) error Init(Settings, chan SocketData) error
Name() string
Subscribe(string) []error Subscribe(string) []error
Fetch(string) error Fetch(string, string) error
Do(string) error Do(string) error
DefaultSubscriptionFilter() string DefaultSubscriptionFilter() string
} }

View file

@ -21,6 +21,11 @@ type MastoAdapter struct {
var scopes = []string{"read", "write", "follow"} var scopes = []string{"read", "write", "follow"}
func (self *MastoAdapter) Name() string {
return self.nickname
}
func (self *MastoAdapter) Init(settings Settings, data chan SocketData) error { func (self *MastoAdapter) Init(settings Settings, data chan SocketData) error {
self.nickname = settings.Nickname self.nickname = settings.Nickname
self.server = *settings.Server self.server = *settings.Server
@ -86,7 +91,7 @@ func (self *MastoAdapter) Subscribe(filter string) []error {
return nil return nil
} }
func (self *MastoAdapter) Fetch(query string) error { func (self *MastoAdapter) Fetch(etype string, id string) error {
return nil return nil
} }

View file

@ -7,8 +7,9 @@ import (
mkcore "github.com/yitsushi/go-misskey/core" mkcore "github.com/yitsushi/go-misskey/core"
mkm "github.com/yitsushi/go-misskey/models" mkm "github.com/yitsushi/go-misskey/models"
n "github.com/yitsushi/go-misskey/services/notes" n "github.com/yitsushi/go-misskey/services/notes"
users "github.com/yitsushi/go-misskey/services/users"
tl "github.com/yitsushi/go-misskey/services/notes/timeline" tl "github.com/yitsushi/go-misskey/services/notes/timeline"
_ "strings" "strings"
"time" "time"
) )
@ -28,6 +29,10 @@ type MisskeyAdapter struct {
stop chan bool stop chan bool
} }
func (self *MisskeyAdapter) Name() string {
return self.nickname
}
func (self *MisskeyAdapter) Init(settings Settings, data chan SocketData) error { func (self *MisskeyAdapter) Init(settings Settings, data chan SocketData) error {
fmt.Println("initializing misskey adapter") fmt.Println("initializing misskey adapter")
@ -121,13 +126,13 @@ func (self *MisskeyAdapter) poll() {
// if anything is newer or as of yet not in the cache, add it // if anything is newer or as of yet not in the cache, add it
// and convert it to a SocketData implementation before sending on data channel // and convert it to a SocketData implementation before sending on data channel
for _, n := range notes { for _, n := range notes {
msg := self.cacheAndConvert(n) msg := self.toMessageIfNew(n)
if msg != nil { if msg != nil {
self.data <- msg self.data <- msg
} }
} }
for _, n := range mentions { for _, n := range mentions {
msg := self.cacheAndConvert(n) msg := self.toMessageIfNew(n)
if msg != nil { if msg != nil {
self.data <- msg self.data <- msg
} }
@ -145,7 +150,7 @@ func (self *MisskeyAdapter) poll() {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
for _, n := range notes { for _, n := range notes {
msg := self.cacheAndConvert(n) msg := self.toMessageIfNew(n)
if msg == nil { if msg == nil {
latest = &probenote[0].CreatedAt latest = &probenote[0].CreatedAt
break break
@ -171,9 +176,21 @@ func (self *MisskeyAdapter) isNew(n mkm.Note) bool {
return !exists || timestamp.Before(n.CreatedAt) return !exists || timestamp.Before(n.CreatedAt)
} }
func (self *MisskeyAdapter) cacheAndConvert(n mkm.Note) *Message { func (self *MisskeyAdapter) toMessageIfNew(n mkm.Note) *Message {
return self.toMessage(n, false);
}
func (self *MisskeyAdapter) toMessage(n mkm.Note, bustCache bool) *Message {
timestamp, exists := self.cache[n.ID] timestamp, exists := self.cache[n.ID]
if !exists || timestamp.Before(n.CreatedAt) { if bustCache || !exists || timestamp.Before(n.CreatedAt) {
host := mkcore.StringValue(n.User.Host)
authorId := ""
if host != "" {
authorId = fmt.Sprintf("@%s@%s", n.User.Username, host)
} else {
authorId = fmt.Sprintf("@%s", n.User.Username)
}
self.cache[n.ID] = n.CreatedAt self.cache[n.ID] = n.CreatedAt
msg := Message{ msg := Message{
Datagram: Datagram{ Datagram: Datagram{
@ -186,7 +203,7 @@ func (self *MisskeyAdapter) cacheAndConvert(n mkm.Note) *Message {
Created: n.CreatedAt, Created: n.CreatedAt,
Author: fmt.Sprintf("%s@%s", n.User.Username, mkcore.StringValue(n.User.Host)), Author: authorId,
Content: n.Text, Content: n.Text,
Attachments: []Attachment{}, Attachments: []Attachment{},
Visibility: n.Visibility, Visibility: n.Visibility,
@ -209,7 +226,103 @@ func (self *MisskeyAdapter) cacheAndConvert(n mkm.Note) *Message {
return nil return nil
} }
func (self *MisskeyAdapter) Fetch(query string) error { func (self *MisskeyAdapter) toAuthor(usr mkm.User) *Author {
fmt.Println("converting author: " + usr.ID)
host := mkcore.StringValue(usr.Host)
authorId := ""
if host != "" {
authorId = fmt.Sprintf("@%s@%s", usr.Username, host)
} else {
authorId = fmt.Sprintf("@%s", usr.Username)
}
author := Author{
Datagram: Datagram {
Id: authorId,
Uri: mkcore.StringValue(usr.URL),
Protocol: "misskey",
Adapter: self.nickname,
Type: "author",
},
Name: usr.Name,
ProfilePic: usr.AvatarURL,
ProfileData: usr.Description,
}
return &author
}
func (self *MisskeyAdapter) Fetch(etype, id string) error {
switch (etype) {
case "message":
data, err := self.mk.Notes().Show(id)
if err != nil {
return err
} else {
msg := self.toMessage(data, true)
if msg != nil {
self.data <- msg
}
}
case "children":
data, err := self.mk.Notes().Children(n.ChildrenRequest{
NoteID: id,
Limit: 100,
})
if err != nil {
return err
} else {
for _, n := range data {
msg := self.toMessage(n, true)
if msg != nil {
self.data <- msg
}
}
}
case "convoy":
data, err := self.mk.Notes().Conversation(n.ConversationRequest{
NoteID: id,
Limit: 100,
})
if err != nil {
return err
} else {
for _, n := range data {
msg := self.toMessage(n, true)
if msg != nil{
self.data <- msg
}
}
}
case "author":
user := ""
host := ""
idParts := strings.Split(id, "@")
user = idParts[0]
if len(idParts) == 2 {
host = idParts[1]
}
var hostPtr *string = nil
if len(host) > 0 {
hostPtr = &host
}
// fmt.Printf("attempting user resolution: @%s@%s\n", user, host)
data, err := self.mk.Users().Show(users.ShowRequest{
Username: &user,
Host: hostPtr,
})
if err != nil {
return err
} else {
a := self.toAuthor(data)
if a != nil {
self.data <- a
}
}
}
return nil return nil
} }

View file

@ -17,6 +17,10 @@ type NostrAdapter struct {
relays []*nostr.Relay relays []*nostr.Relay
} }
func (self *NostrAdapter) Name() string {
return self.nickname
}
func (self *NostrAdapter) Init(settings Settings, data chan SocketData) error { func (self *NostrAdapter) Init(settings Settings, data chan SocketData) error {
self.nickname = settings.Nickname self.nickname = settings.Nickname
self.privkey = *settings.PrivKey self.privkey = *settings.PrivKey
@ -75,7 +79,7 @@ func (self *NostrAdapter) Subscribe(filter string) []error {
return nil return nil
} }
func (self *NostrAdapter) Fetch(query string) error { func (self *NostrAdapter) Fetch(etype, id string) error {
return nil return nil
} }

View file

@ -55,3 +55,11 @@ func (self Message) ToDatagram() []byte {
} }
return data return data
} }
func (self Author) ToDatagram() []byte {
data, err := json.Marshal(self)
if err != nil {
panic(err.Error())
}
return data
}

View file

@ -147,17 +147,44 @@ func ProtectWithSubscriberKey(next http.Handler, subscribers map[*Subscriber][]a
authHeader := req.Header.Get("Authorization") authHeader := req.Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") { if strings.HasPrefix(authHeader, "Bearer ") {
subscriberKey := strings.Split(authHeader, "Bearer ")[1] subscriberKey := strings.Split(authHeader, "Bearer ")[1]
for s, _ := range subscribers { if getSubscriberByKey(subscriberKey, subscribers) != nil {
if s.key == subscriberKey { next.ServeHTTP(w, req);
next.ServeHTTP(w, req) return
return
}
} }
} }
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
}) })
} }
func apiAdapterFetch(next http.Handler, subscribers map[*Subscriber][]adapter.Adapter) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") {
subscriberKey := strings.Split(authHeader, "Bearer ")[1]
s := getSubscriberByKey(subscriberKey, subscribers)
if s != nil {
apiParams := req.Context().Value("params").(map[string]string)
queryParams := req.URL.Query()
for _, a := range subscribers[s] {
if a.Name() == apiParams["adapter_id"] {
err := a.Fetch(queryParams["entity_type"][0], queryParams["entity_id"][0])
if err != nil {
fmt.Println(err.Error())
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusAccepted)
}
next.ServeHTTP(w, req)
return
}
}
}
}
w.WriteHeader(http.StatusUnauthorized)
})
}
func (self *BBSServer) apiMux() http.Handler { func (self *BBSServer) apiMux() http.Handler {
errTemplate, err := template.New("err").Parse("{{ $params := (.Context).Value \"params\" }}<html><body><h1>ERROR {{ $params.ErrorCode }}</h1><p class='error'>{{ $params.ErrorMessage }}</p></body></html>") errTemplate, err := template.New("err").Parse("{{ $params := (.Context).Value \"params\" }}<html><body><h1>ERROR {{ $params.ErrorCode }}</h1><p class='error'>{{ $params.ErrorMessage }}</p></body></html>")
if err != nil { if err != nil {
@ -183,5 +210,10 @@ func (self *BBSServer) apiMux() http.Handler {
self.subscribers, self.subscribers,
)) ))
rtr.Get(`/adapters/(?P<adapter_id>\S+)/fetch`, ProtectWithSubscriberKey(
apiAdapterFetch(renderer.JSON("data"), self.subscribers),
self.subscribers,
))
return http.HandlerFunc(rtr.ServeHTTP) return http.HandlerFunc(rtr.ServeHTTP)
} }

View file

@ -48,7 +48,8 @@ func New() *BBSServer {
// websocket // websocket
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler) srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler) // publish is unused currently, we just use the API and send data back on the websocket
// srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
return srvr return srvr
} }

View file

@ -213,7 +213,14 @@ export class AdapterElement extends HTMLElement {
// otherwise we can orphan it and try to fill it in later // otherwise we can orphan it and try to fill it in later
if (this._orphans.filter(o=>o.id == msg.id).length == 0) { if (this._orphans.filter(o=>o.id == msg.id).length == 0) {
this._orphans.push(msg); this._orphans.push(msg);
// TODO: request the parent's data if (msg.replyTo) {
// request the parent's data, which will try to adopt this orphan when it comes in
util.authorizedFetch(
"GET",
`/api/adapters/${this._name}/fetch?entity_type=message&entity_id=${msg.replyTo}`,
null);
}
} }
return null; return null;
} }

View file

@ -41,7 +41,10 @@ export class ThreadSummaryElement extends HTMLElement {
} }
let author = datastore.profileCache.get(this._msg.author); let author = datastore.profileCache.get(this._msg.author);
if (!author) { if (!author) {
// request it! util.authorizedFetch(
"GET",
`/api/adapters/${this._adapter}/fetch?entity_type=author&entity_id=${this._msg.author}`,
null);
} }
this._author = author || <Author>{ id: this._msg.author }; this._author = author || <Author>{ id: this._msg.author };
const threadAuthor = this.querySelector(".thread_author"); const threadAuthor = this.querySelector(".thread_author");