离线数仓
架构图
Spark
Spark DataStream API 作业
package org.example.demo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;
import java.util.Properties;
public class SparkDemo {
public static void main(String[] args) {
// 1. 创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("HiveToMysql")
.enableHiveSupport() // 启用Hive支持
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.getOrCreate();
try {
// 2. 从Hive读取数据
Dataset<Row> hiveDF = spark.sql("SELECT name, date, amount FROM expenses");
// 3. 数据转换:按姓名和日期分组,计算总花费
Dataset<Row> resultDF = hiveDF.groupBy("name", "date")
.sum("amount")
.withColumnRenamed("sum(amount)", "total_amount");
// 4. 配置MySQL连接参数
Properties connectionProperties = new Properties();
connectionProperties.put("user", "your_username");
connectionProperties.put("password", "your_password");
connectionProperties.put("driver", "com.mysql.cj.jdbc.Driver");
// 5. 写入MySQL
resultDF.write()
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/your_database",
"daily_expenses",
connectionProperties);
} finally {
// 6. 关闭SparkSession
spark.stop();
}
}
}
Spark SQL 作业
package org.example.demo
import org.apache.spark.sql.SparkSession
object SparkSQLDemo {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession
val spark = SparkSession.builder()
.appName("HiveToMysqlSQL")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.getOrCreate()
try {
// 2. 注册临时表
spark.sql("USE your_database")
// 3. 执行SQL查询
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW daily_expenses AS
SELECT
name,
date,
SUM(amount) as total_amount
FROM expenses
GROUP BY name, date
""")
// 4. 将结果写入MySQL
spark.sql("""
INSERT INTO TABLE daily_expenses
SELECT name, date, total_amount
FROM daily_expenses_view
""")
} finally {
spark.stop()
}
}
}
spark-submit 执行
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖 执行要求(计算每个学生当天的花费总额) 🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 输入:hive
CREATE TABLE expenses (
name STRING,
date DATE,
amount DECIMAL(10,2)
)
STORED AS ORC;
# 输出: mysql
CREATE TABLE daily_expenses (
name VARCHAR(255),
date DATE,
total_amount DECIMAL(10,2),
PRIMARY KEY (name, date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 提交
spark-submit \
--class org.example.demo.SparkSQLDemo \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 4g \
--executor-cores 2 \
--num-executors 3 \
--queue production \
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
/path/to/your-application.jar
# 关键配置参数
--class # 执行jar包的入口类
--master # 集群管理器地址
--deploy-mode # 部署模式(client/cluster)
--driver-memory # Driver程序内存
--executor-memory # 单个executor内存
--executor-cores # 单个executor使用的核心数
--num-executors # executor数量
--queue # 指定YARN队列
执行流程架构图
Hive
SQL作业
- app.sql
-- 设置动态分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 设置并行度
SET mapred.reduce.tasks=10;
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
-- 设置压缩
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 设置优化参数
SET hive.optimize.skewjoin=true;
SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.index.filter=true;
-- 目标表结构 (daily_expenses)
CREATE TABLE IF NOT EXISTS daily_expenses (
name STRING COMMENT '姓名',
date DATE COMMENT '每日时间',
total_amount DECIMAL(10,2) COMMENT '花费总和'
)
PARTITIONED BY (dt STRING)
CLUSTERED BY (name) INTO 8 BUCKETS
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');
-- 插入数据(每日增量)
INSERT OVERWRITE TABLE daily_expenses
PARTITION (dt = '${hiveconf:dt}')
SELECT
name,
date,
SUM(amount) as total_amount
FROM expenses
WHERE dt = '${hiveconf:dt}'
GROUP BY name, date;
Beeline 执行(推荐)
使用Beeline的好处
- 使用
Kerberos
安全认证- 细粒度权限控制
- 传输
SSL
加密- 性能优化:连接池配置,资源控制
- 多用户支持:并发控制,资源隔离
- 易于管理:监控配置,日志管理
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖 执行要求(计算每个学生当天的花费总额) 🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 输入: hive
CREATE TABLE IF NOT EXISTS expenses (
name STRING COMMENT '姓名',
date DATE COMMENT '每日时间',
amount DECIMAL(10,2) COMMENT '花费金额'
)
PARTITIONED BY (dt STRING)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');
# 输出: hive
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
# 直接执行SQL文件
beeline -u "jdbc:hive2://localhost:10000/default" \
-n username \
-p password \
-f app.sql
# 交互式连接
beeline -u "jdbc:hive2://localhost:10000/default" \
-n username \
-p password
Hive CLI 执行(不推荐)
Hive CLI的局限性
安全问题:
直接访问MetaStore,缺乏权限控制
无加密传输机制
需要直接访问HDFS
并发限制:
不支持多用户并发
资源隔离困难
连接管理能力弱
维护问题:
难以监控和管理
故障排查困难
升级维护复杂
# 方式1:直接执行SQL文件
hive -f /path/to/script.sql
# 方式2:执行单行命令
hive -e "SELECT * FROM orders LIMIT 10;"