implement batched fetch and fix adding adapter

This commit is contained in:
Iris Lightshard 2024-08-03 10:52:33 -06:00
parent 0106b445b5
commit 557b87b700
Signed by: Iris Lightshard
GPG key ID: 688407174966CAF3
11 changed files with 130 additions and 96 deletions

View file

@ -8,7 +8,7 @@ type Adapter interface {
Init(Settings, chan SocketData) error Init(Settings, chan SocketData) error
Name() string Name() string
Subscribe(string) []error Subscribe(string) []error
Fetch(string, string) error Fetch(string, []string) error
Do(string) error Do(string) error
DefaultSubscriptionFilter() string DefaultSubscriptionFilter() string
} }

View file

@ -90,7 +90,7 @@ func (self *MastoAdapter) Subscribe(filter string) []error {
return nil return nil
} }
func (self *MastoAdapter) Fetch(etype string, id string) error { func (self *MastoAdapter) Fetch(etype string, ids []string) error {
return nil return nil
} }
@ -110,7 +110,7 @@ func (self *MastoAdapter) mastoUpdateToMessage(status madon.Status) *Message {
Id: fmt.Sprintf("%d", status.ID), Id: fmt.Sprintf("%d", status.ID),
Uri: status.URI, Uri: status.URI,
Type: "message", Type: "message",
Created: status.CreatedAt.UnixMilli(), Created: status.CreatedAt.UnixMilli(),
}, },
Content: status.Content, Content: status.Content,
Author: status.Account.Acct, Author: status.Account.Acct,

View file

