案例来源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md

案例背景

出租车车程(taxi ride)事件结构
1.每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。
2.每个事件都由十一个字段组成:

rideId         : Long      // 每次车程的唯一id
taxiId         : Long      // 每一辆出租车的唯一id
driverId       : Long      // 每一位司机的唯一id
isStart        : Boolean   // 行程开始事件为 TRUE, 行程结束事件为 FALSE
eventTime      : Long      // 事件的时间戳
startLon       : Float     // 车程开始位置的经度
startLat       : Float     // 车程开始位置的维度
endLon         : Float     // 车程结束位置的经度
endLat         : Float     // 车程结束位置的维度
passengerCnt   : Short     // 乘车人数

出租车车费(taxi fare)事件结构
rideId         : Long      // 每次车程的唯一id
taxiId         : Long      // 每一辆出租车的唯一id
driverId       : Long      // 每一位司机的唯一id
startTime      : Long   // 车程开始时间
paymentType    : String    // 现金(CASH)或刷卡(CARD)
tip            : Float     // 小费
tolls          : Float     // 过路费
totalFare      : Float     // 总计车费

案例目标

1.将每次车程的 TaxiRide 和 TaxiFare 记录连接在一起

2.对于每个不同的 rideId,恰好有三个事件:

TaxiRide START 事件
TaxiRide END 事件
一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)

最终的结果应该是 DataStream<RideAndFare>,每个不同的 rideId 都产生一个 RideAndFare 记录。 每个 RideAndFare 都应该将某个 rideId 的 TaxiRide START 事件与其匹配的 TaxiFare 配对。

案例流程

核心代码

  • connect 可以将两个流连接成一个ConnectedStreams, 而且不要求两个流的数据类型一致
       // 从车程事件中过滤中车程开始时间,并按车程标识 rideId 分组
        KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
                .filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);

        // 付车费事件按行程标识 rideId 分组
        KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
                .keyBy(TaxiFare::getRideId);

        rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
                .uid("enrichment") // uid for this operator's state
                .name("enrichment") // name for this operator in the web UI
                .addSink(new PrintSinkFunction<>());
  • 使用ValueState保存事件状态
public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {

    private ValueState<TaxiRide> taxiRideValueState;
    private ValueState<TaxiFare> taxiFareValueState;


    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
        ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);

        taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
        taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);

    }


    /**
     * 当车程事件到来,检查车费的taxiFareValueState是否保存有对应行程付费记录
     * 如果有,则匹配输出,清空状态
     * 如果没有,则将车程事件保存起来
     */
    @Override
    public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
        TaxiFare taxiFare = taxiFareValueState.value();
        if (Objects.isNull(taxiFare)) {
            taxiRideValueState.update(taxiRide);
        } else {
            taxiFareValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);
        }
    }


    /**
     * 当付费事件到来,检查车程的taxiRideValueState是否保存有对应行程车程记录
     * 如果有,则匹配输出,清空状态
     * 如果没有,则将付费事件保存起来
     */
    @Override
    public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
        TaxiRide taxiRide = taxiRideValueState.value();
        if (Objects.isNull(taxiRide)) {
            taxiFareValueState.update(taxiFare);
        } else {
            taxiRideValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);

        }
    }
}

  • 车程事件流和付费事件流来自Kafka
       // 定义出租车-车程数据源
        KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_RIDE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("ride") // 避免kafka clientId重复
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiRideDeserialization())
                .build();

        // 定义出租车-车费数据源
        KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_FARE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("fare") // 避免kafka clientId重复
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiFareDeserialization())
                .build();

事件格式:

1.车程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}


2.付费事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}

完整代码

https://github.com/Mr-LuXiaoHua/study-flink

程序入口: com.example.datastream.rideandfare.RideAndFareJob
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/luxh/p/16427196.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!