完成本实验后,你应能够:
你正在一家电商平台的数据治理组实习。团队准备整合两份外部商品目录,建设统一的商品目录底表,用于搜索、推荐、库存整合和价格治理。
困难在于:同一商品在不同来源中的写法并不一致。标题里可能混有标点、版本号、括号附注和厂商名差异;如果直接对两张表做两两比较,计算代价又会很高。
本次实验中,你将沿着一条完整的商品匹配流水线完成工作:先清洗文本,再生成候选对,再做相似度验证,最后输出一份可用于汇报的对比结果。
评分规则: 共 100 分(4 个任务)。
作答要求: 请在包含 # YOUR CODE HERE 的代码单元中完成实现,并删除 raise NotImplementedError();每完成一个任务后,运行紧随其后的测试单元。
本实验提供两套商品数据:
sample:规模较小,适合调试与快速验证;full:规模更大,适合观察两种连接方法在真实规模下的过滤效果。你可以在下面代码块中修改 PRODUCT_DATASET = "sample" 或 "full" 来切换数据集。切换后请重新执行整本 notebook。
下面的代码会完成:
import re
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from IPython.display import display
warnings.filterwarnings("ignore")
PRODUCT_DATASET = "sample" # 可选: "sample", "full"
PRODUCT_CONFIGS = {
"sample": {
"folder": "Amazon-Google-Sample",
"amazon_file": "Amazon_sample.csv",
"google_file": "Google_sample.csv",
"truth_file": "Amazon_Google_perfectMapping_sample.csv",
"label": "Amazon-Google-Sample",
},
"full": {
"folder": "Amazon-Google",
"amazon_file": "Amazon.csv",
"google_file": "Google.csv",
"truth_file": "Amazon_Google_perfectMapping.csv",
"label": "Amazon-Google",
},
}
if PRODUCT_DATASET not in PRODUCT_CONFIGS:
raise ValueError(f"PRODUCT_DATASET 必须是 {list(PRODUCT_CONFIGS)} 之一")
product_cfg = PRODUCT_CONFIGS[PRODUCT_DATASET]
base_dir = Path("Lab-data")
product_dir = base_dir / product_cfg["folder"]
amazon = pd.read_csv(product_dir / product_cfg["amazon_file"])
google = pd.read_csv(product_dir / product_cfg["google_file"])
product_truth = pd.read_csv(product_dir / product_cfg["truth_file"])
product_truth_pairs = {
tuple(x) for x in product_truth[["idAmazon", "idGoogle"]].astype(str).values.tolist()
}
MINHASH_PRIME = 10007
MINHASH_PARAMS = [
(1825, 409), (4507, 4012), (3658, 2286), (1680, 8935), (1425, 9674),
(6913, 520), (489, 1535), (3583, 3811), (8280, 9863), (435, 9195),
(3258, 8928), (6874, 3611), (7360, 9654), (4558, 106), (2616, 6924),
(5575, 4552), (2548, 3527), (5515, 1674), (1520, 6224), (1585, 5881),
(5636, 9891), (4334, 711), (7528, 8785), (2046, 6201), (1292, 9044),
(4804, 5925), (9460, 3150), (1140, 750), (3734, 4741), (1308, 3814),
(1655, 6227), (4555, 7428), (5978, 2664), (6066, 5820), (3433, 4374),
(1170, 9980), (2804, 8751), (4011, 2677), (7574, 6216), (4423, 9125),
(3599, 5313), (917, 3752), (526, 5168), (6573, 4386), (1085, 3456),
(9293, 5155), (3484, 8179), (6483, 7517), (2341, 4339), (2288, 4040),
(9198, 8830), (4305, 9577), (7020, 9560), (6544, 5930), (3594, 2266),
(8349, 8085), (1490, 771), (1797, 2504), (2622, 6916), (9772, 1040)
]
MINHASH_NUM_BANDS = 30
MINHASH_ROWS_PER_BAND = 2
print("商品数据集:", product_cfg["label"], f"({PRODUCT_DATASET})")
print("Amazon:", amazon.shape)
print("Google:", google.shape)
print("商品真值对数量:", len(product_truth_pairs))
print("MinHash 配置:", len(MINHASH_PARAMS), "个哈希函数,", MINHASH_NUM_BANDS, "个 bands, 每个 band", MINHASH_ROWS_PER_BAND, "行")
两张商品表中的同一商品,往往会因为标点、大小写、版本号和括号附注的差异而写成不同形式。正式匹配前,必须先把原始文本清洗成可比较的表示。
normalize_text(text):r"[^a-z0-9]+" 将连续的非字母数字字符替换为一个空格;extract_version_token(text):2.5、4.1、10.4.8 这类至少包含一个小数点的版本号;v 或 V,返回结果时不要保留该前缀;None。build_product_df(df, cols):build_product_df(amazon, ['title', 'manufacturer']);build_product_df(google, ['name', 'manufacturer']);cols 中各列,再调用 normalize_text();cols[0] 对应的原始标题列提取 version_token,并判断 has_bracket_note。请生成 amazon_prep 和 google_prep。它们至少包含以下字段:
id:记录 id;norm_text:清洗后的文本;joinKey:用于后续连接的 token 列表;token_count:token 数量;version_token:提取出的版本号;has_bracket_note:原始标题中是否出现 ( 或 [。def normalize_text(text):
# YOUR CODE HERE
raise NotImplementedError()
def extract_version_token(text):
# YOUR CODE HERE
raise NotImplementedError()
def build_product_df(df, cols):
# YOUR CODE HERE
raise NotImplementedError()
amazon_prep = build_product_df(amazon, ["title", "manufacturer"])
google_prep = build_product_df(google, ["name", "manufacturer"])
assert normalize_text("A.B-C 12") == "a b c 12"
assert extract_version_token("Version V10.4.8") == "10.4.8"
assert extract_version_token("punch software 20100") is None
required_cols = {"id", "norm_text", "joinKey", "token_count", "version_token", "has_bracket_note"}
for raw_df, prep_df in [(amazon, amazon_prep), (google, google_prep)]:
assert set(prep_df.columns) >= required_cols
assert len(prep_df) == len(raw_df)
assert prep_df["joinKey"].map(lambda x: isinstance(x, list) and len(x) > 0).all()
assert amazon_prep.loc[amazon_prep["id"] == "b0002mg5uk", "version_token"].iloc[0] == "2.5"
assert bool(google_prep.loc[google_prep["id"] == "http://www.google.com/base/feeds/snippets/7249154325906371083", "has_bracket_note"].iloc[0])
完成清洗后,我们已经拿到了可比较的 token 表示。接下来要实现一个规则式连接基线:先基于共享 token 生成候选对,再用 Jaccard 相似度做精确验证。
请在 JaccardJoin 类中实现以下 3 个方法:
jaccard_similarity(self, tokens1, tokens2):返回两个 token 序列的 Jaccard 相似度。generate_candidates(self, df1, df2):cand_pairs 至少包含 id1, joinKey1, id2, joinKey2, shared_token_count。verify_candidates(self, cand_df, threshold):返回带 jaccard 列的 scored_pairs,以及满足阈值条件的 verified_pairs。请在 Amazon 与 Google 商品数据上运行 JaccardJoin,并生成:
cand_pairs:候选对;reduction_ratio:候选过滤率;scored_pairs:带精确 Jaccard 分数的候选对;verified_pairs:满足阈值 0.5 的最终结果。class JaccardJoin:
def jaccard_similarity(self, tokens1, tokens2):
# YOUR CODE HERE
raise NotImplementedError()
def generate_candidates(self, df1, df2):
# YOUR CODE HERE
raise NotImplementedError()
def verify_candidates(self, cand_df, threshold):
# YOUR CODE HERE
raise NotImplementedError()
jaccard_join = JaccardJoin()
cand_pairs = jaccard_join.generate_candidates(amazon_prep, google_prep)
reduction_ratio = 1 - len(cand_pairs) / (len(amazon_prep) * len(google_prep))
scored_pairs, verified_pairs = jaccard_join.verify_candidates(cand_pairs, threshold=0.5)
assert np.isclose(jaccard_join.jaccard_similarity([1, 2, 3], [2, 3, 4]), 0.5)
assert isinstance(cand_pairs, pd.DataFrame) and len(cand_pairs) > 0
assert set(cand_pairs.columns) >= {"id1", "joinKey1", "id2", "joinKey2", "shared_token_count"}
assert not cand_pairs.duplicated(["id1", "id2"]).any()
assert 0 < reduction_ratio < 1
assert isinstance(scored_pairs, pd.DataFrame) and len(scored_pairs) == len(cand_pairs)
assert scored_pairs["jaccard"].between(0, 1).all()
assert isinstance(verified_pairs, pd.DataFrame) and (verified_pairs["jaccard"] >= 0.5).all()
当数据规模变大时,JaccardJoin 往往还不够高效。为此,团队希望引入 MinHash + LSH,在尽量保留潜在匹配对的同时,进一步减少需要精确比较的记录对数量。
请在 MinHashJoin 类中实现以下 5 个方法:
token_to_id_func(self, left_join_keys, right_join_keys):joinKey 中出现的全部唯一 token;1 开始编号;token_to_id。minhash_signature(self, token_id_set):返回一条记录的 MinHash 签名。sig_similarity(self, sig1, sig2):用两个签名中对应位置相等的比例估计 Jaccard 相似度。generate_candidates(self, df1, df2):minhash_cand_pairs,至少包含 id1, id2, estimated_jaccard。verify_candidates(self, cand_df, threshold):返回 scored_pairs 和 verified_pairs,其中 verified_pairs 只保留 estimated_jaccard >= threshold 的记录。请在 Amazon 与 Google 商品数据上运行 MinHashJoin,并生成:
minhash_cand_pairs:MinHash 候选对;minhash_reduction_ratio:MinHash 候选过滤率;minhash_scored_pairs:带估计相似度的候选对;minhash_verified_pairs:满足阈值 0.5 的最终结果。class MinHashJoin:
def __init__(self, hash_params, prime=MINHASH_PRIME, bands=MINHASH_NUM_BANDS, rows_per_band=MINHASH_ROWS_PER_BAND):
self.hash_params = hash_params
self.prime = prime
self.bands = bands
self.rows_per_band = rows_per_band
self.token_to_id = {}
self.left_token_sets = {}
self.right_token_sets = {}
self.left_signatures = {}
self.right_signatures = {}
def token_to_id_func(self, left_join_keys, right_join_keys):
# YOUR CODE HERE
raise NotImplementedError()
def minhash_signature(self, token_id_set):
# YOUR CODE HERE
raise NotImplementedError()
def sig_similarity(self, sig1, sig2):
# YOUR CODE HERE
raise NotImplementedError()
def generate_candidates(self, df1, df2):
# YOUR CODE HERE
raise NotImplementedError()
def verify_candidates(self, cand_df, threshold):
# YOUR CODE HERE
raise NotImplementedError()
minhash_join = MinHashJoin(MINHASH_PARAMS, MINHASH_PRIME, MINHASH_NUM_BANDS, MINHASH_ROWS_PER_BAND)
minhash_cand_pairs = minhash_join.generate_candidates(amazon_prep, google_prep)
minhash_reduction_ratio = 1 - len(minhash_cand_pairs) / (len(amazon_prep) * len(google_prep))
minhash_scored_pairs, minhash_verified_pairs = minhash_join.verify_candidates(minhash_cand_pairs, threshold=0.5)
assert np.isclose(minhash_join.sig_similarity([1, 2, 3], [1, 4, 3]), 2 / 3)
assert isinstance(minhash_join.token_to_id, dict) and minhash_join.token_to_id and min(minhash_join.token_to_id.values()) == 1
for signature_map, prep_df in [(minhash_join.left_signatures, amazon_prep), (minhash_join.right_signatures, google_prep)]:
assert isinstance(signature_map, dict) and len(signature_map) == len(prep_df)
assert all(len(sig) == len(MINHASH_PARAMS) for sig in signature_map.values())
assert isinstance(minhash_cand_pairs, pd.DataFrame) and len(minhash_cand_pairs) > 0
assert list(minhash_cand_pairs.columns) == ["id1", "id2", "estimated_jaccard"]
assert not minhash_cand_pairs.duplicated(["id1", "id2"]).any()
assert 0 < minhash_reduction_ratio < 1
assert isinstance(minhash_scored_pairs, pd.DataFrame) and len(minhash_scored_pairs) == len(minhash_cand_pairs)
assert minhash_scored_pairs["estimated_jaccard"].between(0, 1).all()
assert isinstance(minhash_verified_pairs, pd.DataFrame) and (minhash_verified_pairs["estimated_jaccard"] >= 0.5).all()
现在你已经得到了两套匹配结果。接下来,你需要把它们整理成一份可以直接向团队汇报的效果报告,回答两个核心问题:
compute_prf(pred_pairs, true_pairs),返回 (precision, recall, f1)。jaccard_candidate_metrics, jaccard_metricsminhash_candidate_metrics, minhash_metricscomparison 的 DataFrame。comparison 表格要求¶['jaccard_join', 'minhash_join']。candidate_count:候选对数量;reduction_ratio:过滤率,定义为 1 - candidate_count / (len(amazon_prep) * len(google_prep));covered_true_pairs:候选集中覆盖的真值对数量;candidate_recall:候选召回率,定义为 covered_true_pairs / len(product_truth_pairs);precision:最终预测结果的查准率;recall:最终预测结果的查全率;f1:最终预测结果的 F1 值。sample 数据示例¶当 PRODUCT_DATASET = "sample" 且使用当前默认 MinHash 参数时,comparison 的结果示例如下:
| index | candidate_count | reduction_ratio | covered_true_pairs | candidate_recall | precision | recall | f1 |
|---|---|---|---|---|---|---|---|
| jaccard_join | 84 | 0.671875 | 16 | 1.000000 | 1.000000 | 0.375000 | 0.545455 |
| minhash_join | 23 | 0.910156 | 15 | 0.937500 | 1.000000 | 0.500000 | 0.666667 |
# 请在本单元完成以下内容:
# 1. 实现 compute_prf(pred_pairs, true_pairs)
# 2. 计算 jaccard_candidate_metrics 与 jaccard_metrics
# 3. 计算 minhash_candidate_metrics 与 minhash_metrics
# 4. 生成 comparison
assert np.allclose(compute_prf({("a", "b"), ("c", "d")}, {("a", "b"), ("x", "y")}), (0.5, 0.5, 0.5))
assert set(jaccard_candidate_metrics) == {"candidate_count", "reduction_ratio", "covered_true_pairs", "candidate_recall"}
assert set(minhash_candidate_metrics) == {"candidate_count", "reduction_ratio", "covered_true_pairs", "candidate_recall"}
assert set(jaccard_metrics) == {"precision", "recall", "f1"}
assert set(minhash_metrics) == {"precision", "recall", "f1"}
assert isinstance(comparison, pd.DataFrame)
assert list(comparison.index) == ["jaccard_join", "minhash_join"]
assert list(comparison.columns) == ["candidate_count", "reduction_ratio", "covered_true_pairs", "candidate_recall", "precision", "recall", "f1"]
all_pairs = len(amazon_prep) * len(google_prep)
expected_candidate_pairs = {
"jaccard_join": {tuple(x) for x in cand_pairs[["id1", "id2"]].astype(str).values.tolist()},
"minhash_join": {tuple(x) for x in minhash_cand_pairs[["id1", "id2"]].astype(str).values.tolist()},
}
expected_metrics = {
"jaccard_join": jaccard_metrics,
"minhash_join": minhash_metrics,
}
for method, pairs in expected_candidate_pairs.items():
assert comparison.loc[method, "candidate_count"] == len(pairs)
assert np.isclose(comparison.loc[method, "reduction_ratio"], 1 - len(pairs) / all_pairs)
assert np.isclose(comparison.loc[method, "candidate_recall"], len(pairs & product_truth_pairs) / len(product_truth_pairs))
assert np.allclose(
comparison.loc[method, ["precision", "recall", "f1"]].to_numpy(dtype=float),
[expected_metrics[method]["precision"], expected_metrics[method]["recall"], expected_metrics[method]["f1"]],
)
恭喜你完成了电商平台商品目录整合的全部实战任务!你已经亲手走通了一条完整的商品实体解析流程:从文本清洗、候选生成,到相似度验证与结果评估。这个过程正是现实数据治理与数据集成工作中的核心环节。
提交前最终检查:
Kernel -> Restart & Run All,确保代码环境从头到尾运行顺畅无报错。comparison 可以正确生成。.ipynb 文件。