underbbs/server/server.go

191 lines
4.5 KiB
Go
Raw Normal View History

package server
import (
2024-05-19 20:42:28 +00:00
"context"
"errors"
2024-06-02 16:17:12 +00:00
"fmt"
2024-05-19 20:42:28 +00:00
"forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
"forge.lightcrystal.systems/lightcrystal/underbbs/models"
"golang.org/x/time/rate"
2024-05-26 18:50:19 +00:00
"hacklab.nilfm.cc/quartzgun/cookie"
"hacklab.nilfm.cc/quartzgun/renderer"
2024-05-19 20:42:28 +00:00
"io/ioutil"
"log"
"net/http"
2024-05-19 20:42:28 +00:00
"nhooyr.io/websocket"
"sync"
"time"
)
type Subscriber struct {
2024-05-26 18:50:19 +00:00
key string
msgs chan []byte
2024-05-26 23:24:49 +00:00
data chan models.SocketData
closeSlow func()
}
type BBSServer struct {
subscribeMessageBuffer int
publishLimiter *rate.Limiter
logf func(f string, v ...interface{})
serveMux http.ServeMux
subscribersLock sync.Mutex
2024-05-19 20:42:28 +00:00
subscribers map[*Subscriber][]adapter.Adapter
}
func New() *BBSServer {
2024-05-19 20:42:28 +00:00
srvr := &BBSServer{
subscribeMessageBuffer: 16,
logf: log.Printf,
subscribers: make(map[*Subscriber][]adapter.Adapter),
}
// frontend is here
srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./dist")))
2024-05-19 20:42:28 +00:00
2024-05-26 18:50:19 +00:00
// api
2024-05-26 23:24:49 +00:00
srvr.serveMux.Handle("/api/", http.StripPrefix("/api", srvr.apiMux()))
2024-05-26 18:50:19 +00:00
// websocket
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
2024-05-19 20:42:28 +00:00
return srvr
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
self.serveMux.ServeHTTP(w, r)
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
2024-05-19 20:42:28 +00:00
Subprotocols: []string{},
})
if err != nil {
self.logf("%v", err)
return
}
2024-05-19 20:42:28 +00:00
ctx := r.Context()
ctx = c.CloseRead(ctx)
s := &Subscriber{
2024-05-26 18:50:19 +00:00
key: cookie.GenToken(64),
msgs: make(chan []byte, self.subscribeMessageBuffer),
2024-05-26 23:24:49 +00:00
data: make(chan models.SocketData),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
}
2024-05-26 23:24:49 +00:00
// start with an empty set of adapters
// we'll configure them separately
adapters := make([]adapter.Adapter, 0, 4)
2024-05-19 20:42:28 +00:00
self.addSubscriber(s, adapters)
2024-05-26 23:24:49 +00:00
// defer cleanup and write messages in the background
2024-05-26 18:50:19 +00:00
defer self.deleteSubscriber(s)
2024-05-19 20:42:28 +00:00
defer c.Close(websocket.StatusInternalError, "")
2024-05-26 18:50:19 +00:00
go func() {
2024-06-02 16:17:12 +00:00
fmt.Println("waiting for data on the subscriber's channel")
2024-05-26 18:50:19 +00:00
for {
select {
case msg := <-s.msgs:
writeTimeout(ctx, time.Second*5, c, msg)
case <-ctx.Done():
2024-06-02 16:17:12 +00:00
fmt.Println("subscriber has disconnected")
2024-05-26 18:50:19 +00:00
return //ctx.Err()
}
}
}()
2024-05-26 23:24:49 +00:00
// give user their key
2024-06-02 16:17:12 +00:00
s.msgs <- []byte("{ \"key\":\"" + s.key + "\" }")
2024-05-19 20:42:28 +00:00
2024-05-26 23:24:49 +00:00
// block on the data channel, serializing and passing the data to the subscriber
listen([]chan models.SocketData{s.data}, s.msgs)
2024-05-19 20:42:28 +00:00
if errors.Is(err, context.Canceled) {
return
}
2024-05-19 20:42:28 +00:00
if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
websocket.CloseStatus(err) == websocket.StatusGoingAway {
return
}
if err != nil {
self.logf("%v", err)
return
}
}
2024-05-19 20:42:28 +00:00
func listen(channels []chan models.SocketData, out chan []byte) {
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(ch <-chan models.SocketData) {
defer wg.Done()
for data := range ch {
out <- data.ToDatagram()
}
2024-05-19 20:42:28 +00:00
}(ch)
}
2024-05-19 20:42:28 +00:00
wg.Wait()
close(out)
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) publishHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
body := http.MaxBytesReader(w, r.Body, 8192)
msg, err := ioutil.ReadAll(body)
if err != nil {
http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
return
}
self.publish(msg)
w.WriteHeader(http.StatusAccepted)
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) publish(msg []byte) {
self.subscribersLock.Lock()
defer self.subscribersLock.Unlock()
2024-05-19 20:42:28 +00:00
// send messages to our adapter(s)
self.publishLimiter.Wait(context.Background())
2024-05-19 20:42:28 +00:00
// send any response from the adapter(s) back to the client
/*for s, k := range self.subscribers {
// whatever logic to select which subscriber to send back to
2024-05-19 20:42:28 +00:00
}*/
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) addSubscriber(s *Subscriber, k []adapter.Adapter) {
self.subscribersLock.Lock()
self.subscribers[s] = k
self.subscribersLock.Unlock()
}
2024-05-19 20:42:28 +00:00
func (self *BBSServer) deleteSubscriber(s *Subscriber) {
self.subscribersLock.Lock()
delete(self.subscribers, s)
self.subscribersLock.Unlock()
}
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
2024-05-19 20:42:28 +00:00
}