summary history files

vendor/github.com/olekukonko/ll/lh/buffered.go
package lh

import (
	"fmt"
	"io"
	"os"
	"runtime"
	"sync"
	"time"

	"github.com/olekukonko/ll/lx"
)

// Buffering holds configuration for the Buffered handler.
type Buffering struct {
	BatchSize     int           // Flush when this many entries are buffered (default: 100)
	FlushInterval time.Duration // Maximum time between flushes (default: 10s)
	MaxBuffer     int           // Maximum buffer size before applying backpressure (default: 1000)
	OnOverflow    func(int)     // Called when buffer reaches MaxBuffer (default: logs warning)
}

// BufferingOpt configures Buffered handler.
type BufferingOpt func(*Buffering)

// WithBatchSize sets the batch size for flushing.
// It specifies the number of log entries to buffer before flushing to the underlying handler.
// Example:
//
//	handler := NewBuffered(textHandler, WithBatchSize(50)) // Flush every 50 entries
func WithBatchSize(size int) BufferingOpt {
	return func(c *Buffering) {
		c.BatchSize = size
	}
}

// WithFlushInterval sets the maximum time between flushes.
// It defines the interval at which buffered entries are flushed, even if the batch size is not reached.
// Example:
//
//	handler := NewBuffered(textHandler, WithFlushInterval(5*time.Second)) // Flush every 5 seconds
func WithFlushInterval(d time.Duration) BufferingOpt {
	return func(c *Buffering) {
		c.FlushInterval = d
	}
}

// WithMaxBuffer sets the maximum buffer size before backpressure.
// It limits the number of entries that can be queued in the channel, triggering overflow handling if exceeded.
// Example:
//
//	handler := NewBuffered(textHandler, WithMaxBuffer(500)) // Allow up to 500 buffered entries
func WithMaxBuffer(size int) BufferingOpt {
	return func(c *Buffering) {
		c.MaxBuffer = size
	}
}

// WithOverflowHandler sets the overflow callback.
// It specifies a function to call when the buffer reaches MaxBuffer, typically for logging or metrics.
// Example:
//
//	handler := NewBuffered(textHandler, WithOverflowHandler(func(n int) { fmt.Printf("Overflow: %d entries\n", n) }))
func WithOverflowHandler(fn func(int)) BufferingOpt {
	return func(c *Buffering) {
		c.OnOverflow = fn
	}
}

// Buffered wraps any Handler to provide buffering capabilities.
// It buffers log entries in a channel and flushes them based on batch size, time interval, or explicit flush.
// The generic type H ensures compatibility with any lx.Handler implementation.
// Thread-safe via channels and sync primitives.
type Buffered[H lx.Handler] struct {
	handler      H              // Underlying handler to process log entries
	config       *Buffering     // Configuration for batching and flushing
	entries      chan *lx.Entry // Channel for buffering log entries
	flushSignal  chan struct{}  // Channel to trigger explicit flushes
	shutdown     chan struct{}  // Channel to signal worker shutdown
	shutdownOnce sync.Once      // Ensures Close is called only once
	wg           sync.WaitGroup // Waits for worker goroutine to finish
}

// NewBuffered creates a new buffered handler that wraps another handler.
// It initializes the handler with default or provided configuration options and starts a worker goroutine.
// Thread-safe via channel operations and finalizer for cleanup.
// Example:
//
//	textHandler := lh.NewTextHandler(os.Stdout)
//	buffered := NewBuffered(textHandler, WithBatchSize(50))
func NewBuffered[H lx.Handler](handler H, opts ...BufferingOpt) *Buffered[H] {
	// Initialize default configuration
	config := &Buffering{
		BatchSize:     100,              // Default: flush every 100 entries
		FlushInterval: 10 * time.Second, // Default: flush every 10 seconds
		MaxBuffer:     1000,             // Default: max 1000 entries in buffer
		OnOverflow: func(count int) { // Default: log overflow to io.Discard
			fmt.Fprintf(io.Discard, "log buffer overflow: %d entries\n", count)
		},
	}

	// Apply provided options
	for _, opt := range opts {
		opt(config)
	}

	// Ensure sane configuration values
	if config.BatchSize < 1 {
		config.BatchSize = 1 // Minimum batch size is 1
	}
	if config.MaxBuffer < config.BatchSize {
		config.MaxBuffer = config.BatchSize * 10 // Ensure buffer is at least 10x batch size
	}
	if config.FlushInterval <= 0 {
		config.FlushInterval = 10 * time.Second // Minimum flush interval is 10s
	}

	// Initialize Buffered handler
	b := &Buffered[H]{
		handler:     handler,                                // Set underlying handler
		config:      config,                                 // Set configuration
		entries:     make(chan *lx.Entry, config.MaxBuffer), // Create buffered channel
		flushSignal: make(chan struct{}, 1),                 // Create single-slot flush signal channel
		shutdown:    make(chan struct{}),                    // Create shutdown signal channel
	}

	// Start worker goroutine
	b.wg.Add(1)
	go b.worker()

	// Set finalizer for cleanup during garbage collection
	runtime.SetFinalizer(b, (*Buffered[H]).Final)
	return b
}

