nostr subscriptions working

This commit is contained in:
Iris Lightshard 2024-06-02 10:17:12 -06:00
parent 63b79409f4
commit 27c9e9bc59
Signed by: nilix
GPG Key ID: 688407174966CAF3
5 changed files with 63 additions and 16 deletions

View File

@ -22,22 +22,20 @@ type Adapter interface {
} }
type NostrAdapter struct { type NostrAdapter struct {
msgChan chan SocketData data chan SocketData
nickname string nickname string
privkey string privkey string
relays []*nostr.Relay relays []*nostr.Relay
} }
func (self *NostrAdapter) Init(settings Settings, msgChan chan SocketData) error { func (self *NostrAdapter) Init(settings Settings, data chan SocketData) error {
self.nickname = settings.Nickname self.nickname = settings.Nickname
self.privkey = *settings.PrivKey self.privkey = *settings.PrivKey
self.msgChan = msgChan self.data = data
ctx := context.Background() ctx := context.Background()
relays := strings.Split(*settings.Relays, ",") for _, r := range settings.Relays {
for _, r := range relays {
pr, _ := nostr.RelayConnect(ctx, strings.Trim(r, " ")) pr, _ := nostr.RelayConnect(ctx, strings.Trim(r, " "))
if pr == nil { if pr == nil {
return errors.New("Relay connection could not be completed") return errors.New("Relay connection could not be completed")
@ -54,31 +52,37 @@ func (self *NostrAdapter) Subscribe(filter string) []error {
return []error{err} return []error{err}
} }
errs := make([]error, 1) errs := make([]error, 0)
fmt.Print("unmarshalled filter from json; iterating through relays to subscribe..")
for _, r := range self.relays { for _, r := range self.relays {
fmt.Print(".")
sub, err := r.Subscribe(context.Background(), filters) sub, err := r.Subscribe(context.Background(), filters)
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
} else { } else {
go func() { go func() {
for ev := range sub.Events { for ev := range sub.Events {
fmt.Print("!")
// try sequentially to encode into an underbbs object // try sequentially to encode into an underbbs object
// and send it to the appropriate channel // and send it to the appropriate channel
m, err := nostrEventToMsg(ev) m, err := nostrEventToMsg(ev)
if err == nil { if err == nil {
self.msgChan <- m self.data <- m
} }
} }
}() }()
} }
fmt.Println()
} }
if len(errs) > 0 { if len(errs) > 0 {
fmt.Println("subscription operation completed with errors")
return errs return errs
} }
fmt.Println("subscription operation completed without errors")
return nil return nil
} }
func (self *NostrAdapter) SendMessage(msg Message) error { func (self *NostrAdapter) SendMessage(msg Message) error {
@ -98,7 +102,7 @@ func (self *NostrAdapter) UpdateMetadata(data interface{}) error {
} }
func (self *NostrAdapter) DefaultSubscriptionFilter() string { func (self *NostrAdapter) DefaultSubscriptionFilter() string {
return "[{'iDs':null,'kinds':[1],'authors':null,'tags':nil,'since':nil,'until':nil,'limit':0}]" return "[{\"kinds\":[1]}]"
} }
func nostrEventToMsg(evt *nostr.Event) (Message, error) { func nostrEventToMsg(evt *nostr.Event) (Message, error) {

View File

@ -3,8 +3,8 @@ package models
type Settings struct { type Settings struct {
Nickname string Nickname string
Protocol string Protocol string
PrivKey *string `json:"privkey",omitempty` PrivKey *string `json:"privkey",omitempty`
Relays *string `json:"relays",omitempty` Relays []string `json:"relays",omitempty`
Server *string `json:"server",omitempty` Server *string `json:"server",omitempty`
ApiKey *string `json:"apiKey",omitempty` ApiKey *string `json:"apiKey",omitempty`
} }

View File

@ -3,6 +3,7 @@ package server
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
"forge.lightcrystal.systems/lightcrystal/underbbs/models" "forge.lightcrystal.systems/lightcrystal/underbbs/models"
"hacklab.nilfm.cc/quartzgun/renderer" "hacklab.nilfm.cc/quartzgun/renderer"
@ -32,12 +33,16 @@ func getSubscriberByKey(key string, subscribers map[*Subscriber][]adapter.Adapte
func setAdaptersForSubscriber(key string, adapters []adapter.Adapter, subscribers map[*Subscriber][]adapter.Adapter) error { func setAdaptersForSubscriber(key string, adapters []adapter.Adapter, subscribers map[*Subscriber][]adapter.Adapter) error {
var ptr *Subscriber = nil var ptr *Subscriber = nil
fmt.Print("looking for subscriber in map..")
for s, _ := range subscribers { for s, _ := range subscribers {
fmt.Print(".")
if s.key == key { if s.key == key {
ptr = s ptr = s
} }
} }
fmt.Println()
if ptr != nil { if ptr != nil {
fmt.Println("setting adaters for the found subscriber: " + ptr.key)
subscribers[ptr] = adapters subscribers[ptr] = adapters
return nil return nil
} }
@ -82,9 +87,28 @@ func apiConfigureAdapters(next http.Handler, subscribers map[*Subscriber][]adapt
util.AddContextValue(req, "data", err.Error()) util.AddContextValue(req, "data", err.Error())
w.WriteHeader(500) w.WriteHeader(500)
next.ServeHTTP(w, req) next.ServeHTTP(w, req)
return
} }
fmt.Println("adapter initialized - subscribing with default filter")
errs := a.Subscribe(a.DefaultSubscriptionFilter())
if errs != nil {
errMsg := ""
for _, e := range errs {
fmt.Println("processing an error")
errMsg += fmt.Sprintf("- %s\n", e.Error())
}
util.AddContextValue(req, "data", errMsg)
w.WriteHeader(500)
next.ServeHTTP(w, req)
return
}
fmt.Println("adapter ready for use; adding to array")
adapters = append(adapters, a) adapters = append(adapters, a)
fmt.Println("adapter added to array")
} }
// TODO: cancel subscriptions on any existing adapters // TODO: cancel subscriptions on any existing adapters

View File

@ -3,6 +3,7 @@ package server
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
"forge.lightcrystal.systems/lightcrystal/underbbs/models" "forge.lightcrystal.systems/lightcrystal/underbbs/models"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -88,19 +89,21 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request)
defer c.Close(websocket.StatusInternalError, "") defer c.Close(websocket.StatusInternalError, "")
go func() { go func() {
fmt.Println("waiting for data on the subscriber's channel")
for { for {
select { select {
case msg := <-s.msgs: case msg := <-s.msgs:
writeTimeout(ctx, time.Second*5, c, msg) writeTimeout(ctx, time.Second*5, c, msg)
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("subscriber has disconnected")
return //ctx.Err() return //ctx.Err()
} }
} }
}() }()
// give user their key // give user their key
s.msgs <- []byte("{ 'key':'" + s.key + "' }") s.msgs <- []byte("{ \"key\":\"" + s.key + "\" }")
// block on the data channel, serializing and passing the data to the subscriber // block on the data channel, serializing and passing the data to the subscriber
listen([]chan models.SocketData{s.data}, s.msgs) listen([]chan models.SocketData{s.data}, s.msgs)

View File

@ -200,17 +200,33 @@ function saveAdapter(): void {
let _conn: WebSocket | null = null; let _conn: WebSocket | null = null;
async function authorizedFetch(method: string, uri: string, body: any): Promise<Response> {
const headers = new Headers()
headers.set('Authorization', 'Bearer ' + _("skey"))
return await fetch(uri, {
method: method,
headers: headers,
body: body,
})
}
function connect() { function connect() {
// open the websocket connection with settings as subprotocol // open the websocket connection with settings as subprotocol
const wsProto = location.protocol == "https:" ? "wss" : "ws"; const wsProto = location.protocol == "https:" ? "wss" : "ws";
_conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, ""); _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, "underbbs");
_conn.addEventListener("open", (e: any) => { _conn.addEventListener("open", (e: any) => {
console.log("websocket connection opened"); console.log("websocket connection opened");
console.log(JSON.stringify(e)); console.log(JSON.stringify(e));
}); });
_conn.addEventListener("message", (e: any) => { _conn.addEventListener("message", (e: any) => {
console.log(e) console.log(e)
const data = JSON.parse(e.data);
if (data.key) {
_("skey", data.key)
authorizedFetch("POST", "/api/adapters", JSON.stringify(_("settings").adapters))
}
}); });
_conn.addEventListener("error", (e: any) => { _conn.addEventListener("error", (e: any) => {
console.log("websocket connection error"); console.log("websocket connection error");