From 8f2b281d820c03a6ff656a04462af06c2cfef27a Mon Sep 17 00:00:00 2001 From: Iris Lightshard Date: Sun, 26 May 2024 12:50:19 -0600 Subject: [PATCH] open websocket and get subscriber key --- adapter/adapter.go | 5 ++++ server/api.go | 54 +++++++++++++++++++++++++++++++++++ server/server.go | 69 ++++++++++++++++----------------------------- ts/adapter.ts | 48 +++++++++++++++++++++++-------- ts/index.ts | 19 ++++--------- ts/serviceWorker.ts | 12 ++++++-- ts/websocket.ts | 0 7 files changed, 134 insertions(+), 73 deletions(-) create mode 100644 server/api.go create mode 100644 ts/websocket.ts diff --git a/adapter/adapter.go b/adapter/adapter.go index 0d1c832..e5e21f8 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -18,6 +18,7 @@ type Adapter interface { Unfollow(Author) error GetFollowers() error UpdateMetadata(interface{}) error + DefaultSubscriptionFilter() string } type NostrAdapter struct { @@ -96,6 +97,10 @@ func (self *NostrAdapter) UpdateMetadata(data interface{}) error { return nil } +func (self *NostrAdapter) DefaultSubscriptionFilter() string { + return "[{'iDs':null,'kinds':[1],'authors':null,'tags':nil,'since':nil,'until':nil,'limit':0}]" +} + func nostrEventToMsg(evt *nostr.Event) (Message, error) { m := Message{ Protocol: "nostr", diff --git a/server/api.go b/server/api.go new file mode 100644 index 0000000..57284f5 --- /dev/null +++ b/server/api.go @@ -0,0 +1,54 @@ +package server + +import ( + "hacklab.nilfm.cc/quartzgun/renderer" + "hacklab.nilfm.cc/quartzgun/router" + "html/template" + "net/http" +) + +func apiConfigureAdapters(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(201) + next.ServeHTTP(w, req) + }) +} + +func apiGetAdapters(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(201) + next.ServeHTTP(w, req) + }) +} + +func apiAdapterSubscribe(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(201) + next.ServeHTTP(w, req) + }) +} + +func ProtectWithSubscriberKey(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(201) + next.ServeHTTP(w, req) + }) +} + +func apiMux() http.Handler { + errTemplate, err := template.New("err").Parse("{{ $params := (.Context).Value \"params\" }}

ERROR {{ $params.ErrorCode }}

{{ $params.ErrorMessage }}

") + if err != nil { + panic("error template was malformed") + } + rtr := &router.Router{ + Fallback: *errTemplate, + } + + // adapters (POST & GET) + rtr.Post("/adapters", ProtectWithSubscriberKey(apiConfigureAdapters(renderer.JSON("data")))) + rtr.Get("/adapters", ProtectWithSubscriberKey(apiGetAdapters(renderer.JSON("data")))) + // adapters/:name/subscribe + rtr.Post(`/adapters/(?P\S+)/subscribe`, ProtectWithSubscriberKey(apiAdapterSubscribe(renderer.JSON("data")))) + + return http.HandlerFunc(rtr.ServeHTTP) +} diff --git a/server/server.go b/server/server.go index 4c4b0d2..b0e2521 100644 --- a/server/server.go +++ b/server/server.go @@ -2,23 +2,23 @@ package server import ( "context" - "encoding/base64" - "encoding/json" "errors" + "fmt" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/models" "golang.org/x/time/rate" + "hacklab.nilfm.cc/quartzgun/cookie" "hacklab.nilfm.cc/quartzgun/renderer" "io/ioutil" "log" "net/http" "nhooyr.io/websocket" - "strings" "sync" "time" ) type Subscriber struct { + key string msgs chan []byte closeSlow func() } @@ -42,6 +42,9 @@ func New() *BBSServer { // frontend is here srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist"))) + // api + srvr.serveMux.Handle("/api/", http.StripPrefix("/api", apiMux())) + // websocket srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler) srvr.serveMux.HandleFunc("/publish", srvr.publishHandler) @@ -63,54 +66,17 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) return } - // decode subprotocol data into settings objects - data := c.Subprotocol() - data = strings.ReplaceAll(data, ".", "=") - - // base64 decode - decoded, err := base64.StdEncoding.DecodeString(data) - if err != nil { - c.Close(3000, err.Error()) - return - } - - settings := []models.Settings{} - // unmarshal the json into the settings array - err = json.Unmarshal([]byte(decoded), &settings) - if err != nil { - c.Close(3001, err.Error()) - return - } - msgc := make(chan models.SocketData) // attempt to initialize adapters adapters := make([]adapter.Adapter, 0, 4) - for _, s := range settings { - var a adapter.Adapter - switch s.Protocol { - case "nostr": - a = &adapter.NostrAdapter{} - break - case "masto": - break - default: - break - } - err := a.Init(s, msgc) - if err != nil { - c.Close(3002, err.Error()) - return - } - - adapters = append(adapters, a) - } // keep reference to the adapters so we can execute commands on them later ctx := r.Context() ctx = c.CloseRead(ctx) s := &Subscriber{ + key: cookie.GenToken(64), msgs: make(chan []byte, self.subscribeMessageBuffer), closeSlow: func() { c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") @@ -119,13 +85,26 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) self.addSubscriber(s, adapters) - defer self.deleteSubscriber(s) + fmt.Println("subscriber added to map") + defer self.deleteSubscriber(s) defer c.Close(websocket.StatusInternalError, "") - // collect data from the channels injected into the adapters, - // and send it back to the client - // we block until we stop receiving data on all the input channels + go func() { + for { + select { + case msg := <-s.msgs: + writeTimeout(ctx, time.Second*5, c, msg) + + case <-ctx.Done(): + return //ctx.Err() + } + } + }() + + fmt.Println("giving subscriber their key: " + s.key) + s.msgs <- []byte("{ 'key':'" + s.key + " }") + fmt.Println("subscriber key sent, listening on the data channel") listen([]chan models.SocketData{msgc}, s.msgs) diff --git a/ts/adapter.ts b/ts/adapter.ts index 25f6f2b..2609381 100644 --- a/ts/adapter.ts +++ b/ts/adapter.ts @@ -1,16 +1,27 @@ import NDK, {NDKPrivateKeySigner} from "@nostr-dev-kit/ndk"; import * as nip19 from 'nostr-tools/nip19' -import { createRestAPIClient } from "masto"; +import { createRestAPIClient, createStreamingAPIClient } from "masto"; import * as masto from "masto"; type MastodonClient = masto.mastodon.rest.Client; +type MastodonStreamClient = masto.mastodon.streaming.Client; + +export class MastodonCompoundClient { + public rest: MastodonClient; + public stream: MastodonStreamClient; + + public constructor(c: MastodonClient, s: MastodonStreamClient) { + this.rest = c; + this.stream = s; + } +} export class Adapter { public nickname: string = ""; public protocol: string = ""; public identity: any | null; - private _self: NDK | MastodonClient | null = null ; + private _self: NDK | MastodonCompoundClient | null = null ; public init(): void {}; public getInbox(): void {}; @@ -67,21 +78,34 @@ export class Adapter { adapter.nickname = settings.nickname; adapter.init = () => { - adapter._self = createRestAPIClient({ + const rawServer: string = adapter.identity.server.split("://")[1]; + + adapter._self = new MastodonCompoundClient(createRestAPIClient({ url: adapter.identity.server, accessToken: adapter.identity.apiKey - }); + }), + createStreamingAPIClient({ + streamingApiUrl: `https://${rawServer}/v1/api/streaming`, + accessToken: adapter.identity.apiKey + })); + } adapter.getInbox = async () => { - let i = 0; - for await (const statuses of (adapter._self as MastodonClient).v1.timelines.public.list()) { - for (const status of statuses) { - console.log(status); - i++; - } - if (i >= 10) break; - } + const rawServer: string = adapter.identity.server.split("://")[1]; + let conn = new WebSocket(`wss://${rawServer}/streaming/?i=${adapter.identity.apiKey}`) + conn.addEventListener("open", async (e:any)=> { + console.log(e); + let filter = { type: "connect", body: { channel: "localTimeline", id: crypto.randomUUID() }}; + let data = await JSON.stringify(filter); + console.log(data); + conn.send(data); + conn.addEventListener("message", (e:any)=>{console.log(e)}); + }); + + + + } return adapter; diff --git a/ts/index.ts b/ts/index.ts index a720c95..82f8ef3 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -201,31 +201,22 @@ function saveAdapter(): void { let _conn: WebSocket | null = null; function connect() { - // import the data from the settings - const settings = _("settings"); - if (settings) { - - // base64 encode the settings data - let subprotocol: string = "["; - for (let a of settings.adapters) { - subprotocol += JSON.stringify(a) + ","; - } - subprotocol += "]"; - subprotocol = btoa(subprotocol).replace("=", "."); - + // open the websocket connection with settings as subprotocol const wsProto = location.protocol == "https:" ? "wss" : "ws"; - _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, subprotocol); + _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, ""); _conn.addEventListener("open", (e: any) => { console.log("websocket connection opened"); console.log(JSON.stringify(e)); }); + _conn.addEventListener("message", (e: any) => { + console.log(e) + }); _conn.addEventListener("error", (e: any) => { console.log("websocket connection error"); console.log(JSON.stringify(e)); }); _("websocket", _conn); - } } _("addAdapter", addAdapter); diff --git a/ts/serviceWorker.ts b/ts/serviceWorker.ts index 208bb94..a623d2f 100644 --- a/ts/serviceWorker.ts +++ b/ts/serviceWorker.ts @@ -5,5 +5,13 @@ export type {}; declare const self: ServiceWorkerGlobalScope; self.addEventListener('sync', (e: any) => { - console.log(e); -}); \ No newline at end of file + console.log("I won't see this"); +}); + +self.addEventListener("install", (e: any) => { + console.log("installation proceeding...") +}); + +self.addEventListener("activate", (e: any) => { + console.log("SW activated!"); +}); diff --git a/ts/websocket.ts b/ts/websocket.ts new file mode 100644 index 0000000..e69de29