Skip to content

数据处理与标注最佳实践

本文档整理 AI 系统数据处理和标注的最佳实践。

数据处理流程

txt
┌─────────────────────────────────────────────────────┐
│                   数据处理流程                       │
├─────────────────────────────────────────────────────┤
│                                                     │
│  数据采集 ──→ 数据清洗 ──→ 数据标注 ──→ 数据验证    │
│     │            │            │            │        │
│     ↓            ↓            ↓            ↓        │
│  格式统一     去重脱敏    标注规范    质量检查        │
│  来源记录     异常处理    标注指南    一致性验证      │
│  版本控制     数据增强    多人标注    偏差检测        │
│                                                     │
│              ↓ 数据版本管理 ↓                       │
│                                                     │
│  训练集 ──────→ 验证集 ──────→ 测试集              │
│   70%            15%            15%                 │
│                                                     │
└─────────────────────────────────────────────────────┘

数据采集

数据来源管理

python
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime
import hashlib
import json

@dataclass
class DataSource:
    id: str
    name: str
    type: str  # "api", "file", "database", "web"
    url: Optional[str]
    description: str
    created_at: datetime
    updated_at: datetime
    metadata: Dict[str, Any]

@dataclass
class DataRecord:
    id: str
    source_id: str
    content: str
    label: Optional[str]
    metadata: Dict[str, Any]
    created_at: datetime
    version: int

class DataCollector:
    """数据采集器"""

    def __init__(self):
        self.sources: Dict[str, DataSource] = {}
        self.records: List[DataRecord] = []

    def register_source(
        self,
        name: str,
        type: str,
        url: Optional[str] = None,
        description: str = "",
        metadata: Dict[str, Any] = None
    ) -> str:
        """注册数据源"""
        source_id = self._generate_id(f"{name}_{type}")

        source = DataSource(
            id=source_id,
            name=name,
            type=type,
            url=url,
            description=description,
            created_at=datetime.now(),
            updated_at=datetime.now(),
            metadata=metadata or {}
        )

        self.sources[source_id] = source
        return source_id

    def collect_from_api(
        self,
        source_id: str,
        endpoint: str,
        params: Dict[str, Any] = None
    ) -> List[DataRecord]:
        """从 API 采集数据"""
        # 实现 API 采集
        pass

    def collect_from_file(
        self,
        source_id: str,
        file_path: str,
        format: str = "json"
    ) -> List[DataRecord]:
        """从文件采集数据"""
        records = []

        with open(file_path, 'r', encoding='utf-8') as f:
            if format == "json":
                data = json.load(f)
                if isinstance(data, list):
                    for item in data:
                        record = DataRecord(
                            id=self._generate_id(str(item)),
                            source_id=source_id,
                            content=json.dumps(item),
                            label=item.get('label'),
                            metadata=item,
                            created_at=datetime.now(),
                            version=1
                        )
                        records.append(record)

        self.records.extend(records)
        return records

    def collect_from_database(
        self,
        source_id: str,
        query: str,
        connection_string: str
    ) -> List[DataRecord]:
        """从数据库采集数据"""
        # 实现数据库采集
        pass

    def _generate_id(self, content: str) -> str:
        """生成唯一 ID"""
        return hashlib.md5(content.encode()).hexdigest()[:16]

数据清洗

python
from typing import List, Callable, Any
from dataclasses import dataclass
import re

@dataclass
class CleaningRule:
    name: str
    description: str
    function: Callable[[str], str]

