Kafka集成Spring 流式处理

2022-04-15 16:02
2706
0

Kafka集成Spring


一、简介

项目采用jdk1.8、spring boot 、kafka、

二、环境准备

1.IDE使用idea 创建maven 项目 如下图:

2.包管理器配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yyhouc</groupId>
    <artifactId>kafkaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
<!--    创建Spring boot项目-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <!--    导入spring 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--    导入kafka依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
        <!--    导入Spring整合 kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.12.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!- 导入流处理包->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.8.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

3。配置kafka信息

server:
  port: 8700
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      retries: 0
      # 批量大小 16K=16384 46K=65536
      # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      batch-size: 16384
      # 提交延时
      properties:
        linger:
          ms: 500
      # 生产端缓冲区大小
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

4.创建KafkaStream

package com.yyhouc.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

@Configuration
@EnableKafkaStreams
@RequiredArgsConstructor
@Slf4j
public class KafkaBean {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	public KafkaStreamsConfiguration kStreamsConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		//根据需要的设置序列化与反序列化数据 
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return new KafkaStreamsConfiguration(props);
	}
    @Bean
    public KStream<String, String> kStream0(StreamsBuilder streamsBuilder){
        //根据配置的序列化方式接收V的数据
        KStream<String, String> stream = streamsBuilder.stream("goodsTopic");
        //对用户进行喜好商品类型的数据收集
        stream.mapValues((v) -> {
            //对v进行计算后回填,或者处理等等
            JSONObject json = JSON.parseObject(v);
            //用户画像调整
            int type =  json.get("goodsType");
            //调用权重计算方法,回填或者放入缓存,即时根据喜好推一些用户权重的商品。
            json.put("countType",countW(type));
            return JSON.toJSONString(json);
        });
        //对商品是否打折进行分类
        //下一步的流可以根据用户的 浏览打折商品比例,或者购买打折商品比例,进一步的用户画像。
        stream.filter((kye, value) -> {
			JSONObject json = JSON.parseObject(value.toString());
			int discount = json.getInteger("discount");
			return discount == 1;
		}).to("discount");
        stream.filter((kye, value) -> {
			JSONObject json = JSON.parseObject(value.toString());
			int discount = json.getInteger("discount");
			return discount != 1;
		}).to("goods");
        return stream;
    }
}

 

全部评论