backend/services/http_service.go
package services
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"kd/backend/config"
"kd/backend/logwrap"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
)
type httpsrvService struct {
ctx context.Context
conf config.Config
logger *logwrap.LogWrap
srv *http.Server
}
var httpsrv *httpsrvService
var onceHttpsrv sync.Once
func Httpsrv() *httpsrvService {
if httpsrv == nil {
onceHttpsrv.Do(func() {
httpsrv = &httpsrvService{}
})
}
return httpsrv
}
func (h *httpsrvService) Start(ctx context.Context, conf config.Config, logger *logwrap.LogWrap) {
h.ctx = ctx
h.conf = conf
h.logger = logger
h.startHTTPServer(8081)
}
func (h *httpsrvService) startHTTPServer(port int) {
mux := http.NewServeMux()
mux.HandleFunc("/api/pods/exec", h.HandlePodExecWebSocket) // Updated handler name
mux.HandleFunc("/api/pods/logs", h.HandlePodLogs)
mux.HandleFunc("/api/deployments/logs", h.HandleDeploymentLogs)
h.srv = &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", port),
Handler: mux,
}
go func() {
if err := h.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
h.logger.Error(fmt.Sprintf("HTTP server error: %v\n", err))
}
}()
}
// HandlePodLogs handles the WebSocket connection for streaming pod logs
func (h *httpsrvService) HandlePodLogs(w http.ResponseWriter, r *http.Request) {
// Get query parameters
contextName := r.URL.Query().Get("contextName")
namespace := r.URL.Query().Get("namespace")
pod := r.URL.Query().Get("name")
container := r.URL.Query().Get("container")
tailLinesStr := r.URL.Query().Get("tailLines")
stripColors := r.URL.Query().Get("stripColors") != "false" // Default to true
// Validate parameters
if contextName == "" || namespace == "" || pod == "" {
http.Error(w, "Missing required parameters: cluster, namespace, and name are required", http.StatusBadRequest)
return
}
// Parse tailLines parameter
var tailLines *int64
if tailLinesStr != "" {
tl, err := strconv.ParseInt(tailLinesStr, 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid tailLines parameter: %v", err), http.StatusBadRequest)
return
}
tailLines = &tl
} else {
// Default to 100 lines if not specified
defaultTailLines := int64(100)
tailLines = &defaultTailLines
}
// Upgrade connection to WebSocket
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// Upgrade connection to WebSocket
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to upgrade connection: %v", err), http.StatusInternalServerError)
return
}
defer ws.Close()
// Create pod logs request
podLogOpts := &corev1.PodLogOptions{
Follow: true,
TailLines: tailLines,
Timestamps: true,
}
// Only set container if it's provided
if container != "" {
podLogOpts.Container = container
}
cs, err := h.conf.GetClientSet(contextName)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get context: %v", err), http.StatusInternalServerError)
return
}
req := cs.CoreV1().Pods(namespace).GetLogs(pod, podLogOpts)
// Get the log stream
stream, err := req.Stream(context.Background())
if err != nil {
ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream: %v", err)))
return
}
defer stream.Close()
// Compile regex for ANSI color codes once
ansiRegex := regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)
// Read the logs
reader := bufio.NewReader(stream)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs: %v", err)))
}
return
}
// Strip ANSI color codes if requested
if stripColors {
line = ansiRegex.ReplaceAll(line, []byte{})
}
// Send the line without adding extra newlines
err = ws.WriteMessage(websocket.TextMessage, line)
if err != nil {
log.Printf("Error sending log line via WebSocket: %v", err)
return
}
}
}
// Helper function to create pointer to int64
func int64Ptr(i int64) *int64 {
return &i
}
// TerminalSession represents a terminal session with size queue support
type TerminalSession struct {
ws *websocket.Conn
sizeChan chan remotecommand.TerminalSize
doneChan chan struct{}
}
// Read reads data from the WebSocket
func (t *TerminalSession) Read(p []byte) (int, error) {
_, message, err := t.ws.ReadMessage()
if err != nil {
return 0, err
}
// Check if this is a terminal resize message
if len(message) >= 3 && message[0] == 1 {
// This is a resize message
rows := uint16(message[1])
cols := uint16(message[2])
// Log the resize event
log.Printf("Terminal resize: %dx%d", cols, rows)
// Send the new size to the terminal
t.sizeChan <- remotecommand.TerminalSize{Width: cols, Height: rows}
return 0, nil
}
// Check if this is a JSON resize message (for backward compatibility)
if len(message) > 0 && message[0] == '{' {
var resizeMsg struct {
Type string `json:"type"`
Cols int `json:"cols"`
Rows int `json:"rows"`
}
if err := json.Unmarshal(message, &resizeMsg); err == nil && resizeMsg.Type == "resize" {
// This is a resize message
log.Printf("Terminal resize (JSON): %dx%d", resizeMsg.Cols, resizeMsg.Rows)
t.sizeChan <- remotecommand.TerminalSize{
Width: uint16(resizeMsg.Cols),
Height: uint16(resizeMsg.Rows),
}
return 0, nil
}
}
// Regular data message
copy(p, message)
return len(message), nil
}
// Write writes data to the WebSocket
func (t *TerminalSession) Write(p []byte) (int, error) {
err := t.ws.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}
// Next returns the next terminal size
func (t *TerminalSession) Next() *remotecommand.TerminalSize {
select {
case size := <-t.sizeChan:
return &size
case <-t.doneChan:
return nil
}
}
// Close closes the terminal session
func (t *TerminalSession) Close() {
close(t.doneChan)
}
// HandlePodExecWebSocket handles WebSocket connections for pod exec
func (h *httpsrvService) HandlePodExecWebSocket(w http.ResponseWriter, r *http.Request) {
// Extract query parameters
contextName := r.URL.Query().Get("contextName")
namespace := r.URL.Query().Get("namespace")
podName := r.URL.Query().Get("pod")
containerName := r.URL.Query().Get("container")
// Log the request
h.logger.Debug(fmt.Sprintf("Terminal request: context=%s, namespace=%s, pod=%s, container=%s",
contextName, namespace, podName, containerName))
// Validate parameters
if contextName == "" || namespace == "" || podName == "" {
http.Error(w, "Missing required parameters", http.StatusBadRequest)
return
}
// Get the client for the current context
clientset, err := h.conf.GetClientSet(contextName)
if err != nil {
errMsg := fmt.Sprintf("Failed to get client for context %s: %v", contextName, err)
h.logger.Error(errMsg)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
// Get the rest config
restConfig, err := h.conf.GetRestConfig(contextName)
if err != nil {
errMsg := fmt.Sprintf("Failed to get REST config for context %s: %v", contextName, err)
h.logger.Error(errMsg)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
// Upgrade HTTP connection to WebSocket
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // Allow all origins in development
},
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
errMsg := fmt.Sprintf("Failed to upgrade connection: %v", err)
h.logger.Error(errMsg)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
defer ws.Close()
// Create terminal session
session := &TerminalSession{
ws: ws,
sizeChan: make(chan remotecommand.TerminalSize),
doneChan: make(chan struct{}),
}
defer session.Close()
// Set up exec request
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
// Set query parameters
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: []string{"/bin/sh", "-c", "[ -x /bin/bash ] && exec /bin/bash || exec /bin/sh"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
// Create a context with timeout
ctx, cancel := context.WithTimeout(h.ctx, 10*60*time.Second) // 10 minutes
defer cancel()
// Create SPDY executor
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
if err != nil {
errMsg := fmt.Sprintf("Failed to create executor: %v", err)
h.logger.Error(errMsg)
ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
return
}
// Stream terminal session
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: session,
Stdout: session,
Stderr: session,
Tty: true,
TerminalSizeQueue: session,
})
if err != nil {
errMsg := fmt.Sprintf("Error executing command: %v", err)
h.logger.Error(errMsg)
ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
}
}
// HandleDeploymentLogs streams logs from all pods and all containers in a deployment
func (h *httpsrvService) HandleDeploymentLogs(w http.ResponseWriter, r *http.Request) {
// Get query parameters
contextName := r.URL.Query().Get("contextName")
namespace := r.URL.Query().Get("namespace")
deploymentName := r.URL.Query().Get("name")
// Validate parameters
if contextName == "" || namespace == "" || deploymentName == "" {
http.Error(w, "Missing required parameters", http.StatusBadRequest)
return
}
// Upgrade connection to WebSocket
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to upgrade connection: %v", err), http.StatusInternalServerError)
return
}
defer ws.Close()
// Create a mutex to synchronize access to the WebSocket connection
var wsMutex sync.Mutex
// Safe write function that uses the mutex
safeWrite := func(messageType int, data []byte) error {
wsMutex.Lock()
defer wsMutex.Unlock()
return ws.WriteMessage(messageType, data)
}
// Send initial message
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Getting logs for deployment %s in namespace %s\n", deploymentName, namespace)))
cs, err := h.conf.GetClientSet(contextName)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get client set: %v", err), http.StatusInternalServerError)
return
}
labelSelector := fmt.Sprintf("app=%s", deploymentName)
pods, err := cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting pods: %v\n", err)))
return
}
if len(pods.Items) == 0 {
// Try another common label pattern
labelSelector = fmt.Sprintf("app.kubernetes.io/name=%s", deploymentName)
pods, err = cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting pods with alternate label: %v\n", err)))
return
}
}
// If still no pods found, try matching by name prefix
if len(pods.Items) == 0 {
allPods, err := cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting all pods: %v\n", err)))
return
}
// Filter pods that have the deployment name as a prefix
for _, pod := range allPods.Items {
if strings.HasPrefix(pod.Name, deploymentName+"-") {
pods.Items = append(pods.Items, pod)
}
}
}
if len(pods.Items) == 0 {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("No pods found for deployment %s\n", deploymentName)))
return
}
// Send message about found pods
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Found %d pods for deployment %s\n", len(pods.Items), deploymentName)))
// Create wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a channel to signal when the WebSocket is closed
done := make(chan struct{})
// Monitor WebSocket connection
go func() {
defer close(done)
for {
_, _, err := ws.ReadMessage()
if err != nil {
// WebSocket closed or error
cancel() // Cancel all log streaming
return
}
}
}()
// Stream logs from each pod and each container
for _, pod := range pods.Items {
// For each container in the pod
for _, container := range pod.Spec.Containers {
wg.Add(1)
go func(pod corev1.Pod, containerName string) {
defer wg.Done()
podName := pod.Name
// Send message about which pod/container we're getting logs from
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("\n=== Logs from pod %s, container %s ===\n", podName, containerName)))
// Create pod logs request
podLogOpts := &corev1.PodLogOptions{
Container: containerName,
Follow: true,
TailLines: int64Ptr(100),
Timestamps: true,
}
req := cs.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
stream, err := req.Stream(ctx)
if err != nil {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream for pod %s, container %s: %v\n", podName, containerName, err)))
return
}
defer stream.Close()
// Read the logs
reader := bufio.NewReader(stream)
for {
select {
case <-ctx.Done():
// Context was canceled (WebSocket closed)
return
case <-done:
// WebSocket closed
return
default:
line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs from pod %s, container %s: %v\n", podName, containerName, err)))
}
return
}
// Prefix the line with the pod and container name for clarity
prefixedLine := fmt.Sprintf("[%s/%s] %s", podName, containerName, string(line))
// Send the line
err = safeWrite(websocket.TextMessage, []byte(prefixedLine))
if err != nil {
// WebSocket error, stop streaming
return
}
}
}
}(pod, container.Name)
}
// Also check for init containers
for _, container := range pod.Spec.InitContainers {
wg.Add(1)
go func(pod corev1.Pod, containerName string) {
defer wg.Done()
podName := pod.Name
// Send message about which pod/container we're getting logs from
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("\n=== Logs from pod %s, init container %s ===\n", podName, containerName)))
// Create pod logs request
podLogOpts := &corev1.PodLogOptions{
Container: containerName,
Follow: false, // Init containers don't need follow as they're completed
TailLines: int64Ptr(100),
Timestamps: true,
}
req := cs.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
stream, err := req.Stream(ctx)
if err != nil {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream for pod %s, init container %s: %v\n", podName, containerName, err)))
return
}
defer stream.Close()
// Read the logs
reader := bufio.NewReader(stream)
for {
select {
case <-ctx.Done():
// Context was canceled (WebSocket closed)
return
case <-done:
// WebSocket closed
return
default:
line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs from pod %s, init container %s: %v\n", podName, containerName, err)))
}
return
}
// Prefix the line with the pod and container name for clarity
prefixedLine := fmt.Sprintf("[%s/%s] %s", podName, containerName, string(line))
// Send the line
err = safeWrite(websocket.TextMessage, []byte(prefixedLine))
if err != nil {
// WebSocket error, stop streaming
return
}
}
}
}(pod, container.Name)
}
}
// Wait for all log streaming to finish
wg.Wait()
}