felt/gametable/server.go

331 lines
8.4 KiB
Go

package gametable
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"forge.lightcrystal.systems/nilix/felt/admin"
"forge.lightcrystal.systems/nilix/felt/models"
"forge.lightcrystal.systems/nilix/felt/mongodb"
"forge.lightcrystal.systems/nilix/felt/register"
"forge.lightcrystal.systems/nilix/quartzgun/auth"
"forge.lightcrystal.systems/nilix/quartzgun/renderer"
"golang.org/x/time/rate"
"io/ioutil"
"log"
"net/http"
"nhooyr.io/websocket"
"strings"
"sync"
"time"
)
type Subscriber struct {
msgs chan []byte
closeSlow func()
}
type GameTableServer struct {
subscribeMessageBuffer int
publishLimiter *rate.Limiter
logf func(f string, v ...interface{})
serveMux http.ServeMux
subscribersLock sync.Mutex
subscribers map[*Subscriber]models.TableKey
dbAdapter mongodb.DbAdapter
udb auth.UserStore
}
func New(adapter mongodb.DbAdapter, udb auth.UserStore, uploads string, uploadMaxMB int, crypto register.SymmetricCrypto) *GameTableServer {
srvr := &GameTableServer{
subscribeMessageBuffer: 16,
logf: log.Printf,
subscribers: make(map[*Subscriber]models.TableKey),
publishLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 8),
dbAdapter: adapter,
udb: udb,
}
// redirect to table if we hit the root
srvr.serveMux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, "/table/", http.StatusSeeOther)
}))
// frontend is here
srvr.serveMux.Handle("/table/", http.StripPrefix("/table/", renderer.Subtree("./static")))
// uploads filetree
srvr.serveMux.Handle("/uploads/", http.StripPrefix("/uploads/", renderer.Subtree(uploads)))
// admin controller and admin registration controller
srvr.serveMux.Handle("/admin/", http.StripPrefix("/admin", admin.CreateAdminInterface(udb, adapter, uploads, uploadMaxMB)))
srvr.serveMux.Handle("/register/", http.StripPrefix("/register", register.CreateRegistrationInterface(udb, crypto)))
// websocket
srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)
return srvr
}
func (self *GameTableServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
self.serveMux.ServeHTTP(w, r)
}
func (self *GameTableServer) subscribeHandler(w http.ResponseWriter, r *http.Request) {
protocols, err := self.dbAdapter.GetProtocols()
if err != nil {
self.logf("%v", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: protocols,
})
if err != nil {
self.logf("%v", err)
return
}
defer c.Close(websocket.StatusInternalError, "")
err = self.subscribe(r, c)
if errors.Is(err, context.Canceled) {
return
}
if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
websocket.CloseStatus(err) == websocket.StatusGoingAway {
return
}
if err != nil {
self.logf("%v", err)
return
}
}
func (self *GameTableServer) subscribe(r *http.Request, c *websocket.Conn) error {
ctx := r.Context()
ctx = c.CloseRead(ctx)
s := &Subscriber{
msgs: make(chan []byte, self.subscribeMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
}
subprotoParts := strings.Split(c.Subprotocol(), ".")
if len(subprotoParts) != 2 {
return errors.New("Couldn't decode subprotocol")
}
tableKey := models.TableKey{
Name: subprotoParts[0],
Passcode: subprotoParts[1],
}
// subprotocol is guaranteed to be an existing table, but maybe just leave this here
if !self.dbAdapter.CheckTable(tableKey) {
return errors.New("Table with matching key was not found on this server")
}
self.addSubscriber(s, tableKey)
defer self.deleteSubscriber(s)
select {
case s.msgs <- self.getCurrentState(tableKey):
default:
go s.closeSlow()
}
for {
select {
case msg := <-s.msgs:
err := writeTimeout(ctx, time.Second*5, c, msg)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (self *GameTableServer) publishHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
body := http.MaxBytesReader(w, r.Body, 8192)
msg, err := ioutil.ReadAll(body)
if err != nil {
http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
return
}
self.publish(msg)
w.WriteHeader(http.StatusAccepted)
}
func (self *GameTableServer) publish(msg []byte) {
self.subscribersLock.Lock()
defer self.subscribersLock.Unlock()
// decode message and store in DB
tableMsg := models.TableMessage{}
err := json.NewDecoder(bytes.NewReader(msg)).Decode(&tableMsg)
if err != nil {
fmt.Println(err.Error())
return
}
err = self.writeToDB(&tableMsg)
// sanitize auth for rebroadcast
tableMsg.Auth = nil
clean, err := json.Marshal(tableMsg)
if err != nil {
fmt.Println(err.Error())
return
}
self.publishLimiter.Wait(context.Background())
for s, k := range self.subscribers {
if k == *tableMsg.Key {
select {
case s.msgs <- clean:
default:
go s.closeSlow()
}
}
}
}
func (self *GameTableServer) getCurrentState(tableKey models.TableKey) []byte {
// get diceroll log, map, and token state
if self.dbAdapter.CheckTable(tableKey) {
mapUrl, _ := self.dbAdapter.GetMapImageUrl(tableKey)
auxMessage, _ := self.dbAdapter.GetAuxMessage(tableKey)
tokens, _ := self.dbAdapter.GetTokens(tableKey, false)
diceRolls, _ := self.dbAdapter.GetDiceRolls(tableKey)
table := models.Table{
Name: tableKey.Name,
Passcode: tableKey.Passcode,
DiceRolls: diceRolls,
MapImageUrl: mapUrl,
Tokens: tokens,
AuxMessage: auxMessage,
}
data, err := json.Marshal(table)
if err != nil {
return []byte("{\"error\": \"" + err.Error() + "\"}")
} else {
return data
}
}
return []byte("{\"error\": \"table not found\"}")
}
func (self *GameTableServer) writeToDB(tableMsg *models.TableMessage) error {
key := *tableMsg.Key
if tableMsg.DiceRoll != nil {
// don't trust the clientside diceroll
r := make([]uint8, len(tableMsg.DiceRoll.Roll))
rand.Read(r)
for i, d := range r {
r[i] = d%tableMsg.DiceRoll.Faces + 1
}
tableMsg.DiceRoll.Roll = r
err := self.dbAdapter.InsertDiceRoll(key, *tableMsg.DiceRoll)
if err != nil {
return err
}
}
if tableMsg.Token != nil && tableMsg.Token.Id != nil {
t := *tableMsg.Token
exists, active := self.dbAdapter.CheckToken(key, *t.Id)
if exists {
if active {
if !t.Active {
err := self.dbAdapter.ActivateToken(key, *t.Id, false)
if err != nil {
return err
}
} else if t.X != nil && t.Y != nil {
err := self.dbAdapter.MoveToken(key, t)
if err != nil {
return err
}
}
} else {
if t.Active {
err := self.dbAdapter.ActivateToken(key, *t.Id, true)
if err != nil {
return err
}
}
}
} else {
// respond to nonextant IDs as if they were destroyed
tableMsg.Token.X = nil
tableMsg.Token.Y = nil
tableMsg.Token.Active = false
}
}
// map image change, aux message, and token creation/deletion require admin authorization
if tableMsg.Auth != nil {
authorized, _ := self.udb.ValidateToken(*tableMsg.Auth)
if authorized {
if tableMsg.MapImg != nil {
err := self.dbAdapter.SetMapImageUrl(key, *tableMsg.MapImg)
if err != nil {
return err
}
}
if tableMsg.AuxMsg != nil {
err := self.dbAdapter.SetAuxMessage(key, *tableMsg.AuxMsg)
if err != nil {
return err
}
}
if tableMsg.Token != nil {
t := tableMsg.Token
if t.Id == nil {
id, err := self.dbAdapter.CreateToken(key, *t)
t.Id = &id
return err
} else {
if t.X == nil && t.Y == nil && !t.Active {
err := self.dbAdapter.DestroyToken(key, *t.Id)
return err
}
}
}
}
}
return nil
}
func (self *GameTableServer) addSubscriber(s *Subscriber, k models.TableKey) {
self.subscribersLock.Lock()
self.subscribers[s] = k
self.subscribersLock.Unlock()
}
func (self *GameTableServer) deleteSubscriber(s *Subscriber) {
self.subscribersLock.Lock()
delete(self.subscribers, s)
self.subscribersLock.Unlock()
}
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
}