package server import ( "crypto/sha256" "fmt" "sort" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/dal/kaos/internal/supervisor" ) // eventBroker manages SSE connections and state change detection. type eventBroker struct { mu sync.RWMutex clients map[chan string]bool lastHash string tasksDir string stopCh chan struct{} } // newEventBroker creates a new SSE event broker. func newEventBroker(tasksDir string) *eventBroker { return &eventBroker{ clients: make(map[chan string]bool), tasksDir: tasksDir, stopCh: make(chan struct{}), } } // subscribe adds a client to receive events. func (eb *eventBroker) subscribe() chan string { ch := make(chan string, 10) eb.mu.Lock() eb.clients[ch] = true eb.mu.Unlock() return ch } // unsubscribe removes a client. func (eb *eventBroker) unsubscribe(ch chan string) { eb.mu.Lock() delete(eb.clients, ch) close(ch) eb.mu.Unlock() } // broadcast sends an event to all connected clients. func (eb *eventBroker) broadcast(data string) { eb.mu.RLock() defer eb.mu.RUnlock() for ch := range eb.clients { select { case ch <- data: default: // Skip slow clients } } } // hasClients returns true if there are active listeners. func (eb *eventBroker) hasClients() bool { eb.mu.RLock() defer eb.mu.RUnlock() return len(eb.clients) > 0 } // startPolling begins the 2-second polling loop for state changes. func (eb *eventBroker) startPolling(renderFn func() string) { go func() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-eb.stopCh: return case <-ticker.C: if !eb.hasClients() { continue } eb.checkAndBroadcast(renderFn) } } }() } // checkAndBroadcast checks for state changes and broadcasts if changed. func (eb *eventBroker) checkAndBroadcast(renderFn func() string) { hash := eb.computeHash() eb.mu.Lock() changed := hash != eb.lastHash eb.lastHash = hash eb.mu.Unlock() if changed { html := renderFn() eb.broadcast(html) } } // computeHash creates a hash of the current task state. func (eb *eventBroker) computeHash() string { tasks, err := supervisor.ScanTasks(eb.tasksDir) if err != nil { return "" } return hashTaskState(tasks) } // hashTaskState creates a deterministic hash of task IDs and their statuses. func hashTaskState(tasks []supervisor.Task) string { pairs := make([]string, len(tasks)) for i, t := range tasks { pairs[i] = t.ID + ":" + t.Status } sort.Strings(pairs) combined := strings.Join(pairs, "|") h := sha256.Sum256([]byte(combined)) return fmt.Sprintf("%x", h[:8]) } // handleEvents serves the SSE event stream. func (s *Server) handleEvents(c *gin.Context) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") ch := s.events.subscribe() defer s.events.unsubscribe(ch) notify := c.Request.Context().Done() // Send initial keepalive fmt.Fprintf(c.Writer, ": keepalive\n\n") c.Writer.Flush() for { select { case <-notify: return case data, ok := <-ch: if !ok { return } fmt.Fprintf(c.Writer, "event: taskUpdate\ndata: %s\n\n", data) c.Writer.Flush() } } }