diff --git a/adapter/adapter.go b/adapter/adapter.go
index 0d1c832..e5e21f8 100644
--- a/adapter/adapter.go
+++ b/adapter/adapter.go
@@ -18,6 +18,7 @@ type Adapter interface {
Unfollow(Author) error
GetFollowers() error
UpdateMetadata(interface{}) error
+ DefaultSubscriptionFilter() string
}
type NostrAdapter struct {
@@ -96,6 +97,10 @@ func (self *NostrAdapter) UpdateMetadata(data interface{}) error {
return nil
}
+func (self *NostrAdapter) DefaultSubscriptionFilter() string {
+ return "[{'iDs':null,'kinds':[1],'authors':null,'tags':nil,'since':nil,'until':nil,'limit':0}]"
+}
+
func nostrEventToMsg(evt *nostr.Event) (Message, error) {
m := Message{
Protocol: "nostr",
diff --git a/server/api.go b/server/api.go
new file mode 100644
index 0000000..57284f5
--- /dev/null
+++ b/server/api.go
@@ -0,0 +1,54 @@
+package server
+
+import (
+ "hacklab.nilfm.cc/quartzgun/renderer"
+ "hacklab.nilfm.cc/quartzgun/router"
+ "html/template"
+ "net/http"
+)
+
+func apiConfigureAdapters(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(201)
+ next.ServeHTTP(w, req)
+ })
+}
+
+func apiGetAdapters(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(201)
+ next.ServeHTTP(w, req)
+ })
+}
+
+func apiAdapterSubscribe(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(201)
+ next.ServeHTTP(w, req)
+ })
+}
+
+func ProtectWithSubscriberKey(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(201)
+ next.ServeHTTP(w, req)
+ })
+}
+
+func apiMux() http.Handler {
+ errTemplate, err := template.New("err").Parse("{{ $params := (.Context).Value \"params\" }}
ERROR {{ $params.ErrorCode }}
{{ $params.ErrorMessage }}
")
+ if err != nil {
+ panic("error template was malformed")
+ }
+ rtr := &router.Router{
+ Fallback: *errTemplate,
+ }
+
+ // adapters (POST & GET)
+ rtr.Post("/adapters", ProtectWithSubscriberKey(apiConfigureAdapters(renderer.JSON("data"))))
+ rtr.Get("/adapters", ProtectWithSubscriberKey(apiGetAdapters(renderer.JSON("data"))))
+ // adapters/:name/subscribe
+ rtr.Post(`/adapters/(?P\S+)/subscribe`, ProtectWithSubscriberKey(apiAdapterSubscribe(renderer.JSON("data"))))
+
+ return http.HandlerFunc(rtr.ServeHTTP)
+}
diff --git a/server/server.go b/server/server.go
index 4c4b0d2..b0e2521 100644
--- a/server/server.go
+++ b/server/server.go
@@ -2,23 +2,23 @@ package server
import (
"context"
- "encoding/base64"
- "encoding/json"
"errors"
+ "fmt"
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
"forge.lightcrystal.systems/lightcrystal/underbbs/models"
"golang.org/x/time/rate"
+ "hacklab.nilfm.cc/quartzgun/cookie"
"hacklab.nilfm.cc/quartzgun/renderer"
"io/ioutil"
"log"
"net/http"
"nhooyr.io/websocket"
- "strings"
"sync"
"time"
)
type Subscriber struct {
+ key string
msgs chan []byte
closeSlow func()
}
@@ -42,6 +42,9 @@ func New() *BBSServer {
// frontend is here
srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist")))
+ // api
+ srvr.serveMux.Handle("/api/", http.StripPrefix("/api", apiMux()))
+
// websocket
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
@@ -63,54 +66,17 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request)
return
}
- // decode subprotocol data into settings objects
- data := c.Subprotocol()
- data = strings.ReplaceAll(data, ".", "=")
-
- // base64 decode
- decoded, err := base64.StdEncoding.DecodeString(data)
- if err != nil {
- c.Close(3000, err.Error())
- return
- }
-
- settings := []models.Settings{}
- // unmarshal the json into the settings array
- err = json.Unmarshal([]byte(decoded), &settings)
- if err != nil {
- c.Close(3001, err.Error())
- return
- }
-
msgc := make(chan models.SocketData)
// attempt to initialize adapters
adapters := make([]adapter.Adapter, 0, 4)
- for _, s := range settings {
- var a adapter.Adapter
- switch s.Protocol {
- case "nostr":
- a = &adapter.NostrAdapter{}
- break
- case "masto":
- break
- default:
- break
- }
- err := a.Init(s, msgc)
- if err != nil {
- c.Close(3002, err.Error())
- return
- }
-
- adapters = append(adapters, a)
- }
// keep reference to the adapters so we can execute commands on them later
ctx := r.Context()
ctx = c.CloseRead(ctx)
s := &Subscriber{
+ key: cookie.GenToken(64),
msgs: make(chan []byte, self.subscribeMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
@@ -119,13 +85,26 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request)
self.addSubscriber(s, adapters)
- defer self.deleteSubscriber(s)
+ fmt.Println("subscriber added to map")
+ defer self.deleteSubscriber(s)
defer c.Close(websocket.StatusInternalError, "")
- // collect data from the channels injected into the adapters,
- // and send it back to the client
- // we block until we stop receiving data on all the input channels
+ go func() {
+ for {
+ select {
+ case msg := <-s.msgs:
+ writeTimeout(ctx, time.Second*5, c, msg)
+
+ case <-ctx.Done():
+ return //ctx.Err()
+ }
+ }
+ }()
+
+ fmt.Println("giving subscriber their key: " + s.key)
+ s.msgs <- []byte("{ 'key':'" + s.key + " }")
+ fmt.Println("subscriber key sent, listening on the data channel")
listen([]chan models.SocketData{msgc}, s.msgs)
diff --git a/ts/adapter.ts b/ts/adapter.ts
index 25f6f2b..2609381 100644
--- a/ts/adapter.ts
+++ b/ts/adapter.ts
@@ -1,16 +1,27 @@
import NDK, {NDKPrivateKeySigner} from "@nostr-dev-kit/ndk";
import * as nip19 from 'nostr-tools/nip19'
-import { createRestAPIClient } from "masto";
+import { createRestAPIClient, createStreamingAPIClient } from "masto";
import * as masto from "masto";
type MastodonClient = masto.mastodon.rest.Client;
+type MastodonStreamClient = masto.mastodon.streaming.Client;
+
+export class MastodonCompoundClient {
+ public rest: MastodonClient;
+ public stream: MastodonStreamClient;
+
+ public constructor(c: MastodonClient, s: MastodonStreamClient) {
+ this.rest = c;
+ this.stream = s;
+ }
+}
export class Adapter {
public nickname: string = "";
public protocol: string = "";
public identity: any | null;
- private _self: NDK | MastodonClient | null = null ;
+ private _self: NDK | MastodonCompoundClient | null = null ;
public init(): void {};
public getInbox(): void {};
@@ -67,21 +78,34 @@ export class Adapter {
adapter.nickname = settings.nickname;
adapter.init = () => {
- adapter._self = createRestAPIClient({
+ const rawServer: string = adapter.identity.server.split("://")[1];
+
+ adapter._self = new MastodonCompoundClient(createRestAPIClient({
url: adapter.identity.server,
accessToken: adapter.identity.apiKey
- });
+ }),
+ createStreamingAPIClient({
+ streamingApiUrl: `https://${rawServer}/v1/api/streaming`,
+ accessToken: adapter.identity.apiKey
+ }));
+
}
adapter.getInbox = async () => {
- let i = 0;
- for await (const statuses of (adapter._self as MastodonClient).v1.timelines.public.list()) {
- for (const status of statuses) {
- console.log(status);
- i++;
- }
- if (i >= 10) break;
- }
+ const rawServer: string = adapter.identity.server.split("://")[1];
+ let conn = new WebSocket(`wss://${rawServer}/streaming/?i=${adapter.identity.apiKey}`)
+ conn.addEventListener("open", async (e:any)=> {
+ console.log(e);
+ let filter = { type: "connect", body: { channel: "localTimeline", id: crypto.randomUUID() }};
+ let data = await JSON.stringify(filter);
+ console.log(data);
+ conn.send(data);
+ conn.addEventListener("message", (e:any)=>{console.log(e)});
+ });
+
+
+
+
}
return adapter;
diff --git a/ts/index.ts b/ts/index.ts
index a720c95..82f8ef3 100644
--- a/ts/index.ts
+++ b/ts/index.ts
@@ -201,31 +201,22 @@ function saveAdapter(): void {
let _conn: WebSocket | null = null;
function connect() {
- // import the data from the settings
- const settings = _("settings");
- if (settings) {
-
- // base64 encode the settings data
- let subprotocol: string = "[";
- for (let a of settings.adapters) {
- subprotocol += JSON.stringify(a) + ",";
- }
- subprotocol += "]";
- subprotocol = btoa(subprotocol).replace("=", ".");
-
+
// open the websocket connection with settings as subprotocol
const wsProto = location.protocol == "https:" ? "wss" : "ws";
- _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, subprotocol);
+ _conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, "");
_conn.addEventListener("open", (e: any) => {
console.log("websocket connection opened");
console.log(JSON.stringify(e));
});
+ _conn.addEventListener("message", (e: any) => {
+ console.log(e)
+ });
_conn.addEventListener("error", (e: any) => {
console.log("websocket connection error");
console.log(JSON.stringify(e));
});
_("websocket", _conn);
- }
}
_("addAdapter", addAdapter);
diff --git a/ts/serviceWorker.ts b/ts/serviceWorker.ts
index 208bb94..a623d2f 100644
--- a/ts/serviceWorker.ts
+++ b/ts/serviceWorker.ts
@@ -5,5 +5,13 @@ export type {};
declare const self: ServiceWorkerGlobalScope;
self.addEventListener('sync', (e: any) => {
- console.log(e);
-});
\ No newline at end of file
+ console.log("I won't see this");
+});
+
+self.addEventListener("install", (e: any) => {
+ console.log("installation proceeding...")
+});
+
+self.addEventListener("activate", (e: any) => {
+ console.log("SW activated!");
+});
diff --git a/ts/websocket.ts b/ts/websocket.ts
new file mode 100644
index 0000000..e69de29