summary history files

backend/services/http_service.go
package services

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"kd/backend/config"
	"kd/backend/logwrap"
	"log"
	"net/http"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/tools/remotecommand"
)

type httpsrvService struct {
	ctx    context.Context
	conf   config.Config
	logger *logwrap.LogWrap
	srv    *http.Server
}

var httpsrv *httpsrvService
var onceHttpsrv sync.Once

func Httpsrv() *httpsrvService {
	if httpsrv == nil {
		onceHttpsrv.Do(func() {
			httpsrv = &httpsrvService{}
		})
	}
	return httpsrv
}

func (h *httpsrvService) Start(ctx context.Context, conf config.Config, logger *logwrap.LogWrap) {
	h.ctx = ctx
	h.conf = conf
	h.logger = logger
	h.startHTTPServer(8081)
}

func (h *httpsrvService) startHTTPServer(port int) {
	mux := http.NewServeMux()
	mux.HandleFunc("/api/pods/exec", h.HandlePodExecWebSocket) // Updated handler name
	mux.HandleFunc("/api/pods/logs", h.HandlePodLogs)
	mux.HandleFunc("/api/deployments/logs", h.HandleDeploymentLogs)
	h.srv = &http.Server{
		Addr:    fmt.Sprintf("127.0.0.1:%d", port),
		Handler: mux,
	}

	go func() {
		if err := h.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			h.logger.Error(fmt.Sprintf("HTTP server error: %v\n", err))
		}
	}()
}

// HandlePodLogs handles the WebSocket connection for streaming pod logs
func (h *httpsrvService) HandlePodLogs(w http.ResponseWriter, r *http.Request) {
	// Get query parameters
	contextName := r.URL.Query().Get("contextName")
	namespace := r.URL.Query().Get("namespace")
	pod := r.URL.Query().Get("name")
	container := r.URL.Query().Get("container")
	tailLinesStr := r.URL.Query().Get("tailLines")
	stripColors := r.URL.Query().Get("stripColors") != "false" // Default to true

	// Validate parameters
	if contextName == "" || namespace == "" || pod == "" {
		http.Error(w, "Missing required parameters: cluster, namespace, and name are required", http.StatusBadRequest)
		return
	}

	// Parse tailLines parameter
	var tailLines *int64
	if tailLinesStr != "" {
		tl, err := strconv.ParseInt(tailLinesStr, 10, 64)
		if err != nil {
			http.Error(w, fmt.Sprintf("Invalid tailLines parameter: %v", err), http.StatusBadRequest)
			return
		}
		tailLines = &tl
	} else {
		// Default to 100 lines if not specified
		defaultTailLines := int64(100)
		tailLines = &defaultTailLines
	}

	// Upgrade connection to WebSocket
	upgrader := websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}

	// Upgrade connection to WebSocket
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		http.Error(w, fmt.Sprintf("Failed to upgrade connection: %v", err), http.StatusInternalServerError)
		return
	}
	defer ws.Close()

	// Create pod logs request
	podLogOpts := &corev1.PodLogOptions{
		Follow:     true,
		TailLines:  tailLines,
		Timestamps: true,
	}

	// Only set container if it's provided
	if container != "" {
		podLogOpts.Container = container
	}

	cs, err := h.conf.GetClientSet(contextName)
	if err != nil {
		http.Error(w, fmt.Sprintf("Failed to get context: %v", err), http.StatusInternalServerError)
		return
	}

	req := cs.CoreV1().Pods(namespace).GetLogs(pod, podLogOpts)

	// Get the log stream
	stream, err := req.Stream(context.Background())
	if err != nil {
		ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream: %v", err)))
		return
	}
	defer stream.Close()

	// Compile regex for ANSI color codes once
	ansiRegex := regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)

	// Read the logs
	reader := bufio.NewReader(stream)
	for {
		line, err := reader.ReadBytes('\n')
		if err != nil {
			if err != io.EOF {
				ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs: %v", err)))
			}
			return
		}

		// Strip ANSI color codes if requested
		if stripColors {
			line = ansiRegex.ReplaceAll(line, []byte{})
		}

		// Send the line without adding extra newlines
		err = ws.WriteMessage(websocket.TextMessage, line)
		if err != nil {
			log.Printf("Error sending log line via WebSocket: %v", err)
			return
		}
	}
}

