{"id":861,"date":"2020-09-11T15:07:17","date_gmt":"2020-09-11T07:07:17","guid":{"rendered":"http:\/\/prayerlaputa.com\/?p=861"},"modified":"2020-09-11T15:08:33","modified_gmt":"2020-09-11T07:08:33","slug":"861","status":"publish","type":"post","link":"http:\/\/prayerlaputa.com\/?p=861","title":{"rendered":"\u6e90\u7801\u5206\u6790\u4e4bKafka Consumer\u6d88\u8d39\u6d88\u606f\u7684\u8fc7\u7a0b"},"content":{"rendered":"<h2 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u8bf4\u660e<\/span><\/h2>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u672c\u6587\u57fa\u4e8eApache Kafka 2.5.1\uff082020.09.10\u62c9\u53d6\u6700\u65b0\u4ee3\u7801\uff09<\/span><\/p>\n<h2 class=\"md-end-block md-heading\"><span class=\"md-plain\">Consumer\u5982\u4f55\u4f7f\u7528\uff1f<\/span><\/h2>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u9605\u8bfb\u6e90\u7801\u524d\u7684\u9996\u5148\u8981\u505a\u5230\u719f\u6089\u76f8\u5173\u7ec4\u4ef6\u7684\u6982\u5ff5\u3001\u57fa\u672c\u4f7f\u7528\u3002\u800c\u6700\u9760\u8c31\u7684\u8d44\u6599\u5c31\u662f\u5b98\u65b9\u6587\u6863\u3002<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5efa\u8bae\u9605\u8bfb\u5b98\u65b9\u6587\u6863\uff08<\/span><span class=\"md-link md-pair-s\" spellcheck=\"false\"><a href=\"https:\/\/kafka.apache.org\/documentation\/\">https:\/\/kafka.apache.org\/documentation\/<\/a><\/span><span class=\"md-plain\">\uff09\u540e\uff0c\u81ea\u5df1\u7ec3\u4e60\u3001\u4f7f\u7528kafka\u4e4b\u540e\u518d\u5f00\u59cb\u9605\u8bfb\u6e90\u7801\u3002<\/span><!--more--><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">KafkaConsumer\u7684JavaDoc\uff08\u53c2\u89c1<\/span><span class=\"md-link md-pair-s\" spellcheck=\"false\"><a href=\"https:\/\/kafka.apache.org\/10\/javadoc\/?org\/apache\/kafka\/clients\/consumer\/KafkaConsumer.html\">https:\/\/kafka.apache.org\/10\/javadoc\/?org\/apache\/kafka\/clients\/consumer\/KafkaConsumer.html<\/a><\/span><span class=\"md-plain\">\uff09\u672c\u8eab\u5c31\u7ed9\u51fa\u4e86\u4e0d\u5c11\u6709\u7528\u4fe1\u606f\uff0c\u4e0b\u9762\u4ec5\u5217\u51fa\u4e00\u4e9b\u5173\u952e\u70b9\uff1a<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Cross-Version Compatibility<\/span> <span class=\"md-plain\">\u5ba2\u6237\u7aef\u652f\u63010.10.0\u4ee5\u53ca\u4ee5\u4e0a\u7248\u672c\uff0c\u5982\u679c\u8c03\u7528\u4e0d\u652f\u6301\u7684\u7279\u6027\uff0c\u4f1a\u629b\u51faUnsupportedVersionException<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Offsets and Consumer Position<\/span> <span class=\"md-plain\">position: \u6709\u5f85\u8bfb\u53d6\u7684\u4e0b\u4e00\u6761\u8bb0\u5f55\u7684\u504f\u79fb\u91cf<\/span> <span class=\"md-plain\">commited position: \u5df2\u88ab\u4fdd\u5b58\u3001\u5f52\u6863\u7684\u6700\u540e\u4e00\u6761\u8bb0\u5f55\u7684\u504f\u79fb\u91cf\uff0c\u53ef\u4ee5\u7528\u4e8e\u6062\u590d\u6570\u636e\u3002<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Consumer Groups and Topic Subscriptions<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e00\u4e2apartition\u5185\u7684\u6d88\u606f\u53ea\u4f1a\u6295\u9012\u7ed9group\u4e2d\u7684\u4e00\u4e2aconsumer<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4ee5\u4e0b\u573a\u666f\u5c06\u89e6\u53d1group rebalance<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e00\u4e2aconsumer\u6302\u6389<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u65b0\u52a0\u5165\u4e00\u4e2aconsumer<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u65b0\u7684partition\u52a0\u5165\u4e00\u4e2atopic<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e00\u4e2a\u65b0\u7684topic\u5339\u914d\u5df2\u6709\u7684\u8ba2\u9605\u6b63\u5219(subscribed regx)<\/span><\/p>\n<\/li>\n<\/ul>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-meta-i-c md-link\"><a spellcheck=\"false\" href=\"https:\/\/kafka.apache.org\/10\/javadoc\/org\/apache\/kafka\/clients\/consumer\/ConsumerRebalanceListener.html\"><span class=\"md-pair-s\" spellcheck=\"false\"><code>ConsumerRebalanceListener<\/code><\/span><\/a><\/span><span class=\"md-plain\"> \u53ef\u4ee5\u76d1\u542crebalance<\/span><\/p>\n<\/li>\n<\/ul>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Detecting Consumer Faillures<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8c03\u7528poll(long)\u65b9\u6cd5\u65f6consumer\u4f1a\u81ea\u52a8\u52a0\u5165\u5230group\u4e2d\uff0cconsumer\u4f1a\u53d1\u5fc3\u8df3\u7ed9\u670d\u52a1\u5668\u7aef\uff0c\u8d85\u65f6\u65f6\u95f4<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>session.timeout.ms<\/code><\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">consumer\u53ef\u80fd\u9047\u5230\u6d3b\u9501\uff1a\u9501\u68c0\u6d4b\u673a\u5236 <\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>session.timeout.ms<\/code><\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">poll\u65b9\u6cd5\u76f8\u5173\u914d\u7f6e\uff1a<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">max.poll.interval.ms<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">max.poll.records<\/span><\/p>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<h3 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u4ee3\u7801\u793a\u4f8b<\/span><\/h3>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-comment\">\/\/\u4ee3\u78011\uff1a<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">Properties<\/span> <span class=\"cm-variable\">props<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">Properties<\/span>();<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"bootstrap.servers\"<\/span>, <span class=\"cm-string\">\"localhost:9092\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"group.id\"<\/span>, <span class=\"cm-string\">\"test\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"enable.auto.commit\"<\/span>, <span class=\"cm-string\">\"true\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"auto.commit.interval.ms\"<\/span>, <span class=\"cm-string\">\"1000\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"key.deserializer\"<\/span>, <span class=\"cm-string\">\"org.apache.kafka.common.serialization.StringDeserializer\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">props<\/span>.<span class=\"cm-variable\">put<\/span>(<span class=\"cm-string\">\"value.deserializer\"<\/span>, <span class=\"cm-string\">\"org.apache.kafka.common.serialization.StringDeserializer\"<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">KafkaConsumer<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable-3\">String<\/span>, <span class=\"cm-variable-3\">String<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">consumer<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">KafkaConsumer<\/span><span class=\"cm-operator\">&lt;&gt;<\/span>(<span class=\"cm-variable\">props<\/span>);<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable\">consumer<\/span>.<span class=\"cm-variable\">subscribe<\/span>(<span class=\"cm-variable\">Arrays<\/span>.<span class=\"cm-variable\">asList<\/span>(<span class=\"cm-string\">\"foo\"<\/span>, <span class=\"cm-string\">\"bar\"<\/span>));<\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">while<\/span> (<span class=\"cm-atom\">true<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">ConsumerRecords<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable-3\">String<\/span>, <span class=\"cm-variable-3\">String<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">records<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">consumer<\/span>.<span class=\"cm-variable\">poll<\/span>(<span class=\"cm-number\">100<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">for<\/span> (<span class=\"cm-variable\">ConsumerRecord<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable-3\">String<\/span>, <span class=\"cm-variable-3\">String<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">record<\/span> : <span class=\"cm-variable\">records<\/span>)<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">System<\/span>.<span class=\"cm-variable\">out<\/span>.<span class=\"cm-variable\">printf<\/span>(<span class=\"cm-string\">\"offset = %d, key = %s, value = %s%n\"<\/span>, <span class=\"cm-variable\">record<\/span>.<span class=\"cm-variable\">offset<\/span>(), <span class=\"cm-variable\">record<\/span>.<span class=\"cm-variable\">key<\/span>(), <span class=\"cm-variable\">record<\/span>.<span class=\"cm-variable\">value<\/span>());<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<h4 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u4e3b\u8981\u6d41\u7a0b<\/span><\/h4>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8bbe\u7f6e\u914d\u7f6e\u4fe1\u606f\uff0c\u5305\u62ecbroker\u5730\u5740\uff0cconsumer group id, \u81ea\u52a8\u63d0\u4ea4\u6d88\u8d39\u7684\u4f4d\u7f6e\uff0c\u5e8f\u5217\u5316\u914d\u7f6e<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u521b\u5efaKafkaConsumer\u5bf9\u8c61<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8ba2\u96052\u4e2atopic: foo, bar<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5faa\u73af\u62c9\u53d6\u5e76\u6253\u5370<\/span><\/p>\n<\/li>\n<\/ol>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4ee5\u4e0b\u5c06\u91cd\u70b9\u89e3\u8bfbKafkaConsumer\u6d88\u8d39\u6d41\u7a0b\u4e2d\u76843\u4e2a\u95ee\u9898\uff1a<\/span><\/p>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-pair-s \"><strong><span class=\"md-plain\">\u8ba2\u9605\u4e3b\u9898\u7684\u8fc7\u7a0b\u662f\u5982\u4f55\u5b9e\u73b0\u7684\uff1f<\/span><\/strong><\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-pair-s \"><strong><span class=\"md-plain\">consumer\u5982\u4f55\u4e0ecoordinator\u534f\u5546\uff0c\u786e\u5b9a\u6d88\u8d39\u54ea\u4e9bpartition?<\/span><\/strong><\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-pair-s \"><strong><span class=\"md-plain\">\u62c9\u53d6\u6d88\u606f\u7684\u8fc7\u7a0b\u662f\u5982\u4f55\u5b9e\u73b0\u7684\uff1f<\/span><\/strong><\/span><\/p>\n<\/li>\n<\/ol>\n<h2 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u8ba2\u9605\u4e3b\u9898\u7684\u8fc7\u7a0b\u662f\u5982\u4f55\u5b9e\u73b0\u7684\uff1f<\/span><\/h2>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4ecd\u4ee5\u4e0a\u9762\u4ee3\u7801\u793a\u4f8b\u4e3a\u4f8b\uff0c\u8ddf\u8e2a\u6e90\u7801\u4e2d\u7684subscribe\u65b9\u6cd5\uff0c\u6700\u7ec8\u4f1a\u770b\u5230KafkaConsumer\u4e2d\u7684\u5982\u4e0b\u4ee3\u7801\uff1a<\/span><\/p>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-comment\">\/\/\u4ee3\u78012\uff1a<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-meta\">@Override<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">public<\/span> <span class=\"cm-variable-3\">void<\/span> <span class=\"cm-def\">subscribe<\/span>(<span class=\"cm-variable\">Collection<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable-3\">String<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">topics<\/span>, <span class=\"cm-variable\">ConsumerRebalanceListener<\/span> <span class=\"cm-variable\">listener<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">acquireAndEnsureOpen<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">try<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">maybeThrowInvalidGroupIdException<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">topics<\/span> <span class=\"cm-operator\">==<\/span> <span class=\"cm-atom\">null<\/span>)<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">IllegalArgumentException<\/span>(<span class=\"cm-string\">\"Topic collection to subscribe to cannot be null\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">topics<\/span>.<span class=\"cm-variable\">isEmpty<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ treat subscribing to empty topic list as the same as unsubscribing<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">unsubscribe<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  } <span class=\"cm-keyword\">else<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">for<\/span> (<span class=\"cm-variable-3\">String<\/span> <span class=\"cm-variable\">topic<\/span> : <span class=\"cm-variable\">topics<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">topic<\/span> <span class=\"cm-operator\">==<\/span> <span class=\"cm-atom\">null<\/span> <span class=\"cm-operator\">||<\/span> <span class=\"cm-variable\">topic<\/span>.<span class=\"cm-variable\">trim<\/span>().<span class=\"cm-variable\">isEmpty<\/span>())<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">IllegalArgumentException<\/span>(<span class=\"cm-string\">\"Topic collection to subscribe to cannot contain null or empty topic\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">throwIfNoAssignorsConfigured<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">clearBufferedDataForUnassignedTopics<\/span>(<span class=\"cm-variable\">topics<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">log<\/span>.<span class=\"cm-variable\">info<\/span>(<span class=\"cm-string\">\"Subscribed to topic(s): {}\"<\/span>, <span class=\"cm-variable\">Utils<\/span>.<span class=\"cm-variable\">join<\/span>(<span class=\"cm-variable\">topics<\/span>, <span class=\"cm-string\">\", \"<\/span>));<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">subscriptions<\/span>.<span class=\"cm-variable\">subscribe<\/span>(<span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">HashSet<\/span><span class=\"cm-operator\">&lt;&gt;<\/span>(<span class=\"cm-variable\">topics<\/span>), <span class=\"cm-variable\">listener<\/span>))<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">metadata<\/span>.<span class=\"cm-variable\">requestUpdateForNewTopics<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0  } <span class=\"cm-keyword\">finally<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">release<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u57fa\u672c\u6d41\u7a0b\u5982\u4e0b\uff1a<\/span><\/p>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u52a0\u8f7b\u91cf\u7ea7\u9501\uff08\u901a\u8fc7CAS\u65b9\u5f0f\u52a0\u9501\uff09<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u53c2\u6570\u6821\u9a8c<\/span><\/p>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6ce8\u610f\uff1a\u5982\u679c\u4f20\u5165topic\u96c6\u5408\u4e3a\u7a7a\uff0c\u5219\u76f4\u63a5\u8d70unsubscribe\u7684\u903b\u8f91<\/span><\/p>\n<\/li>\n<\/ol>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u91cd\u7f6e\u8ba2\u9605\u72b6\u6001subscriptions\uff0c\u66f4\u65b0\u5143\u6570\u636emetadata\u4e2d\u7684topic\u4fe1\u606f<\/span><\/p>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8ba2\u9605\u72b6\u6001\u7ef4\u62a4\uff1a\u8ba2\u9605\u7684 topic \u548c patition \u7684\u6d88\u8d39\u4f4d\u7f6e\u7b49\u72b6\u6001\u4fe1\u606f<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5143\u6570\u636emetada\u7ef4\u62a4\uff1aKafka \u96c6\u7fa4\u5143\u6570\u636e\u7684\u4e00\u4e2a\u5b50\u96c6\uff0c\u5305\u62ec\u96c6\u7fa4\u7684 Broker \u8282\u70b9\u3001Topic \u548c Partition \u5728\u8282\u70b9\u4e0a\u5206\u5e03\uff0cCoordinator \u7ed9 Consumer \u5206\u914d\u7684 Partition \u4fe1\u606f\u3002<\/span><\/p>\n<\/li>\n<\/ol>\n<\/li>\n<\/ol>\n<h3 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u7ecf\u5178\u601d\u8def\uff1a\u4e3b\u52a8\u68c0\u6d4b\u4e0d\u652f\u6301\u7684\u60c5\u51b5\u5e76\u629b\u51fa\u5f02\u5e38\uff0c\u907f\u514d\u7cfb\u7edf\u4ea7\u751f\u4e0d\u53ef\u9884\u671f\u7684\u884c\u4e3a<\/span><\/h3>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">KafkaConsumer\u7684<\/span><span class=\"md-meta-i-c md-link\"><a spellcheck=\"false\" href=\"https:\/\/kafka.apache.org\/10\/javadoc\/?org\/apache\/kafka\/clients\/consumer\/KafkaConsumer.html\"><span class=\"md-plain\">Javadoc<\/span><\/a><\/span><span class=\"md-plain\">\u660e\u786e\u58f0\u660e\u4e86\uff0cconsumer\u4e0d\u662f\u7ebf\u7a0b\u5b89\u5168\u7684\uff0c\u88ab\u5e76\u53d1\u8c03\u7528\u65f6\u4f1a\u51fa\u73b0\u4e0d\u53ef\u9884\u671f\u7684\u7ed3\u679c\u3002\u4e3a\u4e86\u907f\u514d\u8fd9\u79cd\u60c5\u51b5\u53d1\u751f\uff0cKafka \u505a\u4e86\u4e3b\u52a8\u7684\u68c0\u6d4b\u5e76\u629b\u51fa\u5f02\u5e38\uff0c\u800c\u4e0d\u662f\u653e\u4efb\u7cfb\u7edf\u4ea7\u751f\u4e0d\u53ef\u9884\u671f\u7684\u60c5\u51b5\u3002<\/span><\/p>\n<blockquote>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Kafka\u201c\u4e3b\u52a8\u68c0\u6d4b\u4e0d\u652f\u6301\u7684\u60c5\u51b5\u5e76\u629b\u51fa\u5f02\u5e38\uff0c\u907f\u514d\u7cfb\u7edf\u4ea7\u751f\u4e0d\u53ef\u9884\u671f\u7684\u884c\u4e3a\u201d\u8fd9\u79cd\u6a21\u5f0f\uff0c\u5bf9\u4e8e\u589e\u5f3a\u7684\u7cfb\u7edf\u7684\u5065\u58ee\u6027\u662f\u4e00\u79cd\u975e\u5e38\u6709\u6548\u7684\u505a\u6cd5\u3002\u5982\u679c\u4f60\u7684\u7cfb\u7edf\u4e0d\u652f\u6301\u7528\u6237\u7684\u67d0\u79cd\u64cd\u4f5c\uff0c\u6b63\u786e\u7684\u505a\u6cd5\u662f\uff0c\u68c0\u6d4b\u4e0d\u652f\u6301\u7684\u64cd\u4f5c\uff0c\u76f4\u63a5\u62d2\u7edd\u7528\u6237\u64cd\u4f5c\uff0c\u5e76\u7ed9\u51fa\u660e\u786e\u7684\u9519\u8bef\u63d0\u793a\uff0c\u800c\u4e0d\u5e94\u8be5\u53ea\u662f\u5728\u6587\u6863\u4e2d\u5199\u4e0a\u201c\u4e0d\u8981\u8fd9\u6837\u505a\u201d\uff0c\u5374\u653e\u4efb\u7528\u6237\u9519\u8bef\u7684\u64cd\u4f5c\uff0c\u4ea7\u751f\u4e00\u4e9b\u4e0d\u53ef\u9884\u671f\u7684\u3001\u5947\u602a\u7684\u9519\u8bef\u7ed3\u679c\u3002<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5f15\u81ea\uff1a\u6781\u5ba2\u65f6\u95f4\uff1a\u6d88\u606f\u961f\u5217\u9ad8\u624b\u8bfe<\/span><\/p>\n<\/blockquote>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5177\u4f53\u4ee3\u7801\u5c31\u662f\u4e0a\u9762\u7684\u4ee3\u78012\u4e2d\u7684<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>acquireAndEnsureOpen()<\/code><\/span><span class=\"md-plain\">\uff0c\u5177\u4f53\u5b9e\u73b0\u5982\u4e0b\uff1a<\/span><\/p>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-comment\">\/**<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">* Acquire the light lock and ensure that the consumer hasn't been closed.<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">* @throws IllegalStateException If the consumer has been closed<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-variable-3\">void<\/span> <span class=\"cm-def\">acquireAndEnsureOpen<\/span>() {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">acquire<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/KafkaConsumer\u6210\u5458\u53d8\u91cf\uff0c\u521d\u59cb\u503c\u4e3afalse\uff0c\u8c03\u7528close(Duration)\u65b9\u6cd5\u540e\u624d\u4f1a\u7f6e\u4e3atrue<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">closed<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">release<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">IllegalStateException<\/span>(<span class=\"cm-string\">\"This consumer has already been closed.\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">}<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"><span class=\"cm-comment\">\/\/\u53d8\u91cf\u58f0\u660e<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-keyword\">static<\/span> <span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable-3\">long<\/span> <span class=\"cm-variable\">NO_CURRENT_THREAD<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-operator\">-<\/span><span class=\"cm-number\">1L<\/span>;<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-variable-3\">void<\/span> <span class=\"cm-def\">acquire<\/span>() {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u62ff\u5230\u5f53\u524d\u7ebf\u7a0b\u7684\u7ebf\u7a0bid<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable-3\">long<\/span> <span class=\"cm-variable\">threadId<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">Thread<\/span>.<span class=\"cm-variable\">currentThread<\/span>().<span class=\"cm-variable\">getId<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/*if threadId\u4e0e\u5f53\u524d\u6b63\u6267\u884c\u7684\u7ebf\u7a0b\u7684id\u4e0d\u4e00\u81f4\uff08\u5e76\u53d1\uff0c\u591a\u7ebf\u7a0b\u8bbf\u95ee\uff09&amp;&amp; threadId\u5bf9\u5e94\u7684\u7ebf\u7a0b\u6ca1\u6709\u4e89\u62a2\u5230\u9501<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0       <span class=\"cm-comment\">then \u629b\u51fa\u5f02\u5e38<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0  <span class=\"cm-comment\">\u4e3e\u4f8b\uff1a<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0      <span class=\"cm-comment\">\u73b0\u5728\u6709\u4e24\u4e2aKafkaConsumer\u7ebf\u7a0b\uff0c\u7ebf\u7a0bid\u5206\u522b\u662fthread1, thread2\uff0c\u8981\u6267\u884cacquire()\u65b9\u6cd5\u3002<\/span><\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">thread1\u5148\u542f\u52a8\uff0c\u6267\u884c\u5b8c\u4e0a\u9762\u8fd9\u6761\u8bed\u53e5\u3001\u8d4b\u503cthreadId\u540e\uff0c thread1\u6808\u5e27\u4e2dthreadId=thread1\uff0c\u6b64\u65f6CPU\u7ebf\u7a0b\u8c03\u5ea6\u3001\u6267\u884cthread2\uff0c<\/span><\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">thread2\u4e5f\u8d70\u5230if\u8bed\u53e5\u65f6\uff0c\u5728thread2\u7684\u6808\u5e27\u4e2d\uff0cthreadId\u5df2\u7ecf\u8d4b\u503c\u4e3athread2\uff0c\u8d70\u5230\u8fd9\u91cc\uff0ccurrentThread\u4f5c\u4e3a\u6210\u5458\u53d8\u91cf\uff0c\u521d\u59cb\u503c\u4e3aNO_CURRENT_THREAD\uff08-1\uff09\uff0c\u56e0\u6b64\u5fc5\u7136\u4e0d\u76f8\u7b49\uff0c\u7ee7\u7eed\u8d70\u7b2c\u4e8c\u5224\u65ad\u6761\u4ef6\uff0c\u5373\u5229\u7528AtomicInteger\u7684CAS\u64cd\u4f5c\uff0c\u5c06\u5f53\u524d\u7ebf\u7a0bid threadId(thread2)\u8d4b\u503c\u7ed9currentThread\u8fd9\u4e2aAtomicInteger\uff0c\u5fc5\u7136\u8fd4\u56detrue\uff0c\u56e0\u6b64\u4f1a\u7ee7\u7eed\u6267\u884c\uff0c\u4f7f\u5f97refcount\u52a01\uff1b<\/span><\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">\u63a5\u7740\uff0c\u6b64\u65f6\u6267\u884cthread1\uff0c\u90a3\u4e48\u518d\u7ee7\u7eed\u6267\u884cif\uff0cthreadId(thread1) != currentThread.get() (thread2)\u80fd\u6ee1\u8db3\uff0c\u4f46\u662fcurrentThread\u7684CAS\u8d4b\u503c\u5c06\u4f1a\u5931\u8d25\uff0c\u56e0\u6b64\u6b64\u65f6currentThread\u7684\u503c\u5e76\u4e0d\u662fNO_CURRENT_THREAD\u3002<\/span><\/span>\r\n        \r\n<span role=\"presentation\">        <span class=\"cm-comment\">refcount\u7528\u4e8e\u8bb0\u5f55\u91cd\u5165\u9501\u7684\u60c5\u51b5\uff0c\u53c2\u89c1release()\u65b9\u6cd5\uff0c\u5f53refcount=0\u65f6\uff0ccurrentThread\u5c06\u91cd\u65b0\u8d4b\u503c\u4e3aNO_CURRENT_THREAD\uff0c\u4fdd\u8bc1\u5f7b\u5e95\u89e3\u9501\u3002<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">threadId<\/span> <span class=\"cm-operator\">!=<\/span> <span class=\"cm-variable\">currentThread<\/span>.<span class=\"cm-variable\">get<\/span>() <span class=\"cm-operator\">&amp;&amp;<\/span> <span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">currentThread<\/span>.<span class=\"cm-variable\">compareAndSet<\/span>(<span class=\"cm-variable\">NO_CURRENT_THREAD<\/span>, <span class=\"cm-variable\">threadId<\/span>))<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">ConcurrentModificationException<\/span>(<span class=\"cm-string\">\"KafkaConsumer is not safe for multi-threaded access\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">refcount<\/span>.<span class=\"cm-variable\">incrementAndGet<\/span>();<\/span>\r\n<span role=\"presentation\">}<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"><span class=\"cm-comment\">\/**<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">* Release the light lock protecting the consumer from multi-threaded access.<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-variable-3\">void<\/span> <span class=\"cm-def\">release<\/span>() {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">refcount<\/span>.<span class=\"cm-variable\">decrementAndGet<\/span>() <span class=\"cm-operator\">==<\/span> <span class=\"cm-number\">0<\/span>)<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">currentThread<\/span>.<span class=\"cm-variable\">set<\/span>(<span class=\"cm-variable\">NO_CURRENT_THREAD<\/span>);<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6b64\u5904\u901a\u8fc7AtomicLong currentThread, AtomicInteger refcount \u8fd9\u4e24\u4e2a\u53d8\u91cf\u6765\u5b9e\u73b0\u8f7b\u91cf\u7ea7\u9501\u7684\u673a\u5236\u975e\u5e38\u7ecf\u5178\uff0c\u5efa\u8bae\u5b66\u4e60\u3001\u7406\u89e3\u3001\u8fd0\u7528\u3002\u5177\u4f53\u5206\u6790\u6211\u5df2\u7ecf\u5199\u5728\u4e86\u4e0a\u8ff0\u6e90\u7801\u4e2d\u4f9b\u5927\u5bb6\u53c2\u8003\u3002<\/span><\/p>\n<h3 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u6709\u5173\u5143\u6570\u636e\u66f4\u65b0<\/span><\/h3>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5143\u6570\u636e\u66f4\u65b0\u8c03\u7528\u7684\u662f<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>metadata.requestUpdateForNewTopics();<\/code><\/span><span class=\"md-plain\">\uff0c\u91cc\u9762\u5185\u5bb9\u662f\uff1a<\/span><\/p>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-keyword\">public<\/span> <span class=\"cm-keyword\">synchronized<\/span> <span class=\"cm-variable-3\">int<\/span> <span class=\"cm-def\">requestUpdateForNewTopics<\/span>() {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/ Override the timestamp of last refresh to let immediate update.<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">lastRefreshMs<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-number\">0<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">needPartialUpdate<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-atom\">true<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">requestVersion<\/span><span class=\"cm-operator\">++<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">updateVersion<\/span>;<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u53ef\u4ee5\u770b\u5230\uff0c\u5185\u90e8\u5e76\u6ca1\u6709\u66f4\u65b0\u5143\u6570\u636e\u7684\u64cd\u4f5c\uff0c\u53ea\u662f\u8bbe\u7f6e\u4e86\u51e0\u4e2a\u6807\u5fd7\u4f4d\u3002\u4f46\u6211\u4eec\u77e5\u9053\uff0c\u5728\u6d88\u606f\u6d88\u606f\u524d\uff0c\u6211\u4eec\u5fc5\u987b\u83b7\u53d6\u5143\u6570\u636e\uff0c\u6bd4\u5982broker\u8282\u70b9\u4fe1\u606f\u3001topic\u548cpartition\u7684\u5206\u5e03\uff0c\u5426\u5219\u6839\u672c\u4e0d\u77e5\u9053\u4ece\u54ea\u91cc\u83b7\u53d6\u6570\u636e\u3002\u90a3\u4e48\u6b64\u65f6\uff0c\u6211\u4eec\u5c31\u53ef\u4ee5\u6839\u636e\u8fd9\u51e0\u4e2a\u6807\u5fd7\u4f4d\u6765\u53bb\u627e\u76f8\u5e94\u4ee3\u7801\u3002\u4e0d\u8fc7\u4eceKafkaConsumer\u7684JavaDoc\u63d0\u4f9b\u7684\u4fe1\u606f\u4e5f\u80fd\u83b7\u77e5\uff0c\u662f\u5728\u62c9\u53d6\u6d88\u606f\u65f6\u5c06\u4f1a\u6839\u636e\u8fd9\u4e9b\u6807\u5fd7\u4f4d\u6765\u66f4\u65b0\u5143\u6570\u636e\u3002\u5177\u4f53\u903b\u8f91\u8bf7\u770b\u63a5\u4e0b\u6765\u7684\u5206\u6790\u3002<\/span><\/p>\n<h2 class=\"md-end-block md-heading\"><span class=\"md-plain\">\u62c9\u53d6\u6d88\u606f\u7684\u8fc7\u7a0b\u662f\u5982\u4f55\u5b9e\u73b0\u7684\uff1f<\/span><\/h2>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6211\u4eec\u5df2\u7ecf\u77e5\u9053\uff0c\u62c9\u53d6\u6d88\u606f\u662f\u8c03\u7528KafkaConsumer\u7684poll(Duration)\u65b9\u6cd5\uff0c\u4e3b\u8981\u6d41\u7a0b\u7684\u5e8f\u5217\u56fe\u53c2\u8003\u4e0b\u56fe<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-image md-img-loaded\" data-src=\"images\/kafka-consumer-poll-process-sequence-diagram.png\"><img loading=\"lazy\" class=\" wp-image-863 aligncenter\" src=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-poll-process-sequence-diagram-300x142.png\" alt=\"\" width=\"499\" height=\"236\" srcset=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-poll-process-sequence-diagram-300x142.png 300w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-poll-process-sequence-diagram-768x365.png 768w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-poll-process-sequence-diagram.png 996w\" sizes=\"(max-width: 499px) 100vw, 499px\" \/><\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6ce8\uff1a\u5f15\u81ea \u6781\u5ba2\u65f6\u95f4 \u6d88\u606f\u961f\u5217\u9ad8\u624b\u8bfe<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">poll\u6e90\u7801\u5982\u4e0b\uff0c\u4e3b\u8981\u6d41\u7a0b\u5728\uff1a<\/span><\/p>\n<ol class=\"ol-list\" start=\"\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-pair-s\"><strong><span class=\"md-plain\">updateAssignmentMetadataIfNeeded(): \u66f4\u65b0\u5143\u6570\u636e<\/span><\/strong><\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-pair-s \"><strong><span class=\"md-plain\">pollForFetches()\uff1a\u62c9\u53d6\u6d88\u606f<\/span><\/strong><\/span><\/p>\n<\/li>\n<\/ol>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-meta\">@Override<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">public<\/span> <span class=\"cm-variable\">ConsumerRecords<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">K<\/span>, <span class=\"cm-variable\">V<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">poll<\/span>(<span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable\">Duration<\/span> <span class=\"cm-variable\">timeout<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-variable\">poll<\/span>(<span class=\"cm-variable\">time<\/span>.<span class=\"cm-variable\">timer<\/span>(<span class=\"cm-variable\">timeout<\/span>), <span class=\"cm-atom\">true<\/span>);<\/span>\r\n<span role=\"presentation\">}<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"><span class=\"cm-comment\">\/**<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">* @throws KafkaException if the rebalance callback throws exception<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-variable\">ConsumerRecords<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">K<\/span>, <span class=\"cm-variable\">V<\/span><span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-variable\">poll<\/span>(<span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable\">Timer<\/span> <span class=\"cm-variable\">timer<\/span>, <span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-variable\">includeMetadataInTimeout<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u52a0\u9501\uff0c\u4fdd\u8bc1\u53ea\u67091\u4e2a\u7ebf\u7a0b\u8bfb\u53d6\u6570\u636e<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">acquireAndEnsureOpen<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">try<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u8bb0\u5f55\u5f00\u59cb\u65f6\u95f4<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">kafkaConsumerMetrics<\/span>.<span class=\"cm-variable\">recordPollStart<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">subscriptions<\/span>.<span class=\"cm-variable\">hasNoSubscriptionOrUserAssignment<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u6ca1\u6709\u8ba2\u9605\u4fe1\u606f\uff0c\u629b\u5f02\u5e38<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">IllegalStateException<\/span>(<span class=\"cm-string\">\"Consumer is not subscribed to any topics or assigned any partitions\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">do<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u8fd9\u4e2a\u5730\u65b9\u6ca1\u592a\u770b\u61c2<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">maybeTriggerWakeup<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">includeMetadataInTimeout<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ try to update assignment metadata BUT do not need to block on the timer for join group<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">updateAssignmentMetadataIfNeeded<\/span>(<span class=\"cm-variable\">timer<\/span>, <span class=\"cm-atom\">false<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  } <span class=\"cm-keyword\">else<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\">                <span class=\"cm-comment\">updateAssignmentMetadataIfNeeded\u65b9\u6cd5\u67093\u4e2a\u4f5c\u7528\uff1a<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">- discovery coordinator if necessary<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">- join group if necessary<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">- refresh metadata and fetch position if necessary<\/span><\/span>\r\n<span role=\"presentation\">                <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">while<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">updateAssignmentMetadataIfNeeded<\/span>(<span class=\"cm-variable\">time<\/span>.<span class=\"cm-variable\">timer<\/span>(<span class=\"cm-variable-3\">Long<\/span>.<span class=\"cm-variable\">MAX_VALUE<\/span>), <span class=\"cm-atom\">true<\/span>)) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">log<\/span>.<span class=\"cm-variable\">warn<\/span>(<span class=\"cm-string\">\"Still waiting for metadata\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable\">Map<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">TopicPartition<\/span>, <span class=\"cm-variable\">List<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">ConsumerRecord<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">K<\/span>, <span class=\"cm-variable\">V<\/span><span class=\"cm-operator\">&gt;&gt;&gt;<\/span> <span class=\"cm-variable\">records<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">pollForFetches<\/span>(<span class=\"cm-variable\">timer<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">records<\/span>.<span class=\"cm-variable\">isEmpty<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ before returning the fetched records, we can send off the next round of fetches<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ and avoid block waiting for their responses to enable pipelining while the user<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ is handling the fetched records.<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ NOTE: since the consumed position has already been updated, we must not allow<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ wakeups or any other errors to be triggered prior to returning the fetched records.<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">sendFetches<\/span>() <span class=\"cm-operator\">&gt;<\/span> <span class=\"cm-number\">0<\/span> <span class=\"cm-operator\">||<\/span> <span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">hasPendingRequests<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">transmitSends<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">interceptors<\/span>.<span class=\"cm-variable\">onConsume<\/span>(<span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">ConsumerRecords<\/span><span class=\"cm-operator\">&lt;&gt;<\/span>(<span class=\"cm-variable\">records<\/span>));<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  } <span class=\"cm-keyword\">while<\/span> (<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">notExpired<\/span>());<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-variable\">ConsumerRecords<\/span>.<span class=\"cm-variable\">empty<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0  } <span class=\"cm-keyword\">finally<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">release<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">kafkaConsumerMetrics<\/span>.<span class=\"cm-variable\">recordPollEnd<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">}<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"><span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-def\">updateAssignmentMetadataIfNeeded<\/span>(<span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable\">Timer<\/span> <span class=\"cm-variable\">timer<\/span>, <span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-variable\">waitForJoinGroup<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">coordinator<\/span> <span class=\"cm-operator\">!=<\/span> <span class=\"cm-atom\">null<\/span> <span class=\"cm-operator\">&amp;&amp;<\/span> <span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">coordinator<\/span>.<span class=\"cm-variable\">poll<\/span>(<span class=\"cm-variable\">timer<\/span>, <span class=\"cm-variable\">waitForJoinGroup<\/span>)) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-atom\">false<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured.<\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\u66f4\u65b0position\u6216\u662f\u6839\u636e\u7528\u6237\u914d\u7f6e\u91cd\u7f6eposition<\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-variable\">updateFetchPositions<\/span>(<span class=\"cm-variable\">timer<\/span>);<\/span>\r\n<span role=\"presentation\">}<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\">\u200b<\/span><\/pre>\n<h3 class=\"md-end-block md-heading\"><span class=\"md-plain\">updateAssignmentMetadataIfNeeded() \u66f4\u65b0\u5143\u6570\u636e<\/span><\/h3>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">updateAssignmentMetadataIfNeeded\u65b9\u6cd5\u67093\u4e2a\u4f5c\u7528\uff08\u6b64\u5904\u4ee3\u7801\u6700\u8fd1\u6709\u66f4\u65b0\uff0c\u53ef\u4ee5\u770b\u770bgithub\u4e2d\u4ee3\u7801\u66f4\u65b0\u8bb0\u5f55\uff09\uff1a<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">discovery coordinator if necessary<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">join group if necessary<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">refresh metadata and fetch position if necessary<\/span><\/p>\n<\/li>\n<\/ul>\n<h4 class=\"md-end-block md-heading\"><span class=\"md-plain\">Coordinator#poll() \u7ef4\u6301\u5fc3\u8df3\uff0c\u66f4\u65b0\u5143\u6570\u636e<\/span><\/h4>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">updateAssignmentMetadataIfNeeded\u8c03\u7528\u4e86coordinator.poll() \u65b9\u6cd5\uff0c\u5373ConsumerCoordinator#poll(Timer timer, boolean waitForJoinGroup)\uff0c\u6e90\u7801\u5982\u4e0b<\/span><\/p>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-keyword\">public<\/span> <span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-def\">poll<\/span>(<span class=\"cm-variable\">Timer<\/span> <span class=\"cm-variable\">timer<\/span>, <span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-variable\">waitForJoinGroup<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">maybeUpdateSubscriptionMetadata<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">invokeCompletedOffsetCommitCallbacks<\/span>();<\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\/\/\u5148\u5ffd\u7565\u7ec6\u8282\u903b\u8f91\uff0c\u7b2c\u4e00\u6b21\u542f\u52a8\u65f6\uff0c\u5c06\u6267\u884c\u6b64\u5904<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">subscriptions<\/span>.<span class=\"cm-variable\">hasAutoAssignedPartitions<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">protocol<\/span> <span class=\"cm-operator\">==<\/span> <span class=\"cm-atom\">null<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">throw<\/span> <span class=\"cm-keyword\">new<\/span> <span class=\"cm-variable\">IllegalStateException<\/span>(<span class=\"cm-string\">\"User configured \"<\/span> <span class=\"cm-operator\">+<\/span> <span class=\"cm-variable\">ConsumerConfig<\/span>.<span class=\"cm-variable\">PARTITION_ASSIGNMENT_STRATEGY_CONFIG<\/span> <span class=\"cm-operator\">+<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-string\">\" to empty while trying to subscribe for group protocol to auto assign partitions\"<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">\u6b64\u5904\u662f\u53d1\u9001\u5fc3\u8df3\u903b\u8f91<\/span><\/span>\r\n<span role=\"presentation\">        <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">pollHeartbeat<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">coordinatorUnknown<\/span>() <span class=\"cm-operator\">&amp;&amp;<\/span> <span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">ensureCoordinatorReady<\/span>(<span class=\"cm-variable\">timer<\/span>)) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-atom\">false<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">rejoinNeededOrPending<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">subscriptions<\/span>.<span class=\"cm-variable\">hasPatternSubscription<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">metadata<\/span>.<span class=\"cm-variable\">timeToAllowUpdate<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>()) <span class=\"cm-operator\">==<\/span> <span class=\"cm-number\">0<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">this<\/span>.<span class=\"cm-variable\">metadata<\/span>.<span class=\"cm-variable\">requestUpdate<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\u5728\u6b64\u5904\u4f1a\u6839\u636e\u5f53\u524dneedFullUpdate needPartialUpdate\u7b49\u72b6\u6001\uff0c<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\u6267\u884c\u5143\u6570\u636e\u66f4\u65b0\u903b\u8f91\uff0c\u5177\u4f53\u7684\u903b\u8f91\u5219\u662f\u7531ConsumerNetworkClient#poll\u65b9\u6cd5\u6267\u884c\u3002<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">ensureFreshMetadata<\/span>(<span class=\"cm-variable\">timer<\/span>)) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-atom\">false<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">maybeUpdateSubscriptionMetadata<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">ensureActiveGroup<\/span>(<span class=\"cm-variable\">waitForJoinGroup<\/span> <span class=\"cm-operator\">?<\/span> <span class=\"cm-variable\">timer<\/span> : <span class=\"cm-variable\">time<\/span>.<span class=\"cm-variable\">timer<\/span>(<span class=\"cm-number\">0L<\/span>))) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-atom\">false<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0  } <span class=\"cm-keyword\">else<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">metadata<\/span>.<span class=\"cm-variable\">updateRequested<\/span>() <span class=\"cm-operator\">&amp;&amp;<\/span> <span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">hasReadyNodes<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>())) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">awaitMetadataUpdate<\/span>(<span class=\"cm-variable\">timer<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">maybeAutoCommitOffsetsAsync<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-atom\">true<\/span>;<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<h4 class=\"md-end-block md-heading\"><span class=\"md-plain\">ConsumerNetworkClient#poll() \u5c01\u88c5\u6240\u6709\u7f51\u7edc\u901a\u4fe1<\/span><\/h4>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">ConsumerNetworkClient\u5c01\u88c5\u4e86 consumer\u548c kafka cluster \u4e4b\u95f4\u6240\u6709\u7684\u7f51\u7edc\u901a\u4fe1\u7684\u5b9e\u73b0\uff0c\u5b8c\u5168\u5f02\u6b65\u5b9e\u73b0\u3001\u6ca1\u6709\u81ea\u5df1\u7ef4\u62a4\u7ebf\u7a0b\u3002\u5176\u4e2d\uff1a<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6240\u6709\u5f85\u53d1\u9001\u8bf7\u6c42Request\u90fd\u4f1a\u88ab\u5b58\u653e\u5230ConsumerNetworkClient\u7684\u6210\u5458\u53d8\u91cf<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>UnsentRequests unsent<\/code><\/span><span class=\"md-plain\">\u4e2d\u3002<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6ce8\uff1aUnsentRequests \u662f\u7528<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>ConcurrentMap&lt;Node, ConcurrentLinkedQueue&lt;ClientRequest&gt;&gt; unsent<\/code><\/span><span class=\"md-plain\">\u5b9a\u4e49\u4e00\u4e2aMap\u6765\u4fdd\u5b58\u6240\u6709\u5f85\u53d1\u9001\u8bf7\u6c42\uff0c\u4f5c\u4e3a\u5173\u952e\u5b57\u7684Node\u4e2d\u5305\u542b\u4e86kafka\u8282\u70b9\u7684id\u3001host\u3001port\u3001rack\u7b49\u4fe1\u606f\u3002<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8fd4\u56de\u7684Response\u5c06\u5b58\u653e\u5728ConsumerNetworkClient\u7684\u6210\u5458\u53d8\u91cfpendingCompletion \u4e2d\u3002<\/span><\/p>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6bcf\u6b21\u8c03\u7528ConsumerNetworkClient# poll() \u65b9\u6cd5\u7684\u65f6\u5019\uff0c\u5728\u5f53\u524d\u7ebf\u7a0b\uff08\u800c\u4e0d\u662fConsumerNetworkClient\u81ea\u5df1\u7ef4\u62a4\u7ebf\u7a0b\uff09\u4e2d\u53d1\u9001\u6240\u6709\u5f85\u53d1\u9001\u7684 Request\uff0c\u5904\u7406\u6240\u6709\u6536\u5230\u7684 Response\u3002<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8fd9\u79cd\u5f02\u6b65\u8bbe\u8ba1\u7684\u4f18\u52a3\uff1a<\/span><\/p>\n<ul class=\"ul-list\" data-mark=\"-\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4f18\u70b9\uff1a\u5360\u7528\u7ebf\u7a0b\u5c11\uff0c\u541e\u5410\u91cf\u9ad8<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e0d\u8db3\uff1a\u589e\u52a0\u4ee3\u7801\u590d\u6742\u5ea6<\/span><\/p>\n<\/li>\n<\/ul>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-keyword\">public<\/span> <span class=\"cm-variable-3\">void<\/span> <span class=\"cm-def\">poll<\/span>(<span class=\"cm-variable\">Timer<\/span> <span class=\"cm-variable\">timer<\/span>, <span class=\"cm-variable\">PollCondition<\/span> <span class=\"cm-variable\">pollCondition<\/span>, <span class=\"cm-variable-3\">boolean<\/span> <span class=\"cm-variable\">disableWakeup<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ \u8c03\u7528handler<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">firePendingCompletedRequests<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">lock<\/span>.<span class=\"cm-variable\">lock<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">try<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ Handle async disconnects prior to attempting any sends<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">handlePendingDisconnects<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\u904d\u5386unsent\u4e2d\u5df2\u4fdd\u5b58\u7684\u6240\u6709\u8bf7\u6c42\uff0c\u5e76\u8c03\u7528KafkaClient#send\uff0c\u53d1\u9001\u8bf7\u6c42<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable-3\">long<\/span> <span class=\"cm-variable\">pollDelayMs<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">trySend<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">pendingCompletion<\/span>.<span class=\"cm-variable\">isEmpty<\/span>() <span class=\"cm-operator\">&amp;&amp;<\/span> (<span class=\"cm-variable\">pollCondition<\/span> <span class=\"cm-operator\">==<\/span> <span class=\"cm-atom\">null<\/span> <span class=\"cm-operator\">||<\/span> <span class=\"cm-variable\">pollCondition<\/span>.<span class=\"cm-variable\">shouldBlock<\/span>())) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ if there are no requests in flight, do not block longer than the retry backoff<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable-3\">long<\/span> <span class=\"cm-variable\">pollTimeout<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">Math<\/span>.<span class=\"cm-variable\">min<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">remainingMs<\/span>(), <span class=\"cm-variable\">pollDelayMs<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/*<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">KafkaClient\u4e2d\u7ef4\u62a4\u7740\u4e00\u4e2a\u5728\u9014\u8bf7\u6c42\u6620\u5c04\u8868\uff0c\u53c2\u89c1InFlightRequests\u7c7b\u3002NetworkClient#send\u65b9\u6cd5<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\u5728\u53d1\u9001\u7f51\u7edc\u8bf7\u6c42\u65f6\uff0c\u5c06\u4f1a\u5728\u5728\u9014\u8bf7\u6c42\u6620\u5c04\u8868\u52a0\u5165\u4e00\u6761\u8bb0\u5f55\u3002\u6ce8\u610f\u662f\u4e3a\u4e86\u907f\u514d\u8bf7\u6c42\u8d85\u65f6\u5bfc\u81f4\u8bf7\u6c42\u4e00\u76f4\u7b49\u5f85\u3001<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\u5360\u7528\u5185\u5b58\u3001\u8fdb\u800c\u5bfc\u81f4\u5185\u5b58\u4e0d\u8db3<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">inFlightRequestCount<\/span>() <span class=\"cm-operator\">==<\/span> <span class=\"cm-number\">0<\/span>)<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">pollTimeout<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">Math<\/span>.<span class=\"cm-variable\">min<\/span>(<span class=\"cm-variable\">pollTimeout<\/span>, <span class=\"cm-variable\">retryBackoffMs<\/span>);<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">poll<\/span>(<span class=\"cm-variable\">pollTimeout<\/span>, <span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  } <span class=\"cm-keyword\">else<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">poll<\/span>(<span class=\"cm-number\">0<\/span>, <span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">update<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">checkDisconnects<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">disableWakeup<\/span>) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">maybeTriggerWakeup<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">maybeThrowInterruptException<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">trySend<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">failExpiredRequests<\/span>(<span class=\"cm-variable\">timer<\/span>.<span class=\"cm-variable\">currentTimeMs<\/span>());<\/span>\r\n<span role=\"presentation\">            <span class=\"cm-comment\">\/\/\u6e05\u7406unsent<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">unsent<\/span>.<span class=\"cm-variable\">clean<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  } <span class=\"cm-keyword\">finally<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">lock<\/span>.<span class=\"cm-variable\">unlock<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ called without the lock to avoid deadlock potential if handlers need to acquire locks<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">firePendingCompletedRequests<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-variable\">metadata<\/span>.<span class=\"cm-variable\">maybeThrowAnyException<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span><\/pre>\n<h4 class=\"md-end-block md-heading\"><span class=\"md-plain\">KafkaConsumer#updateFetchPositions() \u66f4\u65b0\u6d88\u8d39\u4f4d\u7f6e<\/span><\/h4>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Kafka Consumer \u5728\u6d88\u8d39\u8fc7\u7a0b\u4e2d\u662f\u9700\u8981\u7ef4\u62a4\u6d88\u8d39\u4f4d\u7f6e\u7684\uff0cConsumer \u6bcf\u6b21\u4ece\u5f53\u524d\u6d88\u8d39\u4f4d\u7f6e\u62c9\u53d6\u4e00\u6279\u6d88\u606f\uff0c\u8fd9\u4e9b\u6d88\u606f\u90fd\u88ab\u6b63\u5e38\u6d88\u8d39\u540e\uff0cConsumer \u4f1a\u7ed9 Coordinator \u53d1\u4e00\u4e2a\u63d0\u4ea4\u4f4d\u7f6e\u7684\u8bf7\u6c42\uff0c\u7136\u540e\u6d88\u8d39\u4f4d\u7f6e\u4f1a\u5411\u540e\u79fb\u52a8\uff0c\u5b8c\u6210\u4e00\u6279\u6d88\u8d39\u8fc7\u7a0b\u3002<\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u800cconsumer\u53d1\u8d77\u63d0\u4ea4\u4f4d\u7f6e\u7684\u8bf7\u6c42\uff0c\u8fd8\u662f\u5728<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>updateAssignmentMetadataIfNeeded()<\/code><\/span><span class=\"md-plain\">\u65b9\u6cd5\u4e2d\uff0c\u6bd5\u7adf\u4f4d\u7f6e\u4fe1\u606f\u4e5f\u662f\u670d\u52a1\u7aefbroker\u7684\u4e00\u4e2a\u5143\u6570\u636e\u3002\u5728<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>`updateAssignmentMetadataIfNeeded<\/code><\/span><span class=\"md-plain\">\u65b9\u6cd5\u5b9e\u73b0\u4e2d\uff0c\u6700\u540e\u4e00\u53e5\u8c03\u7528<\/span><span class=\"md-pair-s\" spellcheck=\"false\"><code>KafkaConsumer#updateFetchPositions(timer);<\/code><\/span><span class=\"md-plain\">\uff0c\u800c\u8fd9\u4e2a\u65b9\u6cd5\u53c8\u4f1a\u8c03\u7528coordinator.refreshCommittedOffsetsIfNeeded()\u65b9\u6cd5\u3002\u8c03\u7528\u94fe\u8def\u5982\u4e0b\uff1a<\/span><\/p>\n<div class=\"md-diagram-panel\">\n<div class=\"md-diagram-panel-error\"><img loading=\"lazy\" class=\"wp-image-864 aligncenter\" src=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-refresh-position-sequence-diagram-300x74.png\" alt=\"\" width=\"568\" height=\"140\" srcset=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-refresh-position-sequence-diagram-300x74.png 300w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-refresh-position-sequence-diagram-1024x254.png 1024w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-refresh-position-sequence-diagram-768x190.png 768w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-refresh-position-sequence-diagram.png 1501w\" sizes=\"(max-width: 568px) 100vw, 568px\" \/><\/div>\n<\/div>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6ce8\uff1a\u540c\u4e00\u4e2a\u7c7b\u5185\u90e8\u7684\u65b9\u6cd5\u8c03\u7528\uff0c\u7528\u7c7b\u540d\u5934\u90e8\u52a0\u4e86&#8217;\u2019&#8217;\u533a\u5206\u4e86\u4e0b\uff0c\u4e3b\u8981\u662f\u4e3a\u4e86\u5728Typora\u91cc\u753b\u56fe\u65b9\u4fbf\u3002\u4e0d\u652f\u6301\u53ef\u53c2\u8003images\/kafka-consumer-refresh-position-sequence-diagram.png \u56fe\u7247\u3002<\/span><\/p>\n<h3 class=\"md-end-block md-heading\"><span class=\"md-plain\">pollForFetchs() \u62c9\u53d6\u6d88\u606f<\/span><\/h3>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6e90\u7801\u5982\u4e0b<\/span><\/p>\n<pre class=\"md-fences md-end-block ty-contain-cm modeLoaded\" lang=\"java\" spellcheck=\"false\"><span role=\"presentation\"><span class=\"cm-keyword\">private<\/span> <span class=\"cm-variable\">Map<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">TopicPartition<\/span>, <span class=\"cm-variable\">List<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">ConsumerRecord<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">K<\/span>, <span class=\"cm-variable\">V<\/span><span class=\"cm-operator\">&gt;&gt;&gt;<\/span> <span class=\"cm-def\">pollForFetches<\/span>(<span class=\"cm-variable\">Timer<\/span> <span class=\"cm-variable\">timer<\/span>) {<\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\/\/ \u7701\u7565\u90e8\u5206\u4ee3\u7801 \u00a0 \u00a0 \u00a0 \u00a0<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/ \u5982\u679c\u7f13\u5b58\u91cc\u9762\u6709\u672a\u8bfb\u53d6\u7684\u6d88\u606f\uff0c\u76f4\u63a5\u8fd4\u56de\u8fd9\u4e9b\u6d88\u606f<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">final<\/span> <span class=\"cm-variable\">Map<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">TopicPartition<\/span>, <span class=\"cm-variable\">List<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">ConsumerRecord<\/span><span class=\"cm-operator\">&lt;<\/span><span class=\"cm-variable\">K<\/span>, <span class=\"cm-variable\">V<\/span><span class=\"cm-operator\">&gt;&gt;&gt;<\/span> <span class=\"cm-variable\">records<\/span> <span class=\"cm-operator\">=<\/span> <span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">fetchedRecords<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">if<\/span> (<span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">records<\/span>.<span class=\"cm-variable\">isEmpty<\/span>()) {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-variable\">records<\/span>;<\/span>\r\n<span role=\"presentation\"> \u00a0  }<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\/* <\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\u6784\u9020\u62c9\u53d6\u6d88\u606f\u8bf7\u6c42\uff0c\u5e76\u53d1\u9001\u3002<\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\u6ce8\u610f\uff0c\u8be5\u65b9\u6cd5\u5185\u90e8\uff0c\u5c06\u8c03\u7528ConsumerNetworkClient#send\u65b9\u6cd5\uff0c\u800c\u8fd9\u4e2asend\u65b9\u6cd5\u5c06\u4f1a\u628a <\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">kafka\u8282\u70b9\u4fe1\u606f --&gt; \u8bf7\u6c42  \u5b58\u653e\u5230ConsumerNetworkClient#unsent\u6210\u5458\u53d8\u91cf\u4e2d<\/span><\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">*\/<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">sendFetches<\/span>();<\/span>\r\n<span role=\"presentation\">\u200b<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/ \u7701\u7565\u90e8\u5206\u4ee3\u7801<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/ \u53d1\u9001\u7f51\u7edc\u8bf7\u6c42\u62c9\u53d6\u6d88\u606f\uff0c\u7b49\u5f85\u76f4\u5230\u6709\u6d88\u606f\u8fd4\u56de\u6216\u8005\u8d85\u65f6<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-variable\">client<\/span>.<span class=\"cm-variable\">poll<\/span>(<span class=\"cm-variable\">pollTimer<\/span>, () <span class=\"cm-operator\">-&gt;<\/span> {<\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ since a fetch might be completed by the background thread, we need this poll condition<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-comment\">\/\/ to ensure that we do not block unnecessarily in poll()<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0 \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-operator\">!<\/span><span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">hasAvailableFetches<\/span>();<\/span>\r\n<span role=\"presentation\"> \u00a0  });<\/span>\r\n<span role=\"presentation\">    <span class=\"cm-comment\">\/\/\u7701\u7565\u90e8\u5206\u4ee3\u7801<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-comment\">\/\/\u8fd4\u56de\u62c9\u53d6\u5230\u7684\u6d88\u606f<\/span><\/span>\r\n<span role=\"presentation\"> \u00a0 \u00a0<span class=\"cm-keyword\">return<\/span> <span class=\"cm-variable\">fetcher<\/span>.<span class=\"cm-variable\">fetchedRecords<\/span>();<\/span>\r\n<span role=\"presentation\">}<\/span><\/pre>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e3b\u8981\u903b\u8f91\u5982\u4e0b\uff1a<\/span><\/p>\n<ol class=\"ol-list\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5982\u679c\u7f13\u5b58\u91cc\u9762\u6709\u672a\u8bfb\u53d6\u7684\u6d88\u606f\uff0c\u76f4\u63a5\u8fd4\u56de\u8fd9\u4e9b\u6d88\u606f\uff1b<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6784\u9020\u62c9\u53d6\u6d88\u606f\u8bf7\u6c42\uff0c\u5e76\u53d1\u9001\uff1b<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u53d1\u9001\u7f51\u7edc\u8bf7\u6c42\u5e76\u62c9\u53d6\u6d88\u606f\uff0c\u7b49\u5f85\u76f4\u5230\u6709\u6d88\u606f\u8fd4\u56de\u6216\u8005\u8d85\u65f6\uff1b<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8fd4\u56de\u62c9\u5230\u7684\u6d88\u606f\u3002<\/span><\/p>\n<ol class=\"ol-list\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">fetcher.fetchedRecords\u4f1a\u5c06\u8fd4\u56de\u7684Response\u53cd\u5e8f\u5217\u5316\uff0c\u8fd4\u56de\u7ed9\u8c03\u7528\u8005\u3002<\/span><\/p>\n<\/li>\n<\/ol>\n<p class=\"md-end-block md-p\">\n<\/li>\n<\/ol>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5177\u4f53\u53d1\u9001\u64cd\u4f5c\u7531fetcher.sendFetches();\u5b9e\u73b0\uff0c\u4e3b\u8981\u5305\u62ec\u5982\u4e0b\u6b65\u9aa4\uff1a<\/span><\/p>\n<ol class=\"ol-list\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6784\u9020\u62c9\u53d6\u6d88\u606f\u8bf7\u6c42Request\u5bf9\u8c61<\/span><\/p>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8c03\u7528ConsumerNetworkClient#send\u65b9\u6cd5<\/span><span class=\"md-pair-s \"><strong><span class=\"md-plain\">\u5f02\u6b65\u53d1\u9001Request<\/span><\/strong><\/span><\/p>\n<ol class=\"ol-list\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u8fd9\u4e2asend\u65b9\u6cd5\u5c06\u4f1a\u628akafka\u8282\u70b9\u4fe1\u606f &#8211;&gt; \u8bf7\u6c42 \u5b58\u653e\u5230ConsumerNetworkClient#unsent\u6210\u5458\u53d8\u91cf\u4e2d<\/span><\/p>\n<\/li>\n<\/ol>\n<\/li>\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u6ce8\u518c\u4e00\u4e2a\u56de\u8c03\u7c7b\u6765\u5904\u7406\u8fd4\u56de\u7684Response<\/span><\/p>\n<ol class=\"ol-list\">\n<li class=\"md-list-item\">\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">Response\u6682\u65f6\u88ab\u5b58\u653e\u5728Fetcher#completedFetches\u6210\u5458\u53d8\u91cf\u4e2d<\/span><\/p>\n<\/li>\n<\/ol>\n<\/li>\n<\/ol>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u5f53\u8c03\u7528ConsumerNetworkClient#poll\u65b9\u6cd5\u65f6\uff0c\u624d\u4f1a\u904d\u5386unsent\uff0c\u5c06\u5176\u4e2d\u7684\u8bf7\u6c42\u53d1\u51fa\u53bb\uff0c\u5e76\u5904\u7406\u6536\u5230\u7684Response\u3002<\/span><\/p>\n<h2 class=\"md-end-block md-heading md-focus\"><span class=\"md-plain md-expand\">KafkaConsumer\u76f8\u5173\u7c7b\u56fe<\/span><\/h2>\n<p class=\"md-end-block md-p\"><span class=\"md-image md-img-loaded\" data-src=\"images\/kafka-consumer-class-diagram.png\"><img loading=\"lazy\" class=\" wp-image-862 aligncenter\" src=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-class-diagram-300x108.png\" alt=\"\" width=\"431\" height=\"155\" srcset=\"http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-class-diagram-300x108.png 300w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-class-diagram-1024x370.png 1024w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-class-diagram-768x277.png 768w, http:\/\/prayerlaputa.com\/wp-content\/uploads\/2020\/09\/kafka-consumer-class-diagram.png 1215w\" sizes=\"(max-width: 431px) 100vw, 431px\" \/><\/span><\/p>\n<p class=\"md-end-block md-p\"><span class=\"md-plain\">\u4e0a\u56fe\u5f15\u81ea\uff1a\u6781\u5ba2\u65f6\u95f4 \u6d88\u606f\u961f\u5217\u9ad8\u624b\u8bfe<\/span><\/p>\n","protected":false},"excerpt":{"rendered":"<p>\u8bf4\u660e \u672c\u6587\u57fa\u4e8eApache Kafka 2.5.1\uff082020.09.10\u62c9\u53d6\u6700\u65b0\u4ee3\u7801\uff09 Consumer\u5982\u4f55\u4f7f <a href='http:\/\/prayerlaputa.com\/?p=861' class='excerpt-more'>[&#8230;]<\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0},"categories":[104,79,18],"tags":[105,82,50],"_links":{"self":[{"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/posts\/861"}],"collection":[{"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=861"}],"version-history":[{"count":3,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/posts\/861\/revisions"}],"predecessor-version":[{"id":867,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=\/wp\/v2\/posts\/861\/revisions\/867"}],"wp:attachment":[{"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=861"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=861"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/prayerlaputa.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=861"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}