首页 > 代码库 > SpringKafka消费端配置类ConsumerConfig.java源码

SpringKafka消费端配置类ConsumerConfig.java源码

 

  1  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
  2 package org.apache.kafka.clients.consumer;
  3 
  4 import org.apache.kafka.clients.CommonClientConfigs;
  5 import org.apache.kafka.common.config.AbstractConfig;
  6 import org.apache.kafka.common.config.ConfigDef;
  7 import org.apache.kafka.common.config.ConfigDef.Importance;
  8 import org.apache.kafka.common.config.ConfigDef.Type;
  9 import org.apache.kafka.common.serialization.Deserializer;
 10 
 11 import java.util.HashMap;
 12 import java.util.Map;
 13 import java.util.Properties;
 14 
 15 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 16 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 17 
 18 /**
 19  * The consumer configuration keys
 20  */
 21 public class ConsumerConfig extends AbstractConfig {
 22     private static final ConfigDef CONFIG;
 23 
 24     /*
 25      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
 26      * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
 27      */
 28 
 29     /**
 30      * <code>group.id</code>
 31      */
 32     public static final String GROUP_ID_CONFIG = "group.id";
 33     private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
 34 
 35     /**
 36      * <code>session.timeout.ms</code>
 37      */
 38     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
 39     private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka‘s group management facilities.";
 40 
 41     /**
 42      * <code>heartbeat.interval.ms</code>
 43      */
 44     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
 45     private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka‘s group management facilities. Heartbeats are used to ensure that the consumer‘s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
 46 
 47     /**
 48      * <code>bootstrap.servers</code>
 49      */
 50     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 51 
 52     /**
 53      * <code>enable.auto.commit</code>
 54      */
 55     public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
 56     private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer‘s offset will be periodically committed in the background.";
 57 
 58     /**
 59      * <code>auto.commit.interval.ms</code>
 60      */
 61     public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
 62     private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
 63 
 64     /**
 65      * <code>partition.assignment.strategy</code>
 66      */
 67     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
 68     private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
 69 
 70     /**
 71      * <code>auto.offset.reset</code>
 72      */
 73     public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
 74     private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer‘s group</li><li>anything else: throw exception to the consumer.</li></ul>";
 75 
 76     /**
 77      * <code>fetch.min.bytes</code>
 78      */
 79     public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
 80     private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
 81 
 82     /**
 83      * <code>fetch.max.wait.ms</code>
 84      */
 85     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
 86     private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn‘t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
 87 
 88     /** <code>metadata.max.age.ms</code> */
 89     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
 90 
 91     /**
 92      * <code>max.partition.fetch.bytes</code>
 93      */
 94     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
 95     private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
 96 
 97     /** <code>send.buffer.bytes</code> */
 98     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
 99 
100     /** <code>receive.buffer.bytes</code> */
101     public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
102 
103     /**
104      * <code>client.id</code>
105      */
106     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
107 
108     /**
109      * <code>reconnect.backoff.ms</code>
110      */
111     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
112 
113     /**
114      * <code>retry.backoff.ms</code>
115      */
116     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
117 
118     /**
119      * <code>metrics.sample.window.ms</code>
120      */
121     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
122 
123     /**
124      * <code>metrics.num.samples</code>
125      */
126     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
127 
128     /**
129      * <code>metric.reporters</code>
130      */
131     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
132 
133     /**
134      * <code>check.crcs</code>
135      */
136     public static final String CHECK_CRCS_CONFIG = "check.crcs";
137     private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
138 
139     /** <code>key.deserializer</code> */
140     public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
141     public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
142 
143     /** <code>value.deserializer</code> */
144     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
145     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
146 
147     /** <code>connections.max.idle.ms</code> */
148     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
149 
150     /** <code>request.timeout.ms</code> */
151     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
152     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
153 
154 
155     static {
156         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
157                                         Type.LIST,
158                                         Importance.HIGH,
159                                         CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
160                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
161                                 .define(SESSION_TIMEOUT_MS_CONFIG,
162                                         Type.INT,
163                                         30000,
164                                         Importance.HIGH,
165                                         SESSION_TIMEOUT_MS_DOC)
166                                 .define(HEARTBEAT_INTERVAL_MS_CONFIG,
167                                         Type.INT,
168                                         3000,
169                                         Importance.HIGH,
170                                         HEARTBEAT_INTERVAL_MS_DOC)
171                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
172                                         Type.LIST,
173                                         RangeAssignor.class.getName(),
174                                         Importance.MEDIUM,
175                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
176                                 .define(METADATA_MAX_AGE_CONFIG,
177                                         Type.LONG,
178                                         5 * 60 * 1000,
179                                         atLeast(0),
180                                         Importance.LOW,
181                                         CommonClientConfigs.METADATA_MAX_AGE_DOC)
182                                 .define(ENABLE_AUTO_COMMIT_CONFIG,
183                                         Type.BOOLEAN,
184                                         true,
185                                         Importance.MEDIUM,
186                                         ENABLE_AUTO_COMMIT_DOC)
187                                 .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
188                                         Type.LONG,
189                                         5000,
190                                         atLeast(0),
191                                         Importance.LOW,
192                                         AUTO_COMMIT_INTERVAL_MS_DOC)
193                                 .define(CLIENT_ID_CONFIG,
194                                         Type.STRING,
195                                         "",
196                                         Importance.LOW,
197                                         CommonClientConfigs.CLIENT_ID_DOC)
198                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
199                                         Type.INT,
200                                         1 * 1024 * 1024,
201                                         atLeast(0),
202                                         Importance.HIGH,
203                                         MAX_PARTITION_FETCH_BYTES_DOC)
204                                 .define(SEND_BUFFER_CONFIG,
205                                         Type.INT,
206                                         128 * 1024,
207                                         atLeast(0),
208                                         Importance.MEDIUM,
209                                         CommonClientConfigs.SEND_BUFFER_DOC)
210                                 .define(RECEIVE_BUFFER_CONFIG,
211                                         Type.INT,
212                                         32 * 1024,
213                                         atLeast(0),
214                                         Importance.MEDIUM,
215                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
216                                 .define(FETCH_MIN_BYTES_CONFIG,
217                                         Type.INT,
218                                         1,
219                                         atLeast(0),
220                                         Importance.HIGH,
221                                         FETCH_MIN_BYTES_DOC)
222                                 .define(FETCH_MAX_WAIT_MS_CONFIG,
223                                         Type.INT,
224                                         500,
225                                         atLeast(0),
226                                         Importance.LOW,
227                                         FETCH_MAX_WAIT_MS_DOC)
228                                 .define(RECONNECT_BACKOFF_MS_CONFIG,
229                                         Type.LONG,
230                                         50L,
231                                         atLeast(0L),
232                                         Importance.LOW,
233                                         CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
234                                 .define(RETRY_BACKOFF_MS_CONFIG,
235                                         Type.LONG,
236                                         100L,
237                                         atLeast(0L),
238                                         Importance.LOW,
239                                         CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
240                                 .define(AUTO_OFFSET_RESET_CONFIG,
241                                         Type.STRING,
242                                         "latest",
243                                         in("latest", "earliest", "none"),
244                                         Importance.MEDIUM,
245                                         AUTO_OFFSET_RESET_DOC)
246                                 .define(CHECK_CRCS_CONFIG,
247                                         Type.BOOLEAN,
248                                         true,
249                                         Importance.LOW,
250                                         CHECK_CRCS_DOC)
251                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
252                                         Type.LONG,
253                                         30000,
254                                         atLeast(0),
255                                         Importance.LOW,
256                                         CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
257                                 .define(METRICS_NUM_SAMPLES_CONFIG,
258                                         Type.INT,
259                                         2,
260                                         atLeast(1),
261                                         Importance.LOW,
262                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
263                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
264                                         Type.LIST,
265                                         "",
266                                         Importance.LOW,
267                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
268                                 .define(KEY_DESERIALIZER_CLASS_CONFIG,
269                                         Type.CLASS,
270                                         Importance.HIGH,
271                                         KEY_DESERIALIZER_CLASS_DOC)
272                                 .define(VALUE_DESERIALIZER_CLASS_CONFIG,
273                                         Type.CLASS,
274                                         Importance.HIGH,
275                                         VALUE_DESERIALIZER_CLASS_DOC)
276                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
277                                         Type.INT,
278                                         40 * 1000,
279                                         atLeast(0),
280                                         Importance.MEDIUM,
281                                         REQUEST_TIMEOUT_MS_DOC)
282                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
283                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
284                                         Type.LONG,
285                                         9 * 60 * 1000,
286                                         Importance.MEDIUM,
287                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
288 
289                                 // security support
290                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
291                                         Type.STRING,
292                                         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
293                                         Importance.MEDIUM,
294                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
295                                 .withClientSslSupport()
296                                 .withClientSaslSupport();
297 
298     }
299 
300     public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
301                                                               Deserializer<?> keyDeserializer,
302                                                               Deserializer<?> valueDeserializer) {
303         Map<String, Object> newConfigs = new HashMap<String, Object>();
304         newConfigs.putAll(configs);
305         if (keyDeserializer != null)
306             newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
307         if (valueDeserializer != null)
308             newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
309         return newConfigs;
310     }
311 
312     public static Properties addDeserializerToConfig(Properties properties,
313                                                      Deserializer<?> keyDeserializer,
314                                                      Deserializer<?> valueDeserializer) {
315         Properties newProperties = new Properties();
316         newProperties.putAll(properties);
317         if (keyDeserializer != null)
318             newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
319         if (valueDeserializer != null)
320             newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
321         return newProperties;
322     }
323 
324     ConsumerConfig(Map<?, ?> props) {
325         super(CONFIG, props);
326     }
327 
328     public static void main(String[] args) {
329         System.out.println(CONFIG.toHtmlTable());
330     }
331 
332 }

 

SpringKafka消费端配置类ConsumerConfig.java源码