package main import ( "bytes" "encoding/json" "fmt" "log" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/yuin/goldmark" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 4096, CheckOrigin: func(r *http.Request) bool { return true }, } type wsMessage struct { Message string `json:"message"` } // WSHandler handles WebSocket connections for chat. type WSHandler struct { sessionMgr *ChatSessionManager sessionsMu sync.Mutex } func NewWSHandler(sessionMgr *ChatSessionManager) *WSHandler { return &WSHandler{ sessionMgr: sessionMgr, } } func (wh *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { project := r.URL.Query().Get("project") projectDir := r.URL.Query().Get("project_dir") if project == "" || projectDir == "" { http.Error(w, "missing project params", http.StatusBadRequest) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } defer conn.Close() sessionID := fmt.Sprintf("%s-%s", project, r.RemoteAddr) subID := fmt.Sprintf("ws-%d", time.Now().UnixNano()) sess, isNew, err := wh.sessionMgr.GetOrCreate(sessionID, projectDir) if err != nil { log.Printf("Session create error: %v", err) writeWSText(conn, FragmentSystemMessage(fmt.Sprintf("Greška pri pokretanju Claude-a: %v", err))) return } // Send status writeWSText(conn, FragmentStatus(true)) if isNew { writeWSText(conn, FragmentSystemMessage("Claude sesija pokrenuta. Možeš da pišeš.")) } else { // Replay buffer buffer := sess.GetBuffer() for _, msg := range buffer { writeWSText(conn, msg.Content) } writeWSText(conn, FragmentSystemMessage("Ponovo povezan. Istorija učitana.")) } // Subscribe to session broadcasts sub := sess.Subscribe(subID) defer sess.Unsubscribe(subID) // Start event listener if new session if isNew { go wh.listenEvents(sess) } // Write pump: forward broadcast messages to this WebSocket wsDone := make(chan struct{}) go func() { defer close(wsDone) for fragment := range sub.Ch { if err := conn.WriteMessage(websocket.TextMessage, []byte(fragment)); err != nil { return } } }() // Read pump: messages from browser for { _, raw, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { log.Printf("WebSocket read error: %v", err) } break } var msg wsMessage if err := json.Unmarshal(raw, &msg); err != nil { log.Printf("Invalid WS message: %v", err) continue } text := strings.TrimSpace(msg.Message) if text == "" { continue } // Add user message to buffer (broadcasts to all subscribers including us) userFragment := FragmentUserMessage(text) sess.AddMessage(ChatMessage{ Role: "user", Content: userFragment, Timestamp: time.Now(), }) // Clear input and show typing — send directly to this connection only writeWSText(conn, FragmentCombine(FragmentClearInput(), FragmentTypingIndicator(true))) // Send to claude CLI if err := sess.Process.Send(text); err != nil { log.Printf("Send to CLI error: %v", err) writeWSText(conn, FragmentSystemMessage("Greška pri slanju poruke")) } } // Don't close session — it stays alive for reconnect } // listenEvents reads events from the CLI process and broadcasts via AddMessage. func (wh *WSHandler) listenEvents(sess *ChatSession) { var currentMsgID string var currentText strings.Builder msgCounter := 0 for { select { case event, ok := <-sess.Process.Events: if !ok { sess.AddMessage(ChatMessage{ Role: "system", Content: FragmentSystemMessage("Claude sesija završena."), Timestamp: time.Now(), }) return } switch event.Type { case "assistant": if event.Message != nil { for _, c := range event.Message.Content { if c.Type == "tool_use" { inputStr := "" if c.Input != nil { if b, err := json.Marshal(c.Input); err == nil { inputStr = string(b) } } fragment := FragmentToolCall(c.Name, inputStr) sess.AddMessage(ChatMessage{ Role: "tool", Content: fragment, Timestamp: time.Now(), }) } } } case "content_block_start": msgCounter++ currentMsgID = fmt.Sprintf("msg-%d-%d", time.Now().UnixMilli(), msgCounter) currentText.Reset() fragment := FragmentAssistantStart(currentMsgID) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) case "content_block_delta": if event.Delta != nil && event.Delta.Text != "" { currentText.WriteString(event.Delta.Text) fragment := FragmentAssistantChunk(currentMsgID, event.Delta.Text) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) } case "content_block_stop": if currentText.Len() > 0 && currentMsgID != "" { rendered := renderMarkdown(currentText.String()) fragment := FragmentAssistantComplete(currentMsgID, rendered) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) } case "result": fragment := FragmentTypingIndicator(false) sess.AddMessage(ChatMessage{ Role: "system", Content: fragment, Timestamp: time.Now(), }) if event.Result != nil && event.Result.CostUSD > 0 { costMsg := FragmentSystemMessage(fmt.Sprintf("Gotovo (%.4f USD)", event.Result.CostUSD)) sess.AddMessage(ChatMessage{ Role: "system", Content: costMsg, Timestamp: time.Now(), }) } } case err, ok := <-sess.Process.Errors: if !ok { return } log.Printf("CLI error [%s]: %v", sess.ID, err) } } } func renderMarkdown(text string) string { var buf bytes.Buffer if err := goldmark.Convert([]byte(text), &buf); err != nil { return text } return buf.String() } func writeWSText(conn *websocket.Conn, text string) { conn.WriteMessage(websocket.TextMessage, []byte(text)) }