KAOS/code/internal/server/pty_session.go
djuka 510b75c0bf Konzola: dinamičke task sesije sa PTY per task
- Zamena fiksnih 2 sesija sa taskSessionManager (map po task ID)
- "Pusti" pokreće interaktivni claude u PTY, šalje task prompt
- "Proveri" pokreće review claude sesiju za task u review/
- WS se konektuje na postojeću PTY sesiju po task ID-u
- Konzola stranica dinamički prikazuje terminale za aktivne sesije
- Replay buffer za reconnect na postojeće sesije
- Novi testovi za session manager, prompt buildere, review endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 04:32:34 +00:00

254 lines
5.7 KiB
Go

package server
import (
"fmt"
"log"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/creack/pty"
)
const (
outputBufferSize = 1024 * 1024 // 1MB ring buffer for replay
)
// RingBuffer is a fixed-size circular buffer for terminal output.
type RingBuffer struct {
data []byte
size int
pos int
full bool
mu sync.Mutex
}
// NewRingBuffer creates a new ring buffer with the given size.
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{data: make([]byte, size), size: size}
}
// Write appends data to the ring buffer.
func (rb *RingBuffer) Write(p []byte) {
rb.mu.Lock()
defer rb.mu.Unlock()
for _, b := range p {
rb.data[rb.pos] = b
rb.pos++
if rb.pos >= rb.size {
rb.pos = 0
rb.full = true
}
}
}
// Bytes returns the buffer contents in correct order.
func (rb *RingBuffer) Bytes() []byte {
rb.mu.Lock()
defer rb.mu.Unlock()
if !rb.full {
result := make([]byte, rb.pos)
copy(result, rb.data[:rb.pos])
return result
}
result := make([]byte, rb.size)
n := copy(result, rb.data[rb.pos:])
copy(result[n:], rb.data[:rb.pos])
return result
}
// Reset clears the buffer.
func (rb *RingBuffer) Reset() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.pos = 0
rb.full = false
}
// consolePTYSession manages a single claude CLI running in a pseudo-terminal.
type consolePTYSession struct {
ID string
Ptmx *os.File
Cmd *exec.Cmd
buffer *RingBuffer
subscribers map[string]chan []byte
mu sync.Mutex
done chan struct{}
lastActive time.Time
}
// spawnTaskPTY starts an interactive claude CLI with auto-permissions in a PTY.
// Used for task work and review sessions. The prompt is sent after startup.
func spawnTaskPTY(projectDir string) (*consolePTYSession, error) {
cmd := exec.Command("claude", "--permission-mode", "dontAsk")
cmd.Dir = projectDir
cmd.Env = cleanEnvForPTY()
ptmx, err := pty.StartWithSize(cmd, &pty.Winsize{Rows: 50, Cols: 180})
if err != nil {
return nil, fmt.Errorf("start pty: %w", err)
}
sess := &consolePTYSession{
ID: fmt.Sprintf("task-%d", time.Now().UnixNano()),
Ptmx: ptmx,
Cmd: cmd,
buffer: NewRingBuffer(outputBufferSize),
subscribers: make(map[string]chan []byte),
done: make(chan struct{}),
lastActive: time.Now(),
}
go sess.readLoop()
go sess.waitExit()
return sess, nil
}
// spawnShellPTY starts an interactive claude CLI in a PTY for the console.
func spawnShellPTY(projectDir string) (*consolePTYSession, error) {
cmd := exec.Command("claude")
cmd.Dir = projectDir
cmd.Env = cleanEnvForPTY()
ptmx, err := pty.StartWithSize(cmd, &pty.Winsize{Rows: 24, Cols: 120})
if err != nil {
return nil, fmt.Errorf("start pty: %w", err)
}
sess := &consolePTYSession{
ID: fmt.Sprintf("shell-%d", time.Now().UnixNano()),
Ptmx: ptmx,
Cmd: cmd,
buffer: NewRingBuffer(outputBufferSize),
subscribers: make(map[string]chan []byte),
done: make(chan struct{}),
lastActive: time.Now(),
}
go sess.readLoop()
go sess.waitExit()
return sess, nil
}
// readLoop reads PTY output, writes to ring buffer, and forwards to subscribers.
func (s *consolePTYSession) readLoop() {
buf := make([]byte, 4096)
totalBytes := 0
for {
n, err := s.Ptmx.Read(buf)
if err != nil {
log.Printf("PTY[%s]: readLoop ended (read %d bytes total, err: %v)", s.ID, totalBytes, err)
return
}
if n == 0 {
continue
}
totalBytes += n
data := make([]byte, n)
copy(data, buf[:n])
s.buffer.Write(data)
s.mu.Lock()
s.lastActive = time.Now()
subs := len(s.subscribers)
for _, ch := range s.subscribers {
select {
case ch <- data:
default:
}
}
s.mu.Unlock()
if totalBytes == n {
// First chunk — log it
log.Printf("PTY[%s]: first output (%d bytes, %d subscribers)", s.ID, n, subs)
}
}
}
// waitExit waits for the CLI process to exit and signals done.
func (s *consolePTYSession) waitExit() {
if s.Cmd.Process != nil {
s.Cmd.Wait()
}
close(s.done)
}
// Subscribe adds a subscriber for PTY output.
func (s *consolePTYSession) Subscribe(id string) chan []byte {
s.mu.Lock()
defer s.mu.Unlock()
ch := make(chan []byte, 256)
s.subscribers[id] = ch
return ch
}
// Unsubscribe removes a subscriber.
func (s *consolePTYSession) Unsubscribe(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if ch, ok := s.subscribers[id]; ok {
close(ch)
delete(s.subscribers, id)
}
}
// Resize changes the PTY terminal size.
func (s *consolePTYSession) Resize(rows, cols uint16) error {
return pty.Setsize(s.Ptmx, &pty.Winsize{Rows: rows, Cols: cols})
}
// WriteInput sends keyboard input to the PTY.
func (s *consolePTYSession) WriteInput(data []byte) (int, error) {
s.mu.Lock()
s.lastActive = time.Now()
s.mu.Unlock()
return s.Ptmx.Write(data)
}
// GetBuffer returns the ring buffer contents for replay.
func (s *consolePTYSession) GetBuffer() []byte {
return s.buffer.Bytes()
}
// Done returns a channel that closes when the process exits.
func (s *consolePTYSession) Done() <-chan struct{} {
return s.done
}
// Close terminates the PTY session.
func (s *consolePTYSession) Close() {
s.mu.Lock()
for id, ch := range s.subscribers {
close(ch)
delete(s.subscribers, id)
}
s.mu.Unlock()
s.Ptmx.Close()
if s.Cmd.Process != nil {
s.Cmd.Process.Kill()
}
}
// cleanEnvForPTY returns environment with proper terminal settings.
func cleanEnvForPTY() []string {
var env []string
for _, e := range os.Environ() {
if strings.HasPrefix(e, "CLAUDECODE=") ||
strings.HasPrefix(e, "CLAUDE_CODE_ENTRYPOINT=") ||
strings.HasPrefix(e, "TERM=") ||
strings.HasPrefix(e, "COLORTERM=") {
continue
}
env = append(env, e)
}
env = append(env, "TERM=xterm-256color", "COLORTERM=truecolor")
return env
}