class DataCleaner:
    """数据清洗器"""

    def __init__(self):
        self.rules: List[CleaningRule] = []
        self._init_default_rules()

    def _init_default_rules(self):
        """初始化默认清洗规则"""
        self.rules = [
            CleaningRule(
                name="remove_extra_whitespace",
                description="移除多余空白",
                function=self._remove_extra_whitespace
            ),
            CleaningRule(
                name="normalize_unicode",
                description="统一 Unicode 字符",
                function=self._normalize_unicode
            ),
            CleaningRule(
                name="remove_html_tags",
                description="移除 HTML 标签",
                function=self._remove_html_tags
            ),
            CleaningRule(
                name="normalize_punctuation",
                description="统一标点符号",
                function=self._normalize_punctuation
            ),
        ]

    def clean(self, text: str, rules: List[str] = None) -> str:
        """清洗文本"""
        if rules is None:
            rules = [r.name for r in self.rules]

        for rule in self.rules:
            if rule.name in rules:
                text = rule.function(text)

        return text

    def _remove_extra_whitespace(self, text: str) -> str:
        """移除多余空白"""
        text = re.sub(r' +', ' ', text)
        text = re.sub(r'\n+', '\n', text)
        text = re.sub(r'\t+', '\t', text)
        return text.strip()

    def _normalize_unicode(self, text: str) -> str:
        """统一 Unicode 字符"""
        import unicodedata
        return unicodedata.normalize('NFKC', text)

    def _remove_html_tags(self, text: str) -> str:
        """移除 HTML 标签"""
        return re.sub(r'<[^>]+>', '', text)

    def _normalize_punctuation(self, text: str) -> str:
        """统一标点符号"""
        replacements = {
            ',': ',',
            '。': '.',
            '!': '!',
            '?': '?',
            ';': ';',
            ':': ':',
            '"': '"',
            '"': '"',
            ''': "'",
            ''': "'",
        }
        for old, new in replacements.items():
            text = text.replace(old, new)
        return text

    def add_rule(self, rule: CleaningRule):
        """添加清洗规则"""
        self.rules.append(rule)

    def remove_duplicates(self, records: List[DataRecord]) -> List[DataRecord]:
        """移除重复记录"""
        seen = set()
        unique = []

        for record in records:
            content_hash = hashlib.md5(record.content.encode()).hexdigest()
            if content_hash not in seen:
                seen.add(content_hash)
                unique.append(record)

        return unique

    def detect_anomalies(
        self,
        records: List[DataRecord],
        threshold: float = 3.0
    ) -> List[DataRecord]:
        """检测异常记录"""
        lengths = [len(r.content) for r in records]
        mean = sum(lengths) / len(lengths)
        std = (sum((l - mean) ** 2 for l in lengths) / len(lengths)) ** 0.5

        anomalies = []
        for record in records:
            z_score = abs(len(record.content) - mean) / std if std > 0 else 0
            if z_score > threshold:
                anomalies.append(record)

        return anomalies

# 使用示例
cleaner = DataCleaner()

# 添加自定义规则
cleaner.add_rule(CleaningRule(
    name="remove_emails",
    description="移除邮箱地址",
    function=lambda text: re.sub(r'\S+@\S+', '[EMAIL]', text)
))

# 清洗数据
text = "这是  一个  测试文本。"
cleaned = cleaner.clean(text)
print(cleaned)

数据标注

标注规范设计

python
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class AnnotationType(Enum):
    CLASSIFICATION = "classification"
    NER = "ner"  # 命名实体识别
    SENTIMENT = "sentiment"
    QA = "qa"
    SUMMARIZATION = "summarization"
    TRANSLATION = "translation"

@dataclass
class LabelSchema:
    name: str
    type: AnnotationType
    labels: List[str]
    descriptions: Dict[str, str]
    examples: List[Dict[str, Any]]

@dataclass
class AnnotationTask:
    id: str
    record_id: str
    task_type: AnnotationType
    content: str
    instructions: str
    labels: Optional[List[str]] = None
    metadata: Dict[str, Any] = None

@dataclass
class Annotation:
    task_id: str
    annotator_id: str
    labels: List[str]
    confidence: float
    notes: Optional[str] = None
    created_at: datetime = None

