Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
前言 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
Kafka架构:
基本概念:
1 )Producer :消息生产者,就是向 kafka broker 发消息的客户端; 2 )Consumer :消息消费者,向 kafka broker 取消息的客户端; 3 )Consumer Group (CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个 组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者; 4 )Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic; 5 )Topic :可以理解为一个队列, 生产者和消费者面向的都是一个 topic; 6 )Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列; 7 )Replica: :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower; 8 )Leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader; 9 )Follower :每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower; 10)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
SpringBoot 整合Kafka 配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 spring.kafka.bootstrap-servers =127.0.0.1:9092 spring.kafka.producer.retries =0 spring.kafka.producer.batch-size =16384 spring.kafka.producer.buffer-memory =33554432 spring.kafka.producer.acks =1 spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer =com.arsenal.kafka.domain.UserSerializable spring.kafka.consumer.group-id =testGroup spring.kafka.consumer.auto-offset-reset =earliest spring.kafka.consumer.enable-auto-commit =true spring.kafka.consumer.auto-commit-interval =100 spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer =com.arsenal.kafka.domain.UserDeserializer
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 @Data public class User implements Serializable { private Long id; private String name; private Integer age; private transient String desc; } public class UserSerializable implements Serializer <User > { @Override public void configure (Map<String, ?> map, boolean b) { } @Override public byte [] serialize(String topic, User user) { System.out.println("topic : " + topic + ", user : " + user); byte [] dataArray = null ; ByteArrayOutputStream outputStream = null ; ObjectOutputStream objectOutputStream = null ; try { outputStream = new ByteArrayOutputStream(); objectOutputStream = new ObjectOutputStream(outputStream); objectOutputStream.writeObject(user); dataArray = outputStream.toByteArray(); } catch (Exception e) { throw new RuntimeException(e); } finally { if (outputStream != null ) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (objectOutputStream != null ) { try { objectOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return dataArray; } @Override public void close () { } } public class UserDeserializer implements Deserializer <User > { @Override public void configure (Map<String, ?> map, boolean b) { } @Override public User deserialize (String topic, byte [] bytes) { User user = null ; ByteArrayInputStream inputStream = null ; ObjectInputStream objectInputStream = null ; try { inputStream = new ByteArrayInputStream(bytes); objectInputStream = new ObjectInputStream(inputStream); user = (User)objectInputStream.readObject(); } catch (Exception e) { throw new RuntimeException(e); }finally { if (inputStream != null ){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (objectInputStream != null ){ try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return user; } @Override public void close () { } } @Component public class ConsumerListener { @KafkaListener (topics = "testTopic" ) public void onMessage (String message) { System.out.println(message); } @KafkaListener (topics = "userTopic" ) public void onMessage (User user) { System.out.println(user); } } @SpringBootApplication public class KafkaApplication { public static void main (String[] args) { SpringApplication.run(KafkaApplication.class ,args ) ; } }
延伸 Kafka百度百科 APACHE KAFKA 尚硅谷Kafka教程 kafka工作原理介绍 Kafka史上最详细原理总结 Kafka 中文文档 - ApacheCN
<
Spring IOC
Category Tree
>