package main import ( "encoding/json" "fmt" "log" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 4096, CheckOrigin: func(r *http.Request) bool { return true }, } type wsMessage struct { Message string `json:"message"` } // WSHandler handles WebSocket connections for chat. type WSHandler struct { sessionMgr *ChatSessionManager sessionsMu sync.Mutex } func NewWSHandler(sessionMgr *ChatSessionManager) *WSHandler { return &WSHandler{ sessionMgr: sessionMgr, } } func (wh *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { project := r.URL.Query().Get("project") projectDir := r.URL.Query().Get("project_dir") if project == "" || projectDir == "" { http.Error(w, "missing project params", http.StatusBadRequest) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } defer conn.Close() // Single write channel — all writes go through here to avoid concurrent writes writeCh := make(chan string, 100) writeDone := make(chan struct{}) go func() { defer close(writeDone) for msg := range writeCh { if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { log.Printf("WebSocket write error: %v", err) return } } }() // Helper to send via write channel send := func(text string) { select { case writeCh <- text: default: log.Printf("Write channel full, dropping message") } } sessionID := fmt.Sprintf("%s-%s", project, r.RemoteAddr) subID := fmt.Sprintf("ws-%d", time.Now().UnixNano()) sess, isNew, err := wh.sessionMgr.GetOrCreate(sessionID, projectDir) if err != nil { log.Printf("Session create error: %v", err) send(FragmentSystemMessage(fmt.Sprintf("Greška pri pokretanju Claude-a: %v", err))) close(writeCh) <-writeDone return } // Send status send(FragmentStatus(true)) if isNew { send(FragmentSystemMessage("Claude sesija pokrenuta. Možeš da pišeš.")) } else { // Replay buffer buffer := sess.GetBuffer() for _, msg := range buffer { send(msg.Content) } send(FragmentSystemMessage("Ponovo povezan. Istorija učitana.")) } // Subscribe to session broadcasts sub := sess.Subscribe(subID) defer sess.Unsubscribe(subID) // Start event listener if new session if isNew { go wh.listenEvents(sess) } // Forward broadcast messages to the write channel go func() { for fragment := range sub.Ch { send(fragment) } }() // Read pump: messages from browser for { _, raw, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { log.Printf("WebSocket read error: %v", err) } break } var msg wsMessage if err := json.Unmarshal(raw, &msg); err != nil { log.Printf("Invalid WS message: %v", err) continue } text := strings.TrimSpace(msg.Message) if text == "" { continue } // Add user message to buffer (broadcasts to all subscribers including us) userFragment := FragmentUserMessage(text) sess.AddMessage(ChatMessage{ Role: "user", Content: userFragment, Timestamp: time.Now(), }) // Clear input and show typing send(FragmentCombine(FragmentClearInput(), FragmentTypingIndicator(true))) // Send to claude CLI if err := sess.Process.Send(text); err != nil { log.Printf("Send to CLI error: %v", err) send(FragmentSystemMessage("Greška pri slanju poruke")) } } // Cleanup close(writeCh) <-writeDone } // listenEvents reads events from the CLI process and broadcasts via AddMessage. func (wh *WSHandler) listenEvents(sess *ChatSession) { var currentMsgID string var currentText strings.Builder msgCounter := 0 for { select { case event, ok := <-sess.Process.Events: if !ok { sess.AddMessage(ChatMessage{ Role: "system", Content: FragmentSystemMessage("Claude sesija završena."), Timestamp: time.Now(), }) return } switch event.Type { case "system": if event.Subtype == "init" { log.Printf("CLI session started: %s", event.SessionID) } case "stream_event": if event.Event == nil { continue } wh.handleStreamEvent(sess, event.Event, ¤tMsgID, ¤tText, &msgCounter) case "assistant": if event.Message != nil { for _, c := range event.Message.Content { switch c.Type { case "tool_use": inputStr := "" if c.Input != nil { if b, err := json.Marshal(c.Input); err == nil { inputStr = string(b) } } fragment := FragmentToolCall(c.Name, inputStr) sess.AddMessage(ChatMessage{ Role: "tool", Content: fragment, Timestamp: time.Now(), }) case "text": if currentMsgID != "" && currentText.Len() > 0 { rendered := renderMarkdown(currentText.String()) fragment := FragmentAssistantComplete(currentMsgID, rendered) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) currentText.Reset() } } } } case "result": fragment := FragmentTypingIndicator(false) sess.AddMessage(ChatMessage{ Role: "system", Content: fragment, Timestamp: time.Now(), }) if event.Result != nil { parts := []string{} if event.Result.Duration > 0 { secs := event.Result.Duration / 1000 if secs >= 60 { parts = append(parts, fmt.Sprintf("%.0fm %.0fs", secs/60, float64(int(secs)%60))) } else { parts = append(parts, fmt.Sprintf("%.1fs", secs)) } } if event.Result.CostUSD > 0 { parts = append(parts, fmt.Sprintf("$%.4f", event.Result.CostUSD)) } if event.Result.NumTurns > 0 { parts = append(parts, fmt.Sprintf("%d turn(s)", event.Result.NumTurns)) } if len(parts) > 0 { costMsg := FragmentSystemMessage(strings.Join(parts, " · ")) sess.AddMessage(ChatMessage{ Role: "system", Content: costMsg, Timestamp: time.Now(), }) } } } case err, ok := <-sess.Process.Errors: if !ok { return } log.Printf("CLI error [%s]: %v", sess.ID, err) } } } func (wh *WSHandler) handleStreamEvent(sess *ChatSession, se *StreamEvent, currentMsgID *string, currentText *strings.Builder, msgCounter *int) { switch se.Type { case "content_block_start": *msgCounter++ *currentMsgID = fmt.Sprintf("msg-%d-%d", time.Now().UnixMilli(), *msgCounter) currentText.Reset() fragment := FragmentAssistantStart(*currentMsgID) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) case "content_block_delta": if se.Delta != nil && se.Delta.Text != "" { currentText.WriteString(se.Delta.Text) fragment := FragmentAssistantChunk(*currentMsgID, se.Delta.Text) sess.AddMessage(ChatMessage{ Role: "assistant", Content: fragment, Timestamp: time.Now(), }) } } } func renderMarkdown(text string) string { return RenderMD(text) }