class AnnotationManager:
    """标注管理器"""

    def __init__(self):
        self.schemas: Dict[str, LabelSchema] = {}
        self.tasks: Dict[str, AnnotationTask] = {}
        self.annotations: List[Annotation] = []

    def create_schema(
        self,
        name: str,
        task_type: AnnotationType,
        labels: List[str],
        descriptions: Dict[str, str] = None,
        examples: List[Dict[str, Any]] = None
    ) -> LabelSchema:
        """创建标注规范"""
        schema = LabelSchema(
            name=name,
            type=task_type,
            labels=labels,
            descriptions=descriptions or {},
            examples=examples or []
        )
        self.schemas[name] = schema
        return schema

    def generate_annotation_guide(self, schema: LabelSchema) -> str:
        """生成标注指南"""
        guide = f"# 标注指南:{schema.name}\n\n"
        guide += f"## 任务类型:{schema.type.value}\n\n"

        guide += "## 标签说明\n\n"
        for label in schema.labels:
            desc = schema.descriptions.get(label, "无描述")
            guide += f"### {label}\n{desc}\n\n"

        if schema.examples:
            guide += "## 标注示例\n\n"
            for i, example in enumerate(schema.examples, 1):
                guide += f"### 示例 {i}\n"
                guide += f"**内容**:{example.get('content', '')}\n"
                guide += f"**标签**:{example.get('labels', [])}\n"
                if 'notes' in example:
                    guide += f"**说明**:{example['notes']}\n"
                guide += "\n"

        guide += "## 注意事项\n\n"
        guide += "1. 如果不确定,请标记为 `uncertain`\n"
        guide += "2. 如果内容不完整或有歧义,请在备注中说明\n"
        guide += "3. 保持标注一致性,参考已有标注\n"

        return guide

    def create_task(
        self,
        record_id: str,
        content: str,
        task_type: AnnotationType,
        schema_name: str
    ) -> AnnotationTask:
        """创建标注任务"""
        schema = self.schemas.get(schema_name)
        if not schema:
            raise ValueError(f"Schema {schema_name} not found")

        task_id = self._generate_id(f"{record_id}_{task_type.value}")
        instructions = self.generate_annotation_guide(schema)

        task = AnnotationTask(
            id=task_id,
            record_id=record_id,
            task_type=task_type,
            content=content,
            instructions=instructions,
            labels=schema.labels
        )

        self.tasks[task_id] = task
        return task

    def submit_annotation(
        self,
        task_id: str,
        annotator_id: str,
        labels: List[str],
        confidence: float = 1.0,
        notes: str = None
    ) -> Annotation:
        """提交标注"""
        annotation = Annotation(
            task_id=task_id,
            annotator_id=annotator_id,
            labels=labels,
            confidence=confidence,
            notes=notes,
            created_at=datetime.now()
        )
        self.annotations.append(annotation)
        return annotation

    def calculate_agreement(self, task_id: str) -> float:
        """计算标注一致性"""
        task_annotations = [
            a for a in self.annotations if a.task_id == task_id
        ]

        if len(task_annotations) < 2:
            return 1.0

        # 简化版:计算标签交集比例
        all_labels = [set(a.labels) for a in task_annotations]
        intersection = set.intersection(*all_labels) if all_labels else set()
        union = set.union(*all_labels) if all_labels else set()

        if not union:
            return 1.0

        return len(intersection) / len(union)

    def _generate_id(self, content: str) -> str:
        """生成唯一 ID"""
        import hashlib
        return hashlib.md5(content.encode()).hexdigest()[:16]

# 使用示例
manager = AnnotationManager()

# 创建情感分析规范
sentiment_schema = manager.create_schema(
    name="sentiment_analysis",
    task_type=AnnotationType.SENTIMENT,
    labels=["positive", "negative", "neutral"],
    descriptions={
        "positive": "正面情感:表达满意、喜欢、赞扬等",
        "negative": "负面情感:表达不满、厌恶、批评等",
        "neutral": "中性情感:客观陈述,无情感倾向"
    },
    examples=[
        {
            "content": "这个产品非常好用,我很喜欢!",
            "labels": ["positive"],
            "notes": "明确的正面评价"
        },
        {
            "content": "服务态度太差了,再也不来了。",
            "labels": ["negative"],
            "notes": "明确的负面评价"
        },
        {
            "content": "今天天气晴朗,气温25度。",
            "labels": ["neutral"],
            "notes": "客观陈述,无情感倾向"
        }
    ]
)

