open websocket and get subscriber key
This commit is contained in:
parent
89df01f089
commit
8f2b281d82
7 changed files with 134 additions and 73 deletions
|
@ -18,6 +18,7 @@ type Adapter interface {
|
||||||
Unfollow(Author) error
|
Unfollow(Author) error
|
||||||
GetFollowers() error
|
GetFollowers() error
|
||||||
UpdateMetadata(interface{}) error
|
UpdateMetadata(interface{}) error
|
||||||
|
DefaultSubscriptionFilter() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type NostrAdapter struct {
|
type NostrAdapter struct {
|
||||||
|
@ -96,6 +97,10 @@ func (self *NostrAdapter) UpdateMetadata(data interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *NostrAdapter) DefaultSubscriptionFilter() string {
|
||||||
|
return "[{'iDs':null,'kinds':[1],'authors':null,'tags':nil,'since':nil,'until':nil,'limit':0}]"
|
||||||
|
}
|
||||||
|
|
||||||
func nostrEventToMsg(evt *nostr.Event) (Message, error) {
|
func nostrEventToMsg(evt *nostr.Event) (Message, error) {
|
||||||
m := Message{
|
m := Message{
|
||||||
Protocol: "nostr",
|
Protocol: "nostr",
|
||||||
|
|
54
server/api.go
Normal file
54
server/api.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"hacklab.nilfm.cc/quartzgun/renderer"
|
||||||
|
"hacklab.nilfm.cc/quartzgun/router"
|
||||||
|
"html/template"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
func apiConfigureAdapters(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.WriteHeader(201)
|
||||||
|
next.ServeHTTP(w, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiGetAdapters(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.WriteHeader(201)
|
||||||
|
next.ServeHTTP(w, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiAdapterSubscribe(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.WriteHeader(201)
|
||||||
|
next.ServeHTTP(w, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProtectWithSubscriberKey(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.WriteHeader(201)
|
||||||
|
next.ServeHTTP(w, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiMux() http.Handler {
|
||||||
|
errTemplate, err := template.New("err").Parse("{{ $params := (.Context).Value \"params\" }}<html><body><h1>ERROR {{ $params.ErrorCode }}</h1><p class='error'>{{ $params.ErrorMessage }}</p></body></html>")
|
||||||
|
if err != nil {
|
||||||
|
panic("error template was malformed")
|
||||||
|
}
|
||||||
|
rtr := &router.Router{
|
||||||
|
Fallback: *errTemplate,
|
||||||
|
}
|
||||||
|
|
||||||
|
// adapters (POST & GET)
|
||||||
|
rtr.Post("/adapters", ProtectWithSubscriberKey(apiConfigureAdapters(renderer.JSON("data"))))
|
||||||
|
rtr.Get("/adapters", ProtectWithSubscriberKey(apiGetAdapters(renderer.JSON("data"))))
|
||||||
|
// adapters/:name/subscribe
|
||||||
|
rtr.Post(`/adapters/(?P<id>\S+)/subscribe`, ProtectWithSubscriberKey(apiAdapterSubscribe(renderer.JSON("data"))))
|
||||||
|
|
||||||
|
return http.HandlerFunc(rtr.ServeHTTP)
|
||||||
|
}
|
|
@ -2,23 +2,23 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
|
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
|
||||||
"forge.lightcrystal.systems/lightcrystal/underbbs/models"
|
"forge.lightcrystal.systems/lightcrystal/underbbs/models"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
"hacklab.nilfm.cc/quartzgun/cookie"
|
||||||
"hacklab.nilfm.cc/quartzgun/renderer"
|
"hacklab.nilfm.cc/quartzgun/renderer"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
||||||
|
key string
|
||||||
msgs chan []byte
|
msgs chan []byte
|
||||||
closeSlow func()
|
closeSlow func()
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,9 @@ func New() *BBSServer {
|
||||||
// frontend is here
|
// frontend is here
|
||||||
srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist")))
|
srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist")))
|
||||||
|
|
||||||
|
// api
|
||||||
|
srvr.serveMux.Handle("/api/", http.StripPrefix("/api", apiMux()))
|
||||||
|
|
||||||
// websocket
|
// websocket
|
||||||
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
|
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
|
||||||
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
|
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
|
||||||
|
@ -63,54 +66,17 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode subprotocol data into settings objects
|
|
||||||
data := c.Subprotocol()
|
|
||||||
data = strings.ReplaceAll(data, ".", "=")
|
|
||||||
|
|
||||||
// base64 decode
|
|
||||||
decoded, err := base64.StdEncoding.DecodeString(data)
|
|
||||||
if err != nil {
|
|
||||||
c.Close(3000, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
settings := []models.Settings{}
|
|
||||||
// unmarshal the json into the settings array
|
|
||||||
err = json.Unmarshal([]byte(decoded), &settings)
|
|
||||||
if err != nil {
|
|
||||||
c.Close(3001, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
msgc := make(chan models.SocketData)
|
msgc := make(chan models.SocketData)
|
||||||
|
|
||||||
// attempt to initialize adapters
|
// attempt to initialize adapters
|
||||||
adapters := make([]adapter.Adapter, 0, 4)
|
adapters := make([]adapter.Adapter, 0, 4)
|
||||||
for _, s := range settings {
|
|
||||||
var a adapter.Adapter
|
|
||||||
switch s.Protocol {
|
|
||||||
case "nostr":
|
|
||||||
a = &adapter.NostrAdapter{}
|
|
||||||
break
|
|
||||||
case "masto":
|
|
||||||
break
|
|
||||||
default:
|
|
||||||
break
|
|
||||||
}
|
|
||||||
err := a.Init(s, msgc)
|
|
||||||
if err != nil {
|
|
||||||
c.Close(3002, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
adapters = append(adapters, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep reference to the adapters so we can execute commands on them later
|
// keep reference to the adapters so we can execute commands on them later
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
ctx = c.CloseRead(ctx)
|
ctx = c.CloseRead(ctx)
|
||||||
|
|
||||||
s := &Subscriber{
|
s := &Subscriber{
|
||||||
|
key: cookie.GenToken(64),
|
||||||
msgs: make(chan []byte, self.subscribeMessageBuffer),
|
msgs: make(chan []byte, self.subscribeMessageBuffer),
|
||||||
closeSlow: func() {
|
closeSlow: func() {
|
||||||
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
|
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
|
||||||
|
@ -119,13 +85,26 @@ func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
self.addSubscriber(s, adapters)
|
self.addSubscriber(s, adapters)
|
||||||
|
|
||||||
defer self.deleteSubscriber(s)
|
fmt.Println("subscriber added to map")
|
||||||
|
|
||||||
|
defer self.deleteSubscriber(s)
|
||||||
defer c.Close(websocket.StatusInternalError, "")
|
defer c.Close(websocket.StatusInternalError, "")
|
||||||
|
|
||||||
// collect data from the channels injected into the adapters,
|
go func() {
|
||||||
// and send it back to the client
|
for {
|
||||||
// we block until we stop receiving data on all the input channels
|
select {
|
||||||
|
case msg := <-s.msgs:
|
||||||
|
writeTimeout(ctx, time.Second*5, c, msg)
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return //ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
fmt.Println("giving subscriber their key: " + s.key)
|
||||||
|
s.msgs <- []byte("{ 'key':'" + s.key + " }")
|
||||||
|
fmt.Println("subscriber key sent, listening on the data channel")
|
||||||
|
|
||||||
listen([]chan models.SocketData{msgc}, s.msgs)
|
listen([]chan models.SocketData{msgc}, s.msgs)
|
||||||
|
|
||||||
|
|
|
@ -1,16 +1,27 @@
|
||||||
import NDK, {NDKPrivateKeySigner} from "@nostr-dev-kit/ndk";
|
import NDK, {NDKPrivateKeySigner} from "@nostr-dev-kit/ndk";
|
||||||
import * as nip19 from 'nostr-tools/nip19'
|
import * as nip19 from 'nostr-tools/nip19'
|
||||||
import { createRestAPIClient } from "masto";
|
import { createRestAPIClient, createStreamingAPIClient } from "masto";
|
||||||
import * as masto from "masto";
|
import * as masto from "masto";
|
||||||
|
|
||||||
type MastodonClient = masto.mastodon.rest.Client;
|
type MastodonClient = masto.mastodon.rest.Client;
|
||||||
|
type MastodonStreamClient = masto.mastodon.streaming.Client;
|
||||||
|
|
||||||
|
export class MastodonCompoundClient {
|
||||||
|
public rest: MastodonClient;
|
||||||
|
public stream: MastodonStreamClient;
|
||||||
|
|
||||||
|
public constructor(c: MastodonClient, s: MastodonStreamClient) {
|
||||||
|
this.rest = c;
|
||||||
|
this.stream = s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export class Adapter {
|
export class Adapter {
|
||||||
public nickname: string = "";
|
public nickname: string = "";
|
||||||
public protocol: string = "";
|
public protocol: string = "";
|
||||||
public identity: any | null;
|
public identity: any | null;
|
||||||
|
|
||||||
private _self: NDK | MastodonClient | null = null ;
|
private _self: NDK | MastodonCompoundClient | null = null ;
|
||||||
|
|
||||||
public init(): void {};
|
public init(): void {};
|
||||||
public getInbox(): void {};
|
public getInbox(): void {};
|
||||||
|
@ -67,21 +78,34 @@ export class Adapter {
|
||||||
adapter.nickname = settings.nickname;
|
adapter.nickname = settings.nickname;
|
||||||
|
|
||||||
adapter.init = () => {
|
adapter.init = () => {
|
||||||
adapter._self = createRestAPIClient({
|
const rawServer: string = adapter.identity.server.split("://")[1];
|
||||||
|
|
||||||
|
adapter._self = new MastodonCompoundClient(createRestAPIClient({
|
||||||
url: adapter.identity.server,
|
url: adapter.identity.server,
|
||||||
accessToken: adapter.identity.apiKey
|
accessToken: adapter.identity.apiKey
|
||||||
});
|
}),
|
||||||
|
createStreamingAPIClient({
|
||||||
|
streamingApiUrl: `https://${rawServer}/v1/api/streaming`,
|
||||||
|
accessToken: adapter.identity.apiKey
|
||||||
|
}));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
adapter.getInbox = async () => {
|
adapter.getInbox = async () => {
|
||||||
let i = 0;
|
const rawServer: string = adapter.identity.server.split("://")[1];
|
||||||
for await (const statuses of (adapter._self as MastodonClient).v1.timelines.public.list()) {
|
let conn = new WebSocket(`wss://${rawServer}/streaming/?i=${adapter.identity.apiKey}`)
|
||||||
for (const status of statuses) {
|
conn.addEventListener("open", async (e:any)=> {
|
||||||
console.log(status);
|
console.log(e);
|
||||||
i++;
|
let filter = { type: "connect", body: { channel: "localTimeline", id: crypto.randomUUID() }};
|
||||||
}
|
let data = await JSON.stringify(filter);
|
||||||
if (i >= 10) break;
|
console.log(data);
|
||||||
}
|
conn.send(data);
|
||||||
|
conn.addEventListener("message", (e:any)=>{console.log(e)});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return adapter;
|
return adapter;
|
||||||
|
|
17
ts/index.ts
17
ts/index.ts
|
@ -201,32 +201,23 @@ function saveAdapter(): void {
|
||||||
let _conn: WebSocket | null = null;
|
let _conn: WebSocket | null = null;
|
||||||
|
|
||||||
function connect() {
|
function connect() {
|
||||||
// import the data from the settings
|
|
||||||
const settings = _("settings");
|
|
||||||
if (settings) {
|
|
||||||
|
|
||||||
// base64 encode the settings data
|
|
||||||
let subprotocol: string = "[";
|
|
||||||
for (let a of settings.adapters) {
|
|
||||||
subprotocol += JSON.stringify(a) + ",";
|
|
||||||
}
|
|
||||||
subprotocol += "]";
|
|
||||||
subprotocol = btoa(subprotocol).replace("=", ".");
|
|
||||||
|
|
||||||
// open the websocket connection with settings as subprotocol
|
// open the websocket connection with settings as subprotocol
|
||||||
const wsProto = location.protocol == "https:" ? "wss" : "ws";
|
const wsProto = location.protocol == "https:" ? "wss" : "ws";
|
||||||
_conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, subprotocol);
|
_conn = new WebSocket(`${wsProto}://${location.host}/subscribe`, "");
|
||||||
_conn.addEventListener("open", (e: any) => {
|
_conn.addEventListener("open", (e: any) => {
|
||||||
console.log("websocket connection opened");
|
console.log("websocket connection opened");
|
||||||
console.log(JSON.stringify(e));
|
console.log(JSON.stringify(e));
|
||||||
});
|
});
|
||||||
|
_conn.addEventListener("message", (e: any) => {
|
||||||
|
console.log(e)
|
||||||
|
});
|
||||||
_conn.addEventListener("error", (e: any) => {
|
_conn.addEventListener("error", (e: any) => {
|
||||||
console.log("websocket connection error");
|
console.log("websocket connection error");
|
||||||
console.log(JSON.stringify(e));
|
console.log(JSON.stringify(e));
|
||||||
});
|
});
|
||||||
_("websocket", _conn);
|
_("websocket", _conn);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
_("addAdapter", addAdapter);
|
_("addAdapter", addAdapter);
|
||||||
_("saveAdapter", saveAdapter);
|
_("saveAdapter", saveAdapter);
|
||||||
|
|
|
@ -5,5 +5,13 @@ export type {};
|
||||||
declare const self: ServiceWorkerGlobalScope;
|
declare const self: ServiceWorkerGlobalScope;
|
||||||
|
|
||||||
self.addEventListener('sync', (e: any) => {
|
self.addEventListener('sync', (e: any) => {
|
||||||
console.log(e);
|
console.log("I won't see this");
|
||||||
|
});
|
||||||
|
|
||||||
|
self.addEventListener("install", (e: any) => {
|
||||||
|
console.log("installation proceeding...")
|
||||||
|
});
|
||||||
|
|
||||||
|
self.addEventListener("activate", (e: any) => {
|
||||||
|
console.log("SW activated!");
|
||||||
});
|
});
|
0
ts/websocket.ts
Normal file
0
ts/websocket.ts
Normal file
Loading…
Reference in a new issue