From d041ad8cf2814620bd8b8716492c7cb293d5c5ac Mon Sep 17 00:00:00 2001 From: Petr Krutov Date: Thu, 7 Nov 2024 18:50:55 +0400 Subject: [PATCH] feat(server): kafka tls authorisation --- pkg/output/kafka/client.go | 29 ++++++++++++++++++++++++--- pkg/output/kafka/config.go | 41 ++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go index 76b1e737..5b28d463 100644 --- a/pkg/output/kafka/client.go +++ b/pkg/output/kafka/client.go @@ -1,6 +1,9 @@ package kafka import ( + "crypto/tls" + "crypto/x509" + "fmt" "strings" "github.com/IBM/sarama" @@ -32,12 +35,15 @@ var ( ) func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { - producerConfig := Init(config) + producerConfig, err := Init(config) + if err != nil { + return nil, err + } brokersList := strings.Split(config.Brokers, ",") return sarama.NewSyncProducer(brokersList, producerConfig) } -func Init(config *Config) *sarama.Config { +func Init(config *Config) (*sarama.Config, error) { c := sarama.NewConfig() c.Producer.Flush.Bytes = config.FlushBytes c.Producer.Flush.Messages = config.FlushMessages @@ -45,6 +51,23 @@ func Init(config *Config) *sarama.Config { c.Producer.Retry.Max = config.MaxRetries c.Producer.Return.Successes = true + if config.TLSClientConfig.CertificatePath != "" { + clientCertificate, err := tls.LoadX509KeyPair(config.TLSClientConfig.CertificatePath, config.TLSClientConfig.KeyPath) + if err != nil { + return nil, fmt.Errorf("failed to read client certificate: %w", err) + } + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCertificate}, + } + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM([]byte(config.TLSClientConfig.CACertificate)) + tlsConfig.RootCAs = certPool + + c.Net.TLS.Enable = true + c.Net.TLS.Config = tlsConfig + } + switch config.RequiredAcks { case RequiredAcksNone: c.Producer.RequiredAcks = sarama.NoResponse @@ -74,5 +97,5 @@ func Init(config *Config) *sarama.Config { c.Producer.Partitioner = sarama.NewRandomPartitioner } - return c + return c, nil } diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index 47eed42e..e3c5da02 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -6,17 +6,24 @@ import ( ) type Config struct { - Brokers string `yaml:"brokers"` - Topic string `yaml:"topic"` - TLS bool `yaml:"tls" default:"false"` - MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` - FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"` - FlushMessages int `yaml:"flushMessages" default:"500"` - FlushBytes int `yaml:"flushBytes" default:"1000000"` - MaxRetries int `yaml:"maxRetries" default:"3"` - Compression CompressionStrategy `yaml:"compression" default:"none"` - RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"` - Partitioning PartitionStrategy `yaml:"partitioning" default:"none"` + Brokers string `yaml:"brokers"` + Topic string `yaml:"topic"` + TLS bool `yaml:"tls" default:"false"` + TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"` + MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` + FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"` + FlushMessages int `yaml:"flushMessages" default:"500"` + FlushBytes int `yaml:"flushBytes" default:"1000000"` + MaxRetries int `yaml:"maxRetries" default:"3"` + Compression CompressionStrategy `yaml:"compression" default:"none"` + RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"` + Partitioning PartitionStrategy `yaml:"partitioning" default:"none"` +} + +type TLSClientConfig struct { + CertificatePath string `yaml:"certificatePath"` + KeyPath string `yaml:"keyPath"` + CACertificate string `yaml:"caCertificate"` } func (c *Config) Validate() error { @@ -28,5 +35,17 @@ func (c *Config) Validate() error { return errors.New("topic is required") } + if err := c.TLSClientConfig.Validate(); err != nil { + return err + } + + return nil +} + +func (c *TLSClientConfig) Validate() error { + if c.CertificatePath != "" && c.KeyPath == "" { + return errors.New("client key is required") + } + return nil }