// Helper function to create pointer to int64
func int64Ptr(i int64) *int64 {
	return &i
}

// TerminalSession represents a terminal session with size queue support
type TerminalSession struct {
	ws       *websocket.Conn
	sizeChan chan remotecommand.TerminalSize
	doneChan chan struct{}
}

// Read reads data from the WebSocket
func (t *TerminalSession) Read(p []byte) (int, error) {
	_, message, err := t.ws.ReadMessage()
	if err != nil {
		return 0, err
	}

	// Check if this is a terminal resize message
	if len(message) >= 3 && message[0] == 1 {
		// This is a resize message
		rows := uint16(message[1])
		cols := uint16(message[2])

		// Log the resize event
		log.Printf("Terminal resize: %dx%d", cols, rows)

		// Send the new size to the terminal
		t.sizeChan <- remotecommand.TerminalSize{Width: cols, Height: rows}
		return 0, nil
	}

	// Check if this is a JSON resize message (for backward compatibility)
	if len(message) > 0 && message[0] == '{' {
		var resizeMsg struct {
			Type string `json:"type"`
			Cols int    `json:"cols"`
			Rows int    `json:"rows"`
		}

		if err := json.Unmarshal(message, &resizeMsg); err == nil && resizeMsg.Type == "resize" {
			// This is a resize message
			log.Printf("Terminal resize (JSON): %dx%d", resizeMsg.Cols, resizeMsg.Rows)
			t.sizeChan <- remotecommand.TerminalSize{
				Width:  uint16(resizeMsg.Cols),
				Height: uint16(resizeMsg.Rows),
			}
			return 0, nil
		}
	}

	// Regular data message
	copy(p, message)
	return len(message), nil
}

// Write writes data to the WebSocket
func (t *TerminalSession) Write(p []byte) (int, error) {
	err := t.ws.WriteMessage(websocket.BinaryMessage, p)
	if err != nil {
		return 0, err
	}
	return len(p), nil
}

// Next returns the next terminal size
func (t *TerminalSession) Next() *remotecommand.TerminalSize {
	select {
	case size := <-t.sizeChan:
		return &size
	case <-t.doneChan:
		return nil
	}
}

// Close closes the terminal session
func (t *TerminalSession) Close() {
	close(t.doneChan)
}

