Skip to main content

实时数仓

架构图

package org.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class FlinkWindowCountDemo {
private static final Logger LOG = LoggerFactory.getLogger(FlinkWindowCountDemo.class);

// Kafka配置
private static final String KAFKA_BROKERS = "localhost:9092";
private static final String SOURCE_TOPIC = "source-topic";
private static final String SINK_TOPIC = "sink-topic";
private static final String CONSUMER_GROUP = "flink-consumer-group";

public static void main(String[] args) throws Exception {
// 1. 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 配置检查点
env.enableCheckpointing(60000);

// 3. 配置Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(KAFKA_BROKERS)
.setTopics(SOURCE_TOPIC)
.setGroupId(CONSUMER_GROUP)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

// 4. 配置Kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(KAFKA_BROKERS)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(SINK_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();

// 5. 创建数据流,解析JSON并提取name字段
DataStream<Tuple2<String, Integer>> nameStream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new JSONParser())
.name("JSON Parser")
.uid("parser-1");

// 6. 窗口计算
DataStream<String> windowCounts = nameStream
.keyBy(tuple -> tuple.f0) // 按name分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口
.aggregate(new CountAggregator())
.map(result -> JSON.toJSONString(result)) // 转换为JSON字符串
.name("Window Counter")
.uid("counter-1");

// 7. 输出到Kafka
windowCounts.sinkTo(sink)
.name("Kafka Sink")
.uid("sink-1");

// 8. 执行任务
env.execute("Name Window Count Demo");
}

/**
* JSON解析器,提取name字段
*/
public static class JSONParser implements MapFunction<String, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
try {
JSONObject json = JSON.parseObject(value);
String name = json.getString("name");
return new Tuple2<>(name, 1);
} catch (Exception e) {
LOG.error("Error parsing JSON: " + value, e);
throw e;
}
}
}

/**
* 计数聚合器
*/
public static class CountAggregator implements AggregateFunction<
Tuple2<String, Integer>, // 输入类型
Tuple2<String, Long>, // 累加器类型
JSONObject> { // 输出类型

@Override
public Tuple2<String, Long> createAccumulator() {
return new Tuple2<>("", 0L);
}

@Override
public Tuple2<String, Long> add(Tuple2<String, Integer> value, Tuple2<String, Long> accumulator) {
return new Tuple2<>(value.f0, accumulator.f1 + 1);
}

@Override
public JSONObject getResult(Tuple2<String, Long> accumulator) {
JSONObject result = new JSONObject();
result.put("window_end_time", System.currentTimeMillis());
result.put("name", accumulator.f0);
result.put("count", accumulator.f1);
return result;
}

@Override
public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
}
}
package org.example.demo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.EnvironmentSettings;

public class FlinkSQLWindowCountDemo {
// Kafka配置
private static final String KAFKA_BROKERS = "localhost:9092";
private static final String SOURCE_TOPIC = "source-topic";
private static final String SINK_TOPIC = "sink-topic";
private static final String CONSUMER_GROUP = "flink-consumer-group";

public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 2. 配置检查点
env.enableCheckpointing(60000);

// 3. 创建Kafka源表
tableEnv.executeSql(
"CREATE TABLE source_table (" +
" name STRING," +
" age INT," +
" proc_time AS PROCTIME()" + // 处理时间
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = '" + SOURCE_TOPIC + "'," +
" 'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "'," +
" 'properties.group.id' = '" + CONSUMER_GROUP + "'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'latest-offset'" +
")"
);

// 4. 创建Kafka结果表
tableEnv.executeSql(
"CREATE TABLE sink_table (" +
" window_end_time BIGINT," +
" name STRING," +
" count BIGINT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = '" + SINK_TOPIC + "'," +
" 'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "'," +
" 'format' = 'json'," +
" 'sink.partitioner' = 'round-robin'" +
")"
);

// 5. 执行窗口计数SQL
String windowSql =
"INSERT INTO sink_table " +
"SELECT " +
" UNIX_TIMESTAMP() * 1000 as window_end_time, " +
" name, " +
" COUNT(*) as count " +
"FROM source_table " +
"GROUP BY " +
" name, " +
" TUMBLE(proc_time, INTERVAL '10' SECOND)";

// 6. 执行SQL
tableEnv.executeSql(windowSql);
}
}

执行

🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖 执行要求(10秒一个窗口,计算每个人出现的次数) 🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 输入:kafka
{"name":"xiaoxi"}
{"name":"xiaohong"}
....

# 输出: kafka
{
"window_end_time": 1635724800000,
"name": "xiaoxi",
"count": 42
}

🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖


# YARN Application模式
./bin/flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-c org.example.demo.FlinkWindowCountDemo \
path/to/your-job.jar


# 执行参数说明
-c,--class <className> # 主类名
-d,--detached # 分离模式运行
-p,--parallelism <parallelism> # 并行度
-yjm,--yarnjobManager <arg> # JobManager内存
-ytm,--yarntaskManager <arg> # TaskManager内存
-ys,--yarnslots <arg> # TaskManager槽位数

执行流程架构图