Intersections¶
Intersections are the glue layer that connects collections, enabling cross-collection queries via .join().
Overview¶
An intersection declares a relationship between fields in two different collections:
# Declare: employees.employee_id links to expertise.subject
hb.intersect("employees.employee_id", "expertise.subject")
# Now you can join them
results = hb.query("employees").search("engineering").join("expertise")
for r in results:
if r.is_matched:
print(f"{r.source['name']} knows {r.target['skill']}")
Without intersections, collections are isolated islands. With intersections, they become a connected graph you can traverse.
Declaring Intersections¶
hb.intersect()¶
Register a relationship between two collection fields:
# Basic intersection (auto-detects relation type)
hb.intersect("employees.employee_id", "expertise.subject")
# Explicit identity matching (exact equality)
hb.intersect(
"projects.project_id",
"budgets.project_id",
relation="identity"
)
# Semantic matching (embedding similarity)
hb.intersect(
"documents.content",
"topics.description",
relation="semantic",
threshold=0.7, # Minimum similarity score
min_margin=0.05, # Gap between best and second-best match
)
Relation Types¶
| Relation | Matching | Use When |
|---|---|---|
"identity" |
Exact equality | IDs, foreign keys, categorical values |
"semantic" |
Embedding similarity | Text content, descriptions, fuzzy matching |
"auto" |
Inferred from field encodings | Default - uses EXACT→identity, SEMANTIC→semantic |
"link" |
Explicit value mappings | Cross-encoding intersections (requires flexible mode) |
IntersectionMode¶
Controls whether intersections allow cross-encoding type pairs.
from hybi.compose.intersections import IntersectionMode
# Default: only same-encoding pairs allowed
mode = IntersectionMode.STRICT
# Cross-encoding pairs via explicit links
mode = IntersectionMode.FLEXIBLE
| Mode | Allowed Pairs | Relation Types |
|---|---|---|
STRICT |
Same encoding only (EXACT↔EXACT, SEMANTIC↔SEMANTIC) | identity, semantic, auto |
FLEXIBLE |
Any encoding pair | link only |
Intersection¶
The Intersection class represents a declared relationship.
from hybi.compose.intersections import Intersection
intersection = Intersection(
source="employees.employee_id",
target="expertise.subject",
relation="identity",
threshold=0.7, # For semantic relations
min_margin=0.05, # Confidence margin
)
hybi.compose.intersections.Intersection
dataclass
¶
Declares a relationship between two collection fields.
An intersection defines how data from one collection relates to data in another collection, enabling cross-collection queries via .join().
Attributes:
| Name | Type | Description |
|---|---|---|
source |
str
|
Source collection and field in "collection.field" format |
target |
str
|
Target collection and field in "collection.field" format |
relation |
Literal['identity', 'semantic', 'auto', 'link']
|
Type of matching - "identity" (exact), "semantic" (similarity), "auto" (inferred from field encodings), or "link" (explicit bindings) |
threshold |
float
|
Minimum similarity score for semantic matching (0.0 to 1.0) |
min_margin |
float
|
Minimum gap between best and second-best match for confidence |
mode |
IntersectionMode
|
STRICT (default, same-type only) or FLEXIBLE (cross-type via links) |
Example
Strict mode (default) - same encoding types¶
intersection = Intersection( ... source="employees.employee_id", ... target="projects.owner_id", ... relation="identity", ... )
Flexible mode - cross encoding types via links¶
intersection = Intersection( ... source="employees.employee_id", ... target="expertise.topic", ... relation="link", ... mode=IntersectionMode.FLEXIBLE, ... )
source
instance-attribute
¶
target
instance-attribute
¶
relation = 'auto'
class-attribute
instance-attribute
¶
threshold = 0.7
class-attribute
instance-attribute
¶
min_margin = 0.05
class-attribute
instance-attribute
¶
mode = IntersectionMode.STRICT
class-attribute
instance-attribute
¶
link_collection = field(init=False, default=None, repr=False)
class-attribute
instance-attribute
¶
__init__(source, target, relation='auto', threshold=0.7, min_margin=0.05, mode=IntersectionMode.STRICT)
¶
reversed()
¶
Create a reversed intersection (target → source).
Returns:
| Type | Description |
|---|---|
Intersection
|
New Intersection with source and target swapped. |
resolved_relation()
¶
Get the resolved relation type.
If relation is "auto" and encodings are known, resolves to "identity" or "semantic" based on encoding types.
Returns:
| Type | Description |
|---|---|
str
|
Resolved relation string, or "auto" if cannot resolve. |
has_links()
¶
Check if this intersection has a link collection.
Returns:
| Type | Description |
|---|---|
bool
|
True if this is a flexible intersection with a link collection. |
Joining Collections¶
Once intersections are declared, use .join() to traverse them:
# Single join
results = hb.query("employees").search("engineering").join("expertise")
# Chained joins (multi-hop)
results = (
hb.query("employees")
.search("senior engineer")
.join("expertise")
.join("projects")
.join("budgets")
)
# Access joined data
for r in results:
print(f"Employee: {r.source['name']}")
if r.target:
print(f" Expertise: {r.target['skill']}")
Join Options¶
results = query.join(
"expertise",
on=("employee_id", "subject"), # Override intersection fields
where={"level": "Expert"}, # Filter target collection
top_k=5, # Limit matches per source row
)
JoinedResult¶
A single result from a cross-collection join.
for result in results:
# Check match status
if result.is_matched:
# Access source and target data
name = result.source["name"]
skill = result.target["skill"]
elif result.is_null:
# Ambiguous match (multiple close candidates)
print(f"Unclear match for {result.source['name']}")
elif result.is_no_match:
# No match found
print(f"No expertise for {result.source['name']}")
Output Formats¶
# Direct access (recommended)
result.source["name"] # From source collection
result.target["skill"] # From target collection
# Flat dictionary (prefixed keys)
result.to_flat()
# {'employees.name': 'Alice', 'expertise.skill': 'Python', '_score': 0.95}
# Nested dictionary
result.to_nested()
# {'employees': {'name': 'Alice'}, 'expertise': {'skill': 'Python'}}
hybi.compose.intersections.JoinedResult
dataclass
¶
Result of joining data from two collections.
Represents a single row from a cross-collection query, combining source and target data with matching metadata.
Attributes:
| Name | Type | Description |
|---|---|---|
source_data |
Dict[str, Any]
|
Data from the source collection. |
target_data |
Optional[Dict[str, Any]]
|
Data from the target collection (None if no match). |
intersection |
Intersection
|
The intersection used for this join. |
score |
float
|
Combined similarity/match score. |
status |
JoinStatus
|
Match status (matched, null, no_match). |
margin |
float
|
Gap between best and second-best match (for semantic). |
Example
result = JoinedResult( ... source_data={"name": "Alice", "id": "EMP001"}, ... target_data={"subject": "EMP001", "object": "Python"}, ... intersection=intersection, ... score=1.0, ... status=JoinStatus.MATCHED, ... ) result.to_flat() {'employees.name': 'Alice', 'employees.id': 'EMP001', 'expertise.subject': 'EMP001', 'expertise.object': 'Python'}
source_data
instance-attribute
¶
target_data
instance-attribute
¶
score = 0.0
class-attribute
instance-attribute
¶
status = JoinStatus.MATCHED
class-attribute
instance-attribute
¶
margin = 0.0
class-attribute
instance-attribute
¶
source
property
¶
Access source collection data directly.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary of source fields. |
Example
result.source["name"] 'Alice' result.source["employee_id"] 'EMP001'
target
property
¶
Access target collection data directly.
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, Any]]
|
Dictionary of target fields, or None if no match. |
Example
if result.target: ... print(result.target["skill"]) 'Python'
is_matched
property
¶
Return True if this result has a confident match.
is_null
property
¶
Return True if this result is ambiguous (NULL).
is_no_match
property
¶
Return True if no match was found.
to_flat()
¶
Convert to a flat dictionary with prefixed keys.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with keys like "collection.field". |
to_nested()
¶
Convert to a nested dictionary by collection.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with collections as top-level keys. |
JoinedResultSet¶
Collection of joined results with filtering utilities.
# Filter to confident matches only
matched = results.filter_matched()
# Statistics
print(f"Matched: {results.matched_count}")
print(f"Ambiguous: {results.null_count}")
print(f"No match: {results.no_match_count}")
print(f"Expansion: {results.expansion_ratio}x") # Fan-out factor
# Chained joins
deeper = results.join("projects").join("budgets")
# Deduplication (handle diamond patterns)
deduped = results.dedupe(key_field="doc_id", strategy="best_score")
# Convert to lists
flat_dicts = results.to_flat_dicts()
nested_dicts = results.to_nested_dicts()
hybi.compose.intersections.JoinedResultSet
dataclass
¶
Collection of joined results from a cross-collection query.
Provides iteration, filtering, and conversion utilities for working with joined data. Supports chained joins via .join().
Attributes:
| Name | Type | Description |
|---|---|---|
results |
List[JoinedResult]
|
List of JoinedResult objects. |
intersection |
Optional[Intersection]
|
The intersection used for the join. |
source_count |
int
|
Number of source rows before joining. |
target_count |
int
|
Number of target rows before joining. |
depth |
int
|
Current depth in the join chain (0 = source). |
was_truncated |
bool
|
Whether results were truncated due to limits. |
truncation_reason |
Optional[str]
|
Why truncation occurred ("max_depth", "max_results", "cycle"). |
collections_visited |
List[str]
|
Ordered list of collections in the join chain. |
timing_ms |
float
|
Execution time in milliseconds. |
Example
Chained joins¶
results = hb.query("employees").search("...") \ ... .join("expertise") \ ... .join("projects")
results = field(default_factory=list)
class-attribute
instance-attribute
¶
intersection = None
class-attribute
instance-attribute
¶
source_count = 0
class-attribute
instance-attribute
¶
target_count = 0
class-attribute
instance-attribute
¶
depth = 0
class-attribute
instance-attribute
¶
was_truncated = False
class-attribute
instance-attribute
¶
matched_count
property
¶
Return the number of matched results.
null_count
property
¶
Return the number of ambiguous (NULL) results.
no_match_count
property
¶
Return the number of unmatched results.
expansion_ratio
property
¶
Return the ratio of output rows to input rows.
A ratio > 1.0 indicates one-to-many expansion.
filter_matched()
¶
Return only confidently matched results.
filter_by_status(status)
¶
Return results with a specific status.
dedupe(key_field=None, strategy='best_score')
¶
Remove duplicate results based on target entity.
Handles diamond patterns where multiple paths lead to the same target entity by keeping only one result per unique target.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_field
|
Optional[str]
|
Field to use as unique key. Defaults to target's primary key or first field. |
None
|
strategy
|
str
|
How to choose which duplicate to keep: - "best_score": Keep highest scoring match (default) - "first": Keep first encountered |
'best_score'
|
Returns:
| Type | Description |
|---|---|
JoinedResultSet
|
New JoinedResultSet with duplicates removed. |
Example
Multiple paths to same document¶
results = query.join("skills").join("documents") results = results.extend(query.join("projects").join("documents")) deduped = results.dedupe(key_field="doc_id", strategy="best_score")
limit(n)
¶
Limit results to first n entries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
Maximum number of results to keep. |
required |
Returns:
| Type | Description |
|---|---|
JoinedResultSet
|
New JoinedResultSet with at most n results. |
extend(other)
¶
Combine results from another JoinedResultSet.
Useful for merging results from multiple join paths (diamond pattern).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
JoinedResultSet
|
Another JoinedResultSet to merge. |
required |
Returns:
| Type | Description |
|---|---|
JoinedResultSet
|
New JoinedResultSet with combined results. |
join(target_collection, *, on=None, where=None, top_k=None)
¶
Continue joining to another collection (chained joins).
Enables patterns like
hb.query("A").search("...").join("B").join("C").join("D")
All join strategies (identity, semantic, link) are supported at
every hop depth via shared dispatch in _join_dispatch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_collection
|
str
|
The collection to join with. |
required |
on
|
Optional[tuple]
|
Optional tuple of (source_field, target_field) to override the declared intersection fields. |
None
|
where
|
Optional[Dict[str, Any]]
|
Optional filter to apply to target results during join. |
None
|
top_k
|
Optional[int]
|
Optional limit on number of matches per source row. |
None
|
Returns:
| Type | Description |
|---|---|
JoinedResultSet
|
JoinedResultSet containing combined data from all collections. |
Raises:
| Type | Description |
|---|---|
QueryStateError
|
If this result set wasn't created with chaining support. |
CircularJoinError
|
If joining would create a cycle (when cycle_behavior="error"). |
MaxJoinDepthError
|
If join chain exceeds max_join_depth. |
NoIntersectionError
|
If no intersection exists between collections. |
Example
results = hb.query("employees").search("engineering") \ ... .join("expertise") \ ... .join("projects") \ ... .join("budgets")
to_flat_dicts()
¶
Convert all results to flat dictionaries.
to_nested_dicts()
¶
Convert all results to nested dictionaries.
JoinStatus¶
Match status for each joined result.
| Status | Meaning |
|---|---|
MATCHED |
Confident match found |
NULL |
Ambiguous match (margin too small between candidates) |
NO_MATCH |
No match found above threshold |
from hybi.compose.intersections import JoinStatus
# Filter by status
matched = results.filter_by_status(JoinStatus.MATCHED)
ambiguous = results.filter_by_status(JoinStatus.NULL)
JoinConfig¶
Configure join behavior for production safety.
from hybi.compose.intersections import JoinConfig
config = JoinConfig(
max_join_depth=5, # Maximum chained joins
max_results_per_join=10000, # Limit fan-out per hop
dedupe_strategy="best_score", # Handle diamond patterns
cycle_behavior="stop", # What to do on cycles
)
hb = HyperBinder(join_config=config)
Options¶
| Option | Default | Description |
|---|---|---|
max_join_depth |
5 | Maximum number of chained .join() calls |
max_results_per_join |
10000 | Maximum results at each join step |
dedupe_strategy |
"best_score" |
"best_score", "first", or "none" |
cycle_behavior |
"stop" |
"stop", "error", or "allow" |
hybi.compose.intersections.JoinConfig
dataclass
¶
Configuration for join operations across collections.
Controls limits, cycle handling, and deduplication behavior for production-safe join chains.
Attributes:
| Name | Type | Description |
|---|---|---|
max_join_depth |
int
|
Maximum number of join hops allowed in a chain. Prevents infinite cycles. Default is 5. |
max_results_per_join |
int
|
Maximum results to keep at each join step. Prevents fan-out explosion. Default is 10000. |
dedupe_strategy |
Literal['best_score', 'first', 'none']
|
How to handle diamond patterns where multiple paths lead to the same entity: - "best_score": Keep highest scoring match (default) - "first": Keep first encountered - "none": No deduplication (preserve all paths) |
cycle_behavior |
Literal['stop', 'error', 'allow']
|
What to do when a cycle is detected: - "stop": Stop traversal, return results so far (default) - "error": Raise CircularJoinError - "allow": Continue (dangerous, relies on max_depth) |
Example
config = JoinConfig(max_join_depth=3, dedupe_strategy="best_score") hb = HyperBinder(join_config=config)
The Bridge Pattern¶
Intersections enable a powerful pattern for connecting heterogeneous data:
flowchart LR
D["Documents<br/>(Fuzzy)"] <-->|semantic| K["Knowledge Graph<br/>(Entity Hub)"] <-->|identity| T["Tables<br/>(Exact)"]
Example: Find budget information for projects mentioned in emails:
# Declare intersections
hb.intersect("emails.content", "projects.description", relation="semantic")
hb.intersect("projects.project_id", "budgets.project_id", relation="identity")
# Query across all three
results = (
hb.query("emails")
.search("Q2 budget allocation")
.join("projects") # semantic: email content → project
.join("budgets") # identity: project ID → budget record
)
for r in results:
print(f"Email: {r['emails.subject']}")
print(f"Project: {r['projects.name']}")
print(f"Budget: ${r['budgets.allocated']:,}")
The Knowledge Graph acts as a semantic index bridging: - Fuzzy text mentions → Canonical entities → Exact structured lookups
Error Handling¶
from hybi.compose.intersections import (
IntersectionError,
NoIntersectionError,
AmbiguousIntersectionError,
CircularJoinError,
MaxJoinDepthError,
)
try:
results = query.join("unknown_collection")
except NoIntersectionError as e:
print(f"No intersection defined: {e}")
try:
results = query.join("A").join("B").join("A") # Cycle
except CircularJoinError as e:
print(f"Cycle detected: {e.collection} in {e.path}")
try:
# Too many hops
results = query.join("A").join("B").join("C").join("D").join("E").join("F")
except MaxJoinDepthError as e:
print(f"Exceeded depth {e.max_depth}")
hybi.compose.intersections.IntersectionError
¶
Bases: Exception
Base exception for intersection operations.
hybi.compose.intersections.NoIntersectionError
¶
hybi.compose.intersections.AmbiguousIntersectionError
¶
hybi.compose.intersections.CircularJoinError
¶
hybi.compose.intersections.MaxJoinDepthError
¶
Flexible Intersections (Cross-Encoding)¶
Flexible mode enables intersections between fields with different encoding types (e.g., EXACT↔SEMANTIC) using explicit link bindings.
hb.intersect_flexible()¶
Declare a flexible intersection allowing cross-encoding types:
# Declare flexible intersection
ix = hb.intersect_flexible(
"employees.employee_id", # EXACT encoding
"expertise.topic", # SEMANTIC encoding
threshold=0.7, # Optional: match threshold
bidirectional=True, # Default: register both directions
)
# The intersection is created with mode=FLEXIBLE and relation="link"
print(ix.mode) # IntersectionMode.FLEXIBLE
print(ix.link_collection) # "__links_employees_employee_id__expertise_topic"
hb.populate_links()¶
Populate the flexible intersection with link data:
import pandas as pd
# Create link data mapping employee IDs to topics
links_df = pd.DataFrame({
"emp_id": ["EMP001", "EMP002", "EMP003"],
"topic": ["machine learning", "databases", "cloud computing"],
})
# Populate the intersection with links
result = hb.populate_links(
ix, # The intersection from intersect_flexible()
links_df, # DataFrame with link pairs
"emp_id", # Source column name
"topic", # Target column name
weight_column=None # Optional: column for link weights
)
print(result) # {'status': 'ok', 'links_stored': 3}
Each call to populate_links() replaces all existing links for that intersection.
Link¶
A single link binding between source and target values.
from hybi.compose.intersections import Link
link = Link(
source_value="EMP001",
target_value="machine learning",
weight=1.0, # Optional weight (default 1.0)
metadata={"note": "primary expertise"}, # Optional metadata
)
# Convert to dictionary for serialization
link.to_dict()
# {'source_value': 'EMP001', 'target_value': 'machine learning', 'weight': 1.0}
hybi.compose.intersections.Link
dataclass
¶
A single link binding between source and target values.
Links enable cross-encoding intersections by explicitly declaring which source values correspond to which target values. The binding is bidirectional - you can query in either direction.
Attributes:
| Name | Type | Description |
|---|---|---|
source_value |
Any
|
The source field value (will be converted to string) |
target_value |
Any
|
The target field value (will be converted to string) |
weight |
float
|
Optional weight for this link (default 1.0) |
metadata |
Optional[Dict[str, Any]]
|
Optional additional metadata for this link |
LinkSet¶
Collection of links for a flexible intersection. Provides convenient creation from DataFrames or list of pairs.
from hybi.compose.intersections import LinkSet
import pandas as pd
# From DataFrame (recommended)
df = pd.DataFrame({
"emp_id": ["EMP001", "EMP002", "EMP003"],
"topic": ["ML", "DB", "Cloud"],
"confidence": [0.9, 0.8, 0.95],
})
link_set = LinkSet.from_dataframe(
df,
source_column="emp_id",
target_column="topic",
weight_column="confidence", # Optional
)
# From list of pairs
link_set = LinkSet.from_pairs([
("EMP001", "ML"),
("EMP002", "DB", 0.8), # With weight
])
# Convert to mappings
forward = link_set.to_forward_mapping() # {"EMP001": ["ML"], ...}
reverse = link_set.to_reverse_mapping() # {"ML": ["EMP001"], ...}
hybi.compose.intersections.LinkSet
dataclass
¶
Collection of links for a flexible intersection.
LinkSet provides a convenient way to create and manage multiple links, typically from a DataFrame with two columns mapping source to target values.
Attributes:
| Name | Type | Description |
|---|---|---|
links |
List[Link]
|
List of Link objects |
source_field |
Optional[str]
|
Name of the source field (for documentation) |
target_field |
Optional[str]
|
Name of the target field (for documentation) |
links = field(default_factory=list)
class-attribute
instance-attribute
¶
source_field = None
class-attribute
instance-attribute
¶
target_field = None
class-attribute
instance-attribute
¶
from_dataframe(df, source_column, target_column, weight_column=None)
classmethod
¶
Create LinkSet from a DataFrame with source and target columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame containing link pairs |
required |
source_column
|
str
|
Column name for source values |
required |
target_column
|
str
|
Column name for target values |
required |
weight_column
|
Optional[str]
|
Optional column name for link weights |
None
|
Returns:
| Type | Description |
|---|---|
LinkSet
|
LinkSet ready for ingestion via populate_links() |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required columns are missing |
Example
df = pd.DataFrame({ ... "employee_id": ["EMP001", "EMP002", "EMP003"], ... "topic": ["machine learning", "databases", "cloud computing"] ... }) links = LinkSet.from_dataframe(df, "employee_id", "topic") len(links) 3
from_pairs(pairs, source_field=None, target_field=None)
classmethod
¶
Create LinkSet from a list of (source, target) pairs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pairs
|
List[tuple]
|
List of (source_value, target_value) tuples, or (source_value, target_value, weight) tuples |
required |
source_field
|
Optional[str]
|
Optional source field name |
None
|
target_field
|
Optional[str]
|
Optional target field name |
None
|
Returns:
| Type | Description |
|---|---|
LinkSet
|
LinkSet ready for ingestion |
Example
links = LinkSet.from_pairs([ ... ("EMP001", "machine learning"), ... ("EMP002", "databases", 0.8), # with weight ... ])
to_list()
¶
Convert to list of dictionaries for serialization.
to_forward_mapping()
¶
Convert to forward mapping: source_value -> [target_values].
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary mapping each source value to its target values. |
to_reverse_mapping()
¶
Convert to reverse mapping: target_value -> [source_values].
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary mapping each target value to its source values. |
get_source_values()
¶
Get all unique source values.
get_target_values()
¶
Get all unique target values.
Complete Example¶
from hybi import HyperBinder
from hybi.compose import Triple, Field, Encoding
from hybi.compose.intersections import JoinConfig
# Initialize with join configuration
hb = HyperBinder(
join_config=JoinConfig(max_join_depth=4, dedupe_strategy="best_score")
)
# Define schemas
employee_schema = Triple(
subject=Field("employee_id", encoding=Encoding.EXACT),
predicate=Field("role"),
object=Field("department"),
)
expertise_schema = Triple(
subject=Field("employee_id", encoding=Encoding.EXACT),
predicate=Field("skill"),
object=Field("level"),
)
# Ingest data
hb.ingest(employees_df, collection="employees", schema=employee_schema)
hb.ingest(expertise_df, collection="expertise", schema=expertise_schema)
# Declare intersection
hb.intersect("employees.employee_id", "expertise.employee_id")
# Query with join
results = (
hb.query("employees", schema=employee_schema)
.find(department="Engineering")
.join("expertise")
)
# Process results
for r in results.filter_matched():
print(f"{r.source['employee_id']}: {r.target['skill']} ({r.target['level']})")
Complete Flexible Intersection Example¶
Cross-encoding joins between EXACT IDs and SEMANTIC topics:
from hybi import HyperBinder
from hybi.compose import Triple, Field, Encoding
import pandas as pd
hb = HyperBinder()
# Schema 1: Employees with EXACT IDs
employee_schema = Triple(
subject=Field("employee_id", encoding=Encoding.EXACT),
predicate=Field("role"),
object=Field("department"),
)
# Schema 2: Expertise with SEMANTIC topics
expertise_schema = Triple(
subject=Field("topic", encoding=Encoding.SEMANTIC), # Different encoding!
predicate=Field("skill"),
object=Field("level"),
)
# Ingest data
employees_df = pd.DataFrame({
"employee_id": ["EMP001", "EMP002"],
"role": ["Engineer", "Analyst"],
"department": ["ML", "Data"],
})
expertise_df = pd.DataFrame({
"topic": ["machine learning", "data engineering"],
"skill": ["Python", "SQL"],
"level": ["Expert", "Senior"],
})
hb.ingest(employees_df, collection="employees", schema=employee_schema)
hb.ingest(expertise_df, collection="expertise", schema=expertise_schema)
# Declare flexible intersection (cross-encoding!)
ix = hb.intersect_flexible("employees.employee_id", "expertise.topic")
# Provide the explicit link mappings
links_df = pd.DataFrame({
"emp_id": ["EMP001", "EMP002"],
"topic": ["machine learning", "data engineering"],
})
hb.populate_links(ix, links_df, "emp_id", "topic")
# Now cross-type joins work
results = (
hb.query("employees", schema=employee_schema)
.find(department="ML")
.join("expertise")
)
for r in results.filter_matched():
print(f"{r.source['employee_id']} knows {r.target['topic']}: {r.target['skill']}")
# EMP001 knows machine learning: Python