kafka源码 有java的源码吗

什么时间使用高级应用?
针对一个消息读取多次
在一个process中,仅仅处理一个topic中的一组partitions
使用事务,确保每个消息只被处理一次
使用高级应用(调用较底层函数)的缺点?
SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要
什么时间使用高级应用?
针对一个消息读取多次
在一个process中,仅仅处理一个topic中的一组partitions
使用事务,确保每个消息只被处理一次
使用高级应用(调用较底层函数)的缺点?
SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)
在应用程序中跟踪上次消息处理的offset
确定一个topic partition的lead broker
手工处理broker leander的改变
使用底层函数(SimpleConsumer)开发的步骤
通过active broker,确定topic partition的lead broker
确定topic partition的replicat brokers
根据需要,创建数据请求
识别lead brokder改变并进行恢复
import java.nio.ByteB
import java.util.ArrayL
import java.util.C
import java.util.HashMap;
import java.util.L
import java.util.M
import kafka.api.FetchR
import kafka.api.FetchRequestB
import kafka.api.PartitionOffsetRequestI
import kafka.cluster.B
import mon.ErrorM
import mon.TopicAndP
import kafka.javaapi.FetchR
import kafka.javaapi.OffsetR
import kafka.javaapi.PartitionM
import kafka.javaapi.TopicM
import kafka.javaapi.TopicMetadataR
import kafka.javaapi.TopicMetadataR
import kafka.javaapi.consumer.SimpleC
import kafka.message.MessageAndO
public class ConsumerSimpleExample {
public static void main(String arg[]) {
String[] args={"20","page_visits","2","172.168.63.233","9092"};
ConsumerSimpleExample example = new ConsumerSimpleExample();
long maxReads = Long.parseLong(args[0]);
String topic = args[1];
int partition = Integer.parseInt(args[2]);
List&String& seeds = new ArrayList&String&();
seeds.add(args[3]);
int port = Integer.parseInt(args[4]);
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
private List&String& m_replicaBrokers = new ArrayList&String&();
public ConsumerSimpleExample() {
m_replicaBrokers = new ArrayList&String&();
public void run(long a_maxReads, String a_topic, int a_partition,
List&String& a_seedBrokers, int a_port) throws Exception {
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if (metadata == null) {
System.out
.println("Can't find metadata for Topic and Partition. Exiting");
if (metadata.leader() == null) {
System.out
.println("Can't find Leader for Topic and Partition. Exiting");
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
int numErrors = 0;
while (a_maxReads & 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
64 * 1024, clientName);
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors & 5)
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset & readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
if (numRead == 0) {
Thread.sleep(1000);
} catch (InterruptedException ie) {
if (consumer != null)
consumer.close();
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map&TopicAndPartition, PartitionOffsetRequestInfo& requestInfo = new HashMap&TopicAndPartition, PartitionOffsetRequestInfo&();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
long[] offsets = response.offsets(topic, partition);
return offsets[0];
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i & 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
goToSleep = true;
return metadata.leader().host();
if (goToSleep) {
Thread.sleep(1000);
} catch (InterruptedException ie) {
System.out
.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to find new leader after Broker failure. Exiting");
private PartitionMetadata findLeader(List&String& a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
"leaderLookup");
List&String& topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
List&TopicMetadata& metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData =
break loop;
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
return returnMetaD
用云栖社区APP,舒服~
【云栖快讯】首届阿里巴巴中间件技术峰会,揭秘阿里10年分布式技术沉淀!阿里高可用体系核心缔造者、全链路压测创始人,DRDS与TDDL负责人等大咖出场,干货分享,不可错过!&&
通过在客户自己的数据中心内交付完整的阿里云软件堆栈,阿里云专有云帮助政企客户向混合云架构平滑演进,实现从IT时代...
为金融行业提供量身定制的云计算服务,具备低成本、高弹性、高可用、安全合规的特性。帮助金融客户实现从传统IT向云计...
充分利用阿里云现有资源管理和服务体系,引入中间件成熟的整套分布式计算框架,以应用为中心,帮助企业级客户轻松构建并...
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
2017杭州云栖大会火热抢票
Loading...查看: 138420|回复: 0
Apache kafka客户端开发-java
主题帖子积分
金牌会员, 积分 6962, 距离下一级还需 3038 积分
金牌会员, 积分 6962, 距离下一级还需 3038 积分
1、如何对producer进行程序开发?
2、kafka消费者api分为哪些?
3、怎样发送消息到指定的partitions?
1.依赖包
& && &&&&dependency&
& && && && &&groupId&org.apache.kafka&/groupId&
& && && && &&artifactId&kafka_2.10&/artifactId&
& && && && &&version&0.8.1&/version&
& && &&&&/dependency&复制代码
2.producer程序开发例子
2.1 producer参数说明
#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=192.168.2.105:.2.106:9092
# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none
&&
# 指定序列化处理类(mafka client API调用说明--&3.序列化约定wiki),默认为kafka.serializer.DefaultEncoder,即byte[]
serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
# serializer.class=kafka.serializer.DefaultEncoder
# serializer.class=kafka.serializer.StringEncoder
# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=
########### request ack ###############
# producer接收消息ack的时机.默认为0.
# 0: producer不会等待broker发送ack
# 1: 当leader接收到消息之后发送ack
# 2: 当所有的follower都同步消息成功后发送ack.
request.required.acks=0
# 在向producer发送ack之前,broker允许等待的最大时间
# 如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种
# 原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
########## end #####################
# 同步还是异步发送消息,默认“sync”表同步,&async&表异步。异步可以提高发送吞吐量,
# 也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
############## 异步发送 (以下四个异步参数可选) ####################
# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000
# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000
# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500
# 当消息在producer端沉积的条数达到&queue.buffering.max.meesages&后
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制&阻塞&的时间
# -1: 无阻塞超时限制,消息不会被抛弃
# 0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
################ end ###############
# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3
# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000复制代码
import java.util.*;&&
& &
import kafka.javaapi.producer.P&&
import kafka.producer.KeyedM&&
import kafka.producer.ProducerC&&
& &
public class TestProducer {&&
& & public static void main(String[] args) {&&
& && &&&long events = Long.parseLong(args[0]);&&
& && &&&Random rnd = new Random();&&
& &
& && &&&Properties props = new Properties();&&
& && &&&props.put(&metadata.broker.list&, &192.168.2.105:9092&);&&
& && &&&props.put(&serializer.class&, &kafka.serializer.StringEncoder&); //默认字符串编码消息&&
& && &&&props.put(&partitioner.class&, &example.producer.SimplePartitioner&);&&
& && &&&props.put(&request.required.acks&, &1&);&&
& &
& && &&&ProducerConfig config = new ProducerConfig(props);&&
& &
& && &&&Producer&String, String& producer = new Producer&String, String&(config);&&
& &
& && &&&for (long nEvents = 0; nEvents & nEvents++) {& &
& && && && && &long runtime = new Date().getTime();& &
& && && && && &String ip = “192.168.2.” + rnd.nextInt(255);& &
& && && && && &String msg = runtime + “,,” +& &
& && && && && &KeyedMessage&String, String& data = new KeyedMessage&String, String&(&page_visits&, ip, msg);&&
& && && && && &producer.send(data);&&
& && &&&}&&
& && &&&producer.close();&&
& & }&&
}&&
复制代码
2.2 指定关键字key,发送消息到指定partitions
说明:如果需要实现自定义partitions消息发送,需要实现Partitioner接口
public class CustomizePartitioner implements Partitioner {&&
& & public CustomizePartitioner(VerifiableProperties props) {&&
& &
& & }&&
& & /**
& &&&* 返回分区索引编号
& &&&* @param key sendMessage时,输出的partKey
& &&&* @param numPartitions topic中的分区总数
& &&&* @return
& &&&*/&&
& & @Override&&
& & public int partition(Object key, int numPartitions) {&&
& && &&&System.out.println(&key:& + key + &&&numPartitions:& + numPartitions);&&
& && &&&String partKey = (String)&&
& && &&&if (&part2&.equals(partKey))&&
& && && && &return 2;&&
//& && &&&System.out.println(&partKey:& + key);&&
& &
& && &&&........&&
& && &&&........&&
& && &&&return 0;&&
& & }&&
}&&复制代码
3.consumer程序开发例子
3.1 consumer参数说明
# zookeeper连接服务器地址,此处为线下测试环境配置(kafka消息服务--&kafka broker集群线上部署环境wiki)
# 配置例子:&127.0.0.1:.0.1:.0.1:3002&
zookeeper.connect=192.168.2.225:.2.225:.2.225:2183/config/mobile/mq/mafka
# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
#指定消费组
group.id=xxx
# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
<mit.enable=true
# 自动更新时间。默认60 * 1000
<mit.interval.ms=1000
# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
# 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
# &Partition Owner registry&节点信息,但是有可能此时旧的consumer尚没有释放此节点,
# 此值用于控制,注册节点的重试次数.
rebalance.max.retries=5
# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk
# 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600
# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、
# anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
# 指定序列化处理类(mafka client API调用说明--&3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[]
derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder复制代码
3.2 多线程并行消费topic
ConsumerTest类
import kafka.consumer.ConsumerI&&
import kafka.consumer.KafkaS&&
& &
public class ConsumerTest implements Runnable {&&
& & private KafkaStream m_&&
& & private int m_threadN&&
& &
& & public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {&&
& && &&&m_threadNumber = a_threadN&&
& && &&&m_stream = a_&&
& & }&&
& &
& & public void run() {&&
& && &&&ConsumerIterator&byte[], byte[]& it = m_stream.iterator();&&
& && &&&while (it.hasNext())&&
& && && && &System.out.println(&Thread & + m_threadNumber + &: & + new String(it.next().message()));&&
& && &&&System.out.println(&Shutting down Thread: & + m_threadNumber);&&
& & }&&
}&&复制代码
ConsumerGroupExample类
import kafka.consumer.ConsumerC&&
import kafka.consumer.KafkaS&&
import kafka.javaapi.consumer.ConsumerC&&
& &
import java.util.HashM&&
import java.util.L&&
import java.util.M&&
import java.util.P&&
import java.util.concurrent.ExecutorS&&
import java.util.concurrent.E&&
& &
public class ConsumerGroupExample {&&
& & private final ConsumerC&&
& & private final S&&
& & private&&ExecutorS&&
& &
& & public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {&&
& && &&&consumer = kafka.consumer.Consumer.createJavaConsumerConnector(&&
& && && && && & createConsumerConfig(a_zookeeper, a_groupId));&&
& && &&&this.topic = a_&&
& & }&&
& &
& & public void shutdown() {&&
& && &&&if (consumer != null) consumer.shutdown();&&
& && &&&if (executor != null) executor.shutdown();&&
& & }&&
& &
& & public void run(int a_numThreads) {&&
& && &&&Map&String, Integer& topicCountMap = new HashMap&String, Integer&();&&
& && &&&topicCountMap.put(topic, new Integer(a_numThreads));&&
& && &&&Map&String, List&KafkaStream&byte[], byte[]&&& consumerMap = consumer.createMessageStreams(topicCountMap);&&
& && &&&List&KafkaStream&byte[], byte[]&& streams = consumerMap.get(topic);&&
& &
& && &&&// 启动所有线程&&
& && &&&executor = Executors.newFixedThreadPool(a_numThreads);&&
& &
& && &&&// 开始消费消息&&
& && &&&int threadNumber = 0;&&
& && &&&for (final KafkaStream stream : streams) {&&
& && && && &executor.submit(new ConsumerTest(stream, threadNumber));&&
& && && && &threadNumber++;&&
& && &&&}&&
& & }&&
& &
& & private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {&&
& && &&&Properties props = new Properties();&&
& && &&&props.put(&zookeeper.connect&, &192.168.2.225:2183/config/mobile/mq/mafka&);&&
& && &&&props.put(&group.id&, &push-token&);&&
& && &&&props.put(&zookeeper.session.timeout.ms&, &60000&);&&
& && &&&props.put(&zookeeper.sync.time.ms&, &2000&);&&
& && &&&props.put(&mit.interval.ms&, &1000&);&&
& &
& && &&&return new ConsumerConfig(props);&&
& & }&&
& &
& & public static void main(String[] args) {&&
& && &&&String zooKeeper = args[0];&&
& && &&&String groupId = args[1];&&
& && &&&String topic = args[2];&&
& && &&&int threads = Integer.parseInt(args[3]);&&
& &
& && &&&ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);&&
& && &&&example.run(threads);&&
& &
& && &&&try {&&
& && && && &Thread.sleep(10000);&&
& && &&&} catch (InterruptedException ie) {&&
& &
& && &&&}&&
& && &&&example.shutdown();&&
& & }&&
}&&复制代码
kafka消费者api分为high api和low api,目前上述demo是都是使用kafka high api,高级api不用关心维护消费状态信息和负载均衡,系统会根据配置参数,
定期flush offset到zk上,如果有多个consumer且每个consumer创建了多个线程,高级api会根据zk上注册consumer信息,进行自动负载均衡操作。
注意事项:
1.高级api将会内部实现持久化每个分区最后读到的消息的offset,数据保存在zookeeper中的消费组名中(如/consumers/push-token-group/offsets/push-token/2。
其中push-token-group是消费组,push-token是topic,最后一个2表示第3个分区),每间隔一个(默认1000ms)时间更新一次offset,
那么可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown()
2.消费组名是一个全局的信息,要注意在新的消费者启动之前旧的消费者要关闭。如果新的进程启动并且消费组名相同,kafka会添加这个进程到可用消费线程组中用来消费
topic和触发重新分配负载均衡,那么同一个分区的消息就有可能发送到不同的进程中。
3.如果消费者组中所有consumer的总线程数量大于分区数,一部分线程或某些consumer可能无法读取消息或处于空闲状态。
4.如果分区数多于线程数(如果消费组中运行者多个消费者,则线程数为消费者组内所有消费者线程总和),一部分线程会读取到多个分区的消息
5.如果一个线程消费多个分区消息,那么接收到的消息是不能保证顺序的。
备注:可用zookeeper web ui工具管理查看zk目录树数据: xxx/consumers/push-token-group/owners/push-token/2其中
push-token-group为消费组,push-token为topic,2为分区3.查看里面的内容如:
push-token-group-mobile-platform03-3-7ab14bd1-0表示该分区被该标示的线程所执行。
producer性能优化:异步化,消息批量发送,具体浏览上述参数说明。consumer性能优化:如果是高吞吐量数据,设置每次拿取消息(fetch.min.bytes)大些,
拿取消息频繁(fetch.wait.max.ms)些(或时间间隔短些),如果是低延时要求,则设置时间时间间隔小,每次从kafka broker拿取消息尽量小些。
欢迎加入about云群 、 ,云计算爱好者群,关注
积极上进,爱好学习
经常参与各类话题的讨论,发帖内容较有主见
长期对论坛的繁荣而不断努力,或多次提出建设性意见
为论坛做出突出贡献的会员
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by

我要回帖

更多关于 kafka源码 的文章

 

随机推荐