KAOS/code/internal/server/events.go
djuka ddc54e739a T16: SSE auto-refresh dashboarda sa polling i hash detekcijom
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 13:09:12 +00:00

154 lines
3.2 KiB
Go

package server
import (
"crypto/sha256"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/dal/kaos/internal/supervisor"
)
// eventBroker manages SSE connections and state change detection.
type eventBroker struct {
mu sync.RWMutex
clients map[chan string]bool
lastHash string
tasksDir string
stopCh chan struct{}
}
// newEventBroker creates a new SSE event broker.
func newEventBroker(tasksDir string) *eventBroker {
return &eventBroker{
clients: make(map[chan string]bool),
tasksDir: tasksDir,
stopCh: make(chan struct{}),
}
}
// subscribe adds a client to receive events.
func (eb *eventBroker) subscribe() chan string {
ch := make(chan string, 10)
eb.mu.Lock()
eb.clients[ch] = true
eb.mu.Unlock()
return ch
}
// unsubscribe removes a client.
func (eb *eventBroker) unsubscribe(ch chan string) {
eb.mu.Lock()
delete(eb.clients, ch)
close(ch)
eb.mu.Unlock()
}
// broadcast sends an event to all connected clients.
func (eb *eventBroker) broadcast(data string) {
eb.mu.RLock()
defer eb.mu.RUnlock()
for ch := range eb.clients {
select {
case ch <- data:
default:
// Skip slow clients
}
}
}
// hasClients returns true if there are active listeners.
func (eb *eventBroker) hasClients() bool {
eb.mu.RLock()
defer eb.mu.RUnlock()
return len(eb.clients) > 0
}
// startPolling begins the 2-second polling loop for state changes.
func (eb *eventBroker) startPolling(renderFn func() string) {
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-eb.stopCh:
return
case <-ticker.C:
if !eb.hasClients() {
continue
}
eb.checkAndBroadcast(renderFn)
}
}
}()
}
// checkAndBroadcast checks for state changes and broadcasts if changed.
func (eb *eventBroker) checkAndBroadcast(renderFn func() string) {
hash := eb.computeHash()
eb.mu.Lock()
changed := hash != eb.lastHash
eb.lastHash = hash
eb.mu.Unlock()
if changed {
html := renderFn()
eb.broadcast(html)
}
}
// computeHash creates a hash of the current task state.
func (eb *eventBroker) computeHash() string {
tasks, err := supervisor.ScanTasks(eb.tasksDir)
if err != nil {
return ""
}
return hashTaskState(tasks)
}
// hashTaskState creates a deterministic hash of task IDs and their statuses.
func hashTaskState(tasks []supervisor.Task) string {
pairs := make([]string, len(tasks))
for i, t := range tasks {
pairs[i] = t.ID + ":" + t.Status
}
sort.Strings(pairs)
combined := strings.Join(pairs, "|")
h := sha256.Sum256([]byte(combined))
return fmt.Sprintf("%x", h[:8])
}
// handleEvents serves the SSE event stream.
func (s *Server) handleEvents(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
ch := s.events.subscribe()
defer s.events.unsubscribe(ch)
notify := c.Request.Context().Done()
// Send initial keepalive
fmt.Fprintf(c.Writer, ": keepalive\n\n")
c.Writer.Flush()
for {
select {
case <-notify:
return
case data, ok := <-ch:
if !ok {
return
}
fmt.Fprintf(c.Writer, "event: taskUpdate\ndata: %s\n\n", data)
c.Writer.Flush()
}
}
}