Kafka集成Spring 流式处理
2022-04-15 16:02
2706
0
Kafka集成Spring
一、简介
项目采用jdk1.8、spring boot 、kafka、
二、环境准备
1.IDE使用idea 创建maven 项目 如下图:
2.包管理器配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yyhouc</groupId>
<artifactId>kafkaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 创建Spring boot项目-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- 导入spring 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- 导入kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- 导入Spring整合 kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.12.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!- 导入流处理包->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
3。配置kafka信息
server:
port: 8700
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
retries: 0
# 批量大小 16K=16384 46K=65536
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
batch-size: 16384
# 提交延时
properties:
linger:
ms: 500
# 生产端缓冲区大小
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
4.创建KafkaStream
package com.yyhouc.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
@Configuration
@EnableKafkaStreams
@RequiredArgsConstructor
@Slf4j
public class KafkaBean {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//根据需要的设置序列化与反序列化数据
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, String> kStream0(StreamsBuilder streamsBuilder){
//根据配置的序列化方式接收V的数据
KStream<String, String> stream = streamsBuilder.stream("goodsTopic");
//对用户进行喜好商品类型的数据收集
stream.mapValues((v) -> {
//对v进行计算后回填,或者处理等等
JSONObject json = JSON.parseObject(v);
//用户画像调整
int type = json.get("goodsType");
//调用权重计算方法,回填或者放入缓存,即时根据喜好推一些用户权重的商品。
json.put("countType",countW(type));
return JSON.toJSONString(json);
});
//对商品是否打折进行分类
//下一步的流可以根据用户的 浏览打折商品比例,或者购买打折商品比例,进一步的用户画像。
stream.filter((kye, value) -> {
JSONObject json = JSON.parseObject(value.toString());
int discount = json.getInteger("discount");
return discount == 1;
}).to("discount");
stream.filter((kye, value) -> {
JSONObject json = JSON.parseObject(value.toString());
int discount = json.getInteger("discount");
return discount != 1;
}).to("goods");
return stream;
}
}
全部评论