From 27c9e9bc59eb4423d7a19e9157b2c288ccbe8568 Mon Sep 17 00:00:00 2001 From: Iris Lightshard Date: Sun, 2 Jun 2024 10:17:12 -0600 Subject: [PATCH] nostr subscriptions working --- adapter/adapter.go | 24 ++++++++++++++---------- models/settings.go | 8 ++++---- server/api.go | 24 ++++++++++++++++++++++++ server/server.go | 5 ++++- ts/index.ts | 18 +++++++++++++++++- 5 files changed, 63 insertions(+), 16 deletions(-) diff --git a/adapter/adapter.go b/adapter/adapter.go index e5e21f8..1309fbe 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -22,22 +22,20 @@ type Adapter interface { } type NostrAdapter struct { - msgChan chan SocketData + data chan SocketData nickname string privkey string relays []*nostr.Relay } -func (self *NostrAdapter) Init(settings Settings, msgChan chan SocketData) error { +func (self *NostrAdapter) Init(settings Settings, data chan SocketData) error { self.nickname = settings.Nickname self.privkey = *settings.PrivKey - self.msgChan = msgChan + self.data = data ctx := context.Background() - relays := strings.Split(*settings.Relays, ",") - - for _, r := range relays { + for _, r := range settings.Relays { pr, _ := nostr.RelayConnect(ctx, strings.Trim(r, " ")) if pr == nil { return errors.New("Relay connection could not be completed") @@ -54,31 +52,37 @@ func (self *NostrAdapter) Subscribe(filter string) []error { return []error{err} } - errs := make([]error, 1) + errs := make([]error, 0) + + fmt.Print("unmarshalled filter from json; iterating through relays to subscribe..") for _, r := range self.relays { - + fmt.Print(".") sub, err := r.Subscribe(context.Background(), filters) if err != nil { errs = append(errs, err) } else { go func() { for ev := range sub.Events { + fmt.Print("!") // try sequentially to encode into an underbbs object // and send it to the appropriate channel m, err := nostrEventToMsg(ev) if err == nil { - self.msgChan <- m + self.data <- m } } }() } + fmt.Println() } if len(errs) > 0 { + fmt.Println("subscription operation completed with errors") return errs } + fmt.Println("subscription operation completed without errors") return nil } func (self *NostrAdapter) SendMessage(msg Message) error { @@ -98,7 +102,7 @@ func (self *NostrAdapter) UpdateMetadata(data interface{}) error { } func (self *NostrAdapter) DefaultSubscriptionFilter() string { - return "[{'iDs':null,'kinds':[1],'authors':null,'tags':nil,'since':nil,'until':nil,'limit':0}]" + return "[{\"kinds\":[1]}]" } func nostrEventToMsg(evt *nostr.Event) (Message, error) { diff --git a/models/settings.go b/models/settings.go index d8640eb..79a7412 100644 --- a/models/settings.go +++ b/models/settings.go @@ -3,8 +3,8 @@ package models type Settings struct { Nickname string Protocol string - PrivKey *string `json:"privkey",omitempty` - Relays *string `json:"relays",omitempty` - Server *string `json:"server",omitempty` - ApiKey *string `json:"apiKey",omitempty` + PrivKey *string `json:"privkey",omitempty` + Relays []string `json:"relays",omitempty` + Server *string `json:"server",omitempty` + ApiKey *string `json:"apiKey",omitempty` } diff --git a/server/api.go b/server/api.go index fb51793..055021e 100644 --- a/server/api.go +++ b/server/api.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "errors" + "fmt" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/models" "hacklab.nilfm.cc/quartzgun/renderer" @@ -32,12 +33,16 @@ func getSubscriberByKey(key string, subscribers map[*Subscriber][]adapter.Adapte func setAdaptersForSubscriber(key string, adapters []adapter.Adapter, subscribers map[*Subscriber][]adapter.Adapter) error { var ptr *Subscriber = nil + fmt.Print("looking for subscriber in map..") for s, _ := range subscribers { + fmt.Print(".") if s.key == key { ptr = s } } + fmt.Println() if ptr != nil { + fmt.Println("setting adaters for the found subscriber: " + ptr.key) subscribers[ptr] = adapters return nil } @@ -82,9 +87,28 @@ func apiConfigureAdapters(next http.Handler, subscribers map[*Subscriber][]adapt util.AddContextValue(req, "data", err.Error()) w.WriteHeader(500) next.ServeHTTP(w, req) + return } + fmt.Println("adapter initialized - subscribing with default filter") + + errs := a.Subscribe(a.DefaultSubscriptionFilter()) + if errs != nil { + errMsg := "" + for _, e := range errs { + fmt.Println("processing an error") + errMsg += fmt.Sprintf("- %s\n", e.Error()) + } + util.AddContextValue(req, "data", errMsg) + w.WriteHeader(500) + next.ServeHTTP(w, req) + return + } + + fmt.Println("adapter ready for use; adding to array") + adapters = append(adapters, a) + fmt.Println("adapter added to array") } // TODO: cancel subscriptions on any existing adapters diff --git a/server/server.go b/server/server.go index ebc3621..299c699 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "errors" + "fmt" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/models" "golang.org/x/time/rate" @@ -88,19 +89,21 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) defer c.Close(websocket.StatusInternalError, "") go func() { + fmt.Println("waiting for data on the subscriber's channel") for { select { case msg := <-s.msgs: writeTimeout(ctx, time.Second*5, c, msg) case <-ctx.Done(): + fmt.Println("subscriber has disconnected") return //ctx.Err() } } }() // give user their key - s.msgs <- []byte("{ 'key':'" + s.key + "' }") + s.msgs <- []byte("{ \"key\":\"" + s.key + "\" }") // block on the data channel, serializing and passing the data to the subscriber listen([]chan models.SocketData{s.data}, s.msgs) diff --git a/ts/index.ts b/ts/index.ts index 82f8ef3..5d34efa 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -200,17 +200,33 @@ function saveAdapter(): void { let _conn: WebSocket | null = null; +async function authorizedFetch(method: string, uri: string, body: any): Promise { + const headers = new Headers() + headers.set('Authorization', 'Bearer ' + _("skey")) + return await fetch(uri, { + method: method, + headers: headers, + body: body, + }) +} + function connect() { // open the websocket connection with settings as subprotocol const wsProto = location.protocol == "https:" ? "wss" : "ws"; - _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, ""); + _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, "underbbs"); _conn.addEventListener("open", (e: any) => { console.log("websocket connection opened"); console.log(JSON.stringify(e)); }); _conn.addEventListener("message", (e: any) => { + console.log(e) + const data = JSON.parse(e.data); + if (data.key) { + _("skey", data.key) + authorizedFetch("POST", "/api/adapters", JSON.stringify(_("settings").adapters)) + } }); _conn.addEventListener("error", (e: any) => { console.log("websocket connection error");