kafka发送数据发生OOM

kafka发送数据发生OOM

1.问题描述

今天在测试环境中的一个应用出现了OOM,日志如下,堆内存为128M。

image-20200312215108690

通过dump文件,可以看到如下信息:

image-20200312215706890

image-20200312223239160

在上图中可以看到是kafka的org.apache.kafka.clients.producer.internals.RecordAccumulator这个类的实例关联了大约100M的内存,并且可以看到是TSpan,TSpanEvent占用了大量的内存。

2.排查思路

最开始看到TSpan这个数据和kafka有了关联,以为是kafka producer发送数据的缓存太大。数据放入kafka producer缓存的速度快于kafka producer将数据发送到服务端的速度。以至于去反复的去看kafka的使用文档,期待能够找到一个控制producer端的缓存大小的方法。在官方确实有一个参数能够控制其发送数据的buffer大小,但是这个参数的默认值是32M。那为什么kafka producer还是持有了100M的内存引用呢?

1
2
3
4
5
6
7
8
/** <code>buffer.memory</code> */
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
+ "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."
+ "<p>"
+ "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
+ "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
+ "compression is enabled) as well as for maintaining in-flight requests.";

3.源码排查

在文档中找不到合理的解释后,打算静下心来试着看一下源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

实际发送数据调用的方法
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

从上述代码中可以发现,kafka发送的数据都是以字节数据为单位的,并没有显示的持有实际发送的数据。发现ProducerRecord实例数据也应该在调用发送方法之后可以被回收。并且ProducerRecord数据实际是被ProducerBatch$Thunk持有的。而这个对象其实是实现kafka的回调功能,回到业务代码发现确实在kafka发送数据的回调代码中为了打印日志而持有了实际发送的对象。

1
2
3
4
5
6
7
8
9
10
11
public void send(ProducerRecord record) {
try {
producer.send(record, (metadata, exception) -> {
if (null != exception) {
logger.error("[Kafka发送数据发生异常] 数据:{},异常", record.value(), exception);
}
});
} catch (Throwable e) {
logger.error("[KAFKA数据发送] kafka 消息发送异常, 数据:{},异常", record.value(), e);
}
}

4.思考

最近看了一些程序,越来越觉得写程序需要有敬畏之心。我们要敬畏自己写的每一行代码,需要知道自己写的每一行代码所起的作用。

技颠 wechat
欢迎您扫一扫上面的微信公众号,订阅我的微信公众号!