# 生成标注指南
guide = manager.generate_annotation_guide(sentiment_schema)
print(guide)

多人标注与一致性

python
from typing import List, Dict, Tuple
from collections import Counter
import statistics

@dataclass
class AnnotationResult:
    task_id: str
    final_label: str
    confidence: float
    annotators: List[str]
    agreement: float
    disputes: List[Dict[str, Any]]

class AnnotationAggregator:
    """标注聚合器"""

    def __init__(
        self,
        min_annotators: int = 3,
        agreement_threshold: float = 0.7
    ):
        self.min_annotators = min_annotators
        self.agreement_threshold = agreement_threshold

    def aggregate(
        self,
        annotations: List[Annotation]
    ) -> AnnotationResult:
        """聚合多个标注"""
        if not annotations:
            raise ValueError("No annotations to aggregate")

        # 统计标签
        label_counts = Counter()
        for annotation in annotations:
            for label in annotation.labels:
                label_counts[label] += 1

        # 计算一致性
        total = len(annotations)
        top_label, top_count = label_counts.most_common(1)[0]
        agreement = top_count / total

        # 计算置信度
        confidence = statistics.mean(a.confidence for a in annotations) * agreement

        # 找出争议
        disputes = []
        if agreement < self.agreement_threshold:
            for annotation in annotations:
                if annotation.labels != [top_label]:
                    disputes.append({
                        "annotator": annotation.annotator_id,
                        "labels": annotation.labels,
                        "notes": annotation.notes
                    })

        return AnnotationResult(
            task_id=annotations[0].task_id,
            final_label=top_label,
            confidence=confidence,
            annotators=[a.annotator_id for a in annotations],
            agreement=agreement,
            disputes=disputes
        )

    def calculate_kappa(
        self,
        annotations1: List[Annotation],
        annotations2: List[Annotation]
    ) -> float:
        """计算 Cohen's Kappa"""
        # 构建混淆矩阵
        all_labels = set()
        for a in annotations1 + annotations2:
            all_labels.update(a.labels)

        label_list = sorted(all_labels)
        n = len(label_list)

        # 构建矩阵
        matrix = [[0] * n for _ in range(n)]

        for a1, a2 in zip(annotations1, annotations2):
            for l1 in a1.labels:
                for l2 in a2.labels:
                    i = label_list.index(l1)
                    j = label_list.index(l2)
                    matrix[i][j] += 1

        # 计算 Kappa
        total = sum(sum(row) for row in matrix)
        if total == 0:
            return 1.0

        # 观察一致性
        observed = sum(matrix[i][i] for i in range(n)) / total

        # 期望一致性
        row_sums = [sum(matrix[i]) for i in range(n)]
        col_sums = [sum(matrix[i][j] for i in range(n)) for j in range(n)]
        expected = sum(row_sums[i] * col_sums[i] for i in range(n)) / (total * total)

        if expected == 1.0:
            return 1.0

        return (observed - expected) / (1 - expected)

# 使用示例
aggregator = AnnotationAggregator(min_annotators=3, agreement_threshold=0.7)

# 假设有多个标注
annotations = [
    Annotation("task1", "annotator1", ["positive"], 0.9, "明确的正面情感"),
    Annotation("task1", "annotator2", ["positive"], 0.8, "略有正面倾向"),
    Annotation("task1", "annotator3", ["neutral"], 0.6, "不太确定"),
]

result = aggregator.aggregate(annotations)
print(f"最终标签: {result.final_label}")
print(f"一致性: {result.agreement:.2f}")
print(f"置信度: {result.confidence:.2f}")
print(f"争议: {result.disputes}")

数据质量保证

数据验证

python
from typing import List, Dict, Any, Callable
from dataclasses import dataclass

@dataclass
class ValidationResult:
    is_valid: bool
    errors: List[str]
    warnings: List[str]
    statistics: Dict[str, Any]

