数据处理与标注最佳实践
本文档整理 AI 系统数据处理和标注的最佳实践。
数据处理流程
txt
┌─────────────────────────────────────────────────────┐
│ 数据处理流程 │
├─────────────────────────────────────────────────────┤
│ │
│ 数据采集 ──→ 数据清洗 ──→ 数据标注 ──→ 数据验证 │
│ │ │ │ │ │
│ ↓ ↓ ↓ ↓ │
│ 格式统一 去重脱敏 标注规范 质量检查 │
│ 来源记录 异常处理 标注指南 一致性验证 │
│ 版本控制 数据增强 多人标注 偏差检测 │
│ │
│ ↓ 数据版本管理 ↓ │
│ │
│ 训练集 ──────→ 验证集 ──────→ 测试集 │
│ 70% 15% 15% │
│ │
└─────────────────────────────────────────────────────┘数据采集
数据来源管理
python
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime
import hashlib
import json
@dataclass
class DataSource:
id: str
name: str
type: str # "api", "file", "database", "web"
url: Optional[str]
description: str
created_at: datetime
updated_at: datetime
metadata: Dict[str, Any]
@dataclass
class DataRecord:
id: str
source_id: str
content: str
label: Optional[str]
metadata: Dict[str, Any]
created_at: datetime
version: int
class DataCollector:
"""数据采集器"""
def __init__(self):
self.sources: Dict[str, DataSource] = {}
self.records: List[DataRecord] = []
def register_source(
self,
name: str,
type: str,
url: Optional[str] = None,
description: str = "",
metadata: Dict[str, Any] = None
) -> str:
"""注册数据源"""
source_id = self._generate_id(f"{name}_{type}")
source = DataSource(
id=source_id,
name=name,
type=type,
url=url,
description=description,
created_at=datetime.now(),
updated_at=datetime.now(),
metadata=metadata or {}
)
self.sources[source_id] = source
return source_id
def collect_from_api(
self,
source_id: str,
endpoint: str,
params: Dict[str, Any] = None
) -> List[DataRecord]:
"""从 API 采集数据"""
# 实现 API 采集
pass
def collect_from_file(
self,
source_id: str,
file_path: str,
format: str = "json"
) -> List[DataRecord]:
"""从文件采集数据"""
records = []
with open(file_path, 'r', encoding='utf-8') as f:
if format == "json":
data = json.load(f)
if isinstance(data, list):
for item in data:
record = DataRecord(
id=self._generate_id(str(item)),
source_id=source_id,
content=json.dumps(item),
label=item.get('label'),
metadata=item,
created_at=datetime.now(),
version=1
)
records.append(record)
self.records.extend(records)
return records
def collect_from_database(
self,
source_id: str,
query: str,
connection_string: str
) -> List[DataRecord]:
"""从数据库采集数据"""
# 实现数据库采集
pass
def _generate_id(self, content: str) -> str:
"""生成唯一 ID"""
return hashlib.md5(content.encode()).hexdigest()[:16]数据清洗
python
from typing import List, Callable, Any
from dataclasses import dataclass
import re
@dataclass
class CleaningRule:
name: str
description: str
function: Callable[[str], str]
class DataCleaner:
"""数据清洗器"""
def __init__(self):
self.rules: List[CleaningRule] = []
self._init_default_rules()
def _init_default_rules(self):
"""初始化默认清洗规则"""
self.rules = [
CleaningRule(
name="remove_extra_whitespace",
description="移除多余空白",
function=self._remove_extra_whitespace
),
CleaningRule(
name="normalize_unicode",
description="统一 Unicode 字符",
function=self._normalize_unicode
),
CleaningRule(
name="remove_html_tags",
description="移除 HTML 标签",
function=self._remove_html_tags
),
CleaningRule(
name="normalize_punctuation",
description="统一标点符号",
function=self._normalize_punctuation
),
]
def clean(self, text: str, rules: List[str] = None) -> str:
"""清洗文本"""
if rules is None:
rules = [r.name for r in self.rules]
for rule in self.rules:
if rule.name in rules:
text = rule.function(text)
return text
def _remove_extra_whitespace(self, text: str) -> str:
"""移除多余空白"""
text = re.sub(r' +', ' ', text)
text = re.sub(r'\n+', '\n', text)
text = re.sub(r'\t+', '\t', text)
return text.strip()
def _normalize_unicode(self, text: str) -> str:
"""统一 Unicode 字符"""
import unicodedata
return unicodedata.normalize('NFKC', text)
def _remove_html_tags(self, text: str) -> str:
"""移除 HTML 标签"""
return re.sub(r'<[^>]+>', '', text)
def _normalize_punctuation(self, text: str) -> str:
"""统一标点符号"""
replacements = {
',': ',',
'。': '.',
'!': '!',
'?': '?',
';': ';',
':': ':',
'"': '"',
'"': '"',
''': "'",
''': "'",
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
def add_rule(self, rule: CleaningRule):
"""添加清洗规则"""
self.rules.append(rule)
def remove_duplicates(self, records: List[DataRecord]) -> List[DataRecord]:
"""移除重复记录"""
seen = set()
unique = []
for record in records:
content_hash = hashlib.md5(record.content.encode()).hexdigest()
if content_hash not in seen:
seen.add(content_hash)
unique.append(record)
return unique
def detect_anomalies(
self,
records: List[DataRecord],
threshold: float = 3.0
) -> List[DataRecord]:
"""检测异常记录"""
lengths = [len(r.content) for r in records]
mean = sum(lengths) / len(lengths)
std = (sum((l - mean) ** 2 for l in lengths) / len(lengths)) ** 0.5
anomalies = []
for record in records:
z_score = abs(len(record.content) - mean) / std if std > 0 else 0
if z_score > threshold:
anomalies.append(record)
return anomalies
# 使用示例
cleaner = DataCleaner()
# 添加自定义规则
cleaner.add_rule(CleaningRule(
name="remove_emails",
description="移除邮箱地址",
function=lambda text: re.sub(r'\S+@\S+', '[EMAIL]', text)
))
# 清洗数据
text = "这是 一个 测试文本。"
cleaned = cleaner.clean(text)
print(cleaned)数据标注
标注规范设计
python
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
class AnnotationType(Enum):
CLASSIFICATION = "classification"
NER = "ner" # 命名实体识别
SENTIMENT = "sentiment"
QA = "qa"
SUMMARIZATION = "summarization"
TRANSLATION = "translation"
@dataclass
class LabelSchema:
name: str
type: AnnotationType
labels: List[str]
descriptions: Dict[str, str]
examples: List[Dict[str, Any]]
@dataclass
class AnnotationTask:
id: str
record_id: str
task_type: AnnotationType
content: str
instructions: str
labels: Optional[List[str]] = None
metadata: Dict[str, Any] = None
@dataclass
class Annotation:
task_id: str
annotator_id: str
labels: List[str]
confidence: float
notes: Optional[str] = None
created_at: datetime = None
class AnnotationManager:
"""标注管理器"""
def __init__(self):
self.schemas: Dict[str, LabelSchema] = {}
self.tasks: Dict[str, AnnotationTask] = {}
self.annotations: List[Annotation] = []
def create_schema(
self,
name: str,
task_type: AnnotationType,
labels: List[str],
descriptions: Dict[str, str] = None,
examples: List[Dict[str, Any]] = None
) -> LabelSchema:
"""创建标注规范"""
schema = LabelSchema(
name=name,
type=task_type,
labels=labels,
descriptions=descriptions or {},
examples=examples or []
)
self.schemas[name] = schema
return schema
def generate_annotation_guide(self, schema: LabelSchema) -> str:
"""生成标注指南"""
guide = f"# 标注指南:{schema.name}\n\n"
guide += f"## 任务类型:{schema.type.value}\n\n"
guide += "## 标签说明\n\n"
for label in schema.labels:
desc = schema.descriptions.get(label, "无描述")
guide += f"### {label}\n{desc}\n\n"
if schema.examples:
guide += "## 标注示例\n\n"
for i, example in enumerate(schema.examples, 1):
guide += f"### 示例 {i}\n"
guide += f"**内容**:{example.get('content', '')}\n"
guide += f"**标签**:{example.get('labels', [])}\n"
if 'notes' in example:
guide += f"**说明**:{example['notes']}\n"
guide += "\n"
guide += "## 注意事项\n\n"
guide += "1. 如果不确定,请标记为 `uncertain`\n"
guide += "2. 如果内容不完整或有歧义,请在备注中说明\n"
guide += "3. 保持标注一致性,参考已有标注\n"
return guide
def create_task(
self,
record_id: str,
content: str,
task_type: AnnotationType,
schema_name: str
) -> AnnotationTask:
"""创建标注任务"""
schema = self.schemas.get(schema_name)
if not schema:
raise ValueError(f"Schema {schema_name} not found")
task_id = self._generate_id(f"{record_id}_{task_type.value}")
instructions = self.generate_annotation_guide(schema)
task = AnnotationTask(
id=task_id,
record_id=record_id,
task_type=task_type,
content=content,
instructions=instructions,
labels=schema.labels
)
self.tasks[task_id] = task
return task
def submit_annotation(
self,
task_id: str,
annotator_id: str,
labels: List[str],
confidence: float = 1.0,
notes: str = None
) -> Annotation:
"""提交标注"""
annotation = Annotation(
task_id=task_id,
annotator_id=annotator_id,
labels=labels,
confidence=confidence,
notes=notes,
created_at=datetime.now()
)
self.annotations.append(annotation)
return annotation
def calculate_agreement(self, task_id: str) -> float:
"""计算标注一致性"""
task_annotations = [
a for a in self.annotations if a.task_id == task_id
]
if len(task_annotations) < 2:
return 1.0
# 简化版:计算标签交集比例
all_labels = [set(a.labels) for a in task_annotations]
intersection = set.intersection(*all_labels) if all_labels else set()
union = set.union(*all_labels) if all_labels else set()
if not union:
return 1.0
return len(intersection) / len(union)
def _generate_id(self, content: str) -> str:
"""生成唯一 ID"""
import hashlib
return hashlib.md5(content.encode()).hexdigest()[:16]
# 使用示例
manager = AnnotationManager()
# 创建情感分析规范
sentiment_schema = manager.create_schema(
name="sentiment_analysis",
task_type=AnnotationType.SENTIMENT,
labels=["positive", "negative", "neutral"],
descriptions={
"positive": "正面情感:表达满意、喜欢、赞扬等",
"negative": "负面情感:表达不满、厌恶、批评等",
"neutral": "中性情感:客观陈述,无情感倾向"
},
examples=[
{
"content": "这个产品非常好用,我很喜欢!",
"labels": ["positive"],
"notes": "明确的正面评价"
},
{
"content": "服务态度太差了,再也不来了。",
"labels": ["negative"],
"notes": "明确的负面评价"
},
{
"content": "今天天气晴朗,气温25度。",
"labels": ["neutral"],
"notes": "客观陈述,无情感倾向"
}
]
)
# 生成标注指南
guide = manager.generate_annotation_guide(sentiment_schema)
print(guide)多人标注与一致性
python
from typing import List, Dict, Tuple
from collections import Counter
import statistics
@dataclass
class AnnotationResult:
task_id: str
final_label: str
confidence: float
annotators: List[str]
agreement: float
disputes: List[Dict[str, Any]]
class AnnotationAggregator:
"""标注聚合器"""
def __init__(
self,
min_annotators: int = 3,
agreement_threshold: float = 0.7
):
self.min_annotators = min_annotators
self.agreement_threshold = agreement_threshold
def aggregate(
self,
annotations: List[Annotation]
) -> AnnotationResult:
"""聚合多个标注"""
if not annotations:
raise ValueError("No annotations to aggregate")
# 统计标签
label_counts = Counter()
for annotation in annotations:
for label in annotation.labels:
label_counts[label] += 1
# 计算一致性
total = len(annotations)
top_label, top_count = label_counts.most_common(1)[0]
agreement = top_count / total
# 计算置信度
confidence = statistics.mean(a.confidence for a in annotations) * agreement
# 找出争议
disputes = []
if agreement < self.agreement_threshold:
for annotation in annotations:
if annotation.labels != [top_label]:
disputes.append({
"annotator": annotation.annotator_id,
"labels": annotation.labels,
"notes": annotation.notes
})
return AnnotationResult(
task_id=annotations[0].task_id,
final_label=top_label,
confidence=confidence,
annotators=[a.annotator_id for a in annotations],
agreement=agreement,
disputes=disputes
)
def calculate_kappa(
self,
annotations1: List[Annotation],
annotations2: List[Annotation]
) -> float:
"""计算 Cohen's Kappa"""
# 构建混淆矩阵
all_labels = set()
for a in annotations1 + annotations2:
all_labels.update(a.labels)
label_list = sorted(all_labels)
n = len(label_list)
# 构建矩阵
matrix = [[0] * n for _ in range(n)]
for a1, a2 in zip(annotations1, annotations2):
for l1 in a1.labels:
for l2 in a2.labels:
i = label_list.index(l1)
j = label_list.index(l2)
matrix[i][j] += 1
# 计算 Kappa
total = sum(sum(row) for row in matrix)
if total == 0:
return 1.0
# 观察一致性
observed = sum(matrix[i][i] for i in range(n)) / total
# 期望一致性
row_sums = [sum(matrix[i]) for i in range(n)]
col_sums = [sum(matrix[i][j] for i in range(n)) for j in range(n)]
expected = sum(row_sums[i] * col_sums[i] for i in range(n)) / (total * total)
if expected == 1.0:
return 1.0
return (observed - expected) / (1 - expected)
# 使用示例
aggregator = AnnotationAggregator(min_annotators=3, agreement_threshold=0.7)
# 假设有多个标注
annotations = [
Annotation("task1", "annotator1", ["positive"], 0.9, "明确的正面情感"),
Annotation("task1", "annotator2", ["positive"], 0.8, "略有正面倾向"),
Annotation("task1", "annotator3", ["neutral"], 0.6, "不太确定"),
]
result = aggregator.aggregate(annotations)
print(f"最终标签: {result.final_label}")
print(f"一致性: {result.agreement:.2f}")
print(f"置信度: {result.confidence:.2f}")
print(f"争议: {result.disputes}")数据质量保证
数据验证
python
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
@dataclass
class ValidationResult:
is_valid: bool
errors: List[str]
warnings: List[str]
statistics: Dict[str, Any]
class DataValidator:
"""数据验证器"""
def __init__(self):
self.validators: Dict[str, Callable] = {}
self._init_default_validators()
def _init_default_validators(self):
"""初始化默认验证器"""
self.validators = {
"length": self._validate_length,
"format": self._validate_format,
"completeness": self._validate_completeness,
"consistency": self._validate_consistency,
"distribution": self._validate_distribution,
}
def validate(
self,
records: List[DataRecord],
validators: List[str] = None,
config: Dict[str, Any] = None
) -> ValidationResult:
"""验证数据集"""
if validators is None:
validators = list(self.validators.keys())
config = config or {}
errors = []
warnings = []
statistics = {}
for validator_name in validators:
if validator_name in self.validators:
result = self.validators[validator_name](records, config)
errors.extend(result.get("errors", []))
warnings.extend(result.get("warnings", []))
statistics.update(result.get("statistics", {}))
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
statistics=statistics
)
def _validate_length(
self,
records: List[DataRecord],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""验证长度"""
errors = []
warnings = []
statistics = {}
min_length = config.get("min_length", 10)
max_length = config.get("max_length", 10000)
lengths = [len(r.content) for r in records]
statistics["length_min"] = min(lengths)
statistics["length_max"] = max(lengths)
statistics["length_mean"] = sum(lengths) / len(lengths)
too_short = sum(1 for l in lengths if l < min_length)
too_long = sum(1 for l in lengths if l > max_length)
if too_short > 0:
errors.append(f"{too_short} 条记录长度小于 {min_length}")
if too_long > 0:
warnings.append(f"{too_long} 条记录长度大于 {max_length}")
return {"errors": errors, "warnings": warnings, "statistics": statistics}
def _validate_format(
self,
records: List[DataRecord],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""验证格式"""
errors = []
warnings = []
statistics = {}
# 检查编码
invalid_encoding = 0
for record in records:
try:
record.content.encode('utf-8')
except UnicodeEncodeError:
invalid_encoding += 1
if invalid_encoding > 0:
errors.append(f"{invalid_encoding} 条记录编码无效")
statistics["invalid_encoding"] = invalid_encoding
return {"errors": errors, "warnings": warnings, "statistics": statistics}
def _validate_completeness(
self,
records: List[DataRecord],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""验证完整性"""
errors = []
warnings = []
statistics = {}
# 检查空记录
empty_records = sum(1 for r in records if not r.content.strip())
# 检查缺失标签
missing_labels = sum(1 for r in records if r.label is None)
if empty_records > 0:
errors.append(f"{empty_records} 条记录为空")
if missing_labels > 0:
warnings.append(f"{missing_labels} 条记录缺少标签")
statistics["empty_records"] = empty_records
statistics["missing_labels"] = missing_labels
return {"errors": errors, "warnings": warnings, "statistics": statistics}
def _validate_consistency(
self,
records: List[DataRecord],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""验证一致性"""
errors = []
warnings = []
statistics = {}
# 检查标签分布
label_counts = Counter(r.label for r in records if r.label)
if label_counts:
total = sum(label_counts.values())
for label, count in label_counts.items():
ratio = count / total
if ratio < 0.01:
warnings.append(f"标签 '{label}' 占比过低: {ratio:.2%}")
statistics["label_distribution"] = dict(label_counts)
return {"errors": errors, "warnings": warnings, "statistics": statistics}
def _validate_distribution(
self,
records: List[DataRecord],
config: Dict[str, Any]
) -> Dict[str, Any]:
"""验证分布"""
errors = []
warnings = []
statistics = {}
# 检查时间分布
timestamps = [r.created_at for r in records]
if timestamps:
timestamps.sort()
time_diffs = [(timestamps[i+1] - timestamps[i]).total_seconds()
for i in range(len(timestamps)-1)]
if time_diffs:
statistics["time_gap_min"] = min(time_diffs)
statistics["time_gap_max"] = max(time_diffs)
statistics["time_gap_mean"] = sum(time_diffs) / len(time_diffs)
# 检查来源分布
source_counts = Counter(r.source_id for r in records)
statistics["source_distribution"] = dict(source_counts)
return {"errors": errors, "warnings": warnings, "statistics": statistics}
# 使用示例
validator = DataValidator()
config = {
"min_length": 10,
"max_length": 5000
}
result = validator.validate(records, config=config)
print(f"验证通过: {result.is_valid}")
print(f"错误: {result.errors}")
print(f"警告: {result.warnings}")
print(f"统计: {result.statistics}")数据增强
python
from typing import List, Dict, Any, Callable
import random
class DataAugmenter:
"""数据增强器"""
def __init__(self):
self.augmentations: Dict[str, Callable] = {}
self._init_default_augmentations()
def _init_default_augmentations(self):
"""初始化默认增强方法"""
self.augmentations = {
"synonym_replacement": self._synonym_replacement,
"random_insertion": self._random_insertion,
"random_deletion": self._random_deletion,
"random_swap": self._random_swap,
"back_translation": self._back_translation,
}
def augment(
self,
text: str,
methods: List[str] = None,
n_augments: int = 1
) -> List[str]:
"""增强数据"""
if methods is None:
methods = list(self.augmentations.keys())
results = []
for _ in range(n_augments):
augmented = text
for method in methods:
if method in self.augmentations and random.random() < 0.5:
augmented = self.augmentations[method](augmented)
results.append(augmented)
return results
def _synonym_replacement(self, text: str, n: int = 1) -> str:
"""同义词替换"""
# 简化实现
synonyms = {
"好": ["优秀", "出色", "棒"],
"坏": ["差", "糟糕", "不好"],
"大": ["巨大", "庞大", "大型"],
"小": ["微小", "小型", "微小"],
}
words = list(text)
for i in range(min(n, len(words))):
if words[i] in synonyms:
words[i] = random.choice(synonyms[words[i]])
return ''.join(words)
def _random_insertion(self, text: str, n: int = 1) -> str:
"""随机插入"""
words = list(text)
for _ in range(n):
pos = random.randint(0, len(words))
words.insert(pos, random.choice(["很", "非常", "特别"]))
return ''.join(words)
def _random_deletion(self, text: str, p: float = 0.1) -> str:
"""随机删除"""
words = list(text)
if len(words) <= 1:
return text
new_words = [w for w in words if random.random() > p]
return ''.join(new_words) if new_words else text
def _random_swap(self, text: str, n: int = 1) -> str:
"""随机交换"""
words = list(text)
if len(words) < 2:
return text
for _ in range(n):
idx1 = random.randint(0, len(words) - 1)
idx2 = random.randint(0, len(words) - 1)
words[idx1], words[idx2] = words[idx2], words[idx1]
return ''.join(words)
def _back_translation(self, text: str) -> str:
"""回译(需要翻译 API)"""
# 简化实现,实际需要调用翻译 API
return text
def balance_dataset(
self,
records: List[DataRecord],
strategy: str = "oversample"
) -> List[DataRecord]:
"""平衡数据集"""
# 统计每个标签的数量
label_counts = Counter(r.label for r in records)
if strategy == "oversample":
# 过采样:复制少数类
max_count = max(label_counts.values())
balanced = list(records)
for label, count in label_counts.items():
if count < max_count:
label_records = [r for r in records if r.label == label]
to_add = max_count - count
balanced.extend(random.choices(label_records, k=to_add))
return balanced
elif strategy == "undersample":
# 欠采样:减少多数类
min_count = min(label_counts.values())
balanced = []
for label in label_counts:
label_records = [r for r in records if r.label == label]
balanced.extend(random.sample(label_records, min_count))
return balanced
return records
# 使用示例
augmenter = DataAugmenter()
# 数据增强
text = "这个产品很好用"
augmented = augmenter.augment(text, n_augments=3)
for i, aug in enumerate(augmented, 1):
print(f"增强 {i}: {aug}")
# 平衡数据集
balanced_records = augmenter.balance_dataset(records, strategy="oversample")数据版本管理
python
from dataclasses import dataclass
from typing import Dict, Any, Optional
import json
import hashlib
@dataclass
class DataVersion:
version: str
description: str
created_at: datetime
records_count: int
checksum: str
metadata: Dict[str, Any]
class DataVersionManager:
"""数据版本管理器"""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.versions: Dict[str, DataVersion] = {}
def create_version(
self,
records: List[DataRecord],
description: str = "",
metadata: Dict[str, Any] = None
) -> DataVersion:
"""创建新版本"""
# 生成版本号
version = self._generate_version()
# 计算校验和
checksum = self._calculate_checksum(records)
# 创建版本
data_version = DataVersion(
version=version,
description=description,
created_at=datetime.now(),
records_count=len(records),
checksum=checksum,
metadata=metadata or {}
)
# 保存数据
self._save_version_data(version, records)
# 记录版本
self.versions[version] = data_version
return data_version
def get_version(self, version: str) -> List[DataRecord]:
"""获取指定版本的数据"""
if version not in self.versions:
raise ValueError(f"Version {version} not found")
return self._load_version_data(version)
def list_versions(self) -> List[DataVersion]:
"""列出所有版本"""
return sorted(self.versions.values(), key=lambda v: v.created_at, reverse=True)
def compare_versions(
self,
version1: str,
version2: str
) -> Dict[str, Any]:
"""比较两个版本"""
v1 = self.versions.get(version1)
v2 = self.versions.get(version2)
if not v1 or not v2:
raise ValueError("Version not found")
return {
"version1": version1,
"version2": version2,
"records_diff": v2.records_count - v1.records_count,
"time_diff": (v2.created_at - v1.created_at).total_seconds(),
"checksum_match": v1.checksum == v2.checksum
}
def _generate_version(self) -> str:
"""生成版本号"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
count = len(self.versions) + 1
return f"v{timestamp}_{count:03d}"
def _calculate_checksum(self, records: List[DataRecord]) -> str:
"""计算校验和"""
content = json.dumps([r.content for r in records], sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def _save_version_data(self, version: str, records: List[DataRecord]):
"""保存版本数据"""
# 实现保存逻辑
pass
def _load_version_data(self, version: str) -> List[DataRecord]:
"""加载版本数据"""
# 实现加载逻辑
pass
# 使用示例
version_manager = DataVersionManager("/data/versions")
# 创建新版本
version = version_manager.create_version(
records,
description="初始数据集",
metadata={"source": "web_crawl", "date": "2026-03-08"}
)
print(f"版本: {version.version}")
print(f"记录数: {version.records_count}")
print(f"校验和: {version.checksum}")最佳实践总结
数据处理检查清单
markdown
## 数据处理检查清单
### 数据采集
- [ ] 数据来源清晰记录
- [ ] 数据格式统一
- [ ] 数据量充足
- [ ] 数据质量初步评估
### 数据清洗
- [ ] 移除重复数据
- [ ] 处理缺失值
- [ ] 移除异常数据
- [ ] 统一格式编码
### 数据标注
- [ ] 标注规范清晰
- [ ] 标注指南完善
- [ ] 多人标注
- [ ] 一致性检验
### 数据验证
- [ ] 长度分布检查
- [ ] 格式正确性
- [ ] 完整性检查
- [ ] 分布均匀性
### 数据版本
- [ ] 版本号规范
- [ ] 变更记录
- [ ] 校验和验证
- [ ] 回滚机制