154 lines
3.2 KiB
Go
154 lines
3.2 KiB
Go
package server
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
|
|
"github.com/dal/kaos/internal/supervisor"
|
|
)
|
|
|
|
// eventBroker manages SSE connections and state change detection.
|
|
type eventBroker struct {
|
|
mu sync.RWMutex
|
|
clients map[chan string]bool
|
|
lastHash string
|
|
tasksDir string
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// newEventBroker creates a new SSE event broker.
|
|
func newEventBroker(tasksDir string) *eventBroker {
|
|
return &eventBroker{
|
|
clients: make(map[chan string]bool),
|
|
tasksDir: tasksDir,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// subscribe adds a client to receive events.
|
|
func (eb *eventBroker) subscribe() chan string {
|
|
ch := make(chan string, 10)
|
|
eb.mu.Lock()
|
|
eb.clients[ch] = true
|
|
eb.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
// unsubscribe removes a client.
|
|
func (eb *eventBroker) unsubscribe(ch chan string) {
|
|
eb.mu.Lock()
|
|
delete(eb.clients, ch)
|
|
close(ch)
|
|
eb.mu.Unlock()
|
|
}
|
|
|
|
// broadcast sends an event to all connected clients.
|
|
func (eb *eventBroker) broadcast(data string) {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
for ch := range eb.clients {
|
|
select {
|
|
case ch <- data:
|
|
default:
|
|
// Skip slow clients
|
|
}
|
|
}
|
|
}
|
|
|
|
// hasClients returns true if there are active listeners.
|
|
func (eb *eventBroker) hasClients() bool {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
return len(eb.clients) > 0
|
|
}
|
|
|
|
// startPolling begins the 2-second polling loop for state changes.
|
|
func (eb *eventBroker) startPolling(renderFn func() string) {
|
|
go func() {
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-eb.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
if !eb.hasClients() {
|
|
continue
|
|
}
|
|
eb.checkAndBroadcast(renderFn)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// checkAndBroadcast checks for state changes and broadcasts if changed.
|
|
func (eb *eventBroker) checkAndBroadcast(renderFn func() string) {
|
|
hash := eb.computeHash()
|
|
eb.mu.Lock()
|
|
changed := hash != eb.lastHash
|
|
eb.lastHash = hash
|
|
eb.mu.Unlock()
|
|
|
|
if changed {
|
|
html := renderFn()
|
|
eb.broadcast(html)
|
|
}
|
|
}
|
|
|
|
// computeHash creates a hash of the current task state.
|
|
func (eb *eventBroker) computeHash() string {
|
|
tasks, err := supervisor.ScanTasks(eb.tasksDir)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return hashTaskState(tasks)
|
|
}
|
|
|
|
// hashTaskState creates a deterministic hash of task IDs and their statuses.
|
|
func hashTaskState(tasks []supervisor.Task) string {
|
|
pairs := make([]string, len(tasks))
|
|
for i, t := range tasks {
|
|
pairs[i] = t.ID + ":" + t.Status
|
|
}
|
|
sort.Strings(pairs)
|
|
combined := strings.Join(pairs, "|")
|
|
h := sha256.Sum256([]byte(combined))
|
|
return fmt.Sprintf("%x", h[:8])
|
|
}
|
|
|
|
// handleEvents serves the SSE event stream.
|
|
func (s *Server) handleEvents(c *gin.Context) {
|
|
c.Header("Content-Type", "text/event-stream")
|
|
c.Header("Cache-Control", "no-cache")
|
|
c.Header("Connection", "keep-alive")
|
|
|
|
ch := s.events.subscribe()
|
|
defer s.events.unsubscribe(ch)
|
|
|
|
notify := c.Request.Context().Done()
|
|
|
|
// Send initial keepalive
|
|
fmt.Fprintf(c.Writer, ": keepalive\n\n")
|
|
c.Writer.Flush()
|
|
|
|
for {
|
|
select {
|
|
case <-notify:
|
|
return
|
|
case data, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
fmt.Fprintf(c.Writer, "event: taskUpdate\ndata: %s\n\n", data)
|
|
c.Writer.Flush()
|
|
}
|
|
}
|
|
}
|