Source code for oagdedupe.block.blocking

import logging
from dataclasses import dataclass

from oagdedupe._typing import ENGINE, StatsDict
from oagdedupe.base import BaseBlocking
from oagdedupe.block import base as block
from oagdedupe.block.forward import Forward
from oagdedupe.block.learner import Conjunctions
from oagdedupe.block.pairs import Pairs
from oagdedupe.db.base import BaseRepositoryBlocking


[docs]@dataclass class Blocking(BaseBlocking): """ General interface for blocking: - forward: constructs forward indices - conjunctions: learns best conjunctions - pairs: generates pairs from inverted indices """ repo: BaseRepositoryBlocking conj: block.BaseConjunctions = Conjunctions forward: block.BaseForward = Forward pairs: block.BasePairs = Pairs optimizer: block.BaseConjunctions = None
[docs] def __post_init__(self): self.settings = self.repo.settings self.forward = self.forward(repo=self.repo, settings=self.settings) self.pairs = self.pairs(repo=self.repo, settings=self.settings) self.conj = self.conj( settings=self.settings, optimizer=self.optimizer(repo=self.repo, settings=self.settings), )
[docs] def _check_rr(self, stats: StatsDict) -> bool: """ check if new block scheme is below minium reduction ratio """ return stats.rr < self.forward.repo.min_rr
[docs] def save_comparisons(self, table: str, n_covered: int) -> None: """ Iterates through best conjunction from best to worst. For each conjunction, append comparisons to "comparisons" or "full_comparisons" (if using full data). Stop if (a) subsequent conjunction yields a reduction ratio below the minimum rr setting or (b) the number of comparison pairs gathered exceeds n_covered. Parameters ---------- table: str table used to get pairs (either blocks_train for sample or blocks_df for full df) n_covered: int number of records that the conjunctions should cover """ stepsize = n_covered // 10 step = 0 for i, stats in enumerate(self.conj.conjunctions_list): if self._check_rr(stats): logging.warning( f""" next conjunction exceeds reduction ratio limit; stopping pair generation with scheme {stats.conjunction} """ ) return if table == "blocks_df": self.forward.build_forward_indices( full=True, conjunction=stats.conjunction ) self.pairs.add_new_comparisons(stats=stats, table=table) n_pairs = self.repo.get_n_pairs(table=table) if n_pairs // stepsize > step: logging.info(f"""{n_pairs} comparison pairs gathered""") step = n_pairs // stepsize if n_pairs > n_covered: return
[docs] def save(self, full: bool = False): """save comparison pairs, using conjunctions list; if using sample, build all forward indices first, otherwise builds forward index as needed """ if full: self.save_comparisons( table="blocks_df", n_covered=self.settings.model.n_covered ) else: self.forward.build_forward_indices(full=False) self.save_comparisons(table="blocks_train", n_covered=500)