From 557b87b700e1791b6a0282d81eab183decf59968 Mon Sep 17 00:00:00 2001 From: Iris Lightshard Date: Sat, 3 Aug 2024 10:52:33 -0600 Subject: [PATCH] implement batched fetch and fix adding adapter --- adapter/adapter.go | 2 +- adapter/mastodon.go | 4 +- adapter/misskey.go | 148 ++++++++++++++++---------------- adapter/nostr.go | 2 +- frontend/ts/adapter-element.ts | 25 +++--- frontend/ts/adapter.ts | 2 + frontend/ts/batch-timer.ts | 24 ++++++ frontend/ts/settings-element.ts | 2 +- frontend/ts/util.ts | 1 + models/msg.go | 14 +-- server/api.go | 2 +- 11 files changed, 130 insertions(+), 96 deletions(-) create mode 100644 frontend/ts/batch-timer.ts diff --git a/adapter/adapter.go b/adapter/adapter.go index 4b93231..e50313f 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -8,7 +8,7 @@ type Adapter interface { Init(Settings, chan SocketData) error Name() string Subscribe(string) []error - Fetch(string, string) error + Fetch(string, []string) error Do(string) error DefaultSubscriptionFilter() string } diff --git a/adapter/mastodon.go b/adapter/mastodon.go index 636ed5d..1e84663 100644 --- a/adapter/mastodon.go +++ b/adapter/mastodon.go @@ -90,7 +90,7 @@ func (self *MastoAdapter) Subscribe(filter string) []error { return nil } -func (self *MastoAdapter) Fetch(etype string, id string) error { +func (self *MastoAdapter) Fetch(etype string, ids []string) error { return nil } @@ -110,7 +110,7 @@ func (self *MastoAdapter) mastoUpdateToMessage(status madon.Status) *Message { Id: fmt.Sprintf("%d", status.ID), Uri: status.URI, Type: "message", - Created: status.CreatedAt.UnixMilli(), + Created: status.CreatedAt.UnixMilli(), }, Content: status.Content, Author: status.Account.Acct, diff --git a/adapter/misskey.go b/adapter/misskey.go index 49d38f4..a8ccf98 100644 --- a/adapter/misskey.go +++ b/adapter/misskey.go @@ -205,7 +205,7 @@ func (self *MisskeyAdapter) toMessage(n mkm.Note, bustCache bool) *Message { Protocol: "misskey", Adapter: self.nickname, Type: "message", - Created: n.CreatedAt.UnixMilli(), + Created: n.CreatedAt.UnixMilli(), }, Author: authorId, @@ -219,11 +219,11 @@ func (self *MisskeyAdapter) toMessage(n mkm.Note, bustCache bool) *Message { for _, f := range n.Files { msg.Attachments = append(msg.Attachments, Attachment{ - Src: f.URL, - ThumbSrc: f.ThumbnailURL, - Size: f.Size, - Desc: f.Comment, - Created: f.CreatedAt.UnixMilli(), + Src: f.URL, + ThumbSrc: f.ThumbnailURL, + Size: f.Size, + Desc: f.Comment, + Created: f.CreatedAt.UnixMilli(), }) } return &msg @@ -241,11 +241,11 @@ func (self *MisskeyAdapter) toAuthor(usr mkm.User) *Author { authorId = fmt.Sprintf("@%s", usr.Username) } - var updated *int64 = nil - if usr.UpdatedAt != nil { - updatedTmp := usr.UpdatedAt.UnixMilli() - updated = &updatedTmp - } + var updated *int64 = nil + if usr.UpdatedAt != nil { + updatedTmp := usr.UpdatedAt.UnixMilli() + updated = &updatedTmp + } author := Author{ Datagram: Datagram{ @@ -265,76 +265,78 @@ func (self *MisskeyAdapter) toAuthor(usr mkm.User) *Author { return &author } -func (self *MisskeyAdapter) Fetch(etype, id string) error { - switch etype { - case "message": - data, err := self.mk.Notes().Show(id) - if err != nil { - return err - } else { - msg := self.toMessage(data, true) - if msg != nil { - self.data <- msg - } - } - case "children": - data, err := self.mk.Notes().Children(n.ChildrenRequest{ - NoteID: id, - Limit: 100, - }) - if err != nil { - return err - } else { - for _, n := range data { - msg := self.toMessage(n, true) +func (self *MisskeyAdapter) Fetch(etype, ids []string) error { + for _, id := range ids { + switch etype { + case "message": + data, err := self.mk.Notes().Show(id) + if err != nil { + return err + } else { + msg := self.toMessage(data, true) if msg != nil { self.data <- msg } } - } - case "convoy": - data, err := self.mk.Notes().Conversation(n.ConversationRequest{ - NoteID: id, - Limit: 100, - }) - if err != nil { - return err - } else { - for _, n := range data { - msg := self.toMessage(n, true) - if msg != nil { - self.data <- msg + case "children": + data, err := self.mk.Notes().Children(n.ChildrenRequest{ + NoteID: id, + Limit: 100, + }) + if err != nil { + return err + } else { + for _, n := range data { + msg := self.toMessage(n, true) + if msg != nil { + self.data <- msg + } } } - } - case "author": - user := "" - host := "" - idParts := strings.Split(id, "@") - user = idParts[1] - if len(idParts) == 3 { - host = idParts[2] - } - - var hostPtr *string = nil - if len(host) > 0 { - hostPtr = &host - } - - // fmt.Printf("attempting user resolution: @%s@%s\n", user, host) - data, err := self.mk.Users().Show(users.ShowRequest{ - Username: &user, - Host: hostPtr, - }) - if err != nil { - return err - } else { - a := self.toAuthor(data) - if a != nil { - self.data <- a + case "convoy": + data, err := self.mk.Notes().Conversation(n.ConversationRequest{ + NoteID: id, + Limit: 100, + }) + if err != nil { + return err + } else { + for _, n := range data { + msg := self.toMessage(n, true) + if msg != nil { + self.data <- msg + } + } + } + case "author": + user := "" + host := "" + idParts := strings.Split(id, "@") + user = idParts[1] + if len(idParts) == 3 { + host = idParts[2] } - } + var hostPtr *string = nil + if len(host) > 0 { + hostPtr = &host + } + + // fmt.Printf("attempting user resolution: @%s@%s\n", user, host) + data, err := self.mk.Users().Show(users.ShowRequest{ + Username: &user, + Host: hostPtr, + }) + if err != nil { + return err + } else { + a := self.toAuthor(data) + if a != nil { + self.data <- a + } + } + + } } return nil } diff --git a/adapter/nostr.go b/adapter/nostr.go index 66157a6..7f13fdf 100644 --- a/adapter/nostr.go +++ b/adapter/nostr.go @@ -79,7 +79,7 @@ func (self *NostrAdapter) Subscribe(filter string) []error { return nil } -func (self *NostrAdapter) Fetch(etype, id string) error { +func (self *NostrAdapter) Fetch(etype, ids []string) error { return nil } diff --git a/frontend/ts/adapter-element.ts b/frontend/ts/adapter-element.ts index 93e8bff..dceef3e 100644 --- a/frontend/ts/adapter-element.ts +++ b/frontend/ts/adapter-element.ts @@ -3,6 +3,7 @@ import util from "./util" import { Message, Author } from "./message" import { MessageThread } from "./thread" import { AdapterState } from "./adapter" +import { BatchTimer } from "./batch-timer" export class AdapterElement extends HTMLElement { static observedAttributes = [ "data-latest", "data-view", "data-viewing" ] @@ -12,6 +13,14 @@ export class AdapterElement extends HTMLElement { private _name: string = "" private _viewing: string = ""; + private _convoyBatchTimer = new BatchTimer((ids: string[])=>{ + let url = `/api/adapters/${this._name}/fetch?entity_type=convoy`; + for (let id of ids) { + url += `&entity_id=${id}`; + } + util.authorizedFetch("GET", url, null) + }); + // TODO: use visibility of the thread to organize into DMs and public threads private _threads: MessageThread[] = []; private _orphans: Message[] = []; @@ -29,7 +38,6 @@ export class AdapterElement extends HTMLElement { } attributeChangedCallback() { - console.log(`${this._name}.attributeChangedCallback: start`); // set the viewing subject if it's changed const viewing = this.getAttribute("data-viewing"); if (this._viewing != viewing && viewing != null) { @@ -68,13 +76,11 @@ export class AdapterElement extends HTMLElement { // if latest changed, check if it's a message const latest = this.getAttribute("data-latest"); - console.log(`${this._name}.attributeChangedCallback: checking latest(${latest}) vs _latest${this._latest}`); if (latest ?? "" != this._latest) { - console.log("latest changed") this._latest = latest ?? ""; let datastore = AdapterState._instance.data.get(this._name); if (!datastore) { - util.errMsg(this._name + " has no datastore!"); + //util.errMsg(this._name + " has no datastore!"); return; } const latestMsg = datastore.messages.get(this._latest); @@ -157,8 +163,11 @@ export class AdapterElement extends HTMLElement { } updateIdxView(latest: string, rootId: string) { - const existingThread = this.querySelector(`underbbs-thread-summary[data-msg="${rootId}"]`); + const existingThread = document.querySelector(`underbbs-thread-summary[data-msg='${rootId}']`); const thread = this._threads.find(t=>t.root.data.id == rootId); + console.log(`looking for thread ${rootId}`) + console.log(`- DOM object: ${existingThread}`); + console.log(`- in memory: ${thread}`); if (existingThread && thread) { console.log(`updating thread: ${thread.root.data.id} // ${thread.messageCount} NEW`) existingThread.setAttribute("data-latest", `${thread.latest}`); @@ -273,11 +282,7 @@ export class AdapterElement extends HTMLElement { if (this._orphans.filter(o=>o.id == msg.id).length == 0) { this._orphans.push(msg); if (msg.replyTo) { - // request the parent's data, which will try to adopt this orphan when it comes in - util.authorizedFetch( - "GET", - `/api/adapters/${this._name}/fetch?entity_type=message&entity_id=${msg.replyTo}`, - null); + this._convoyBatchTimer.queue(k, 2000); } } diff --git a/frontend/ts/adapter.ts b/frontend/ts/adapter.ts index 85dd63d..73c72d0 100644 --- a/frontend/ts/adapter.ts +++ b/frontend/ts/adapter.ts @@ -4,12 +4,14 @@ export class AdapterData { public directMessages: Map; public messages: Map; public profileCache: Map; + public convoyCache: Map; constructor(protocol: string) { this.protocol = protocol; this.messages = new Map(); this.directMessages = new Map(); this.profileCache = new Map(); + this.convoyCache = new Map(); } } diff --git a/frontend/ts/batch-timer.ts b/frontend/ts/batch-timer.ts new file mode 100644 index 0000000..c15e2ac --- /dev/null +++ b/frontend/ts/batch-timer.ts @@ -0,0 +1,24 @@ +export class BatchTimer { + private _batch: string[]; + private _timer: number; + private _reqFn: (id: string[])=>void; + + constructor(reqFn: (id: string[])=>void) { + this._batch = []; + this._timer = new Date().getTime(); + this._reqFn = reqFn; + } + + public queue(id: string, timeout: number){ + this._timer = new Date().getTime() + timeout; + this._batch.push(id); + setTimeout(this.checkBatch, timeout); + } + + private checkBatch() { + if ((new Date()).getTime() >= this._timer) { + this._reqFn(this._batch); + this._batch = []; + } + } +} \ No newline at end of file diff --git a/frontend/ts/settings-element.ts b/frontend/ts/settings-element.ts index e59340b..bfd4f9d 100644 --- a/frontend/ts/settings-element.ts +++ b/frontend/ts/settings-element.ts @@ -125,7 +125,7 @@ export class SettingsElement extends HTMLElement { self._adapters.push(adapterdata.nickname); localStorage.setItem("settings", JSON.stringify(settings)); - self.showSettings(self); + self.showSettings(self)(); } } } diff --git a/frontend/ts/util.ts b/frontend/ts/util.ts index 365a9d5..a0f417e 100644 --- a/frontend/ts/util.ts +++ b/frontend/ts/util.ts @@ -1,4 +1,5 @@ import { DatagramSocket } from './websocket' +import { BatchTimer } from './batch-timer' function _(key: string, value: any | null | undefined = undefined): any | null { const x = window; diff --git a/models/msg.go b/models/msg.go index bfa2267..7c7c666 100644 --- a/models/msg.go +++ b/models/msg.go @@ -11,8 +11,8 @@ type Datagram struct { Adapter string `json:"adapter"` Type string `json:"type"` Target *string `json:"target,omitempty"` - Created int64 `json:"created"` - Updated *int64 `json:"updated,omitempty"` + Created int64 `json:"created"` + Updated *int64 `json:"updated,omitempty"` } type Message struct { @@ -36,11 +36,11 @@ type Author struct { } type Attachment struct { - Src string `json:"src"` - ThumbSrc string `json:"thumbSrc"` - Desc string `json:"desc"` - Created int64 `json:"created"` - Size uint64 `json:"size"` + Src string `json:"src"` + ThumbSrc string `json:"thumbSrc"` + Desc string `json:"desc"` + Created int64 `json:"created"` + Size uint64 `json:"size"` } type SocketData interface { diff --git a/server/api.go b/server/api.go index b0d5771..a13f484 100644 --- a/server/api.go +++ b/server/api.go @@ -167,7 +167,7 @@ func apiAdapterFetch(next http.Handler, subscribers map[*Subscriber][]adapter.Ad queryParams := req.URL.Query() for _, a := range subscribers[s] { if a.Name() == apiParams["adapter_id"] { - err := a.Fetch(queryParams["entity_type"][0], queryParams["entity_id"][0]) + err := a.Fetch(queryParams["entity_type"][0], queryParams["entity_id"]) if err != nil { fmt.Println(err.Error()) w.WriteHeader(http.StatusInternalServerError)