友情链接:
随着企业的数据量、数据源和数据类型的增加,在分析、数据科学和机器学习计划中利用这些数据以获得业务洞察力的重要性也在增加。优先考虑这些计划的需求给数据工程团队带来了越来越大的压力,因为将原始、杂乱的数据处理成干净、新鲜、可靠的数据是实施这 ETL 是抽取、转换和加载的缩写,是数据工程师用来从不同来源提取数据、将数据转换为可用和可信资源,并将数据加载到 终用户可以访问和使用的系统中以解决业务问题的流程。
1、全量抽取
特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
实现方式:直接查询整个表或读取整个文件。
2、增量抽取
特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
常用技术:
连接数据库,执行 SQL 查询,将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接配置
db_config ={
'host':'localhost',
'user':'root',
'password':'password',
'database':'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 执行 SQL 查询
query ="SELECT * FROM users"# 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)
# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
读取 CSV 文件,将数据加载到 DataFrame 中。
import pandas as pd
# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')# 假设 orders.csv 包含 order_id, user_id, amount 字段
# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
发送 HTTP 请求到 API,解析返回的 JSON 数据,将数据保存到 DataFrame 中。
import requests
import pandas as pd
# API 配置
api_url ="https://api.weatherapi.com/v1/current.json"
api_key ="your_api_key"# 替换为你的 API Key
params={
'key': api_key,
'q':'Beijing'# 查询北京的天气
}
# 发送 HTTP 请求
response = requests.get(api_url,params=params)
data = response.json()# 解析 JSON 数据
# 将数据保存到 DataFrame
weather_data ={
'location': data['location']['name'],
'temperature': data['current']['temp_c'],
'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])
# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)
数据清洗是确保数据质量的关键步骤,主要包括处理缺失值、删除重复记录、修正错误数据、标准化数据格式和处理异常值等操作。
1、处理缺失值。填充默认值(如用 0 填充缺失的数值,用 Unknown 填充缺失的文本);删除包含缺失值的记录(如果缺失值比例较高或对分析影响较大)。
2、删除重复数据。识别并删除完全重复的记录;根据业务规则删除部分重复的记录(如保留最新的一条记录)。
3、修正错误数据。修正格式错误(如日期格式不一致、电话号码格式错误);修正逻辑错误(如年龄为负数、订单金额为 0)。
4、标准化数据格式。统一字段格式(如日期统一为 YYYY-MM-DD,金额统一为两位小数);统一编码(如性别字段统一为 Male 和 Female)。
5、处理异常值。识别并处理异常值(如年龄超过 150 岁,订单金额为负数);根据业务规则修正或剔除异常值。
6、数据拆分与合并。拆分字段(如将地址字段拆分为省、市、区);合并字段(如将姓和名字段合并为全名)。
1、数据质量评估。对原始数据进行初步分析,识别数据质量问题(如缺失值、重复值、异常值);使用统计方法(如描述性统计)或可视化工具(如直方图、箱线图)评估数据质量。
2、制定清洗规则。根据业务需求和数据质量问题,制定清洗规则(如缺失值填充规则、异常值处理规则)。
3、执行清洗操作。根据清洗规则,对数据进行清洗(如填充缺失值、删除重复记录、修正错误数据)。
4、验证清洗结果。检查清洗后的数据是否符合预期(如缺失值是否已填充,重复记录是否已删除);记录清洗过程中的错误和警告。
5、输出清洗后的数据。将清洗后的数据保存到目标系统(如数据库、文件)。
假设我们有一个包含用户信息的 CSV 文件 users.csv,需要进行以下清洗操作:
原始数据 (users.csv)
id | name | age | gender | join_date |
1 | Alice | 25 | F | 2025-01-01 |
2 | Bob | M | 2025-02-15 | |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | M | 2025-04-20 |
5 | Eve | 120 | F | 2025-05-25 |
1 | Alice | 25 | F | 2025-01-01 |
清洗后的数据
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |
使用 Python 和 pandas 实现上述清洗任务的代码:
import pandas as pd
# 读取数据
df = pd.read_csv('users.csv')
# 1. 处理缺失值:将缺失的年龄字段填充为默认值 0
df['age'] = df['age'].fillna(0)
# 2. 删除重复记录:根据 id 字段删除完全重复的记录
df = df.drop_duplicates(subset=['id'])
# 3. 修正错误数据:将性别字段统一为 Male 和 Female
df['gender'] = df['gender'].replace({'M': 'Male', 'F': 'Female'})
# 4. 标准化数据格式:将日期字段统一为 YYYY-MM-DD 格式
df['join_date'] = pd.to_datetime(df['join_date']).dt.strftime('%Y-%m-%d')
# 5. 处理异常值:删除年龄超过 100 岁的记录
df = df[df['age'] <= 100]
# 输出清洗后的数据
print("清洗后的数据:")
print(df)
# 保存清洗后的数据到新文件
df.to_csv('cleaned_users.csv', index=False)
1、数据转换操作。数据映射:将源字段与目标字段匹配;数据拆分:将一个字段拆分为多个字段(如身份证号拆分为地区码、出生日期);数据聚合:对数据进行分组和计算(如按地区汇总销售额)。
2、数据验证。检查转换后的数据是否符合预期(如字段类型、数据范围);记录转换过程中的错误和警告。
3、数据输出。将转换后的数据保存到目标系统(如数据库、文件)。
假设我们有一个用户信息的原始数据集raw_users.csv,该数据集包含用户的ID、姓名、性别、出生日期和地址等字段。我们的目标是将这些数据转换为适合分析的形式,并最终加载到数据库中。具体转换需求如下:
原始数据 (raw_users.csv)
id | name | gender | birth_date | address |
1 | Alice | F | 1990-01-01 | 北京市朝阳区 |
2 | Bob | M | 1985-05-20 | 上海市浦东新区 |
3 | Carol | F | 1992-07-14 | 广东省广州市天河区 |
转换后的数据
id | name | gender | birth_date | province | city | district |
1 | Alice | Female | 1990-01-01 | 北京 | 朝阳 | |
2 | Bob | Male | 1985-05-20 | 上海 | 浦东 | |
3 | Carol | Female | 1992-07-14 | 广东 | 广州 | 天河 |
使用 Python 实现数据转换
import pandas as pd
# 读取原始数据
df = pd.read_csv('raw_users.csv')
# 1. 数据映射:性别字段标准化
gender_mapping = {'F': 'Female', 'M': 'Male'}
df['gender'] = df['gender'].map(gender_mapping)
# 2. 数据拆分:从地址字段提取省份、城市、区县
def split_address(address):
parts = address.split('市')
province = parts[0].replace('省', '')
city = parts[1].split('区')[0]
district = parts[1].split('区')[1] if len(parts[1].split('区')) > 1 else ''
return province, city, district
df[['province', 'city', 'district']] = df['address'].apply(lambda x: pd.Series(split_address(x)))
# 删除原始地址列
df.drop(columns=['address'], inplace=True)
# 3. 数据聚合:按省份统计用户数量并计算平均年龄
df['birth_date'] = pd.to_datetime(df['birth_date'])
current_year = pd.Timestamp.now().year
df['age'] = current_year - df['birth_date'].dt.year
summary = df.groupby('province').agg(
user_count=('id', 'count'),
avg_age=('age', 'mean') ).reset_index()
print("转换后的汇总数据:")
print(summary)
# 输出转换后的数据到新文件
df.to_csv('transformed_users.csv', index=False)
summary.to_csv('province_summary.csv', index=False)
1、选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2、数据写入目标系统。将数据写入目标系统的表或文件中。
3、数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4、异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用)。
1、全量加载(Full Load)
2、增量加载(Incremental Load)
3、批量加载(Bulk Load)
4、实时加载(Real-time Load)
1.选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2.数据写入目标系统。数据库:使用 SQL 语句(如 INSERT INTO、UPDATE)或数据库工具(如AgroDB内置工具Impexp);文件:将数据保存为文件(如 CSV、JSON、Parquet);数据湖/数据仓库:使用专用工具(如星环湖仓一体平台)。
3.数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4.异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用);根据错误类型,选择重试、跳过或报警。
假设我们有一个清洗后的用户数据表 cleaned_users.csv,需要将其加载到 MySQL 数据库中。以下是具体实现:
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |
字段名 | 类型 | 说明 |
id | INT | 用户 ID |
name | VARCHAR(50) | 用户姓名 |
age | INT | 用户年龄 |
gender | VARCHAR(10) | 用户性别 |
join_date | DATE | 加入日期 |
以下是使用 Python 和 pandas + SQLAlchemy 实现数据加载的代码:
import pandas as pd
from sqlalchemy import create_engine
# 读取清洗后的数据
df = pd.read_csv('cleaned_users.csv')
# 数据库连接配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 将数据加载到 MySQL 数据库
try:
df.to_sql('users', con=engine, if_exists='append', index=False) # if_exists='append' 表示增量加载
print("数据加载成功!")
except Exception as e:
print(f"数据加载失败:{e}")
数据加载后,MySQL 数据库中的 users 表内容如下:
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |
友情链接:
随着企业的数据量、数据源和数据类型的增加,在分析、数据科学和机器学习计划中利用这些数据以获得业务洞察力的重要性也在增加。优先考虑这些计划的需求给数据工程团队带来了越来越大的压力,因为将原始、杂乱的数据处理成干净、新鲜、可靠的数据是实施这 ETL 是抽取、转换和加载的缩写,是数据工程师用来从不同来源提取数据、将数据转换为可用和可信资源,并将数据加载到 终用户可以访问和使用的系统中以解决业务问题的流程。
1、全量抽取
特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
实现方式:直接查询整个表或读取整个文件。
2、增量抽取
特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
常用技术:
连接数据库,执行 SQL 查询,将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接配置
db_config ={
'host':'localhost',
'user':'root',
'password':'password',
'database':'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 执行 SQL 查询
query ="SELECT * FROM users"# 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)
# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
读取 CSV 文件,将数据加载到 DataFrame 中。
import pandas as pd
# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')# 假设 orders.csv 包含 order_id, user_id, amount 字段
# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
发送 HTTP 请求到 API,解析返回的 JSON 数据,将数据保存到 DataFrame 中。
import requests
import pandas as pd
# API 配置
api_url ="https://api.weatherapi.com/v1/current.json"
api_key ="your_api_key"# 替换为你的 API Key
params={
'key': api_key,
'q':'Beijing'# 查询北京的天气
}
# 发送 HTTP 请求
response = requests.get(api_url,params=params)
data = response.json()# 解析 JSON 数据
# 将数据保存到 DataFrame
weather_data ={
'location': data['location']['name'],
'temperature': data['current']['temp_c'],
'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])
# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)
数据清洗是确保数据质量的关键步骤,主要包括处理缺失值、删除重复记录、修正错误数据、标准化数据格式和处理异常值等操作。
1、处理缺失值。填充默认值(如用 0 填充缺失的数值,用 Unknown 填充缺失的文本);删除包含缺失值的记录(如果缺失值比例较高或对分析影响较大)。
2、删除重复数据。识别并删除完全重复的记录;根据业务规则删除部分重复的记录(如保留最新的一条记录)。
3、修正错误数据。修正格式错误(如日期格式不一致、电话号码格式错误);修正逻辑错误(如年龄为负数、订单金额为 0)。
4、标准化数据格式。统一字段格式(如日期统一为 YYYY-MM-DD,金额统一为两位小数);统一编码(如性别字段统一为 Male 和 Female)。
5、处理异常值。识别并处理异常值(如年龄超过 150 岁,订单金额为负数);根据业务规则修正或剔除异常值。
6、数据拆分与合并。拆分字段(如将地址字段拆分为省、市、区);合并字段(如将姓和名字段合并为全名)。
1、数据质量评估。对原始数据进行初步分析,识别数据质量问题(如缺失值、重复值、异常值);使用统计方法(如描述性统计)或可视化工具(如直方图、箱线图)评估数据质量。
2、制定清洗规则。根据业务需求和数据质量问题,制定清洗规则(如缺失值填充规则、异常值处理规则)。
3、执行清洗操作。根据清洗规则,对数据进行清洗(如填充缺失值、删除重复记录、修正错误数据)。
4、验证清洗结果。检查清洗后的数据是否符合预期(如缺失值是否已填充,重复记录是否已删除);记录清洗过程中的错误和警告。
5、输出清洗后的数据。将清洗后的数据保存到目标系统(如数据库、文件)。
假设我们有一个包含用户信息的 CSV 文件 users.csv,需要进行以下清洗操作:
原始数据 (users.csv)
id | name | age | gender | join_date |
1 | Alice | 25 | F | 2025-01-01 |
2 | Bob | M | 2025-02-15 | |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | M | 2025-04-20 |
5 | Eve | 120 | F | 2025-05-25 |
1 | Alice | 25 | F | 2025-01-01 |
清洗后的数据
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |
使用 Python 和 pandas 实现上述清洗任务的代码:
import pandas as pd
# 读取数据
df = pd.read_csv('users.csv')
# 1. 处理缺失值:将缺失的年龄字段填充为默认值 0
df['age'] = df['age'].fillna(0)
# 2. 删除重复记录:根据 id 字段删除完全重复的记录
df = df.drop_duplicates(subset=['id'])
# 3. 修正错误数据:将性别字段统一为 Male 和 Female
df['gender'] = df['gender'].replace({'M': 'Male', 'F': 'Female'})
# 4. 标准化数据格式:将日期字段统一为 YYYY-MM-DD 格式
df['join_date'] = pd.to_datetime(df['join_date']).dt.strftime('%Y-%m-%d')
# 5. 处理异常值:删除年龄超过 100 岁的记录
df = df[df['age'] <= 100]
# 输出清洗后的数据
print("清洗后的数据:")
print(df)
# 保存清洗后的数据到新文件
df.to_csv('cleaned_users.csv', index=False)
1、数据转换操作。数据映射:将源字段与目标字段匹配;数据拆分:将一个字段拆分为多个字段(如身份证号拆分为地区码、出生日期);数据聚合:对数据进行分组和计算(如按地区汇总销售额)。
2、数据验证。检查转换后的数据是否符合预期(如字段类型、数据范围);记录转换过程中的错误和警告。
3、数据输出。将转换后的数据保存到目标系统(如数据库、文件)。
假设我们有一个用户信息的原始数据集raw_users.csv,该数据集包含用户的ID、姓名、性别、出生日期和地址等字段。我们的目标是将这些数据转换为适合分析的形式,并最终加载到数据库中。具体转换需求如下:
原始数据 (raw_users.csv)
id | name | gender | birth_date | address |
1 | Alice | F | 1990-01-01 | 北京市朝阳区 |
2 | Bob | M | 1985-05-20 | 上海市浦东新区 |
3 | Carol | F | 1992-07-14 | 广东省广州市天河区 |
转换后的数据
id | name | gender | birth_date | province | city | district |
1 | Alice | Female | 1990-01-01 | 北京 | 朝阳 | |
2 | Bob | Male | 1985-05-20 | 上海 | 浦东 | |
3 | Carol | Female | 1992-07-14 | 广东 | 广州 | 天河 |
使用 Python 实现数据转换
import pandas as pd
# 读取原始数据
df = pd.read_csv('raw_users.csv')
# 1. 数据映射:性别字段标准化
gender_mapping = {'F': 'Female', 'M': 'Male'}
df['gender'] = df['gender'].map(gender_mapping)
# 2. 数据拆分:从地址字段提取省份、城市、区县
def split_address(address):
parts = address.split('市')
province = parts[0].replace('省', '')
city = parts[1].split('区')[0]
district = parts[1].split('区')[1] if len(parts[1].split('区')) > 1 else ''
return province, city, district
df[['province', 'city', 'district']] = df['address'].apply(lambda x: pd.Series(split_address(x)))
# 删除原始地址列
df.drop(columns=['address'], inplace=True)
# 3. 数据聚合:按省份统计用户数量并计算平均年龄
df['birth_date'] = pd.to_datetime(df['birth_date'])
current_year = pd.Timestamp.now().year
df['age'] = current_year - df['birth_date'].dt.year
summary = df.groupby('province').agg(
user_count=('id', 'count'),
avg_age=('age', 'mean') ).reset_index()
print("转换后的汇总数据:")
print(summary)
# 输出转换后的数据到新文件
df.to_csv('transformed_users.csv', index=False)
summary.to_csv('province_summary.csv', index=False)
1、选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2、数据写入目标系统。将数据写入目标系统的表或文件中。
3、数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4、异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用)。
1、全量加载(Full Load)
2、增量加载(Incremental Load)
3、批量加载(Bulk Load)
4、实时加载(Real-time Load)
1.选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2.数据写入目标系统。数据库:使用 SQL 语句(如 INSERT INTO、UPDATE)或数据库工具(如AgroDB内置工具Impexp);文件:将数据保存为文件(如 CSV、JSON、Parquet);数据湖/数据仓库:使用专用工具(如星环湖仓一体平台)。
3.数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4.异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用);根据错误类型,选择重试、跳过或报警。
假设我们有一个清洗后的用户数据表 cleaned_users.csv,需要将其加载到 MySQL 数据库中。以下是具体实现:
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |
字段名 | 类型 | 说明 |
id | INT | 用户 ID |
name | VARCHAR(50) | 用户姓名 |
age | INT | 用户年龄 |
gender | VARCHAR(10) | 用户性别 |
join_date | DATE | 加入日期 |
以下是使用 Python 和 pandas + SQLAlchemy 实现数据加载的代码:
import pandas as pd
from sqlalchemy import create_engine
# 读取清洗后的数据
df = pd.read_csv('cleaned_users.csv')
# 数据库连接配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 将数据加载到 MySQL 数据库
try:
df.to_sql('users', con=engine, if_exists='append', index=False) # if_exists='append' 表示增量加载
print("数据加载成功!")
except Exception as e:
print(f"数据加载失败:{e}")
数据加载后,MySQL 数据库中的 users 表内容如下:
id | name | age | gender | join_date |
1 | Alice | 25 | Female | 2025-01-01 |
2 | Bob | 0 | Male | 2025-02-15 |
3 | Charlie | 30 | Male | 2025-03-10 |
4 | David | 28 | Male | 2025-04-20 |