class DataValidator:
    """数据验证器"""

    def __init__(self):
        self.validators: Dict[str, Callable] = {}
        self._init_default_validators()

    def _init_default_validators(self):
        """初始化默认验证器"""
        self.validators = {
            "length": self._validate_length,
            "format": self._validate_format,
            "completeness": self._validate_completeness,
            "consistency": self._validate_consistency,
            "distribution": self._validate_distribution,
        }

    def validate(
        self,
        records: List[DataRecord],
        validators: List[str] = None,
        config: Dict[str, Any] = None
    ) -> ValidationResult:
        """验证数据集"""
        if validators is None:
            validators = list(self.validators.keys())

        config = config or {}
        errors = []
        warnings = []
        statistics = {}

        for validator_name in validators:
            if validator_name in self.validators:
                result = self.validators[validator_name](records, config)
                errors.extend(result.get("errors", []))
                warnings.extend(result.get("warnings", []))
                statistics.update(result.get("statistics", {}))

        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
            statistics=statistics
        )

    def _validate_length(
        self,
        records: List[DataRecord],
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """验证长度"""
        errors = []
        warnings = []
        statistics = {}

        min_length = config.get("min_length", 10)
        max_length = config.get("max_length", 10000)

        lengths = [len(r.content) for r in records]

        statistics["length_min"] = min(lengths)
        statistics["length_max"] = max(lengths)
        statistics["length_mean"] = sum(lengths) / len(lengths)

        too_short = sum(1 for l in lengths if l < min_length)
        too_long = sum(1 for l in lengths if l > max_length)

        if too_short > 0:
            errors.append(f"{too_short} 条记录长度小于 {min_length}")

        if too_long > 0:
            warnings.append(f"{too_long} 条记录长度大于 {max_length}")

        return {"errors": errors, "warnings": warnings, "statistics": statistics}

    def _validate_format(
        self,
        records: List[DataRecord],
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """验证格式"""
        errors = []
        warnings = []
        statistics = {}

        # 检查编码
        invalid_encoding = 0
        for record in records:
            try:
                record.content.encode('utf-8')
            except UnicodeEncodeError:
                invalid_encoding += 1

        if invalid_encoding > 0:
            errors.append(f"{invalid_encoding} 条记录编码无效")

        statistics["invalid_encoding"] = invalid_encoding

        return {"errors": errors, "warnings": warnings, "statistics": statistics}

    def _validate_completeness(
        self,
        records: List[DataRecord],
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """验证完整性"""
        errors = []
        warnings = []
        statistics = {}

        # 检查空记录
        empty_records = sum(1 for r in records if not r.content.strip())

        # 检查缺失标签
        missing_labels = sum(1 for r in records if r.label is None)

        if empty_records > 0:
            errors.append(f"{empty_records} 条记录为空")

        if missing_labels > 0:
            warnings.append(f"{missing_labels} 条记录缺少标签")

        statistics["empty_records"] = empty_records
        statistics["missing_labels"] = missing_labels

        return {"errors": errors, "warnings": warnings, "statistics": statistics}

    def _validate_consistency(
        self,
        records: List[DataRecord],
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """验证一致性"""
        errors = []
        warnings = []
        statistics = {}

        # 检查标签分布
        label_counts = Counter(r.label for r in records if r.label)

        if label_counts:
            total = sum(label_counts.values())
            for label, count in label_counts.items():
                ratio = count / total
                if ratio < 0.01:
                    warnings.append(f"标签 '{label}' 占比过低: {ratio:.2%}")

        statistics["label_distribution"] = dict(label_counts)

        return {"errors": errors, "warnings": warnings, "statistics": statistics}

    def _validate_distribution(
        self,
        records: List[DataRecord],
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """验证分布"""
        errors = []
        warnings = []
        statistics = {}

        # 检查时间分布
        timestamps = [r.created_at for r in records]
        if timestamps:
            timestamps.sort()
            time_diffs = [(timestamps[i+1] - timestamps[i]).total_seconds()
                         for i in range(len(timestamps)-1)]

            if time_diffs:
                statistics["time_gap_min"] = min(time_diffs)
                statistics["time_gap_max"] = max(time_diffs)
                statistics["time_gap_mean"] = sum(time_diffs) / len(time_diffs)

        # 检查来源分布
        source_counts = Counter(r.source_id for r in records)
        statistics["source_distribution"] = dict(source_counts)

        return {"errors": errors, "warnings": warnings, "statistics": statistics}

# 使用示例
validator = DataValidator()

config = {
    "min_length": 10,
    "max_length": 5000
}

result = validator.validate(records, config=config)

print(f"验证通过: {result.is_valid}")
print(f"错误: {result.errors}")
print(f"警告: {result.warnings}")
print(f"统计: {result.statistics}")

数据增强

python
from typing import List, Dict, Any, Callable
import random

class DataAugmenter:
    """数据增强器"""

    def __init__(self):
        self.augmentations: Dict[str, Callable] = {}
        self._init_default_augmentations()

    def _init_default_augmentations(self):
        """初始化默认增强方法"""
        self.augmentations = {
            "synonym_replacement": self._synonym_replacement,
            "random_insertion": self._random_insertion,
            "random_deletion": self._random_deletion,
            "random_swap": self._random_swap,
            "back_translation": self._back_translation,
        }

    def augment(
        self,
        text: str,
        methods: List[str] = None,
        n_augments: int = 1
    ) -> List[str]:
        """增强数据"""
        if methods is None:
            methods = list(self.augmentations.keys())

        results = []
        for _ in range(n_augments):
            augmented = text
            for method in methods:
                if method in self.augmentations and random.random() < 0.5:
                    augmented = self.augmentations[method](augmented)
            results.append(augmented)

        return results

    def _synonym_replacement(self, text: str, n: int = 1) -> str:
        """同义词替换"""
        # 简化实现
        synonyms = {
            "好": ["优秀", "出色", "棒"],
            "坏": ["差", "糟糕", "不好"],
            "大": ["巨大", "庞大", "大型"],
            "小": ["微小", "小型", "微小"],
        }

        words = list(text)
        for i in range(min(n, len(words))):
            if words[i] in synonyms:
                words[i] = random.choice(synonyms[words[i]])

        return ''.join(words)

    def _random_insertion(self, text: str, n: int = 1) -> str:
        """随机插入"""
        words = list(text)
        for _ in range(n):
            pos = random.randint(0, len(words))
            words.insert(pos, random.choice(["很", "非常", "特别"]))
        return ''.join(words)

    def _random_deletion(self, text: str, p: float = 0.1) -> str:
        """随机删除"""
        words = list(text)
        if len(words) <= 1:
            return text

        new_words = [w for w in words if random.random() > p]
        return ''.join(new_words) if new_words else text

    def _random_swap(self, text: str, n: int = 1) -> str:
        """随机交换"""
        words = list(text)
        if len(words) < 2:
            return text

        for _ in range(n):
            idx1 = random.randint(0, len(words) - 1)
            idx2 = random.randint(0, len(words) - 1)
            words[idx1], words[idx2] = words[idx2], words[idx1]

        return ''.join(words)

    def _back_translation(self, text: str) -> str:
        """回译(需要翻译 API)"""
        # 简化实现,实际需要调用翻译 API
        return text

    def balance_dataset(
        self,
        records: List[DataRecord],
        strategy: str = "oversample"
    ) -> List[DataRecord]:
        """平衡数据集"""
        # 统计每个标签的数量
        label_counts = Counter(r.label for r in records)

        if strategy == "oversample":
            # 过采样:复制少数类
            max_count = max(label_counts.values())
            balanced = list(records)

            for label, count in label_counts.items():
                if count < max_count:
                    label_records = [r for r in records if r.label == label]
                    to_add = max_count - count
                    balanced.extend(random.choices(label_records, k=to_add))

            return balanced

        elif strategy == "undersample":
            # 欠采样:减少多数类
            min_count = min(label_counts.values())
            balanced = []

            for label in label_counts:
                label_records = [r for r in records if r.label == label]
                balanced.extend(random.sample(label_records, min_count))

            return balanced

        return records

# 使用示例
augmenter = DataAugmenter()

# 数据增强
text = "这个产品很好用"
augmented = augmenter.augment(text, n_augments=3)
for i, aug in enumerate(augmented, 1):
    print(f"增强 {i}: {aug}")

# 平衡数据集
balanced_records = augmenter.balance_dataset(records, strategy="oversample")

数据版本管理

python
from dataclasses import dataclass
from typing import Dict, Any, Optional
import json
import hashlib

@dataclass
class DataVersion:
    version: str
    description: str
    created_at: datetime
    records_count: int
    checksum: str
    metadata: Dict[str, Any]

class DataVersionManager:
    """数据版本管理器"""

    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.versions: Dict[str, DataVersion] = {}

    def create_version(
        self,
        records: List[DataRecord],
        description: str = "",
        metadata: Dict[str, Any] = None
    ) -> DataVersion:
        """创建新版本"""
        # 生成版本号
        version = self._generate_version()

        # 计算校验和
        checksum = self._calculate_checksum(records)

        # 创建版本
        data_version = DataVersion(
            version=version,
            description=description,
            created_at=datetime.now(),
            records_count=len(records),
            checksum=checksum,
            metadata=metadata or {}
        )

        # 保存数据
        self._save_version_data(version, records)

        # 记录版本
        self.versions[version] = data_version

        return data_version

    def get_version(self, version: str) -> List[DataRecord]:
        """获取指定版本的数据"""
        if version not in self.versions:
            raise ValueError(f"Version {version} not found")

        return self._load_version_data(version)

    def list_versions(self) -> List[DataVersion]:
        """列出所有版本"""
        return sorted(self.versions.values(), key=lambda v: v.created_at, reverse=True)

    def compare_versions(
        self,
        version1: str,
        version2: str
    ) -> Dict[str, Any]:
        """比较两个版本"""
        v1 = self.versions.get(version1)
        v2 = self.versions.get(version2)

        if not v1 or not v2:
            raise ValueError("Version not found")

        return {
            "version1": version1,
            "version2": version2,
            "records_diff": v2.records_count - v1.records_count,
            "time_diff": (v2.created_at - v1.created_at).total_seconds(),
            "checksum_match": v1.checksum == v2.checksum
        }

    def _generate_version(self) -> str:
        """生成版本号"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        count = len(self.versions) + 1
        return f"v{timestamp}_{count:03d}"

    def _calculate_checksum(self, records: List[DataRecord]) -> str:
        """计算校验和"""
        content = json.dumps([r.content for r in records], sort_keys=True)
        return hashlib.md5(content.encode()).hexdigest()

    def _save_version_data(self, version: str, records: List[DataRecord]):
        """保存版本数据"""
        # 实现保存逻辑
        pass

    def _load_version_data(self, version: str) -> List[DataRecord]:
        """加载版本数据"""
        # 实现加载逻辑
        pass

# 使用示例
version_manager = DataVersionManager("/data/versions")

# 创建新版本
version = version_manager.create_version(
    records,
    description="初始数据集",
    metadata={"source": "web_crawl", "date": "2026-03-08"}
)

print(f"版本: {version.version}")
print(f"记录数: {version.records_count}")
print(f"校验和: {version.checksum}")

最佳实践总结

数据处理检查清单

markdown
## 数据处理检查清单

### 数据采集

- [ ] 数据来源清晰记录
- [ ] 数据格式统一
- [ ] 数据量充足
- [ ] 数据质量初步评估

### 数据清洗

- [ ] 移除重复数据
- [ ] 处理缺失值
- [ ] 移除异常数据
- [ ] 统一格式编码

### 数据标注

- [ ] 标注规范清晰
- [ ] 标注指南完善
- [ ] 多人标注
- [ ] 一致性检验

### 数据验证

- [ ] 长度分布检查
- [ ] 格式正确性
- [ ] 完整性检查
- [ ] 分布均匀性

### 数据版本

- [ ] 版本号规范
- [ ] 变更记录
- [ ] 校验和验证
- [ ] 回滚机制