package server import ( "context" "encoding/base64" "encoding/json" "errors" "forge.lightcrystal.systems/lightcrystal/underbbs/adapter" "forge.lightcrystal.systems/lightcrystal/underbbs/models" "golang.org/x/time/rate" "hacklab.nilfm.cc/quartzgun/renderer" "io/ioutil" "log" "net/http" "nhooyr.io/websocket" "sync" "time" ) type Subscriber struct { msgs chan []byte closeSlow func() } type BBSServer struct { subscribeMessageBuffer int publishLimiter *rate.Limiter logf func(f string, v ...interface{}) serveMux http.ServeMux subscribersLock sync.Mutex subscribers map[*Subscriber][]adapter.Adapter } func New() *BBSServer { srvr := &BBSServer{ subscribeMessageBuffer: 16, logf: log.Printf, subscribers: make(map[*Subscriber][]adapter.Adapter), } // frontend is here srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist"))) // websocket srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler) srvr.serveMux.HandleFunc("/publish", srvr.publishHandler) return srvr } func (self *BBSServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { self.serveMux.ServeHTTP(w, r) } func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ Subprotocols: []string{}, }) if err != nil { self.logf("%v", err) return } // decode subprotocol data into settings objects data := c.Subprotocol() // base64 decode decoded, err := base64.StdEncoding.DecodeString(data) if err != nil { c.Close(3000, err.Error()) return } settings := []models.Settings{} // unmarshal the json into the settings array err = json.Unmarshal([]byte(decoded), &settings) if err != nil { c.Close(3001, err.Error()) return } msgc := make(chan models.SocketData) // attempt to initialize adapters adapters := make([]adapter.Adapter, 0, 4) for _, s := range settings { var a adapter.Adapter switch s.Protocol { case "nostr": a = &adapter.NostrAdapter{} break case "masto": break default: break } err := a.Init(s, msgc) if err != nil { c.Close(3002, err.Error()) return } adapters = append(adapters, a) } // keep reference to the adapters so we can execute commands on them later ctx := r.Context() ctx = c.CloseRead(ctx) s := &Subscriber{ msgs: make(chan []byte, self.subscribeMessageBuffer), closeSlow: func() { c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") }, } self.addSubscriber(s, adapters) defer self.deleteSubscriber(s) defer c.Close(websocket.StatusInternalError, "") // collect data from the channels injected into the adapters, // and send it back to the client // we block until we stop receiving data on all the input channels listen([]chan models.SocketData{msgc}, s.msgs) if errors.Is(err, context.Canceled) { return } if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { return } if err != nil { self.logf("%v", err) return } } func listen(channels []chan models.SocketData, out chan []byte) { var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) go func(ch <-chan models.SocketData) { defer wg.Done() for data := range ch { out <- data.ToDatagram() } }(ch) } wg.Wait() close(out) } func (self *BBSServer) publishHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return } body := http.MaxBytesReader(w, r.Body, 8192) msg, err := ioutil.ReadAll(body) if err != nil { http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } self.publish(msg) w.WriteHeader(http.StatusAccepted) } func (self *BBSServer) publish(msg []byte) { self.subscribersLock.Lock() defer self.subscribersLock.Unlock() // send messages to our adapter(s) self.publishLimiter.Wait(context.Background()) // send any response from the adapter(s) back to the client /*for s, k := range self.subscribers { // whatever logic to select which subscriber to send back to }*/ } func (self *BBSServer) addSubscriber(s *Subscriber, k []adapter.Adapter) { self.subscribersLock.Lock() self.subscribers[s] = k self.subscribersLock.Unlock() } func (self *BBSServer) deleteSubscriber(s *Subscriber) { self.subscribersLock.Lock() delete(self.subscribers, s) self.subscribersLock.Unlock() } func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return c.Write(ctx, websocket.MessageText, msg) }