Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

前言

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。


Kafka架构:


基本概念:

1 )Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2 )Consumer :消息消费者,向 kafka broker 取消息的客户端;
3 )Consumer Group (CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个 组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者;
4 )Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic;
5 )Topic :可以理解为一个队列, 生产者和消费者面向的都是一个 topic;
6 )Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
7 )Replica: :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower;
8 )Leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader;
9 )Follower :每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower;
10)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

SpringBoot 整合Kafka

配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.arsenal.kafka.domain.UserSerializable

#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.arsenal.kafka.domain.UserDeserializer

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* @Author: Arsenal
* @Date: 2020-08-19 0:01
* @Description:
*/
@Data
public class User implements Serializable {

private Long id;

private String name;

private Integer age;

/**
* transient 关键字修饰的字段不会被序列化
*/
private transient String desc;
}

/**
* @Author: Arsenal
* @Date: 2020-08-19 0:01
* @Description: User序列化器
*/
public class UserSerializable implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String topic, User user) {
System.out.println("topic : " + topic + ", user : " + user);
byte[] dataArray = null;
ByteArrayOutputStream outputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
outputStream = new ByteArrayOutputStream();
objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(user);
dataArray = outputStream.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return dataArray;
}

@Override
public void close() {

}
}

/**
* @Author: Arsenal
* @Date: 2020-08-19 0:01
* @Description: User反序列化器
*/
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public User deserialize(String topic, byte[] bytes) {
User user = null;
ByteArrayInputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
inputStream = new ByteArrayInputStream(bytes);
objectInputStream = new ObjectInputStream(inputStream);
user = (User)objectInputStream.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
if(inputStream != null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(objectInputStream != null){
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return user;
}

@Override
public void close() {

}
}

/**
* @Author: Arsenal
* @Date: 2020-08-19 0:01
* @Description: 消费者监听器
*/
@Component
public class ConsumerListener {

@KafkaListener(topics = "testTopic")
public void onMessage(String message){
//insertIntoDb(buffer);//这里为插入数据库代码
System.out.println(message);
}

@KafkaListener(topics = "userTopic")
public void onMessage(User user){
//insertIntoDb(buffer);//这里为插入数据库代码
System.out.println(user);
}

}

/**
* @Author: Arsenal
* @Date: 2020-08-17 0:01
* @Description: kafka启动类
*/
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class,args);
}
}

延伸

    Kafka百度百科
    APACHE KAFKA
    尚硅谷Kafka教程
    kafka工作原理介绍
    Kafka史上最详细原理总结
    Kafka 中文文档 - ApacheCN

Content
  1. 1. 前言
  2. 2. SpringBoot 整合Kafka
  3. 3. 延伸