Skip to main content

离线数仓

架构图

Spark

Spark DataStream API 作业

package org.example.demo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;
import java.util.Properties;

public class SparkDemo {
public static void main(String[] args) {
// 1. 创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("HiveToMysql")
.enableHiveSupport() // 启用Hive支持
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.getOrCreate();

try {
// 2. 从Hive读取数据
Dataset<Row> hiveDF = spark.sql("SELECT name, date, amount FROM expenses");

// 3. 数据转换:按姓名和日期分组,计算总花费
Dataset<Row> resultDF = hiveDF.groupBy("name", "date")
.sum("amount")
.withColumnRenamed("sum(amount)", "total_amount");

// 4. 配置MySQL连接参数
Properties connectionProperties = new Properties();
connectionProperties.put("user", "your_username");
connectionProperties.put("password", "your_password");
connectionProperties.put("driver", "com.mysql.cj.jdbc.Driver");

// 5. 写入MySQL
resultDF.write()
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/your_database",
"daily_expenses",
connectionProperties);

} finally {
// 6. 关闭SparkSession
spark.stop();
}
}
}

Spark SQL 作业

package org.example.demo

import org.apache.spark.sql.SparkSession

object SparkSQLDemo {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession
val spark = SparkSession.builder()
.appName("HiveToMysqlSQL")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.getOrCreate()

try {
// 2. 注册临时表
spark.sql("USE your_database")

// 3. 执行SQL查询
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW daily_expenses AS
SELECT
name,
date,
SUM(amount) as total_amount
FROM expenses
GROUP BY name, date
""")

// 4. 将结果写入MySQL
spark.sql("""
INSERT INTO TABLE daily_expenses
SELECT name, date, total_amount
FROM daily_expenses_view
""")

} finally {
spark.stop()
}
}
}

spark-submit 执行

🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖 执行要求(计算每个学生当天的花费总额) 🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 输入:hive
CREATE TABLE expenses (
name STRING,
date DATE,
amount DECIMAL(10,2)
)
STORED AS ORC;

# 输出: mysql
CREATE TABLE daily_expenses (
name VARCHAR(255),
date DATE,
total_amount DECIMAL(10,2),
PRIMARY KEY (name, date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖


# 提交
spark-submit \
--class org.example.demo.SparkSQLDemo \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 4g \
--executor-cores 2 \
--num-executors 3 \
--queue production \
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
/path/to/your-application.jar


# 关键配置参数
--class # 执行jar包的入口类
--master # 集群管理器地址
--deploy-mode # 部署模式(client/cluster)
--driver-memory # Driver程序内存
--executor-memory # 单个executor内存
--executor-cores # 单个executor使用的核心数
--num-executors # executor数量
--queue # 指定YARN队列

执行流程架构图

Hive

SQL作业

  • app.sql
-- 设置动态分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

-- 设置并行度
SET mapred.reduce.tasks=10;
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;

-- 设置压缩
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- 设置优化参数
SET hive.optimize.skewjoin=true;
SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.index.filter=true;


-- 目标表结构 (daily_expenses)
CREATE TABLE IF NOT EXISTS daily_expenses (
name STRING COMMENT '姓名',
date DATE COMMENT '每日时间',
total_amount DECIMAL(10,2) COMMENT '花费总和'
)
PARTITIONED BY (dt STRING)
CLUSTERED BY (name) INTO 8 BUCKETS
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- 插入数据(每日增量)
INSERT OVERWRITE TABLE daily_expenses
PARTITION (dt = '${hiveconf:dt}')
SELECT
name,
date,
SUM(amount) as total_amount
FROM expenses
WHERE dt = '${hiveconf:dt}'
GROUP BY name, date;

Beeline 执行(推荐)

使用Beeline的好处
  • 使用 Kerberos 安全认证
  • 细粒度权限控制
  • 传输SSL加密
  • 性能优化:连接池配置,资源控制
  • 多用户支持:并发控制,资源隔离
  • 易于管理:监控配置,日志管理
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖 执行要求(计算每个学生当天的花费总额) 🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 输入: hive
CREATE TABLE IF NOT EXISTS expenses (
name STRING COMMENT '姓名',
date DATE COMMENT '每日时间',
amount DECIMAL(10,2) COMMENT '花费金额'
)
PARTITIONED BY (dt STRING)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

# 输出: hive

🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖


# 直接执行SQL文件
beeline -u "jdbc:hive2://localhost:10000/default" \
-n username \
-p password \
-f app.sql

# 交互式连接
beeline -u "jdbc:hive2://localhost:10000/default" \
-n username \
-p password

Hive CLI 执行(不推荐)

Hive CLI的局限性
  1. 安全问题

    • 直接访问MetaStore,缺乏权限控制

    • 无加密传输机制

    • 需要直接访问HDFS

  2. 并发限制

    • 不支持多用户并发

    • 资源隔离困难

    • 连接管理能力弱

  3. 维护问题

    • 难以监控和管理

    • 故障排查困难

    • 升级维护复杂

# 方式1:直接执行SQL文件
hive -f /path/to/script.sql

# 方式2:执行单行命令
hive -e "SELECT * FROM orders LIMIT 10;"

执行流程架构图