
数据质量是任何数据科学项目的基石。数据质量差会导致错误的模型、误导性的见解以及代价高昂的业务决策。在本指南中,我们将探索如何使用 Python 构建强大而简洁的数据清理和验证流程。
数据清理和验证流程是一种自动化工作流程,它系统地处理原始数据,以确保其质量在进行分析之前符合公认的标准。可以将其视为数据的质量控制系统:
管道本质上充当守门人的角色,确保只有干净且经过验证的数据才能流入您的分析和机器学习工作流。

Source:
自动化清理管道的一些主要优势包括:

Source:
在开始构建流水线之前,请确保我们已准备好所有工具。我们的流水线将利用 Python 强大的库:
import pandas as pd import numpy as np from datetime import datetime import logging from typing import Dict, List, Any, Optional
代码中将使用以下库,并介绍它们提供的实用功能:

Source:
验证模式本质上是一份蓝图,它定义了数据对其所基于的结构及其遵循的约束的期望。我们的模式定义如下:
VALIDATION_SCHEMA = {
'user_id': {'type': int, 'required': True, 'min_value': 1},
'email': {'type': str, 'required': True, 'pattern': r'^[^@]+@[^@]+\.[^@]+$'},
'age': {'type': int, 'required': False, 'min_value': 0, 'max_value': 120},
'signup_date': {'type': 'datetime', 'required': True},
'score': {'type': float, 'required': False, 'min_value': 0.0, 'max_value': 100.0}
}
该架构指定了一些验证规则:
我们的管道类将充当协调器,协调所有清理和验证操作:
class DataCleaningPipeline: def __init__(self, schema: Dict[str, Any]): self.schema = schema self.errors = [] self.cleaned_rows = 0 self.total_rows = 0 # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getlogger(__name__) def clean_and_validate(self, df: pd.DataFrame) -> pd.DataFrame: """Main pipeline orchestrator""" self.total_rows = len(df) self.logger.info(f"Starting pipeline with {self.total_rows} rows") # Pipeline stages df = self._handle_missing_values(df) df = self._validate_data_types(df) df = self._apply_constraints(df) df = self._remove_outliers(df) self.cleaned_rows = len(df) self._generate_report() return df
该流程遵循系统化 *** :

