main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package main
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "crypto/tls"
  6. "fmt"
  7. "html/template"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "path/filepath"
  13. "sort"
  14. "strings"
  15. "github.com/gorilla/mux"
  16. "github.com/nats-io/nats.go"
  17. "nhooyr.io/websocket"
  18. )
  19. var natsConn *nats.Conn
  20. func Initialize() error {
  21. var err error
  22. natsConn, err = nats.Connect(nats.DefaultURL)
  23. if err != nil {
  24. return err
  25. }
  26. return nil
  27. }
  28. func wsLoop(ctx context.Context, cancelFunc context.CancelFunc, ws *websocket.Conn, topic string, userID string) {
  29. log.Printf("Starting wsLoop for %s...", userID)
  30. defer closeWS(ws)
  31. for {
  32. if _, message, err := ws.Read(ctx); err != nil {
  33. log.Printf("Error reading message %s", err)
  34. break
  35. } else {
  36. log.Printf("Received message to websocket: ")
  37. msg := &nats.Msg{Subject: topic, Data: message, Reply: userID}
  38. if err = natsConn.PublishMsg(msg); err != nil {
  39. log.Printf("Could not publish message: %s", err)
  40. return
  41. }
  42. }
  43. }
  44. cancelFunc()
  45. log.Printf("Shutting down wsLoop for %s...", userID)
  46. }
  47. func natsConnLoop(cctx, ctx context.Context, ws *websocket.Conn, topic string, userID string) {
  48. log.Printf("Starting natsConnLoop for %s...", userID)
  49. _, err := natsConn.Subscribe(topic, func(m *nats.Msg) {
  50. m.Ack()
  51. if m.Reply == userID {
  52. log.Println("skipping message from self")
  53. return
  54. }
  55. if err := ws.Write(ctx, websocket.MessageText, m.Data); err != nil {
  56. log.Printf("Error writing message to %s: %s", userID, err)
  57. return
  58. }
  59. })
  60. if err != nil {
  61. panic(err)
  62. }
  63. }
  64. func closeWS(ws *websocket.Conn) {
  65. // can check if already closed here
  66. if err := ws.Close(websocket.StatusNormalClosure, ""); err != nil {
  67. log.Printf("Error closing: %s", err)
  68. }
  69. }
  70. func VideoConnections(w http.ResponseWriter, r *http.Request) {
  71. ws, err := websocket.Accept(w, r, nil)
  72. if err != nil {
  73. log.Fatal(err)
  74. }
  75. userID := strings.ToLower(r.URL.Query().Get("userID"))
  76. peerID := strings.ToLower(r.URL.Query().Get("peerID"))
  77. peers := []string{userID, peerID}
  78. sort.Strings(peers)
  79. topicName := fmt.Sprintf("video-%s-%s", peers[0], peers[1])
  80. ctx := context.Background()
  81. cctx, cancelFunc := context.WithCancel(ctx)
  82. go wsLoop(ctx, cancelFunc, ws, topicName, userID)
  83. natsConnLoop(cctx, ctx, ws, topicName, userID)
  84. }
  85. const TEMPLATE = "layouts/layout.html"
  86. const STATIC_DIR = "/static/"
  87. type API struct {
  88. }
  89. type PageData struct {
  90. Title string
  91. Content string
  92. CanonicalURL string
  93. OGTitle string
  94. OGDescription string
  95. OGType string
  96. OGImage string
  97. }
  98. type gzipResponseWriter struct {
  99. io.Writer
  100. http.ResponseWriter
  101. }
  102. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  103. return w.Writer.Write(b)
  104. }
  105. func makeGzipHandler(h http.Handler) http.Handler {
  106. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  107. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  108. h.ServeHTTP(w, r)
  109. return
  110. }
  111. w.Header().Set("Content-Encoding", "gzip")
  112. gz := gzip.NewWriter(w)
  113. defer gz.Close()
  114. gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
  115. h.ServeHTTP(gzr, r)
  116. })
  117. }
  118. func notFound(w http.ResponseWriter, r *http.Request) {
  119. t, _ := template.ParseFiles(TEMPLATE, "content/custom_404.html")
  120. w.WriteHeader(http.StatusNotFound)
  121. t.ExecuteTemplate(w, "layout", &PageData{
  122. Title: "DUMMY", Content: ""})
  123. }
  124. func fileHandler(w http.ResponseWriter, r *http.Request) {
  125. http.ServeFile(w, r, "."+STATIC_DIR+r.URL.Path[1:])
  126. }
  127. func maxAgeHandler(seconds int, h http.Handler) http.Handler {
  128. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  129. w.Header().Add("Cache-Control", fmt.Sprintf("max-age=%d, public, immutable", seconds))
  130. h.ServeHTTP(w, r)
  131. })
  132. }
  133. func compileTemplates(ua string, filenames ...string) (*template.Template, error) {
  134. var tmpl *template.Template
  135. for _, filename := range filenames {
  136. name := filepath.Base(filename)
  137. if tmpl == nil {
  138. tmpl = template.New(name).Funcs(template.FuncMap{})
  139. } else {
  140. tmpl = tmpl.New(name).Funcs(template.FuncMap{})
  141. }
  142. b, err := ioutil.ReadFile(filename)
  143. if err != nil {
  144. return nil, err
  145. }
  146. tmpl.Parse(string(b))
  147. }
  148. return tmpl, nil
  149. }
  150. type HomePageData struct {
  151. *PageData
  152. }
  153. func (api *API) index(w http.ResponseWriter, r *http.Request) {
  154. ua := r.Header.Get("User-Agent")
  155. t, err := compileTemplates(ua, TEMPLATE, "content/index.html")
  156. if err != nil {
  157. fmt.Println(err.Error())
  158. panic(err)
  159. }
  160. err = t.ExecuteTemplate(w, "layout", &HomePageData{
  161. PageData: &PageData{
  162. CanonicalURL: "",
  163. Title: "",
  164. Content: "",
  165. OGTitle: "",
  166. OGDescription: "",
  167. OGImage: "",
  168. OGType: "",
  169. }})
  170. if err != nil {
  171. panic(err)
  172. }
  173. }
  174. func main() {
  175. api := &API{}
  176. err := Initialize()
  177. if err != nil {
  178. panic(err)
  179. }
  180. router := mux.NewRouter().StrictSlash(true)
  181. router.
  182. PathPrefix("/static/").
  183. Handler(http.StripPrefix(STATIC_DIR, makeGzipHandler(maxAgeHandler(2629746, http.FileServer(http.Dir("."+STATIC_DIR))))))
  184. router.HandleFunc("/robots.txt", fileHandler).Methods("GET")
  185. router.HandleFunc("/", api.index).Methods("GET")
  186. router.NotFoundHandler = http.HandlerFunc(notFound)
  187. router.HandleFunc("/video/connections", VideoConnections).Methods(http.MethodGet)
  188. tls.LoadX509KeyPair("localhost.crt", "localhost.key")
  189. log.Fatal(http.ListenAndServe(":8000", router))
  190. }