@ -205,7 +205,7 @@ func (self *MisskeyAdapter) toMessage(n mkm.Note, bustCache bool) *Message {
Protocol: "misskey", Protocol: "misskey",
Adapter: self.nickname, Adapter: self.nickname,
Type: "message", Type: "message",
Created: n.CreatedAt.UnixMilli(), Created: n.CreatedAt.UnixMilli(),
}, },
Author: authorId, Author: authorId,
@ -219,11 +219,11 @@ func (self *MisskeyAdapter) toMessage(n mkm.Note, bustCache bool) *Message {
for _, f := range n.Files { for _, f := range n.Files {
msg.Attachments = append(msg.Attachments, Attachment{ msg.Attachments = append(msg.Attachments, Attachment{
Src: f.URL, Src: f.URL,
ThumbSrc: f.ThumbnailURL, ThumbSrc: f.ThumbnailURL,
Size: f.Size, Size: f.Size,
Desc: f.Comment, Desc: f.Comment,
Created: f.CreatedAt.UnixMilli(), Created: f.CreatedAt.UnixMilli(),
}) })
} }
return &msg return &msg
@ -241,11 +241,11 @@ func (self *MisskeyAdapter) toAuthor(usr mkm.User) *Author {
authorId = fmt.Sprintf("@%s", usr.Username) authorId = fmt.Sprintf("@%s", usr.Username)
} }
var updated *int64 = nil var updated *int64 = nil
if usr.UpdatedAt != nil { if usr.UpdatedAt != nil {
updatedTmp := usr.UpdatedAt.UnixMilli() updatedTmp := usr.UpdatedAt.UnixMilli()
updated = &updatedTmp updated = &updatedTmp
} }
author := Author{ author := Author{
Datagram: Datagram{ Datagram: Datagram{
@ -265,76 +265,78 @@ func (self *MisskeyAdapter) toAuthor(usr mkm.User) *Author {
return &author return &author
} }
func (self *MisskeyAdapter) Fetch(etype, id string) error { func (self *MisskeyAdapter) Fetch(etype, ids []string) error {
switch etype { for _, id := range ids {
case "message": switch etype {
data, err := self.mk.Notes().Show(id) case "message":
if err != nil { data, err := self.mk.Notes().Show(id)
return err if err != nil {
} else { return err
msg := self.toMessage(data, true) } else {
if msg != nil { msg := self.toMessage(data, true)
self.data <- msg
}
}
case "children":
data, err := self.mk.Notes().Children(n.ChildrenRequest{
NoteID: id,
Limit: 100,
})
if err != nil {
return err
} else {
for _, n := range data {
msg := self.toMessage(n, true)
if msg != nil { if msg != nil {
self.data <- msg self.data <- msg
} }
} }
} case "children":
case "convoy": data, err := self.mk.Notes().Children(n.ChildrenRequest{
data, err := self.mk.Notes().Conversation(n.ConversationRequest{ NoteID: id,
NoteID: id, Limit: 100,
Limit: 100, })
}) if err != nil {
if err != nil { return err
return err } else {
} else { for _, n := range data {
for _, n := range data { msg := self.toMessage(n, true)
msg := self.toMessage(n, true) if msg != nil {
if msg != nil { self.data <- msg
self.data <- msg }
} }
} }
} case "convoy":
case "author": data, err := self.mk.Notes().Conversation(n.ConversationRequest{
user := "" NoteID: id,
host := "" Limit: 100,
idParts := strings.Split(id, "@") })
user = idParts[1] if err != nil {
if len(idParts) == 3 { return err
host = idParts[2] } else {
} for _, n := range data {
msg := self.toMessage(n, true)
var hostPtr *string = nil if msg != nil {
if len(host) > 0 { self.data <- msg
hostPtr = &host }
} }
}
// fmt.Printf("attempting user resolution: @%s@%s\n", user, host) case "author":
data, err := self.mk.Users().Show(users.ShowRequest{ user := ""
Username: &user, host := ""
Host: hostPtr, idParts := strings.Split(id, "@")
}) user = idParts[1]
if err != nil { if len(idParts) == 3 {
return err host = idParts[2]
} else {
a := self.toAuthor(data)
if a != nil {
self.data <- a
} }
}
var hostPtr *string = nil
if len(host) > 0 {
hostPtr = &host
}
// fmt.Printf("attempting user resolution: @%s@%s\n", user, host)
data, err := self.mk.Users().Show(users.ShowRequest{
Username: &user,
Host: hostPtr,
})
if err != nil {
return err
} else {
a := self.toAuthor(data)
if a != nil {
self.data <- a
}
}
}
} }
return nil return nil
} }

View file

@ -79,7 +79,7 @@ func (self *NostrAdapter) Subscribe(filter string) []error {
return nil return nil
} }
func (self *NostrAdapter) Fetch(etype, id string) error { func (self *NostrAdapter) Fetch(etype, ids []string) error {
return nil return nil
} }

View file

@ -3,6 +3,7 @@ import util from "./util"
import { Message, Author } from "./message" import { Message, Author } from "./message"
import { MessageThread } from "./thread" import { MessageThread } from "./thread"
import { AdapterState } from "./adapter" import { AdapterState } from "./adapter"
import { BatchTimer } from "./batch-timer"
export class AdapterElement extends HTMLElement { export class AdapterElement extends HTMLElement {
static observedAttributes = [ "data-latest", "data-view", "data-viewing" ] static observedAttributes = [ "data-latest", "data-view", "data-viewing" ]
@ -12,6 +13,14 @@ export class AdapterElement extends HTMLElement {
private _name: string = "" private _name: string = ""
private _viewing: string = ""; private _viewing: string = "";
private _convoyBatchTimer = new BatchTimer((ids: string[])=>{
let url = `/api/adapters/${this._name}/fetch?entity_type=convoy`;
for (let id of ids) {
url += `&entity_id=${id}`;
}
util.authorizedFetch("GET", url, null)
});
// TODO: use visibility of the thread to organize into DMs and public threads // TODO: use visibility of the thread to organize into DMs and public threads
private _threads: MessageThread[] = []; private _threads: MessageThread[] = [];
private _orphans: Message[] = []; private _orphans: Message[] = [];
@ -29,7 +38,6 @@ export class AdapterElement extends HTMLElement {
} }
attributeChangedCallback() { attributeChangedCallback() {
console.log(`${this._name}.attributeChangedCallback: start`);
// set the viewing subject if it's changed // set the viewing subject if it's changed
const viewing = this.getAttribute("data-viewing"); const viewing = this.getAttribute("data-viewing");
if (this._viewing != viewing && viewing != null) { if (this._viewing != viewing && viewing != null) {
@ -68,13 +76,11 @@ export class AdapterElement extends HTMLElement {
// if latest changed, check if it's a message // if latest changed, check if it's a message
const latest = this.getAttribute("data-latest"); const latest = this.getAttribute("data-latest");
console.log(`${this._name}.attributeChangedCallback: checking latest(${latest}) vs _latest${this._latest}`);
if (latest ?? "" != this._latest) { if (latest ?? "" != this._latest) {
console.log("latest changed")
this._latest = latest ?? ""; this._latest = latest ?? "";
let datastore = AdapterState._instance.data.get(this._name); let datastore = AdapterState._instance.data.get(this._name);
if (!datastore) { if (!datastore) {
util.errMsg(this._name + " has no datastore!"); //util.errMsg(this._name + " has no datastore!");
return; return;
} }
const latestMsg = datastore.messages.get(this._latest); const latestMsg = datastore.messages.get(this._latest);
@ -157,8 +163,11 @@ export class AdapterElement extends HTMLElement {
} }
updateIdxView(latest: string, rootId: string) { updateIdxView(latest: string, rootId: string) {
const existingThread = this.querySelector(`underbbs-thread-summary[data-msg="${rootId}"]`); const existingThread = document.querySelector(`underbbs-thread-summary[data-msg='${rootId}']`);
const thread = this._threads.find(t=>t.root.data.id == rootId); const thread = this._threads.find(t=>t.root.data.id == rootId);
console.log(`looking for thread ${rootId}`)
console.log(`- DOM object: ${existingThread}`);
console.log(`- in memory: ${thread}`);
if (existingThread && thread) { if (existingThread && thread) {
console.log(`updating thread: ${thread.root.data.id} // ${thread.messageCount} NEW`) console.log(`updating thread: ${thread.root.data.id} // ${thread.messageCount} NEW`)
existingThread.setAttribute("data-latest", `${thread.latest}`); existingThread.setAttribute("data-latest", `${thread.latest}`);
@ -273,11 +282,7 @@ export class AdapterElement extends HTMLElement {
if (this._orphans.filter(o=>o.id == msg.id).length == 0) { if (this._orphans.filter(o=>o.id == msg.id).length == 0) {
this._orphans.push(msg); this._orphans.push(msg);
if (msg.replyTo) { if (msg.replyTo) {
// request the parent's data, which will try to adopt this orphan when it comes in this._convoyBatchTimer.queue(k, 2000);
util.authorizedFetch(
"GET",
`/api/adapters/${this._name}/fetch?entity_type=message&entity_id=${msg.replyTo}`,
null);
} }
} }

View file

@ -4,12 +4,14 @@ export class AdapterData {
public directMessages: Map<string, Message>; public directMessages: Map<string, Message>;
public messages: Map<string, Message>; public messages: Map<string, Message>;
public profileCache: Map<string, Author>; public profileCache: Map<string, Author>;
public convoyCache: Map<string, string>;
constructor(protocol: string) { constructor(protocol: string) {
this.protocol = protocol; this.protocol = protocol;
this.messages = new Map<string, Message>(); this.messages = new Map<string, Message>();
this.directMessages = new Map<string, Message>(); this.directMessages = new Map<string, Message>();
this.profileCache = new Map<string, Author>(); this.profileCache = new Map<string, Author>();
this.convoyCache = new Map<string, string>();
} }
} }

View file

@ -0,0 +1,24 @@
export class BatchTimer {
private _batch: string[];
private _timer: number;
private _reqFn: (id: string[])=>void;
constructor(reqFn: (id: string[])=>void) {
this._batch = [];
this._timer = new Date().getTime();
this._reqFn = reqFn;
}
public queue(id: string, timeout: number){
this._timer = new Date().getTime() + timeout;
this._batch.push(id);
setTimeout(this.checkBatch, timeout);
}
private checkBatch() {
if ((new Date()).getTime() >= this._timer) {
this._reqFn(this._batch);
this._batch = [];
}
}
}

View file

@ -125,7 +125,7 @@ export class SettingsElement extends HTMLElement {
self._adapters.push(adapterdata.nickname); self._adapters.push(adapterdata.nickname);
localStorage.setItem("settings", JSON.stringify(settings)); localStorage.setItem("settings", JSON.stringify(settings));
self.showSettings(self); self.showSettings(self)();
} }
} }
} }

View file

@ -1,4 +1,5 @@
import { DatagramSocket } from './websocket' import { DatagramSocket } from './websocket'
import { BatchTimer } from './batch-timer'
function _(key: string, value: any | null | undefined = undefined): any | null { function _(key: string, value: any | null | undefined = undefined): any | null {
const x = <any>window; const x = <any>window;

View file

@ -11,8 +11,8 @@ type Datagram struct {
Adapter string `json:"adapter"` Adapter string `json:"adapter"`
Type string `json:"type"` Type string `json:"type"`
Target *string `json:"target,omitempty"` Target *string `json:"target,omitempty"`
Created int64 `json:"created"` Created int64 `json:"created"`
Updated *int64 `json:"updated,omitempty"` Updated *int64 `json:"updated,omitempty"`
} }
type Message struct { type Message struct {
@ -36,11 +36,11 @@ type Author struct {
} }
type Attachment struct { type Attachment struct {
Src string `json:"src"` Src string `json:"src"`
ThumbSrc string `json:"thumbSrc"` ThumbSrc string `json:"thumbSrc"`
Desc string `json:"desc"` Desc string `json:"desc"`
Created int64 `json:"created"` Created int64 `json:"created"`
Size uint64 `json:"size"` Size uint64 `json:"size"`
} }
type SocketData interface { type SocketData interface {

View file

@ -167,7 +167,7 @@ func apiAdapterFetch(next http.Handler, subscribers map[*Subscriber][]adapter.Ad
queryParams := req.URL.Query() queryParams := req.URL.Query()
for _, a := range subscribers[s] { for _, a := range subscribers[s] {
if a.Name() == apiParams["adapter_id"] { if a.Name() == apiParams["adapter_id"] {
err := a.Fetch(queryParams["entity_type"][0], queryParams["entity_id"][0]) err := a.Fetch(queryParams["entity_type"][0], queryParams["entity_id"])
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)