From integrating Apache Kafka at TCS for NSDL/CDSL depository holding feeds, I've learned patterns and best practices for building robust real-time systems. Here's a comprehensive guide to Kafka integration patterns.

Architecture: NSDL/CDSL Depository Feed Processing

1. Data Sources

NSDL Depository Feed

CDSL Depository Feed

Exchange Data (NSE/BSE)

2. Kafka Producers

GoLang Services

Schema Registry

Idempotent Producers

3. Kafka Topics

holding-updates

order-events

audit-logs

dead-letter

4. Kafka Consumers

Consumer Groups

Exactly-Once Processing

Parallel Consumers

5. Sinks

Oracle Database

Redis Cache

Monitoring System

Analytics Pipeline

1. Producer Patterns

1.1 Idempotent Producer Implementation

// GoLang Kafka Producer with idempotence
package main

import (
    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/snappy"
    "context"
    "time"
)

type IdempotentProducer struct {
    writer *kafka.Writer
}

func NewIdempotentProducer(brokers []string, topic string) *IdempotentProducer {
    return &IdempotentProducer{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        topic,
            Balancer:     &kafka.LeastBytes{},
            RequiredAcks: kafka.RequireAll, // Wait for all replicas
            Async:        false,           // Synchronous production
            Compression:  snappy.NewCompression(),
            BatchTimeout: 50 * time.Millisecond,
            BatchSize:    100,
            
            // Idempotence configuration
            Transport: &kafka.Transport{
                Idempotent: true,
                TransactionalID: "producer-1",
            },
        },
    }
}

func (p *IdempotentProducer) Produce(ctx context.Context, key, value []byte) error {
    message := kafka.Message{
        Key:   key,
        Value: value,
        Time:  time.Now(),
        Headers: []kafka.Header{
            {Key: "producer-id", Value: []byte("holding-service")},
            {Key: "message-id", Value: []byte(generateUUID())},
        },
    }
    
    // Transactional production
    err := p.writer.WriteMessages(ctx, message)
    if err != nil {
        // Implement retry logic with exponential backoff
        return p.retryProduce(ctx, message, err)
    }
    
    return nil
}

func (p *IdempotentProducer) retryProduce(ctx context.Context, msg kafka.Message, err error) error {
    backoff := time.Millisecond * 100
    maxRetries := 5
    
    for i := 0; i < maxRetries; i++ {
        time.Sleep(backoff)
        if err := p.writer.WriteMessages(ctx, msg); err == nil {
            return nil
        }
        backoff *= 2
    }
    
    return err
}

1.2 Schema Evolution with Avro

// Avro schema management for holding updates
package schemas

// HoldingUpdate Avro schema
const HoldingUpdateSchema = `{
  "type": "record",
  "name": "HoldingUpdate",
  "namespace": "com.tcs.depository",
  "fields": [
    {"name": "clientId", "type": "string"},
    {"name": "isin", "type": "string"},
    {"name": "quantity", "type": "long"},
    {"name": "averagePrice", "type": "double"},
    {"name": "updateType", "type": {"type": "enum", "name": "UpdateType", "symbols": ["BUY", "SELL", "SPLIT", "BONUS"]}},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "version", "type": "int", "default": 1}
  ]
}`

// Schema evolution: Adding new field (backward compatible)
const HoldingUpdateSchemaV2 = `{
  "type": "record",
  "name": "HoldingUpdate",
  "namespace": "com.tcs.depository",
  "fields": [
    {"name": "clientId", "type": "string"},
    {"name": "isin", "type": "string"},
    {"name": "quantity", "type": "long"},
    {"name": "averagePrice", "type": "double"},
    {"name": "updateType", "type": {"type": "enum", "name": "UpdateType", "symbols": ["BUY", "SELL", "SPLIT", "BONUS", "MERGER"]}},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "version", "type": "int", "default": 2},
    {"name": "exchange", "type": ["null", "string"], "default": null}  // New optional field
  ]
}`

