harry louis做受网站/seo对网络推广的作用是
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 总体流程图
- 源码剖析
- waitOnMetadata
- awaitUpdate
- Sender线程方法
- 步骤一:maybeUpdate
- 步骤二
- 步骤三
- handleCompletedReceives
- maybeHandleCompletedReceive
- handleResponse
总体流程图
源码剖析
waitOnMetadata
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {// add topic to metadata topic list if it is not there already and reset expiry// 把当前的topic存入到元数据里面metadata.add(topic);//我们使用的是场景驱动的方式,然后我们目前代码执行到的producer端初始化完成。//我们知道这个cluster里面其实没有元数据,只是有我们写代码的时候设置addressCluster cluster = metadata.fetch();//根据当前的topic从这个集群的cluster元数据信息里面查看分区的信息。//因为我们目前是第一次执行这段代码,所以这儿肯定是没有对应的分区的信息的。Integer partitionsCount = cluster.partitionCountForTopic(topic);// Return cached metadata if we have it, and if the record's partition is either undefined// or within the known partition range//如果在元数据里面获取到了 分区的信息//我们用场景驱动的方式,我们知道如果是第一次代码进来这儿,代码是不会运行这儿。if (partitionsCount != null && (partition == null || partition < partitionsCount))//直接返回cluster元数据信息,拉取元数据花的时间。return new ClusterAndWaitTime(cluster, 0);//如果代码执行到这儿,说明,真的需要去服务端拉取元数据。//记录当前时间long begin = time.milliseconds();//剩余多少时间,默认值给的是 最多可以等待的时间。long remainingWaitMs = maxWaitMs;//已经花了多少时间。long elapsed;// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.// In case we already have cached metadata for the topic, but the requested partition is greater// than expected, issue an update request only once. This is necessary in case the metadata// is stale and the number of partitions for this topic has increased in the meantime.do {log.trace("Requesting metadata update for topic {}.", topic);//1)获取当前元数据的版本//在Producer管理元数据时候,对于他来说元数据是有版本号的。//每次成功更新元数据,都会递增这个版本号。//2把needUpdate 标识赋值为trueint version = metadata.requestUpdate();/*** TODO 这个步骤重要* 我们发现这儿去唤醒sender线程。* 其实是因为,拉取有拉取元数据这个操作是有sender线程去完成的。* 这个地方把线程给唤醒了以后* 我们知道sender线程肯定就开始进行干活了!! 至于怎么我们后面在继续分析。** 这儿我告诉大家,java的线程的知识,并发的知识,大家一定要掌握。* 没有掌握好的同学,下去补一补这方面的知识。*/sender.wakeup();try {//TODO 等待元数据//同步的等待//等待这sender线程获取到元数据。metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException ex) {// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrow new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}//尝试获取一下集群的元数据信息。cluster = metadata.fetch();//计算一下 拉取元数据已经花了多少时间elapsed = time.milliseconds() - begin;//如果花的时间大于 最大等待的时间,那么就报超时。if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//如果已经获取到了元数据,但是发现topic没有授权if (cluster.unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);//计算出来 还可以用的时间。remainingWaitMs = maxWaitMs - elapsed;//尝试获取一下,我们要发送消息的这个topic对应分区的信息。//如果这个值不为null,说明前面sender线程已经获取到了元数据了。partitionsCount = cluster.partitionCountForTopic(topic);//如果获取到了元数据以后,这儿代码就会退出。} while (partitionsCount == null);if (partition != null && partition >= partitionsCount) {throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));}//代码就执行到这儿,返回一个对象//有两个参数://cluster: 集群的元数据//elapsed: 代表的是拉取元数据花了多少时间。return new ClusterAndWaitTime(cluster, elapsed);}
awaitUpdate
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}//获取当前时间long begin = System.currentTimeMillis();//看剩余可以使用的时间,一开始是最大等待的时间。long remainingWaitMs = maxWaitMs;//version是元数据的版本号。//如果当前的这个version小于等于上一次的version。//说明元数据还没更新。//因为如果sender线程那儿 更新元数据,如果更新成功了,sender线程肯定回去累加这个version。while (this.version <= lastVersion) {//如果还有剩余的时间。if (remainingWaitMs != 0)//让当前线程阻塞等待。//我们这儿虽然没有去看 sender线程的源码//但是我们知道,他那儿肯定会做这样的一个操作//如果更新元数据成功了,会唤醒这个线程。wait(remainingWaitMs);//如果代码执行到这儿 说明就要么就被唤醒了,要么就到点了。//计算一下花了多少时间。long elapsed = System.currentTimeMillis() - begin;//已经超时了if (elapsed >= maxWaitMs)//报一个超时的异常。throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//再次计算 可以使用的时间。remainingWaitMs = maxWaitMs - elapsed;}}
Sender线程方法
public void run() {log.debug("Starting Kafka producer I/O thread.");//其实代码就是一个死循环,然后一直在运行。//所以我们要知道sender线程启动起来一以后是一直在运行的。while (running) {try {//TODOrun(time.milliseconds());} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");// okay we stopped accepting requests but there may still be// requests in the accumulator or waiting for acknowledgment,// wait until these are completed.while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {try {run(time.milliseconds());} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}if (forceClose) {// We need to fail all the incomplete batches and wake up the threads waiting on// the futures.this.accumulator.abortIncompleteBatches();}try {this.client.close();} catch (Exception e) {log.error("Failed to close network client", e);}log.debug("Shutdown of Kafka producer I/O thread has completed.");}
再调用run方法
void run(long now) {//获取元数据//因为我们是根据场景驱动的方式,目前是我们第一次代码进来,//目前还没有获取到元数据//所以这个cluster里面是没有元数据//如果这儿没有元数据的话,这个方法里面接下来的代码就不用看了//是以为接下来的这些代码依赖这个元数据。//TODO 我们直接看这个方法的最后一行代码//就是这行代码去拉取的元数据。/*** 我们用场景驱动的方式,现在我们的代码是第二次进来* 第二次进来的时候,已经有元数据了,所以cluster这儿是有元数据。** 步骤一:* 获取元数据***/Cluster cluster = metadata.fetch();// get the list of partitions with data ready to send/*** 步骤二:* 首先是判断哪些partition有消息可以发送,获取到这个partition的leader partition* 对应的broker主机。** 哪些broker上面需要我们去发送消息?*/RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);/*** 步骤三:* 标识还没有拉取到元数据的topic*/if (!result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for (String topic : result.unknownLeaderTopics)this.metadata.add(topic);this.metadata.requestUpdate();}// remove any nodes we aren't ready to send toIterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while (iter.hasNext()) {Node node = iter.next();/*** 步骤四:检查与要发送数据的主机的网络是否已经建立好。*/if (!this.client.ready(node, now)) {//如果返回的是false !false 代码就进来//移除result 里面要发送消息的主机。//所以我们会看到这儿所有的主机都会被移除iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));}}/*** 步骤五:** 我们有可能要发送的partition有很多个,* 很有可能有一些partition的leader partition是在同一台服务器上面。* p0:leader:0* p1:leader: 0* p2:leader: 1* p3:leader: 2* 假设我们集群只有3台服务器* 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。** 按照broker进行分组,同一个broker的partition为同一组* 0:{p0,p1}* 1:{p2}* 2:{p3}**///所以我们发现 如果网络没有建立的话,这儿的代码是不执行的Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);if (guaranteeMessageOrder) {// Mute all the partitions drained//如果batches 空的话,这而的代码也就不执行了。for (List<RecordBatch> batchList : batches.values()) {for (RecordBatch batch : batchList)this.accumulator.mutePartition(batch.topicPartition);}}/*** 步骤六:* 对超时的批次是如何处理的?**/List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);// update sensorsfor (RecordBatch expiredBatch : expiredBatches)this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);sensors.updateProduceRequestMetrics(batches);/*** 步骤七:* 创建发送消息的请求*** 创建请求* 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面* ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁* 我们要知道,我们集群里面网络资源是非常珍贵的。* 会把发往同个broker上面partition的数据 组合成为一个请求。* 然后统一一次发送过去,这样子就减少了网络请求。*///如果网络连接没有建立好 batches其实是为空。//也就说其实这段代码也是不会执行。List<ClientRequest> requests = createProduceRequests(batches, now);// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}//发送请求for (ClientRequest request : requests)client.send(request, now);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;//TODO 重点就是去看这个方法//就是用这个方法拉取的元数据。/*** 步骤八:* 真正执行网络操作的都是这个NetWordClient这个组件* 包括:发送请求,接受响应(处理响应)*///我们猜这儿可能就是去建立连接。this.client.poll(pollTimeout, now);}
就是poll方法进行拉取
public List<ClientResponse> poll(long timeout, long now) {/*** 在这个方法里面有涉及到kafka的网络的方法,但是* 目前我们还没有给大家讲kafka的网络,所以我们分析的时候* 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据* 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码* 的时候,其实代码就比较简单了。*///步骤一:封装了一个要拉取元数据请求long metadataTimeout = metadataUpdater.maybeUpdate(now);try {//步骤二: 发送请求,进行复杂的网络操作//但是我们目前还没有学习到kafka的网络//所以这儿大家就只需要知道这儿会发送网络请求。//TODO 执行网络IO的操作。this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);//步骤三:处理响应,响应里面就会有我们需要的元数据。/*** 这个地方是我们在看生产者是如何获取元数据的时候,看的。* 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。* 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接* -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息)**/handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();//处理超时的请求handleTimedOutRequests(responses, updatedNow);// invoke callbacksfor (ClientResponse response : responses) {if (response.request().hasCallback()) {try {//调用的响应的里面的我们之前发送出去的请求的回调函数//看到了这儿,我们回头再去看一下//我们当时发送请求的时候,是如何封装这个请求。//不过虽然目前我们还没看到,但是我们可以大胆猜一下。//当时封装网络请求的时候,肯定是给他绑定了一个回调函数。response.request().callback().onComplete(response);} catch (Exception e) {log.error("Uncaught error in request completion:", e);}}}return responses;}
步骤一:maybeUpdate
private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");// mark the timestamp for no node available to connectthis.lastNoNodeAvailableMs = now;return;}String nodeConnectionId = node.idString();//判断网络连接是否应建立好//因为我们还没有学习kafka的网络,所以大家就认为这儿的网络是已经建立好了if (canSendRequest(nodeConnectionId)) {this.metadataFetchInProgress = true;MetadataRequest metadataRequest;//if (metadata.needMetadataForAllTopics())//封装请求(获取所有topics)的元数据信息的请求。//但是我们一般获取元数据的时候,只获取自己要发送消息的//对应的topic的元数据的信息metadataRequest = MetadataRequest.allTopics();else//我们默认走的这儿的这个方法//就是拉取我们发送消息的对应的topic的方法metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));//这儿就给我们创建了一个请求(拉取元数据的)ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());//发送请求//至于里面的代码是怎么发送的?我们在分析kafka网络知识的时候在给大家讲解//这儿大家只需要知道,他会在这儿存储要发送的请求doSend(clientRequest, now);} else if (connectionStates.canConnect(nodeConnectionId, now)) {// we don't have a connection to this node right now, make onelog.debug("Initialize connection to node {} for sending metadata request", node.id());initiateConnect(node, now);// If initiateConnect failed immediately, this node will be put into blackout and we// should allow immediately retrying in case there is another candidate node. If it// is still connecting, the worst case is that we end up setting a longer timeout// on the next round and then wait for the response.} else { // connected, but can't send more OR connecting// In either case, we just need to wait for a network event to let us know the selected// connection might be usable again.this.lastNoNodeAvailableMs = now;}}}
步骤二
待补充
。。。
步骤三
handleCompletedReceives
private void handleCompletedReceives(List<ClientResponse> responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {//获取broker idString source = receive.source();/*** kafka 有这样的一个机制:每个连接可以容忍5个发送出去了,但是还没接收到响应的请求。*///从数据结构里面移除已经接收到响应的请求。//把之前存入进去的请求也获取到了ClientRequest req = inFlightRequests.completeNext(source);//解析服务端发送回来的请求(里面有响应的结果数据)Struct body = parseResponse(receive.payload(), req.request().header());//TODO 如果是关于元数据信息的响应if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))//解析完了以后就把封装成一个一个的cilentResponse//body 存储的是响应的内容//req 发送出去的那个请求信息responses.add(new ClientResponse(req, now, false, body));}}
maybeHandleCompletedReceive
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {short apiKey = req.request().header().apiKey();if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {//TODO 处理响应handleResponse(req.request().header(), body, now);return true;}return false;}
handleResponse
private void handleResponse(RequestHeader header, Struct body, long now) {this.metadataFetchInProgress = false;//因为服务端发送回来的是一个二进制的一个数据结构//所以生产者这儿要对这个数据结构要进行解析//解析完了以后就封装成一个MetadataResponse对象。MetadataResponse response = new MetadataResponse(body);//响应里面会带回来元数据的信息//获取到了从服务端拉取的集群的元数据信息。Cluster cluster = response.cluster();// check if any topics metadata failed to get updatedMap<String, Errors> errors = response.errors();if (!errors.isEmpty())log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being// created which means we will get errors and no nodes until it exists//如果正常获取到了元数据的信息if (cluster.nodes().size() > 0) {//更新元数据信息。this.metadata.update(cluster, now);} else {log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());this.metadata.failedUpdate(now);}}
最后我们发现又会调用update方法
public synchronized void update(Cluster cluster, long now) {Objects.requireNonNull(cluster, "cluster should not be null");this.needUpdate = false;this.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;/*** 我们看到了,这儿会跟新元数据的版本。**/this.version += 1;//这个默认值是trueif (topicExpiryEnabled) {// Handle expiry of topics from the metadata refresh set.//但是我们目前topics是空的//所以下面的代码是不会被运行的。//到现在我们的代码是不是第二次进来了呀?//如果第二次进来,此时此刻进来,我们 producer.send(topics,)方法//要去拉取元数据 -》 sender -》 代码走到的这儿。//第二次进来的时候,topics其实不为空了,因为我们已经给它赋值了//所以这儿的代码是会继续运行的。for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {Map.Entry<String, Long> entry = it.next();long expireMs = entry.getValue();if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)entry.setValue(now + TOPIC_EXPIRY_MS);else if (expireMs <= now) {it.remove();log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);}}}for (Listener listener: listeners)listener.onMetadataUpdate(cluster);String previousClusterId = cluster.clusterResource().clusterId();//这个的默认值是false,所以这个分支的代码不会被运行。if (this.needMetadataForAllTopics) {// the listener may change the interested topics, which could cause another metadata refresh.// If we have already fetched all topics, however, another fetch should be unnecessary.this.needUpdate = false;this.cluster = getClusterForCurrentTopics(cluster);} else {//所以代码执行的是这儿。//直接把刚刚传进来的对象赋值给了这个cluster。//cluster代表的是kafka集群的元数据。//初始化的时候,update这个方法没有去服务端拉取数据。this.cluster = cluster;//address}// The bootstrap cluster is guaranteed not to have any useful informationif (!cluster.isBootstrapConfigured()) {String clusterId = cluster.clusterResource().clusterId();if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))log.info("Cluster ID: {}", cluster.clusterResource().clusterId());clusterResourceListeners.onUpdate(cluster.clusterResource());}//大家发现这儿会有一个notifyAll,这个最重要的一个作用是不是就是唤醒,我们上一讲//看到那个wait的线程。notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);}
nottifyAll去唤醒,awaitUpdate的线程