使用docker搭建kafka

使用docker搭建kafka并在SpringBoot中使用


搭建kafka

新建zookeeper容器

1
docker run -d --name zookeeper -p 2181 -t zookeeper

新建kafka容器

1
docker run -d --name kafka -p 9092:9092 --link zookeeper -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 -e KAFKA_ADVERTISED_PORT=9092 -v /home/ogic/docker-container/kafka/localtime:/etc/localtime wurstmeister/kafka

测试kafka

进入容器

1
docker exec -it kafka bash

进入kafka目录

1
cd /opt/kafka/bin

创建topic

1
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic testTopic

创建producer

1
kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic

新开两个终端分别创建两个消费者且这两个消费者在同一个消费者组

1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --group testGroup

在producer中发出消息

kafka-consumer-group-demo

如果关闭一个消费着的话,后面的所有信息都会发给剩下的消费者。注意关闭消费者实在终端里按ctrl+c,不是关闭终端,不要像我那样犯蠢直接关掉终端结果没截到想要的图

在SpringBoot中使用

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

或者

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

修改application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kafka:
producer:
bootstrapServers: localhost:9092
retries: 3

#16K
batchSize: 16384

lingerMs: 1

#32M
bufferMemory: 33554432

consumer:
bootstrapServers: localhost:9092
groupId: order-service
enableAutoCommit: false
autoCommitIntervalMs: 1000
sessionTimeoutMs: 30000
maxPollRecords: 100

#earliest,latest
autoOffsetReset: earliest

添加KafkaConfig.java

因为在yaml是在自定义的地方配置kafka而不是配置sping.kafka,所以要自己使用该配置

并且自定义KafkaTemplate作为producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
@Configuration
public class KafkaConfig {

@Value("${kafka.producer.bootstrapServers}")
private String producerBootstrapServers;

@Value("${kafka.producer.retries}")
private String producerRetries;

@Value("${kafka.producer.batchSize}")
private String producerBatchSize;

@Value("${kafka.producer.lingerMs}")
private String producerLingerMs;

@Value("${kafka.producer.bufferMemory}")
private String producerBufferMemory;

@Value("${kafka.consumer.bootstrapServers}")
private String consumerBootstrapServers;

@Value("${kafka.consumer.groupId}")
private String consumerGroupId;

@Value("${kafka.consumer.enableAutoCommit}")
private String consumerEnableAutoCommit;

@Value("${kafka.consumer.autoCommitIntervalMs}")
private String consumerAutoCommitIntervalMs;

@Value("${kafka.consumer.sessionTimeoutMs}")
private String consumerSessionTimeoutMs;

@Value("${kafka.consumer.maxPollRecords}")
private String consumerMaxPollRecords;

@Value("${kafka.consumer.autoOffsetReset}")
private String consumerAutoOffsetReset;

/**
* ProducerFactory
* @return
*/
@Bean
public ProducerFactory<Object, Object> producerFactory(){
Map<String, Object> configs = new HashMap<>(10);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);

return new DefaultKafkaProducerFactory<Object, Object>(configs);
}

/**
* KafkaTemplate
* @return
*/
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory(), true);
}

/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<>(10);
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);

return new DefaultKafkaConsumerFactory<>(configs);
}

/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4);
containerFactory.setBatchListener(true);
containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return containerFactory;
}
}

自定义json格式的序列化器和反序列化器

上面配置kafka的地方使用了这些自定义序列化器和反序列化器,以为kafka内置的序列化和反序列化只用基础类型和String类型的,所以要自定义json格式的来序列化自定义的对象类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ObjectSerializer implements Serializer<Object> {
@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String s, Object o) {
return SerializationUtils.serialize(o);
}

@Override
public void close() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ObjectDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public Object deserialize(String s, byte[] bytes) {
return SerializationUtils.deserialize(bytes);
}

@Override
public void close() {

}
}

自定义Listener

1
2
3
4
5
@KafkaListener(id = "demo-id", topics = {"demoTopic"})
public void listenTopicOrder(Object data){
/* do something **/
......
}