|
@@ -19,13 +19,11 @@ import (
|
|
|
"nhooyr.io/websocket"
|
|
"nhooyr.io/websocket"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-var pubSub *nats.Conn
|
|
|
|
|
-
|
|
|
|
|
-var js nats.JetStreamContext
|
|
|
|
|
|
|
+var natsConn *nats.Conn
|
|
|
|
|
|
|
|
func Initialize() error {
|
|
func Initialize() error {
|
|
|
var err error
|
|
var err error
|
|
|
- pubSub, err = nats.Connect(nats.DefaultURL)
|
|
|
|
|
|
|
+ natsConn, err = nats.Connect(nats.DefaultURL)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -42,7 +40,7 @@ func wsLoop(ctx context.Context, cancelFunc context.CancelFunc, ws *websocket.Co
|
|
|
} else {
|
|
} else {
|
|
|
log.Printf("Received message to websocket: ")
|
|
log.Printf("Received message to websocket: ")
|
|
|
msg := &nats.Msg{Subject: topic, Data: message, Reply: userID}
|
|
msg := &nats.Msg{Subject: topic, Data: message, Reply: userID}
|
|
|
- if err = pubSub.PublishMsg(msg); err != nil {
|
|
|
|
|
|
|
+ if err = natsConn.PublishMsg(msg); err != nil {
|
|
|
log.Printf("Could not publish message: %s", err)
|
|
log.Printf("Could not publish message: %s", err)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -53,9 +51,9 @@ func wsLoop(ctx context.Context, cancelFunc context.CancelFunc, ws *websocket.Co
|
|
|
log.Printf("Shutting down wsLoop for %s...", userID)
|
|
log.Printf("Shutting down wsLoop for %s...", userID)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func pubSubLoop(cctx, ctx context.Context, ws *websocket.Conn, topic string, userID string) {
|
|
|
|
|
- log.Printf("Starting pubSubLoop for %s...", userID)
|
|
|
|
|
- _, err := pubSub.Subscribe(topic, func(m *nats.Msg) {
|
|
|
|
|
|
|
+func natsConnLoop(cctx, ctx context.Context, ws *websocket.Conn, topic string, userID string) {
|
|
|
|
|
+ log.Printf("Starting natsConnLoop for %s...", userID)
|
|
|
|
|
+ _, err := natsConn.Subscribe(topic, func(m *nats.Msg) {
|
|
|
m.Ack()
|
|
m.Ack()
|
|
|
if m.Reply == userID {
|
|
if m.Reply == userID {
|
|
|
log.Println("skipping message from self")
|
|
log.Println("skipping message from self")
|
|
@@ -91,7 +89,7 @@ func VideoConnections(w http.ResponseWriter, r *http.Request) {
|
|
|
ctx := context.Background()
|
|
ctx := context.Background()
|
|
|
cctx, cancelFunc := context.WithCancel(ctx)
|
|
cctx, cancelFunc := context.WithCancel(ctx)
|
|
|
go wsLoop(ctx, cancelFunc, ws, topicName, userID)
|
|
go wsLoop(ctx, cancelFunc, ws, topicName, userID)
|
|
|
- pubSubLoop(cctx, ctx, ws, topicName, userID)
|
|
|
|
|
|
|
+ natsConnLoop(cctx, ctx, ws, topicName, userID)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
const TEMPLATE = "layouts/layout.html"
|
|
const TEMPLATE = "layouts/layout.html"
|