// HandlePodExecWebSocket handles WebSocket connections for pod exec
func (h *httpsrvService) HandlePodExecWebSocket(w http.ResponseWriter, r *http.Request) {
	// Extract query parameters
	contextName := r.URL.Query().Get("contextName")
	namespace := r.URL.Query().Get("namespace")
	podName := r.URL.Query().Get("pod")
	containerName := r.URL.Query().Get("container")

	// Log the request
	h.logger.Debug(fmt.Sprintf("Terminal request: context=%s, namespace=%s, pod=%s, container=%s",
		contextName, namespace, podName, containerName))

	// Validate parameters
	if contextName == "" || namespace == "" || podName == "" {
		http.Error(w, "Missing required parameters", http.StatusBadRequest)
		return
	}

	// Get the client for the current context
	clientset, err := h.conf.GetClientSet(contextName)
	if err != nil {
		errMsg := fmt.Sprintf("Failed to get client for context %s: %v", contextName, err)
		h.logger.Error(errMsg)
		http.Error(w, errMsg, http.StatusInternalServerError)
		return
	}

	// Get the rest config
	restConfig, err := h.conf.GetRestConfig(contextName)
	if err != nil {
		errMsg := fmt.Sprintf("Failed to get REST config for context %s: %v", contextName, err)
		h.logger.Error(errMsg)
		http.Error(w, errMsg, http.StatusInternalServerError)
		return
	}

	// Upgrade HTTP connection to WebSocket
	upgrader := websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		CheckOrigin: func(r *http.Request) bool {
			return true // Allow all origins in development
		},
	}

	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		errMsg := fmt.Sprintf("Failed to upgrade connection: %v", err)
		h.logger.Error(errMsg)
		http.Error(w, errMsg, http.StatusInternalServerError)
		return
	}
	defer ws.Close()

	// Create terminal session
	session := &TerminalSession{
		ws:       ws,
		sizeChan: make(chan remotecommand.TerminalSize),
		doneChan: make(chan struct{}),
	}
	defer session.Close()

	// Set up exec request
	req := clientset.CoreV1().RESTClient().Post().
		Resource("pods").
		Name(podName).
		Namespace(namespace).
		SubResource("exec")

	// Set query parameters
	req.VersionedParams(&corev1.PodExecOptions{
		Container: containerName,
		Command:   []string{"/bin/sh", "-c", "[ -x /bin/bash ] && exec /bin/bash || exec /bin/sh"},
		Stdin:     true,
		Stdout:    true,
		Stderr:    true,
		TTY:       true,
	}, scheme.ParameterCodec)

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(h.ctx, 10*60*time.Second) // 10 minutes
	defer cancel()

	// Create SPDY executor
	exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
	if err != nil {
		errMsg := fmt.Sprintf("Failed to create executor: %v", err)
		h.logger.Error(errMsg)
		ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
		return
	}

	// Stream terminal session
	err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
		Stdin:             session,
		Stdout:            session,
		Stderr:            session,
		Tty:               true,
		TerminalSizeQueue: session,
	})

	if err != nil {
		errMsg := fmt.Sprintf("Error executing command: %v", err)
		h.logger.Error(errMsg)
		ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
	}
}

