2024-05-11 02:54:51 +00:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2024-05-19 20:42:28 +00:00
|
|
|
"context"
|
2025-01-05 03:45:22 +00:00
|
|
|
"crypto"
|
|
|
|
"crypto/x509"
|
|
|
|
"encoding/pem"
|
2024-05-19 20:42:28 +00:00
|
|
|
"errors"
|
2024-11-28 23:51:12 +00:00
|
|
|
"forge.lightcrystal.systems/nilix/quartzgun/cookie"
|
|
|
|
"forge.lightcrystal.systems/nilix/quartzgun/renderer"
|
2024-12-07 02:30:41 +00:00
|
|
|
"forge.lightcrystal.systems/nilix/underbbs/adapter"
|
|
|
|
"forge.lightcrystal.systems/nilix/underbbs/models"
|
2025-01-05 03:45:22 +00:00
|
|
|
_ "github.com/rs/cors"
|
2024-05-19 20:42:28 +00:00
|
|
|
"golang.org/x/time/rate"
|
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
2024-05-11 02:54:51 +00:00
|
|
|
"net/http"
|
2024-05-19 20:42:28 +00:00
|
|
|
"nhooyr.io/websocket"
|
|
|
|
"sync"
|
|
|
|
"time"
|
2024-05-11 02:54:51 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Subscriber struct {
|
2024-05-26 18:50:19 +00:00
|
|
|
key string
|
2024-05-11 02:54:51 +00:00
|
|
|
msgs chan []byte
|
2024-05-26 23:24:49 +00:00
|
|
|
data chan models.SocketData
|
2024-05-11 02:54:51 +00:00
|
|
|
closeSlow func()
|
|
|
|
}
|
|
|
|
|
|
|
|
type BBSServer struct {
|
|
|
|
subscribeMessageBuffer int
|
|
|
|
publishLimiter *rate.Limiter
|
|
|
|
logf func(f string, v ...interface{})
|
|
|
|
serveMux http.ServeMux
|
|
|
|
subscribersLock sync.Mutex
|
2024-05-19 20:42:28 +00:00
|
|
|
subscribers map[*Subscriber][]adapter.Adapter
|
2025-01-05 03:45:22 +00:00
|
|
|
apKey *crypto.PrivateKey
|
|
|
|
apDomain *string
|
2024-05-11 02:54:51 +00:00
|
|
|
}
|
|
|
|
|
2025-01-05 03:45:22 +00:00
|
|
|
func CORS(next http.Handler) http.Handler {
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
w.Header().Add("Access-Control-Allow-Origin", "*")
|
|
|
|
w.Header().Add("Access-Control-Allow-Credentials", "true")
|
|
|
|
w.Header().Add("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, X-Underbbs-Subscriber")
|
|
|
|
w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
|
|
|
|
|
|
|
|
if r.Method == "OPTIONS" {
|
|
|
|
http.Error(w, "No Content", http.StatusNoContent)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
next.ServeHTTP(w, r)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func New(config models.GlobalSettings) *BBSServer {
|
|
|
|
var apKey crypto.PrivateKey
|
|
|
|
|
|
|
|
useApKey := false
|
|
|
|
|
|
|
|
if config.ApKey != nil && config.ApDomain != nil {
|
|
|
|
keybytes, err := ioutil.ReadFile(*config.ApKey)
|
|
|
|
if err != nil {
|
|
|
|
panic(err.Error())
|
|
|
|
}
|
|
|
|
keypem, _ := pem.Decode(keybytes)
|
|
|
|
if keypem == nil {
|
|
|
|
panic("couldn't decode pem block from AP key")
|
|
|
|
}
|
|
|
|
apKey, err = x509.ParsePKCS8PrivateKey(keypem.Bytes)
|
|
|
|
if err != nil {
|
|
|
|
panic(err.Error())
|
|
|
|
}
|
|
|
|
useApKey = true
|
|
|
|
}
|
|
|
|
|
|
|
|
var srvr *BBSServer
|
|
|
|
|
|
|
|
if useApKey {
|
|
|
|
srvr = &BBSServer{
|
|
|
|
subscribeMessageBuffer: 16,
|
|
|
|
logf: log.Printf,
|
|
|
|
subscribers: make(map[*Subscriber][]adapter.Adapter),
|
|
|
|
apKey: &apKey,
|
|
|
|
apDomain: config.ApDomain,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
srvr = &BBSServer{
|
|
|
|
subscribeMessageBuffer: 16,
|
|
|
|
logf: log.Printf,
|
|
|
|
subscribers: make(map[*Subscriber][]adapter.Adapter),
|
|
|
|
}
|
2024-05-19 20:42:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// frontend is here
|
2024-07-07 17:07:18 +00:00
|
|
|
srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./frontend/dist")))
|
2024-05-19 20:42:28 +00:00
|
|
|
|
2024-05-26 18:50:19 +00:00
|
|
|
// api
|
2025-01-05 03:45:22 +00:00
|
|
|
srvr.serveMux.Handle("/api/", http.StripPrefix("/api", CORS(srvr.apiMux())))
|
2024-05-26 18:50:19 +00:00
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
// websocket
|
|
|
|
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
|
2024-07-07 03:13:18 +00:00
|
|
|
// publish is unused currently, we just use the API and send data back on the websocket
|
|
|
|
// srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
|
2024-05-19 20:42:28 +00:00
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
return srvr
|
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
2024-05-11 02:54:51 +00:00
|
|
|
self.serveMux.ServeHTTP(w, r)
|
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
2024-12-08 01:04:04 +00:00
|
|
|
Subprotocols: []string{},
|
|
|
|
OriginPatterns: []string{"*"},
|
2024-05-11 02:54:51 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
self.logf("%v", err)
|
|
|
|
return
|
|
|
|
}
|
2024-05-19 20:42:28 +00:00
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
ctx := r.Context()
|
|
|
|
ctx = c.CloseRead(ctx)
|
|
|
|
|
|
|
|
s := &Subscriber{
|
2024-05-26 18:50:19 +00:00
|
|
|
key: cookie.GenToken(64),
|
2024-05-11 02:54:51 +00:00
|
|
|
msgs: make(chan []byte, self.subscribeMessageBuffer),
|
2024-05-26 23:24:49 +00:00
|
|
|
data: make(chan models.SocketData),
|
2024-05-11 02:54:51 +00:00
|
|
|
closeSlow: func() {
|
|
|
|
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2024-05-26 23:24:49 +00:00
|
|
|
// start with an empty set of adapters
|
|
|
|
// we'll configure them separately
|
|
|
|
adapters := make([]adapter.Adapter, 0, 4)
|
2024-05-19 20:42:28 +00:00
|
|
|
self.addSubscriber(s, adapters)
|
2024-05-11 02:54:51 +00:00
|
|
|
|
2024-05-26 23:24:49 +00:00
|
|
|
// defer cleanup and write messages in the background
|
2024-05-26 18:50:19 +00:00
|
|
|
defer self.deleteSubscriber(s)
|
2024-05-19 20:42:28 +00:00
|
|
|
defer c.Close(websocket.StatusInternalError, "")
|
|
|
|
|
2024-05-26 18:50:19 +00:00
|
|
|
go func() {
|
2024-09-28 17:39:03 +00:00
|
|
|
self.logf("waiting for data on the subscriber's channel")
|
2024-05-26 18:50:19 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg := <-s.msgs:
|
|
|
|
writeTimeout(ctx, time.Second*5, c, msg)
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
2024-09-28 17:39:03 +00:00
|
|
|
self.logf("subscriber has disconnected")
|
2024-12-07 02:25:58 +00:00
|
|
|
for _, a := range self.subscribers[s] {
|
|
|
|
// keeps any adapter's subscription from trying to send on the data channel after we close it
|
|
|
|
a.Stop()
|
|
|
|
}
|
2024-08-23 01:49:17 +00:00
|
|
|
close(s.data)
|
2024-05-26 18:50:19 +00:00
|
|
|
return //ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2024-05-26 23:24:49 +00:00
|
|
|
// give user their key
|
2024-06-02 16:17:12 +00:00
|
|
|
s.msgs <- []byte("{ \"key\":\"" + s.key + "\" }")
|
2024-05-19 20:42:28 +00:00
|
|
|
|
2024-05-26 23:24:49 +00:00
|
|
|
// block on the data channel, serializing and passing the data to the subscriber
|
|
|
|
listen([]chan models.SocketData{s.data}, s.msgs)
|
2024-05-19 20:42:28 +00:00
|
|
|
|
2024-09-28 17:39:03 +00:00
|
|
|
self.logf("data listener is done!")
|
2024-08-23 01:49:17 +00:00
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
return
|
2024-05-11 02:54:51 +00:00
|
|
|
}
|
2024-05-19 20:42:28 +00:00
|
|
|
if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
|
|
|
|
websocket.CloseStatus(err) == websocket.StatusGoingAway {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
self.logf("%v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2024-05-11 02:54:51 +00:00
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func listen(channels []chan models.SocketData, out chan []byte) {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, ch := range channels {
|
|
|
|
wg.Add(1)
|
|
|
|
go func(ch <-chan models.SocketData) {
|
|
|
|
defer wg.Done()
|
|
|
|
for data := range ch {
|
|
|
|
out <- data.ToDatagram()
|
2024-05-11 02:54:51 +00:00
|
|
|
}
|
2024-05-19 20:42:28 +00:00
|
|
|
}(ch)
|
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
}
|
2024-05-19 20:42:28 +00:00
|
|
|
wg.Wait()
|
|
|
|
close(out)
|
2024-05-11 02:54:51 +00:00
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) publishHandler(w http.ResponseWriter, r *http.Request) {
|
2024-05-11 02:54:51 +00:00
|
|
|
if r.Method != "POST" {
|
|
|
|
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
body := http.MaxBytesReader(w, r.Body, 8192)
|
|
|
|
msg, err := ioutil.ReadAll(body)
|
|
|
|
if err != nil {
|
|
|
|
http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
self.publish(msg)
|
|
|
|
|
|
|
|
w.WriteHeader(http.StatusAccepted)
|
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) publish(msg []byte) {
|
2024-05-11 02:54:51 +00:00
|
|
|
self.subscribersLock.Lock()
|
|
|
|
defer self.subscribersLock.Unlock()
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
// send messages to our adapter(s)
|
|
|
|
|
2024-05-11 02:54:51 +00:00
|
|
|
self.publishLimiter.Wait(context.Background())
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
// send any response from the adapter(s) back to the client
|
|
|
|
|
|
|
|
/*for s, k := range self.subscribers {
|
2024-05-11 02:54:51 +00:00
|
|
|
// whatever logic to select which subscriber to send back to
|
2024-05-19 20:42:28 +00:00
|
|
|
}*/
|
2024-05-11 02:54:51 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) addSubscriber(s *Subscriber, k []adapter.Adapter) {
|
2024-05-11 02:54:51 +00:00
|
|
|
self.subscribersLock.Lock()
|
|
|
|
self.subscribers[s] = k
|
|
|
|
self.subscribersLock.Unlock()
|
|
|
|
}
|
|
|
|
|
2024-05-19 20:42:28 +00:00
|
|
|
func (self *BBSServer) deleteSubscriber(s *Subscriber) {
|
2024-05-11 02:54:51 +00:00
|
|
|
self.subscribersLock.Lock()
|
|
|
|
delete(self.subscribers, s)
|
|
|
|
self.subscribersLock.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
return c.Write(ctx, websocket.MessageText, msg)
|
2024-05-19 20:42:28 +00:00
|
|
|
}
|