From 80cf1d73ce2117295f5103684ba325e1e3925faf Mon Sep 17 00:00:00 2001 From: djuka Date: Fri, 20 Feb 2026 14:50:01 +0000 Subject: [PATCH] Fix: Operater mod koristi claude CLI umesto API poziva Co-Authored-By: Claude Opus 4.6 --- code/internal/server/server_test.go | 18 +- code/internal/server/submit.go | 349 ++++++++++++---------------- code/web/templates/submit.html | 10 +- 3 files changed, 159 insertions(+), 218 deletions(-) diff --git a/code/internal/server/server_test.go b/code/internal/server/server_test.go index 4b0616f..0fa1aef 100644 --- a/code/internal/server/server_test.go +++ b/code/internal/server/server_test.go @@ -1452,29 +1452,29 @@ func TestSimpleSubmit_DefaultPriority(t *testing.T) { } } -func TestChatSubmit_NoAPIKey(t *testing.T) { +func TestChatSubmit_ReturnsChatID(t *testing.T) { srv := setupTestServer(t) - // Ensure no API key is set - os.Unsetenv("ANTHROPIC_API_KEY") - body := `{"message":"test poruka"}` req := httptest.NewRequest(http.MethodPost, "/submit/chat", strings.NewReader(body)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() srv.Router.ServeHTTP(w, req) - if w.Code != http.StatusServiceUnavailable { - t.Fatalf("expected 503 without API key, got %d: %s", w.Code, w.Body.String()) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["chat_id"] == nil || resp["chat_id"] == "" { + t.Error("expected non-empty chat_id in response") } } func TestChatSubmit_EmptyMessage(t *testing.T) { srv := setupTestServer(t) - os.Setenv("ANTHROPIC_API_KEY", "test-key") - defer os.Unsetenv("ANTHROPIC_API_KEY") - body := `{"message":""}` req := httptest.NewRequest(http.MethodPost, "/submit/chat", strings.NewReader(body)) req.Header.Set("Content-Type", "application/json") diff --git a/code/internal/server/submit.go b/code/internal/server/submit.go index 0483255..bf73692 100644 --- a/code/internal/server/submit.go +++ b/code/internal/server/submit.go @@ -1,15 +1,14 @@ // Package server — submit.go handles task submission in two modes: -// client (simple form) and operator (chat with Claude API). +// client (simple form) and operator (chat via claude CLI). package server import ( "bufio" - "bytes" - "encoding/json" "fmt" "io" "net/http" "os" + "os/exec" "path/filepath" "regexp" "strconv" @@ -22,22 +21,16 @@ import ( "github.com/dal/kaos/internal/supervisor" ) -// chatState manages an operator chat session. +// chatState manages an operator chat session backed by a claude CLI process. type chatState struct { mu sync.Mutex id string - messages []chatMessage - response string + cmd *exec.Cmd + output []string done bool listeners map[chan string]bool } -// chatMessage represents a single message in the chat. -type chatMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - // nextTaskNumber finds the highest T{XX} number across all tasks and returns the next one. func nextTaskNumber(tasksDir string) (string, error) { tasks, err := supervisor.ScanTasks(tasksDir) @@ -119,11 +112,10 @@ func (s *Server) handleSimpleSubmit(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok", "task_id": taskID}) } -// handleChatSubmit handles an operator chat message by calling the Claude API. +// handleChatSubmit spawns a claude CLI process with the operator's message. func (s *Server) handleChatSubmit(c *gin.Context) { var req struct { Message string `json:"message"` - ChatID string `json:"chat_id,omitempty"` } if err := c.BindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "neispravan zahtev"}) @@ -135,55 +127,122 @@ func (s *Server) handleChatSubmit(c *gin.Context) { return } - apiKey := os.Getenv("ANTHROPIC_API_KEY") - if apiKey == "" { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ANTHROPIC_API_KEY nije podešen"}) - return - } - - // Get or create chat session - chatID := req.ChatID - var chat *chatState - - s.chatMu.Lock() - if chatID != "" { - chat = s.chats[chatID] - } - if chat == nil { - chatID = s.console.nextExecID() - chat = &chatState{ - id: chatID, - listeners: make(map[chan string]bool), - } - s.chats[chatID] = chat - } - s.chatMu.Unlock() - - // Add user message and reset response state - chat.mu.Lock() - chat.messages = append(chat.messages, chatMessage{Role: "user", Content: req.Message}) - chat.done = false - chat.response = "" - chat.mu.Unlock() - - // Build system prompt with task context + // Build prompt with context tasks, _ := supervisor.ScanTasks(s.Config.TasksDir) context := buildTaskContext(tasks) projectRoot := filepath.Dir(s.Config.TasksDir) claudeMD, _ := os.ReadFile(filepath.Join(projectRoot, "CLAUDE.md")) - systemPrompt := string(claudeMD) + "\n\n## Trenutno stanje taskova\n\n" + context + - "\n\n## Tvoja uloga\n\nTi si KAOS mastermind. Operater ti govori šta treba. " + - "Predloži task u markdown formatu ili odgovori na pitanje. " + - "Ako operater kaže 'kreiraj task', generiši task markdown u standardnom KAOS formatu." + prompt := fmt.Sprintf("Kontekst (CLAUDE.md):\n%s\n\nTrenutni taskovi:\n%s\n\nOperater kaže:\n%s", + string(claudeMD), context, req.Message) - go s.callClaudeAPI(chat, apiKey, systemPrompt) + // Create chat session + chatID := s.console.nextExecID() + chat := &chatState{ + id: chatID, + listeners: make(map[chan string]bool), + } + + s.chatMu.Lock() + s.chats[chatID] = chat + s.chatMu.Unlock() + + // Spawn claude CLI process in background + go s.runChatCommand(chat, prompt) c.JSON(http.StatusOK, gin.H{"chat_id": chatID}) } -// handleChatStream streams the Claude API response via SSE. +// runChatCommand executes claude CLI and streams output to chat listeners. +func (s *Server) runChatCommand(chat *chatState, prompt string) { + cmd := exec.Command("claude", "--dangerously-skip-permissions", "-p", prompt) + cmd.Dir = s.projectRoot() + + stdout, err := cmd.StdoutPipe() + if err != nil { + sendChatLine(chat, "[greška: "+err.Error()+"]") + finishChat(chat) + return + } + + stderr, err := cmd.StderrPipe() + if err != nil { + sendChatLine(chat, "[greška: "+err.Error()+"]") + finishChat(chat) + return + } + + chat.mu.Lock() + chat.cmd = cmd + chat.mu.Unlock() + + if err := cmd.Start(); err != nil { + sendChatLine(chat, "[greška pri pokretanju: "+err.Error()+"]") + finishChat(chat) + return + } + + // Read stdout and stderr concurrently + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + chatStreamReader(chat, stdout) + }() + + go func() { + defer wg.Done() + chatStreamReader(chat, stderr) + }() + + wg.Wait() + cmd.Wait() + finishChat(chat) +} + +// chatStreamReader reads from a reader line by line and sends to chat listeners. +func chatStreamReader(chat *chatState, reader io.Reader) { + scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, 64*1024), 256*1024) + for scanner.Scan() { + sendChatLine(chat, scanner.Text()) + } +} + +// sendChatLine sends a line to all chat listeners and stores in output buffer. +func sendChatLine(chat *chatState, line string) { + chat.mu.Lock() + defer chat.mu.Unlock() + + chat.output = append(chat.output, line) + + for ch := range chat.listeners { + select { + case ch <- line: + default: + } + } +} + +// finishChat marks a chat as done and signals all listeners. +func finishChat(chat *chatState) { + chat.mu.Lock() + defer chat.mu.Unlock() + + chat.done = true + chat.cmd = nil + + for ch := range chat.listeners { + select { + case ch <- "[DONE]": + default: + } + } +} + +// handleChatStream serves an SSE stream for a chat session. func (s *Server) handleChatStream(c *gin.Context) { chatID := c.Param("id") @@ -200,168 +259,48 @@ func (s *Server) handleChatStream(c *gin.Context) { c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") - ch := make(chan string, 64) + ch := make(chan string, 100) + chat.mu.Lock() + // Replay buffered output + for _, line := range chat.output { + fmt.Fprintf(c.Writer, "data: %s\n\n", line) + } + c.Writer.Flush() + + // If already done, send done event and return + if chat.done { + chat.mu.Unlock() + fmt.Fprintf(c.Writer, "event: done\ndata: finished\n\n") + c.Writer.Flush() + return + } + chat.listeners[ch] = true - // Replay existing response if available - if chat.response != "" { - ch <- chat.response - } - isDone := chat.done chat.mu.Unlock() - if isDone { - c.SSEvent("done", "complete") - return - } + defer func() { + chat.mu.Lock() + delete(chat.listeners, ch) + chat.mu.Unlock() + }() - c.Stream(func(w io.Writer) bool { + notify := c.Request.Context().Done() + + for { select { - case data, ok := <-ch: - if !ok { - c.SSEvent("done", "complete") - return false - } - if data == "__DONE__" { - c.SSEvent("done", "complete") - chat.mu.Lock() - delete(chat.listeners, ch) - chat.mu.Unlock() - return false - } - c.SSEvent("message", data) - return true - case <-c.Request.Context().Done(): - chat.mu.Lock() - delete(chat.listeners, ch) - chat.mu.Unlock() - return false - } - }) -} - -// callClaudeAPI calls the Anthropic Messages API with streaming and relays text to listeners. -func (s *Server) callClaudeAPI(chat *chatState, apiKey, systemPrompt string) { - chat.mu.Lock() - messages := make([]chatMessage, len(chat.messages)) - copy(messages, chat.messages) - chat.mu.Unlock() - - // Build API request body - apiMessages := make([]map[string]string, len(messages)) - for i, m := range messages { - apiMessages[i] = map[string]string{"role": m.Role, "content": m.Content} - } - - body := map[string]interface{}{ - "model": "claude-sonnet-4-6", - "max_tokens": 4096, - "system": systemPrompt, - "messages": apiMessages, - "stream": true, - } - - jsonBody, _ := json.Marshal(body) - - req, err := http.NewRequest("POST", "https://api.anthropic.com/v1/messages", bytes.NewReader(jsonBody)) - if err != nil { - broadcastChatError(chat, "HTTP greška: "+err.Error()) - return - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("x-api-key", apiKey) - req.Header.Set("anthropic-version", "2023-06-01") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - broadcastChatError(chat, "API greška: "+err.Error()) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - respBody, _ := io.ReadAll(resp.Body) - broadcastChatError(chat, fmt.Sprintf("API %d: %s", resp.StatusCode, string(respBody))) - return - } - - // Parse SSE stream from Anthropic API - scanner := bufio.NewScanner(resp.Body) - var fullResponse strings.Builder - - for scanner.Scan() { - line := scanner.Text() - if !strings.HasPrefix(line, "data: ") { - continue - } - - data := strings.TrimPrefix(line, "data: ") - if data == "[DONE]" { - break - } - - var event map[string]interface{} - if err := json.Unmarshal([]byte(data), &event); err != nil { - continue - } - - eventType, _ := event["type"].(string) - if eventType == "content_block_delta" { - delta, ok := event["delta"].(map[string]interface{}) - if !ok { - continue - } - text, _ := delta["text"].(string) - if text != "" { - fullResponse.WriteString(text) - broadcastChatText(chat, fullResponse.String()) + case <-notify: + return + case line := <-ch: + if line == "[DONE]" { + fmt.Fprintf(c.Writer, "event: done\ndata: finished\n\n") + c.Writer.Flush() + return } + fmt.Fprintf(c.Writer, "data: %s\n\n", line) + c.Writer.Flush() } } - - // Finalize: save response and signal done - chat.mu.Lock() - chat.response = fullResponse.String() - chat.messages = append(chat.messages, chatMessage{Role: "assistant", Content: chat.response}) - chat.done = true - for ch := range chat.listeners { - select { - case ch <- "__DONE__": - default: - } - } - chat.mu.Unlock() -} - -// broadcastChatText sends the current accumulated text to all listeners. -func broadcastChatText(chat *chatState, text string) { - chat.mu.Lock() - chat.response = text - for ch := range chat.listeners { - select { - case ch <- text: - default: - } - } - chat.mu.Unlock() -} - -// broadcastChatError sends an error message and signals done. -func broadcastChatError(chat *chatState, errMsg string) { - chat.mu.Lock() - chat.done = true - chat.response = "Greška: " + errMsg - for ch := range chat.listeners { - select { - case ch <- "Greška: " + errMsg: - default: - } - select { - case ch <- "__DONE__": - default: - } - } - chat.mu.Unlock() } // buildTaskContext creates a text summary of current tasks for the system prompt. diff --git a/code/web/templates/submit.html b/code/web/templates/submit.html index 31a3766..8d49bab 100644 --- a/code/web/templates/submit.html +++ b/code/web/templates/submit.html @@ -108,7 +108,7 @@ function sendChat() { fetch('/submit/chat', { method: 'POST', headers: {'Content-Type': 'application/json'}, - body: JSON.stringify({message: msg, chat_id: currentChatID}) + body: JSON.stringify({message: msg}) }) .then(function(r) { return r.json(); }) .then(function(data) { @@ -130,14 +130,16 @@ function streamChatResponse(chatID, botId) { var source = new EventSource('/submit/chat/stream/' + chatID); var el = document.getElementById(botId); var input = document.getElementById('chat-input'); + var lines = []; - source.addEventListener('message', function(e) { + source.onmessage = function(e) { if (el) { - el.querySelector('.chat-text').textContent = e.data; + lines.push(e.data); + el.querySelector('.chat-text').textContent = lines.join('\n'); var messages = document.getElementById('chat-messages'); messages.scrollTop = messages.scrollHeight; } - }); + }; source.addEventListener('done', function() { source.close();