// Schema registry client
type SchemaRegistryClient struct {
    registryURL string
    client      *http.Client
}

func (src *SchemaRegistryClient) GetSchema(schemaID int) (string, error) {
    resp, err := src.client.Get(fmt.Sprintf("%s/schemas/ids/%d", src.registryURL, schemaID))
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()
    
    var schemaResp struct {
        Schema string `json:"schema"`
    }
    
    if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
        return "", err
    }
    
    return schemaResp.Schema, nil
}

2. Consumer Patterns

2.1 Exactly-Once Processing

// Exactly-once consumer with idempotent processing
package main

import (
    "github.com/segmentio/kafka-go"
    "context"
    "sync"
    "time"
)

type ExactlyOnceConsumer struct {
    reader      *kafka.Reader
    db          *sql.DB
    processed   map[string]bool // In-memory deduplication
    mu          sync.RWMutex
}

func NewExactlyOnceConsumer(brokers []string, topic, groupID string) *ExactlyOnceConsumer {
    return &ExactlyOnceConsumer{
        reader: kafka.NewReader(kafka.ReaderConfig{
            Brokers:        brokers,
            Topic:          topic,
            GroupID:        groupID,
            MinBytes:       10e3, // 10KB
            MaxBytes:       10e6, // 10MB
            CommitInterval: time.Second,
            StartOffset:    kafka.FirstOffset,
            
            // Enable idempotent consumption
            IsolationLevel: kafka.ReadCommitted,
        }),
        processed: make(map[string]bool),
    }
}

func (c *ExactlyOnceConsumer) ProcessMessages(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            msg, err := c.reader.FetchMessage(ctx)
            if err != nil {
                return err
            }
            
            // Check if already processed
            messageID := string(msg.Headers.Get("message-id"))
            c.mu.RLock()
            if c.processed[messageID] {
                c.mu.RUnlock()
                c.reader.CommitMessages(ctx, msg)
                continue
            }
            c.mu.RUnlock()
            
            // Process with idempotent operation
            if err := c.processMessage(msg); err != nil {
                // Send to dead-letter queue
                c.sendToDLQ(msg, err)
                continue
            }
            
            // Mark as processed
            c.mu.Lock()
            c.processed[messageID] = true
            c.mu.Unlock()
            
            // Commit offset
            if err := c.reader.CommitMessages(ctx, msg); err != nil {
                return err
            }
        }
    }
}

func (c *ExactlyOnceConsumer) processMessage(msg kafka.Message) error {
    // Idempotent database operation
    tx, err := c.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // Use UPSERT to ensure idempotence
    _, err = tx.Exec(`
        INSERT INTO holding_updates 
        (message_id, client_id, isin, quantity, processed_at)
        VALUES ($1, $2, $3, $4, $5)
        ON CONFLICT (message_id) DO NOTHING
    `, string(msg.Headers.Get("message-id")),
       // Parse and extract data from msg.Value
    )
    
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

2.2 Consumer Group Rebalancing Strategies

// Custom rebalancing strategy
type StickyAssignor struct {
    memberID      string
    generationID  int32
    assignments   map[string][]int32
}

func (sa *StickyAssignor) Assign(members map[string]kafka.GroupMember, topics map[string][]int32) (map[string][]int32, error) {
    assignments := make(map[string][]int32)
    
    // Implement sticky assignment logic
    // 1. Preserve existing assignments when possible
    // 2. Minimize partition movement during rebalance
    // 3. Balance load across consumers
    
    for memberID, member := range members {
        // Custom assignment logic based on:
        // - Current assignments
        // - Consumer capacity
        // - Partition affinity
        assignments[memberID] = sa.stickyAssignment(member, topics)
    }
    
    sa.assignments = assignments
    return assignments, nil
}

// Graceful shutdown with cooperative rebalancing
func (c *ExactlyOnceConsumer) Shutdown(ctx context.Context) error {
    // 1. Stop fetching new messages
    c.reader.SetOffset(kafka.LastOffset)
    
    // 2. Complete processing of current batch
    c.waitForCompletion()
    
    // 3. Revoke partitions gracefully
    if err := c.reader.Close(); err != nil {
        return err
    }
    
    // 4. Wait for rebalance to complete
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(30 * time.Second):
        return nil
    }
}

