main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package main
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "crypto/tls"
  6. "fmt"
  7. "html/template"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "path/filepath"
  13. "sort"
  14. "strings"
  15. "github.com/gorilla/mux"
  16. "github.com/nats-io/nats.go"
  17. "nhooyr.io/websocket"
  18. )
  19. var pubSub *nats.Conn
  20. var js nats.JetStreamContext
  21. func Initialize() error {
  22. var err error
  23. pubSub, err = nats.Connect(nats.DefaultURL)
  24. if err != nil {
  25. return err
  26. }
  27. return nil
  28. }
  29. func wsLoop(ctx context.Context, cancelFunc context.CancelFunc, ws *websocket.Conn, topic string, userID string) {
  30. log.Printf("Starting wsLoop for %s...", userID)
  31. defer closeWS(ws)
  32. for {
  33. if _, message, err := ws.Read(ctx); err != nil {
  34. log.Printf("Error reading message %s", err)
  35. break
  36. } else {
  37. log.Printf("Received message to websocket: ")
  38. msg := &nats.Msg{Subject: topic, Data: message, Reply: userID}
  39. if err = pubSub.PublishMsg(msg); err != nil {
  40. log.Printf("Could not publish message: %s", err)
  41. return
  42. }
  43. }
  44. }
  45. cancelFunc()
  46. log.Printf("Shutting down wsLoop for %s...", userID)
  47. }
  48. func pubSubLoop(cctx, ctx context.Context, ws *websocket.Conn, topic string, userID string) {
  49. log.Printf("Starting pubSubLoop for %s...", userID)
  50. _, err := pubSub.Subscribe(topic, func(m *nats.Msg) {
  51. m.Ack()
  52. if m.Reply == userID {
  53. log.Println("skipping message from self")
  54. return
  55. }
  56. if err := ws.Write(ctx, websocket.MessageText, m.Data); err != nil {
  57. log.Printf("Error writing message to %s: %s", userID, err)
  58. return
  59. }
  60. })
  61. if err != nil {
  62. panic(err)
  63. }
  64. }
  65. func closeWS(ws *websocket.Conn) {
  66. // can check if already closed here
  67. if err := ws.Close(websocket.StatusNormalClosure, ""); err != nil {
  68. log.Printf("Error closing: %s", err)
  69. }
  70. }
  71. func VideoConnections(w http.ResponseWriter, r *http.Request) {
  72. ws, err := websocket.Accept(w, r, nil)
  73. if err != nil {
  74. log.Fatal(err)
  75. }
  76. userID := strings.ToLower(r.URL.Query().Get("userID"))
  77. peerID := strings.ToLower(r.URL.Query().Get("peerID"))
  78. peers := []string{userID, peerID}
  79. sort.Strings(peers)
  80. topicName := fmt.Sprintf("video-%s-%s", peers[0], peers[1])
  81. ctx := context.Background()
  82. cctx, cancelFunc := context.WithCancel(ctx)
  83. go wsLoop(ctx, cancelFunc, ws, topicName, userID)
  84. pubSubLoop(cctx, ctx, ws, topicName, userID)
  85. }
  86. const TEMPLATE = "layouts/layout.html"
  87. const STATIC_DIR = "/static/"
  88. type API struct {
  89. }
  90. type PageData struct {
  91. Title string
  92. Content string
  93. CanonicalURL string
  94. OGTitle string
  95. OGDescription string
  96. OGType string
  97. OGImage string
  98. }
  99. type gzipResponseWriter struct {
  100. io.Writer
  101. http.ResponseWriter
  102. }
  103. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  104. return w.Writer.Write(b)
  105. }
  106. func makeGzipHandler(h http.Handler) http.Handler {
  107. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  108. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  109. h.ServeHTTP(w, r)
  110. return
  111. }
  112. w.Header().Set("Content-Encoding", "gzip")
  113. gz := gzip.NewWriter(w)
  114. defer gz.Close()
  115. gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
  116. h.ServeHTTP(gzr, r)
  117. })
  118. }
  119. func notFound(w http.ResponseWriter, r *http.Request) {
  120. t, _ := template.ParseFiles(TEMPLATE, "content/custom_404.html")
  121. w.WriteHeader(http.StatusNotFound)
  122. t.ExecuteTemplate(w, "layout", &PageData{
  123. Title: "DUMMY", Content: ""})
  124. }
  125. func fileHandler(w http.ResponseWriter, r *http.Request) {
  126. http.ServeFile(w, r, "."+STATIC_DIR+r.URL.Path[1:])
  127. }
  128. func maxAgeHandler(seconds int, h http.Handler) http.Handler {
  129. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  130. w.Header().Add("Cache-Control", fmt.Sprintf("max-age=%d, public, immutable", seconds))
  131. h.ServeHTTP(w, r)
  132. })
  133. }
  134. func compileTemplates(ua string, filenames ...string) (*template.Template, error) {
  135. var tmpl *template.Template
  136. for _, filename := range filenames {
  137. name := filepath.Base(filename)
  138. if tmpl == nil {
  139. tmpl = template.New(name).Funcs(template.FuncMap{})
  140. } else {
  141. tmpl = tmpl.New(name).Funcs(template.FuncMap{})
  142. }
  143. b, err := ioutil.ReadFile(filename)
  144. if err != nil {
  145. return nil, err
  146. }
  147. tmpl.Parse(string(b))
  148. }
  149. return tmpl, nil
  150. }
  151. type HomePageData struct {
  152. *PageData
  153. }
  154. func (api *API) index(w http.ResponseWriter, r *http.Request) {
  155. ua := r.Header.Get("User-Agent")
  156. t, err := compileTemplates(ua, TEMPLATE, "content/index.html")
  157. if err != nil {
  158. fmt.Println(err.Error())
  159. panic(err)
  160. }
  161. err = t.ExecuteTemplate(w, "layout", &HomePageData{
  162. PageData: &PageData{
  163. CanonicalURL: "",
  164. Title: "",
  165. Content: "",
  166. OGTitle: "",
  167. OGDescription: "",
  168. OGImage: "",
  169. OGType: "",
  170. }})
  171. if err != nil {
  172. panic(err)
  173. }
  174. }
  175. func main() {
  176. api := &API{}
  177. err := Initialize()
  178. if err != nil {
  179. panic(err)
  180. }
  181. router := mux.NewRouter().StrictSlash(true)
  182. router.
  183. PathPrefix("/static/").
  184. Handler(http.StripPrefix(STATIC_DIR, makeGzipHandler(maxAgeHandler(2629746, http.FileServer(http.Dir("."+STATIC_DIR))))))
  185. router.HandleFunc("/robots.txt", fileHandler).Methods("GET")
  186. router.HandleFunc("/", api.index).Methods("GET")
  187. router.NotFoundHandler = http.HandlerFunc(notFound)
  188. router.HandleFunc("/video/connections", VideoConnections).Methods(http.MethodGet)
  189. tls.LoadX509KeyPair("localhost.crt", "localhost.key")
  190. log.Fatal(http.ListenAndServe(":8000", router))
  191. }