// HandleDeploymentLogs streams logs from all pods and all containers in a deployment
func (h *httpsrvService) HandleDeploymentLogs(w http.ResponseWriter, r *http.Request) {
	// Get query parameters
	contextName := r.URL.Query().Get("contextName")
	namespace := r.URL.Query().Get("namespace")
	deploymentName := r.URL.Query().Get("name")

	// Validate parameters
	if contextName == "" || namespace == "" || deploymentName == "" {
		http.Error(w, "Missing required parameters", http.StatusBadRequest)
		return
	}

	// Upgrade connection to WebSocket
	upgrader := websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}

	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		http.Error(w, fmt.Sprintf("Failed to upgrade connection: %v", err), http.StatusInternalServerError)
		return
	}
	defer ws.Close()

	// Create a mutex to synchronize access to the WebSocket connection
	var wsMutex sync.Mutex

	// Safe write function that uses the mutex
	safeWrite := func(messageType int, data []byte) error {
		wsMutex.Lock()
		defer wsMutex.Unlock()
		return ws.WriteMessage(messageType, data)
	}

	// Send initial message
	safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Getting logs for deployment %s in namespace %s\n", deploymentName, namespace)))

	cs, err := h.conf.GetClientSet(contextName)
	if err != nil {
		http.Error(w, fmt.Sprintf("Failed to get client set: %v", err), http.StatusInternalServerError)
		return
	}

	labelSelector := fmt.Sprintf("app=%s", deploymentName)
	pods, err := cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
		LabelSelector: labelSelector,
	})

	if err != nil {
		safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting pods: %v\n", err)))
		return
	}

	if len(pods.Items) == 0 {
		// Try another common label pattern
		labelSelector = fmt.Sprintf("app.kubernetes.io/name=%s", deploymentName)
		pods, err = cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
			LabelSelector: labelSelector,
		})

		if err != nil {
			safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting pods with alternate label: %v\n", err)))
			return
		}
	}

	// If still no pods found, try matching by name prefix
	if len(pods.Items) == 0 {
		allPods, err := cs.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{})
		if err != nil {
			safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting all pods: %v\n", err)))
			return
		}

		// Filter pods that have the deployment name as a prefix
		for _, pod := range allPods.Items {
			if strings.HasPrefix(pod.Name, deploymentName+"-") {
				pods.Items = append(pods.Items, pod)
			}
		}
	}

	if len(pods.Items) == 0 {
		safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("No pods found for deployment %s\n", deploymentName)))
		return
	}

	// Send message about found pods
	safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Found %d pods for deployment %s\n", len(pods.Items), deploymentName)))

	// Create wait group to wait for all goroutines to finish
	var wg sync.WaitGroup

	// Create a context that can be canceled
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Create a channel to signal when the WebSocket is closed
	done := make(chan struct{})

	// Monitor WebSocket connection
	go func() {
		defer close(done)
		for {
			_, _, err := ws.ReadMessage()
			if err != nil {
				// WebSocket closed or error
				cancel() // Cancel all log streaming
				return
			}
		}
	}()

	// Stream logs from each pod and each container
	for _, pod := range pods.Items {
		// For each container in the pod
		for _, container := range pod.Spec.Containers {
			wg.Add(1)

			go func(pod corev1.Pod, containerName string) {
				defer wg.Done()

				podName := pod.Name

				// Send message about which pod/container we're getting logs from
				safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("\n=== Logs from pod %s, container %s ===\n", podName, containerName)))

				// Create pod logs request
				podLogOpts := &corev1.PodLogOptions{
					Container:  containerName,
					Follow:     true,
					TailLines:  int64Ptr(100),
					Timestamps: true,
				}

				req := cs.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
				stream, err := req.Stream(ctx)
				if err != nil {
					safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream for pod %s, container %s: %v\n", podName, containerName, err)))
					return
				}
				defer stream.Close()

				// Read the logs
				reader := bufio.NewReader(stream)
				for {
					select {
					case <-ctx.Done():
						// Context was canceled (WebSocket closed)
						return
					case <-done:
						// WebSocket closed
						return
					default:
						line, err := reader.ReadBytes('\n')
						if err != nil {
							if err != io.EOF {
								safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs from pod %s, container %s: %v\n", podName, containerName, err)))
							}
							return
						}

						// Prefix the line with the pod and container name for clarity
						prefixedLine := fmt.Sprintf("[%s/%s] %s", podName, containerName, string(line))

						// Send the line
						err = safeWrite(websocket.TextMessage, []byte(prefixedLine))
						if err != nil {
							// WebSocket error, stop streaming
							return
						}
					}
				}
			}(pod, container.Name)
		}

		// Also check for init containers
		for _, container := range pod.Spec.InitContainers {
			wg.Add(1)

			go func(pod corev1.Pod, containerName string) {
				defer wg.Done()

				podName := pod.Name

				// Send message about which pod/container we're getting logs from
				safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("\n=== Logs from pod %s, init container %s ===\n", podName, containerName)))

				// Create pod logs request
				podLogOpts := &corev1.PodLogOptions{
					Container:  containerName,
					Follow:     false, // Init containers don't need follow as they're completed
					TailLines:  int64Ptr(100),
					Timestamps: true,
				}

				req := cs.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
				stream, err := req.Stream(ctx)
				if err != nil {
					safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error getting log stream for pod %s, init container %s: %v\n", podName, containerName, err)))
					return
				}
				defer stream.Close()

				// Read the logs
				reader := bufio.NewReader(stream)
				for {
					select {
					case <-ctx.Done():
						// Context was canceled (WebSocket closed)
						return
					case <-done:
						// WebSocket closed
						return
					default:
						line, err := reader.ReadBytes('\n')
						if err != nil {
							if err != io.EOF {
								safeWrite(websocket.TextMessage, []byte(fmt.Sprintf("Error reading logs from pod %s, init container %s: %v\n", podName, containerName, err)))
							}
							return
						}

						// Prefix the line with the pod and container name for clarity
						prefixedLine := fmt.Sprintf("[%s/%s] %s", podName, containerName, string(line))

						// Send the line
						err = safeWrite(websocket.TextMessage, []byte(prefixedLine))
						if err != nil {
							// WebSocket error, stop streaming
							return
						}
					}
				}
			}(pod, container.Name)
		}
	}

	// Wait for all log streaming to finish
	wg.Wait()
}