3. Advanced Patterns

3.1 Dead Letter Queue Pattern

// DLQ handler for failed messages
type DLQHandler struct {
    dlqProducer *kafka.Writer
    maxRetries  int
    retryDelay  time.Duration
}

func (dh *DLQHandler) HandleFailure(msg kafka.Message, err error, retryCount int) error {
    if retryCount < dh.maxRetries {
        // Retry with exponential backoff
        delay := dh.retryDelay * time.Duration(math.Pow(2, float64(retryCount)))
        time.Sleep(delay)
        return kafka.ErrRetry
    }
    
    // Send to DLQ
    dlqMsg := kafka.Message{
        Key:   msg.Key,
        Value: msg.Value,
        Headers: append(msg.Headers,
            kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
            kafka.Header{Key: "failure-reason", Value: []byte(err.Error())},
            kafka.Header{Key: "retry-count", Value: []byte(strconv.Itoa(retryCount))},
            kafka.Header{Key: "failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
        ),
    }
    
    return dh.dlqProducer.WriteMessages(context.Background(), dlqMsg)
}

3.2 Change Data Capture (CDC) Pattern

// CDC from Oracle to Kafka
type OracleCDC struct {
    db        *sql.DB
    producer  *kafka.Writer
    lastLSN   string
}

func (cdc *OracleCDC) CaptureChanges(ctx context.Context) error {
    // Query Oracle redo logs or use LogMiner
    rows, err := cdc.db.QueryContext(ctx, `
        SELECT scn, operation, table_name, row_data
        FROM v$logmnr_contents
        WHERE scn > :1
        ORDER BY scn
    `, cdc.lastLSN)
    
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var scn, operation, tableName string
        var rowData []byte
        
        if err := rows.Scan(&scn, &operation, &tableName, &rowData); err != nil {
            return err
        }
        
        // Convert to Kafka message
        message := kafka.Message{
            Key:   []byte(fmt.Sprintf("%s:%s", tableName, scn)),
            Value: rowData,
            Headers: []kafka.Header{
                {Key: "operation", Value: []byte(operation)},
                {Key: "table", Value: []byte(tableName)},
                {Key: "scn", Value: []byte(scn)},
            },
        }
        
        if err := cdc.producer.WriteMessages(ctx, message); err != nil {
            return err
        }
        
        cdc.lastLSN = scn
    }
    
    return rows.Err()
}

3.3 Exactly-Once Sink to Database

// Idempotent sink to Oracle
type OracleSink struct {
    db         *sql.DB
    processed  *lru.Cache // LRU cache for deduplication
    batchSize  int
}

func (sink *OracleSink) WriteBatch(messages []kafka.Message) error {
    tx, err := sink.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    stmt, err := tx.Prepare(`
        MERGE INTO holdings h
        USING (SELECT :1 AS client_id, :2 AS isin FROM dual) src
        ON (h.client_id = src.client_id AND h.isin = src.isin)
        WHEN MATCHED THEN
            UPDATE SET h.quantity = h.quantity + :3,
                      h.updated_at = SYSTIMESTAMP
        WHEN NOT MATCHED THEN
            INSERT (client_id, isin, quantity, created_at)
            VALUES (:1, :2, :3, SYSTIMESTAMP)
    `)
    if err != nil {
        return err
    }
    defer stmt.Close()
    
    for _, msg := range messages {
        // Parse message
        var update HoldingUpdate
        if err := avro.Unmarshal(schema, msg.Value, &update); err != nil {
            continue // Send to DLQ
        }
        
        // Check deduplication
        if sink.processed.Contains(update.MessageID) {
            continue
        }
        
        // Execute upsert
        if _, err := stmt.Exec(update.ClientID, update.ISIN, update.Quantity); err != nil {
            return err
        }
        
        sink.processed.Add(update.MessageID, true)
    }
    
    return tx.Commit()
}