// Handle implements the lx.Handler interface.
// It buffers log entries in the entries channel or triggers a flush on overflow.
// Returns an error if the buffer is full and flush cannot be triggered.
// Thread-safe via non-blocking channel operations.
// Example:
//
//	buffered.Handle(&lx.Entry{Message: "test"}) // Buffers entry or triggers flush
func (b *Buffered[H]) Handle(e *lx.Entry) error {
	select {
	case b.entries <- e: // Buffer entry if channel has space
		return nil
	default: // Handle buffer overflow
		if b.config.OnOverflow != nil {
			b.config.OnOverflow(len(b.entries)) // Call overflow handler
		}
		select {
		case b.flushSignal <- struct{}{}: // Trigger flush if possible
			return fmt.Errorf("log buffer overflow, triggering flush")
		default: // Flush already in progress
			return fmt.Errorf("log buffer overflow and flush already in progress")
		}
	}
}

// Flush triggers an immediate flush of buffered entries.
// It sends a signal to the worker to process all buffered entries.
// If a flush is already pending, it waits briefly and may exit without flushing.
// Thread-safe via non-blocking channel operations.
// Example:
//
//	buffered.Flush() // Flushes all buffered entries
func (b *Buffered[H]) Flush() {
	select {
	case b.flushSignal <- struct{}{}: // Signal worker to flush
	case <-time.After(100 * time.Millisecond): // Timeout if flush is pending
		// Flush already pending
	}
}

// Close flushes any remaining entries and stops the worker.
// It ensures shutdown is performed only once and waits for the worker to finish.
// Thread-safe via sync.Once and WaitGroup.
// Returns nil as it does not produce errors.
// Example:
//
//	buffered.Close() // Flushes entries and stops worker
func (b *Buffered[H]) Close() error {
	b.shutdownOnce.Do(func() {
		close(b.shutdown)            // Signal worker to shut down
		b.wg.Wait()                  // Wait for worker to finish
		runtime.SetFinalizer(b, nil) // Remove finalizer
	})
	return nil
}

// Final ensures remaining entries are flushed during garbage collection.
// It calls Close to flush entries and stop the worker.
// Used as a runtime finalizer to prevent log loss.
// Example (internal usage):
//
//	runtime.SetFinalizer(buffered, (*Buffered[H]).Final)
func (b *Buffered[H]) Final() {
	b.Close()
}

// Config returns the current configuration of the Buffered handler.
// It provides access to BatchSize, FlushInterval, MaxBuffer, and OnOverflow settings.
// Example:
//
//	config := buffered.Config() // Access configuration
func (b *Buffered[H]) Config() *Buffering {
	return b.config
}

// worker processes entries and handles flushing.
// It runs in a goroutine, buffering entries, flushing on batch size, timer, or explicit signal,
// and shutting down cleanly when signaled.
// Thread-safe via channel operations and WaitGroup.
func (b *Buffered[H]) worker() {
	defer b.wg.Done()                                 // Signal completion when worker exits
	batch := make([]*lx.Entry, 0, b.config.BatchSize) // Buffer for batching entries
	ticker := time.NewTicker(b.config.FlushInterval)  // Timer for periodic flushes
	defer ticker.Stop()                               // Clean up ticker
	for {
		select {
		case entry := <-b.entries: // Receive new entry
			batch = append(batch, entry)
			// Flush if batch size is reached
			if len(batch) >= b.config.BatchSize {
				b.flushBatch(batch)
				batch = batch[:0]
			}
		case <-ticker.C: // Periodic flush
			if len(batch) > 0 {
				b.flushBatch(batch)
				batch = batch[:0]
			}
		case <-b.flushSignal: // Explicit flush
			if len(batch) > 0 {
				b.flushBatch(batch)
				batch = batch[:0]
			}
			b.drainRemaining() // Drain all entries from the channel
		case <-b.shutdown: // Shutdown signal
			if len(batch) > 0 {
				b.flushBatch(batch)
			}
			b.drainRemaining() // Flush remaining entries
			return
		}
	}
}

// flushBatch processes a batch of entries through the wrapped handler.
// It writes each entry to the underlying handler, logging any errors to stderr.
// Example (internal usage):
//
//	b.flushBatch([]*lx.Entry{entry1, entry2})
func (b *Buffered[H]) flushBatch(batch []*lx.Entry) {
	for _, entry := range batch {
		// Process each entry through the handler
		if err := b.handler.Handle(entry); err != nil {
			fmt.Fprintf(os.Stderr, "log flush error: %v\n", err) // Log errors to stderr
		}
	}
}

// drainRemaining processes any remaining entries in the channel.
// It flushes all entries from the entries channel to the underlying handler,
// logging any errors to stderr. Used during flush or shutdown.
// Example (internal usage):
//
//	b.drainRemaining() // Flushes all pending entries
func (b *Buffered[H]) drainRemaining() {
	for {
		select {
		case entry := <-b.entries: // Process next entry
			if err := b.handler.Handle(entry); err != nil {
				fmt.Fprintf(os.Stderr, "log drain error: %v\n", err) // Log errors to stderr
			}
		default: // Exit when channel is empty
			return
		}
	}
}