From integrating Apache Kafka at TCS for NSDL/CDSL depository holding feeds, I've learned patterns and best practices for building robust real-time systems. Here's a comprehensive guide to Kafka integration patterns.
Architecture: NSDL/CDSL Depository Feed Processing
1. Data Sources
NSDL Depository Feed
CDSL Depository Feed
Exchange Data (NSE/BSE)
→
2. Kafka Producers
GoLang Services
Schema Registry
Idempotent Producers
→
3. Kafka Topics
holding-updates
order-events
audit-logs
dead-letter
→
4. Kafka Consumers
Consumer Groups
Exactly-Once Processing
Parallel Consumers
→
5. Sinks
Oracle Database
Redis Cache
Monitoring System
Analytics Pipeline
1. Producer Patterns
1.1 Idempotent Producer Implementation
// GoLang Kafka Producer with idempotence
package main
import (
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"context"
"time"
)
type IdempotentProducer struct {
writer *kafka.Writer
}
func NewIdempotentProducer(brokers []string, topic string) *IdempotentProducer {
return &IdempotentProducer{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireAll, // Wait for all replicas
Async: false, // Synchronous production
Compression: snappy.NewCompression(),
BatchTimeout: 50 * time.Millisecond,
BatchSize: 100,
// Idempotence configuration
Transport: &kafka.Transport{
Idempotent: true,
TransactionalID: "producer-1",
},
},
}
}
func (p *IdempotentProducer) Produce(ctx context.Context, key, value []byte) error {
message := kafka.Message{
Key: key,
Value: value,
Time: time.Now(),
Headers: []kafka.Header{
{Key: "producer-id", Value: []byte("holding-service")},
{Key: "message-id", Value: []byte(generateUUID())},
},
}
// Transactional production
err := p.writer.WriteMessages(ctx, message)
if err != nil {
// Implement retry logic with exponential backoff
return p.retryProduce(ctx, message, err)
}
return nil
}
func (p *IdempotentProducer) retryProduce(ctx context.Context, msg kafka.Message, err error) error {
backoff := time.Millisecond * 100
maxRetries := 5
for i := 0; i < maxRetries; i++ {
time.Sleep(backoff)
if err := p.writer.WriteMessages(ctx, msg); err == nil {
return nil
}
backoff *= 2
}
return err
}
1.2 Schema Evolution with Avro
// Avro schema management for holding updates
package schemas
// HoldingUpdate Avro schema
const HoldingUpdateSchema = `{
"type": "record",
"name": "HoldingUpdate",
"namespace": "com.tcs.depository",
"fields": [
{"name": "clientId", "type": "string"},
{"name": "isin", "type": "string"},
{"name": "quantity", "type": "long"},
{"name": "averagePrice", "type": "double"},
{"name": "updateType", "type": {"type": "enum", "name": "UpdateType", "symbols": ["BUY", "SELL", "SPLIT", "BONUS"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "version", "type": "int", "default": 1}
]
}`
// Schema evolution: Adding new field (backward compatible)
const HoldingUpdateSchemaV2 = `{
"type": "record",
"name": "HoldingUpdate",
"namespace": "com.tcs.depository",
"fields": [
{"name": "clientId", "type": "string"},
{"name": "isin", "type": "string"},
{"name": "quantity", "type": "long"},
{"name": "averagePrice", "type": "double"},
{"name": "updateType", "type": {"type": "enum", "name": "UpdateType", "symbols": ["BUY", "SELL", "SPLIT", "BONUS", "MERGER"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "version", "type": "int", "default": 2},
{"name": "exchange", "type": ["null", "string"], "default": null} // New optional field
]
}`
// Schema registry client
type SchemaRegistryClient struct {
registryURL string
client *http.Client
}
func (src *SchemaRegistryClient) GetSchema(schemaID int) (string, error) {
resp, err := src.client.Get(fmt.Sprintf("%s/schemas/ids/%d", src.registryURL, schemaID))
if err != nil {
return "", err
}
defer resp.Body.Close()
var schemaResp struct {
Schema string `json:"schema"`
}
if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
return "", err
}
return schemaResp.Schema, nil
}
2. Consumer Patterns
2.1 Exactly-Once Processing
// Exactly-once consumer with idempotent processing
package main
import (
"github.com/segmentio/kafka-go"
"context"
"sync"
"time"
)
type ExactlyOnceConsumer struct {
reader *kafka.Reader
db *sql.DB
processed map[string]bool // In-memory deduplication
mu sync.RWMutex
}
func NewExactlyOnceConsumer(brokers []string, topic, groupID string) *ExactlyOnceConsumer {
return &ExactlyOnceConsumer{
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second,
StartOffset: kafka.FirstOffset,
// Enable idempotent consumption
IsolationLevel: kafka.ReadCommitted,
}),
processed: make(map[string]bool),
}
}
func (c *ExactlyOnceConsumer) ProcessMessages(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
return err
}
// Check if already processed
messageID := string(msg.Headers.Get("message-id"))
c.mu.RLock()
if c.processed[messageID] {
c.mu.RUnlock()
c.reader.CommitMessages(ctx, msg)
continue
}
c.mu.RUnlock()
// Process with idempotent operation
if err := c.processMessage(msg); err != nil {
// Send to dead-letter queue
c.sendToDLQ(msg, err)
continue
}
// Mark as processed
c.mu.Lock()
c.processed[messageID] = true
c.mu.Unlock()
// Commit offset
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return err
}
}
}
}
func (c *ExactlyOnceConsumer) processMessage(msg kafka.Message) error {
// Idempotent database operation
tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Use UPSERT to ensure idempotence
_, err = tx.Exec(`
INSERT INTO holding_updates
(message_id, client_id, isin, quantity, processed_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (message_id) DO NOTHING
`, string(msg.Headers.Get("message-id")),
// Parse and extract data from msg.Value
)
if err != nil {
return err
}
return tx.Commit()
}
2.2 Consumer Group Rebalancing Strategies
// Custom rebalancing strategy
type StickyAssignor struct {
memberID string
generationID int32
assignments map[string][]int32
}
func (sa *StickyAssignor) Assign(members map[string]kafka.GroupMember, topics map[string][]int32) (map[string][]int32, error) {
assignments := make(map[string][]int32)
// Implement sticky assignment logic
// 1. Preserve existing assignments when possible
// 2. Minimize partition movement during rebalance
// 3. Balance load across consumers
for memberID, member := range members {
// Custom assignment logic based on:
// - Current assignments
// - Consumer capacity
// - Partition affinity
assignments[memberID] = sa.stickyAssignment(member, topics)
}
sa.assignments = assignments
return assignments, nil
}
// Graceful shutdown with cooperative rebalancing
func (c *ExactlyOnceConsumer) Shutdown(ctx context.Context) error {
// 1. Stop fetching new messages
c.reader.SetOffset(kafka.LastOffset)
// 2. Complete processing of current batch
c.waitForCompletion()
// 3. Revoke partitions gracefully
if err := c.reader.Close(); err != nil {
return err
}
// 4. Wait for rebalance to complete
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(30 * time.Second):
return nil
}
}
3. Advanced Patterns
3.1 Dead Letter Queue Pattern
// DLQ handler for failed messages
type DLQHandler struct {
dlqProducer *kafka.Writer
maxRetries int
retryDelay time.Duration
}
func (dh *DLQHandler) HandleFailure(msg kafka.Message, err error, retryCount int) error {
if retryCount < dh.maxRetries {
// Retry with exponential backoff
delay := dh.retryDelay * time.Duration(math.Pow(2, float64(retryCount)))
time.Sleep(delay)
return kafka.ErrRetry
}
// Send to DLQ
dlqMsg := kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: append(msg.Headers,
kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
kafka.Header{Key: "failure-reason", Value: []byte(err.Error())},
kafka.Header{Key: "retry-count", Value: []byte(strconv.Itoa(retryCount))},
kafka.Header{Key: "failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
),
}
return dh.dlqProducer.WriteMessages(context.Background(), dlqMsg)
}
3.2 Change Data Capture (CDC) Pattern
// CDC from Oracle to Kafka
type OracleCDC struct {
db *sql.DB
producer *kafka.Writer
lastLSN string
}
func (cdc *OracleCDC) CaptureChanges(ctx context.Context) error {
// Query Oracle redo logs or use LogMiner
rows, err := cdc.db.QueryContext(ctx, `
SELECT scn, operation, table_name, row_data
FROM v$logmnr_contents
WHERE scn > :1
ORDER BY scn
`, cdc.lastLSN)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var scn, operation, tableName string
var rowData []byte
if err := rows.Scan(&scn, &operation, &tableName, &rowData); err != nil {
return err
}
// Convert to Kafka message
message := kafka.Message{
Key: []byte(fmt.Sprintf("%s:%s", tableName, scn)),
Value: rowData,
Headers: []kafka.Header{
{Key: "operation", Value: []byte(operation)},
{Key: "table", Value: []byte(tableName)},
{Key: "scn", Value: []byte(scn)},
},
}
if err := cdc.producer.WriteMessages(ctx, message); err != nil {
return err
}
cdc.lastLSN = scn
}
return rows.Err()
}
3.3 Exactly-Once Sink to Database
// Idempotent sink to Oracle
type OracleSink struct {
db *sql.DB
processed *lru.Cache // LRU cache for deduplication
batchSize int
}
func (sink *OracleSink) WriteBatch(messages []kafka.Message) error {
tx, err := sink.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
MERGE INTO holdings h
USING (SELECT :1 AS client_id, :2 AS isin FROM dual) src
ON (h.client_id = src.client_id AND h.isin = src.isin)
WHEN MATCHED THEN
UPDATE SET h.quantity = h.quantity + :3,
h.updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (client_id, isin, quantity, created_at)
VALUES (:1, :2, :3, SYSTIMESTAMP)
`)
if err != nil {
return err
}
defer stmt.Close()
for _, msg := range messages {
// Parse message
var update HoldingUpdate
if err := avro.Unmarshal(schema, msg.Value, &update); err != nil {
continue // Send to DLQ
}
// Check deduplication
if sink.processed.Contains(update.MessageID) {
continue
}
// Execute upsert
if _, err := stmt.Exec(update.ClientID, update.ISIN, update.Quantity); err != nil {
return err
}
sink.processed.Add(update.MessageID, true)
}
return tx.Commit()
}
4. Monitoring & Operations
4.1 Kafka Metrics Collection
// Prometheus metrics for Kafka
var (
messagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_consumed_total",
Help: "Total number of messages consumed",
}, []string{"topic", "consumer_group"})
consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "kafka_consumer_lag",
Help: "Consumer lag in messages",
}, []string{"topic", "partition", "consumer_group"})
processingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "kafka_message_processing_duration_seconds",
Help: "Time taken to process a message",
Buckets: prometheus.DefBuckets,
}, []string{"topic", "status"})
)
// Collect consumer metrics
func collectConsumerMetrics(reader *kafka.Reader) {
go func() {
for {
stats := reader.Stats()
consumerLag.WithLabelValues(
stats.Topic,
strconv.Itoa(stats.Partition),
stats.ClientID,
).Set(float64(stats.Lag))
time.Sleep(10 * time.Second)
}
}()
}
4.2 Alerting Rules
# Prometheus alerting rules for Kafka
groups:
- name: kafka_alerts
rules:
- alert: HighConsumerLag
expr: kafka_consumer_lag > 10000
for: 5m
annotations:
summary: "High consumer lag detected"
description: "Consumer group {{ $labels.consumer_group }} has lag of {{ $value }} messages on topic {{ $labels.topic }}"
- alert: ConsumerGroupNotProcessing
expr: rate(kafka_messages_consumed_total[5m]) == 0
for: 10m
annotations:
summary: "Consumer group not processing messages"
description: "Consumer group {{ $labels.consumer_group }} has not processed any messages in 10 minutes"
- alert: HighErrorRate
expr: rate(kafka_message_processing_duration_seconds_count{status="error"}[5m]) / rate(kafka_message_processing_duration_seconds_count[5m]) > 0.1
for: 5m
annotations:
summary: "High error rate in message processing"
description: "Error rate is {{ $value }}% for topic {{ $labels.topic }}"
5. Performance Tuning
| Parameter | Default | Optimized | Impact |
|---|---|---|---|
| batch.size | 16KB | 1MB | Higher throughput, more memory |
| linger.ms | 0 | 20 | Better batching, higher latency |
| compression.type | none | snappy | Reduced network usage |
| fetch.min.bytes | 1 | 10KB | Reduced broker load |
| max.poll.records | 500 | 1000 | Better throughput |
| session.timeout.ms | 10000 | 30000 | More tolerant to GC pauses |
6. Lessons from Production (TCS Experience)
📈 Scaling Patterns
- Start with single consumer, scale horizontally as needed
- Use partition keys for ordering guarantees
- Implement backpressure with pause/resume
- Monitor consumer group rebalancing time
🔧 Operational Excellence
- Always use idempotent producers/consumers
- Implement comprehensive monitoring
- Have a dead letter queue strategy
- Test failure scenarios regularly
⚡ Performance Tips
- Batch database writes
- Use connection pooling
- Compress messages when possible
- Tune OS network buffers
🚀 Production Checklist
- ✅ Idempotent producers enabled
- ✅ Exactly-once processing implemented
- ✅ Schema registry configured
- ✅ Dead letter queue setup
- ✅ Comprehensive monitoring
- ✅ Alerting configured
- ✅ Consumer lag monitoring
- ✅ Graceful shutdown handling
- ✅ Disaster recovery plan