KAOS/code/internal/server/ws.go
djuka 510b75c0bf Konzola: dinamičke task sesije sa PTY per task
- Zamena fiksnih 2 sesija sa taskSessionManager (map po task ID)
- "Pusti" pokreće interaktivni claude u PTY, šalje task prompt
- "Proveri" pokreće review claude sesiju za task u review/
- WS se konektuje na postojeću PTY sesiju po task ID-u
- Konzola stranica dinamički prikazuje terminale za aktivne sesije
- Replay buffer za reconnect na postojeće sesije
- Novi testovi za session manager, prompt buildere, review endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 04:32:34 +00:00

129 lines
3.0 KiB
Go

package server
import (
"encoding/json"
"log"
"net/http"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
// wsResizeMsg is sent from the browser when the terminal size changes.
type wsResizeMsg struct {
Type string `json:"type"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// handleConsoleWS handles WebSocket connections for task console terminals.
// Each connection attaches to an existing PTY session by task key.
func (s *Server) handleConsoleWS(c *gin.Context) {
key := c.Param("key") // e.g., "T08" or "T08-review"
sess := s.console.getSessionByKey(key)
if sess == nil {
c.JSON(http.StatusNotFound, gin.H{"error": "sesija nije pronađena"})
return
}
conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WS[%s]: upgrade error: %v", key, err)
return
}
defer conn.Close()
log.Printf("WS[%s]: connected", key)
ptySess := sess.PTY
// Send replay buffer so the user sees existing output
replay := ptySess.GetBuffer()
if len(replay) > 0 {
if err := conn.WriteMessage(websocket.BinaryMessage, replay); err != nil {
log.Printf("WS[%s]: replay write error: %v", key, err)
return
}
log.Printf("WS[%s]: replayed %d bytes", key, len(replay))
}
// Subscribe to new PTY output
subID := key + "-ws"
outputCh := ptySess.Subscribe(subID)
defer ptySess.Unsubscribe(subID)
// Serialized write channel
writeCh := make(chan []byte, 256)
writeDone := make(chan struct{})
go func() {
defer close(writeDone)
for data := range writeCh {
if err := conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
return
}
}
}()
// PTY output → WebSocket
go func() {
for data := range outputCh {
select {
case writeCh <- data:
default:
}
}
}()
// Signal to stop goroutines when read pump exits
stopCh := make(chan struct{})
// Watch for process exit
go func() {
select {
case <-ptySess.Done():
log.Printf("WS[%s]: process exited", key)
conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "done"))
case <-stopCh:
}
}()
// WebSocket → PTY (read pump)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WS[%s]: read error: %v", key, err)
}
break
}
// Check for resize message
var resize wsResizeMsg
if json.Unmarshal(msg, &resize) == nil && resize.Type == "resize" && resize.Cols > 0 && resize.Rows > 0 {
if err := ptySess.Resize(resize.Rows, resize.Cols); err != nil {
log.Printf("WS[%s]: resize error: %v", key, err)
}
continue
}
// Regular keyboard input → PTY
if _, err := ptySess.WriteInput(msg); err != nil {
log.Printf("WS[%s]: write error: %v", key, err)
break
}
}
close(stopCh)
close(writeCh)
<-writeDone
log.Printf("WS[%s]: disconnected", key)
}