Source:
让我们实现每个清理阶段,并实现强大的错误处理功能:
以下代码将删除缺少必填字段的行,并使用中位数(对于数字类型)或“未知”(对于非数字类型)填充缺少的可选字段。
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values based on field requirements"""
for column, rules in self.schema.items():
if column in df.columns:
if rules.get('required', False):
# Remove rows with missing required fields
missing_count = df[column].isnull().sum()
if missing_count > 0:
self.errors.append(f"Removed {missing_count} rows with missing {column}")
df = df.dropna(subset=[column])
else:
# Fill optional missing values
if df[column].dtype in ['int64', 'float64']:
df[column].fillna(df[column].median(), inplace=True)
else:
df[column].fillna('Unknown', inplace=True)
return df
以下代码将列转换为指定类型,并删除转换失败的行。
def _validate_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Convert and validate data types"""
for column, rules in self.schema.items():
if column in df.columns:
expected_type = rules['type']
try:
if expected_type == 'datetime':
df[column] = pd.to_datetime(df[column], errors='coerce')
elif expected_type == int:
df[column] = pd.to_numeric(df[column], errors='coerce').astype('Int64')
elif expected_type == float:
df[column] = pd.to_numeric(df[column], errors='coerce')
# Remove rows with conversion failures
invalid_count = df[column].isnull().sum()
if invalid_count > 0:
self.errors.append(f"Removed {invalid_count} rows with invalid {column}")
df = df.dropna(subset=[column])
except Exception as e:
self.logger.error(f"Type conversion error for {column}: {e}")
return df
我们的约束验证系统可确保数据在限制范围内且格式可接受:
def _apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply field-specific constraints"""
for column, rules in self.schema.items():
if column in df.columns:
initial_count = len(df)
# Range validation
if 'min_value' in rules:
df = df[df[column] >= rules['min_value']]
if 'max_value' in rules:
df = df[df[column] <= rules['max_value']]
# Pattern validation for strings
if 'pattern' in rules and df[column].dtype == 'object':
import re
pattern = re.compile(rules['pattern'])
df = df[df[column].astype(str).str.match(pattern, na=False)]
removed_count = initial_count - len(df)
if removed_count > 0:
self.errors.append(f"Removed {removed_count} rows failing {column} constraints")
return df
当考虑多个字段之间的关系时,通常需要高级验证:
def _cross_field_validation(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate relationships between fields"""
initial_count = len(df)
# Example: Signup date should not be in the future
if 'signup_date' in df.columns:
future_signups = df['signup_date'] > datetime.now()
df = df[~future_signups]
removed = future_signups.sum()
if removed > 0:
self.errors.append(f"Removed {removed} rows with future signup dates")
# Example: Age consistency with signup date
if 'age' in df.columns and 'signup_date' in df.columns:
# Remove records where age seems inconsistent with signup timing
suspicious_age = (df['age'] < 13) & (df['signup_date'] < datetime(2010, 1, 1))
df = df[~suspicious_age]
removed = suspicious_age.sum()
if removed > 0:
self.errors.append(f"Removed {removed} rows with suspicious age/date combinations")
return df
异常值可能会对分析结果产生极大的影响。该流程提供了一种先进的 *** 来检测此类异常值:
def _remove_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Remove statistical outliers using IQR method"""
numeric_columns = df.select_dtypes(include=[np.number]).columns
for column in numeric_columns:
if column in self.schema:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = (df[column] < lower_bound) | (df[column] > upper_bound)
outlier_count = outliers.sum()
if outlier_count > 0:
df = df[~outliers]
self.errors.append(f"Removed {outlier_count} outliers from {column}")
return df
以下是我们完整、紧凑的管道实现:
class DataCleaningPipeline:
def __init__(self, schema: Dict[str, Any]):
self.schema = schema
self.errors = []
self.cleaned_rows = 0
self.total_rows = 0
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def clean_and_validate(self, df: pd.DataFrame) -> pd.DataFrame:
self.total_rows = len(df)
self.logger.info(f"Starting pipeline with {self.total_rows} rows")
# Execute cleaning stages
df = self._handle_missing_values(df)
df = self._validate_data_types(df)
df = self._apply_constraints(df)
df = self._remove_outliers(df)
self.cleaned_rows = len(df)
self._generate_report()
return df
def _generate_report(self):
"""Generate cleaning summary report"""
self.logger.info(f"Pipeline completed: {self.cleaned_rows}/{self.total_rows} rows retained")
for error in self.errors:
self.logger.warning(error)
让我们看一下使用真实数据集的管道实际运行情况的演示:
# Create sample problematic data
sample_data = pd.DataFrame({
'user_id': [1, 2, None, 4, 5, 999999],
'email': ['user1@email.com', 'invalid-email', 'user3@domain.co', None, 'user5@test.org', 'user6@example.com'],
'age': [25, 150, 30, -5, 35, 28], # Contains invalid ages
'signup_date': ['2023-01-15', '2030-12-31', '2022-06-10', '2023-03-20', 'invalid-date', '2023-05-15'],
'score': [85.5, 105.0, 92.3, 78.1, -10.0, 88.7] # Contains out-of-range scores
})
# Initialize and run pipeline
pipeline = DataCleaningPipeline(VALIDATION_SCHEMA)
cleaned_data = pipeline.clean_and_validate(sample_data)
print("Cleaned Data:")
print(cleaned_data)
print(f"\nCleaning Summary: {pipeline.cleaned_rows}/{pipeline.total_rows} rows retained")
输出:

输出显示了最终清理后的 DataFrame,其中删除了缺少必填字段、数据类型无效、违反约束条件(例如超出范围的值或错误的电子邮件地址)以及包含异常值的行。摘要行报告了在总数中保留了多少行。这确保只有有效的、可分析的数据才能继续处理,从而提高质量、减少错误,并使您的管道可靠且可重复。
我们的管道已实现可扩展。以下是一些改进建议:

这种清理和验证的概念是检查数据中所有可能出错的元素:缺失值、无效的数据类型或约束、异常值,当然,还要尽可能详细地报告所有这些信息。之后,此管道将成为您在任何类型的数据分析或机器学习任务中进行数据质量保证的起点。这种 *** 的优势包括:自动 QA 检查(确保不会遗漏任何错误)、可重现的结果、全面的错误追踪,以及在特定领域约束下轻松安装多项检查。
通过在数据工作流中部署此类管道,您的数据驱动决策将更有可能保持正确性和准确性。数据清理是一个迭代过程,随着新的数据质量问题出现,您可以在您的领域中扩展此管道,并添加额外的验证规则和清理逻辑。这种模块化设计允许集成新功能,而不会与当前已实现的功能发生冲突。
问题 1:什么是数据清理和验证管道?
A. 它是一个自动化的工作流程,可以检测并修复缺失值、类型不匹配、约束违规和异常值,以确保只有干净的数据才能进入分析或建模。
问题 2:为什么使用管道而不是手动清理?
A. 管道比手动 *** 更快、更一致、可重复且更不容易出错,这在处理大型数据集时尤为重要。
问题 3:缺少或无效值的行会发生什么?
A. 缺少必填字段或验证失败的行将被删除。可选字段将获取默认值,例如中位数或“未知”。
不久前,PHP 8.0大张旗鼓地发布了。它带来了许多新特性、性能增强和变化——其中最令人兴奋的是新的JIT编译器。 技术世界总是在向前发展,PHP也是如此。 ,包含了几个令人兴奋的特性。它定于今年晚些时候于2021年11月25日发布。 在本文中,我们将详细介绍PHP 8.1将带来哪些新的东...
Linux面板环境安装,主要支持LNMP和LAMP、Tomcat、node.js。不过对于大部分站长来说,主要是LNMP和LAMP两个环境的安装。 LNMP和LAMP两个环境的最大区别是,前者采用Nginx作为Web服务器,后者则采用Apache作为Web服务器。(选择哪个作为您的Web服务器,可...
宝塔面板中的网站管理是非常重要的一部分,也是站长经常需要使用到的功能模块。网站管理,主要用于管理和创建WEB站点。如果您是宝塔面板的使用用户,应该对此模块有充分的了解,以便于您更高效地管理网站。 宝塔面板网站管理模块包括:添加新网站、修改默认页、设置默认站点、站点列表、站点的运行与停止、备份站点、...
每台连接到Internet的计算机都有一个Internet协议 (IP) 地址。但是,并非所有IP地址的外观或行为都相同。 如果您使用计算机网络或服务器,了解动态IP和静态IP之间的区别至关重要。通过详细了解每个协议,您可以选择最适合您需求的解决方案。 在本文中,我们将讨论静态和动态IP之间...
宝塔面板提供丰富的软件以一键安装,这让服务器环境搭建提供不少的便利性,站长可以根据实际需求快速编译安装以实现不同的功能需求。 软件管理,主要是宝塔提供的一些面板扩展插件。 Nginx Nginx是一个高性能的HTTP和反向代理服务器,具有轻量级、占用内存小,并发能力强等优势。 w...
想成为一名网络开发人员或好奇工作的哪些子类型的薪水最高?Web开发是一个竞争激烈、多样化的行业,随着新语言和框架的出现而不断发展。 询问Web开发人员的薪水是一个难以解决的问题(尽管我们尝试)。有太多的因素需要考虑。 无论您是自由开发者还是有兴趣从事更传统的工作、喜欢前端或后端工作,或者想知...