From 295df2eeec10a8528ef9166c28b1bcb75bbe74dd Mon Sep 17 00:00:00 2001 From: Iris Lightshard Date: Fri, 10 May 2024 20:54:51 -0600 Subject: [PATCH] copypaste and modify boilerplate from felt --- server/server.go | 158 +++++++++++++++++++++++++++++++++++++++++++++++ underbbs.go | 56 ++++++++++++++++- 2 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 server/server.go diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..0481fbc --- /dev/null +++ b/server/server.go @@ -0,0 +1,158 @@ +package server + +import ( + "forge.lightcrystal.systems/lightcrystal/underbbs/models" + "hacklab.nilfm.cc/quartzgun/auth" + "hacklab.nilfm.cc/quartzgun/renderer" + "net/http" + "nhooyr.io/websocket" +) + + +type Subscriber struct { + msgs chan []byte + closeSlow func() +} + + +type BBSServer struct { + subscribeMessageBuffer int + publishLimiter *rate.Limiter + logf func(f string, v ...interface{}) + serveMux http.ServeMux + subscribersLock sync.Mutex + subscribers map[*Subscriber][]models.Adapter +} + +func New() *BBSServer { + srvr := &BBSServer{ + subscribeMessageBuffer: 16, + logf: log.Printf, + subscribers: make(map[*Subscriber][]model.Adapter), + } + + // frontend is here + srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist"))) + + // websocket + srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler) + srvr.serveMux.HandleFunc("/publish", srvr.publishHandler) + + return srvr +} + +func (self *GameTableServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + self.serveMux.ServeHTTP(w, r) +} + +func (self *GameTableServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { + + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + Subprotocols: [], + }) + if err != nil { + self.logf("%v", err) + return + } + defer c.Close(websocket.StatusInternalError, "") + + err = self.subscribe(r, c) + if errors.Is(err, context.Canceled) { + return + } + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || + websocket.CloseStatus(err) == websocket.StatusGoingAway { + return + } + if err != nil { + self.logf("%v", err) + return + } +} + +func (self *GameTableServer) subscribe(r *http.Request, c *websocket.Conn) error { + ctx := r.Context() + ctx = c.CloseRead(ctx) + + s := &Subscriber{ + msgs: make(chan []byte, self.subscribeMessageBuffer), + closeSlow: func() { + c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") + }, + } + + // validate the subprotocol + + self.addSubscriber(s, tableKey) + + defer self.deleteSubscriber(s) + select { + case s.msgs <- self.getCurrentState(tableKey): + default: + go s.closeSlow() + } + + for { + select { + case msg := <-s.msgs: + err := writeTimeout(ctx, time.Second*5, c, msg) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (self *GameTableServer) publishHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + body := http.MaxBytesReader(w, r.Body, 8192) + msg, err := ioutil.ReadAll(body) + if err != nil { + http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + self.publish(msg) + + w.WriteHeader(http.StatusAccepted) +} + +func (self *GameTableServer) publish(msg []byte) { + self.subscribersLock.Lock() + defer self.subscribersLock.Unlock() + + // send messages to our adapter(s) + + self.publishLimiter.Wait(context.Background()) + + // send any response from the adapter(s) back to the client + + for s, k := range self.subscribers { + // whatever logic to select which subscriber to send back to + } + +} + +func (self *GameTableServer) addSubscriber(s *Subscriber, k models.TableKey) { + self.subscribersLock.Lock() + self.subscribers[s] = k + self.subscribersLock.Unlock() +} + +func (self *GameTableServer) deleteSubscriber(s *Subscriber) { + self.subscribersLock.Lock() + delete(self.subscribers, s) + self.subscribersLock.Unlock() +} + +func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + return c.Write(ctx, websocket.MessageText, msg) +} \ No newline at end of file diff --git a/underbbs.go b/underbbs.go index e0b87a2..f9233bf 100644 --- a/underbbs.go +++ b/underbbs.go @@ -1,5 +1,59 @@ package main +import ( + "context" + "hacklab.nilfm.cc/felt/cmd" + "hacklab.nilfm.cc/felt/config" + "hacklab.nilfm.cc/felt/gametable" + "hacklab.nilfm.cc/felt/mongodb" + "hacklab.nilfm.cc/felt/register" + "hacklab.nilfm.cc/quartzgun/indentalUserDB" + "log" + "net" + "net/http" + "os" + "os/signal" + "path/filepath" + "strconv" + "time" +) + func main() { - return + err := run() + if err != nil { + log.Fatal(err) + } +} + +func run() error { + l, err := net.Listen("tcp", ":"+strconv.FormatInt(int64(cfg.Port), 10)) + if err != nil { + return err + } + + serviceHandler := // something + s := &http.Server{ + Handler: serviceHandler, + ReadTimeout: time.Second * 10, + WriteTimeout: time.Second * 10, + } + + errc := make(chan error, 1) + go func() { + errc <- s.Serve(l) + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt) + select { + case err := <-errc: + log.Printf("failed to serve: %v", err) + case sig := <-sigs: + log.Printf("terminating: %v", sig) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + return s.Shutdown(ctx) }