4. Monitoring & Operations

4.1 Kafka Metrics Collection

// Prometheus metrics for Kafka
var (
    messagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "kafka_messages_consumed_total",
        Help: "Total number of messages consumed",
    }, []string{"topic", "consumer_group"})
    
    consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
        Name: "kafka_consumer_lag",
        Help: "Consumer lag in messages",
    }, []string{"topic", "partition", "consumer_group"})
    
    processingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name:    "kafka_message_processing_duration_seconds",
        Help:    "Time taken to process a message",
        Buckets: prometheus.DefBuckets,
    }, []string{"topic", "status"})
)

// Collect consumer metrics
func collectConsumerMetrics(reader *kafka.Reader) {
    go func() {
        for {
            stats := reader.Stats()
            
            consumerLag.WithLabelValues(
                stats.Topic,
                strconv.Itoa(stats.Partition),
                stats.ClientID,
            ).Set(float64(stats.Lag))
            
            time.Sleep(10 * time.Second)
        }
    }()
}

4.2 Alerting Rules

# Prometheus alerting rules for Kafka
groups:
  - name: kafka_alerts
    rules:
      - alert: HighConsumerLag
        expr: kafka_consumer_lag > 10000
        for: 5m
        annotations:
          summary: "High consumer lag detected"
          description: "Consumer group {{ $labels.consumer_group }} has lag of {{ $value }} messages on topic {{ $labels.topic }}"
      
      - alert: ConsumerGroupNotProcessing
        expr: rate(kafka_messages_consumed_total[5m]) == 0
        for: 10m
        annotations:
          summary: "Consumer group not processing messages"
          description: "Consumer group {{ $labels.consumer_group }} has not processed any messages in 10 minutes"
      
      - alert: HighErrorRate
        expr: rate(kafka_message_processing_duration_seconds_count{status="error"}[5m]) / rate(kafka_message_processing_duration_seconds_count[5m]) > 0.1
        for: 5m
        annotations:
          summary: "High error rate in message processing"
          description: "Error rate is {{ $value }}% for topic {{ $labels.topic }}"

5. Performance Tuning

Parameter Default Optimized Impact
batch.size 16KB 1MB Higher throughput, more memory
linger.ms 0 20 Better batching, higher latency
compression.type none snappy Reduced network usage
fetch.min.bytes 1 10KB Reduced broker load
max.poll.records 500 1000 Better throughput
session.timeout.ms 10000 30000 More tolerant to GC pauses

6. Lessons from Production (TCS Experience)

📈 Scaling Patterns

  • Start with single consumer, scale horizontally as needed
  • Use partition keys for ordering guarantees
  • Implement backpressure with pause/resume
  • Monitor consumer group rebalancing time

🔧 Operational Excellence

  • Always use idempotent producers/consumers
  • Implement comprehensive monitoring
  • Have a dead letter queue strategy
  • Test failure scenarios regularly

⚡ Performance Tips

  • Batch database writes
  • Use connection pooling
  • Compress messages when possible
  • Tune OS network buffers

🚀 Production Checklist

  • ✅ Idempotent producers enabled
  • ✅ Exactly-once processing implemented
  • ✅ Schema registry configured
  • ✅ Dead letter queue setup
  • ✅ Comprehensive monitoring
  • ✅ Alerting configured
  • ✅ Consumer lag monitoring
  • ✅ Graceful shutdown handling
  • ✅ Disaster recovery plan

Recommended Resources