Fast and accurate hyper-relational knowledge graph embedding with dual-branch reasoning
Matches state-of-the-art accuracy at 1000Γ the speed
AlertStar is a novel hyper-relational knowledge graph embedding model designed for cybersecurity alert prediction. It addresses the fundamental challenge of leveraging rich contextual information (qualifiers) while maintaining computational efficiency.
AlertStar combines two complementary reasoning approaches:
Traditional knowledge graph models ignore qualifiers, treating these events identically:
IP_A --[Port_Scan]--> IP_B (at 3 AM, FlowCount: 1000, Protocol: TCP)
IP_A --[Port_Scan]--> IP_B (at 2 PM, FlowCount: 5, Protocol: UDP)
AlertStar understands that context changes everything in cybersecurity.
# Clone the repository
git clone https://github.com/alertstar.git
cd alertstar
# Install dependencies
pip install -r requirements.txt
# Or install with conda
conda env create -f environment.yml
conda activate alertstar
from alertstar import AlertStar, load_data
# Load cybersecurity dataset
train_data, valid_data, test_data = load_data('data/alert33/')
# Initialize AlertStar
model = AlertStar(
num_entities=len(entity_vocab),
num_relations=len(relation_vocab),
num_qual_keys=len(qualifier_keys),
num_qual_values=len(qualifier_values),
embedding_dim=200
)
# Train
trainer = AlertStarTrainer(model, train_data, valid_data)
trainer.train(epochs=50)
# Predict
query = {
'head': 'IP_192.168.1.100',
'relation': 'Port_Scan',
'qualifiers': [
('DetectTime', '2019-01-15 03:00:00'),
('FlowCount', '1000'),
('Protocol', 'TCP'),
('Port', '22')
]
}
predictions = model.predict(query, top_k=10)
print(f"Most likely target: {predictions[0]}")
| Model | MRR β | MR β | Hits@1 β | Hits@10 β | Time/Epoch |
|---|---|---|---|---|---|
| TransE | 0.120 | 500 | 0.080 | 0.250 | 5 min |
| StarE | 0.280 | 180 | 0.210 | 0.480 | 8 min |
| NBFNet | 0.320 | 150 | 0.250 | 0.520 | 120 min β οΈ |
| AlertStar | 0.315 | 155 | 0.245 | 0.515 | 10 min β |
| MT-AlertStar | 0.335 | 140 | 0.265 | 0.535 | 15 min π |
Key Findings:
Complexity Analysis:
NBFNet: O(L Γ E Γ Q_max Γ d) ~6 billion ops
AlertStar: O(n Γ d + dΒ² + N Γ d) ~1 million ops
Speedup: 1000-10000Γ on large graphs
Input: (head, relation, qualifiers, tail)
ββββββββββββββββ
β EMBEDDINGS β
ββββββββ¬ββββββββ
β
βΌ
βββββββββββββββββββββββ
β QUALIFIER ENRICHMENTβ
β (Cross-Attention) β
ββββββββββββ¬βββββββββββ
β
ββββββββ΄βββββββ
β β
βΌ βΌ
ββββββββββββββ ββββββββββββββ
β Attention β β Path β
β Branch β β Branch β
βββββββ¬βββββββ ββββββββ¬ββββββ
β β
ββββββββββ¬ββββββββ
β
βΌ
ββββββββββββββββ
β Gated Fusion β
β Ξ±Β·A + (1-Ξ±)Β·Bβ
ββββββββ¬ββββββββ
β
βΌ
[Predictions]
MT-AlertStar extends AlertStar with a Transformer encoder and four simultaneous prediction tasks:
alertstar/models/)alertstar.py - AlertStar ModelMain Components:
class AlertStar(nn.Module):
"""
AlertStar: Dual-branch hyper-relational KG embedding.
Architecture:
1. Qualifier enrichment via cross-attention
2. Attention branch: h + enriched_r
3. Path branch: FFN(h, enriched_r)
4. Gated fusion: Ξ±Β·attn + (1-Ξ±)Β·path
5. Scoring: fused Β· entity_embeddings
"""
def __init__(self, num_entities, num_relations,
num_qual_keys, num_qual_values,
embedding_dim=200, num_heads=4, dropout=0.1):
super().__init__()
# Embeddings
self.entity_emb = nn.Embedding(num_entities, embedding_dim)
self.relation_emb = nn.Embedding(num_relations, embedding_dim)
self.qual_key_emb = nn.Embedding(num_qual_keys, embedding_dim)
self.qual_value_emb = nn.Embedding(num_qual_values, embedding_dim)
# Qualifier enrichment
self.attention = nn.MultiheadAttention(
embedding_dim, num_heads, batch_first=True
)
# Path branch
self.path_net = nn.Sequential(
nn.Linear(2 * embedding_dim, embedding_dim),
nn.LayerNorm(embedding_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(embedding_dim, embedding_dim)
)
# Layer norms
self.ln1 = nn.LayerNorm(embedding_dim)
self.ln2 = nn.LayerNorm(embedding_dim)
# Learnable gate
self.gate = nn.Parameter(torch.tensor(0.5))
self.dropout = nn.Dropout(dropout)
def forward(self, head, relation, qualifiers, tail=None):
"""
Args:
head: [batch] entity IDs
relation: [batch] relation IDs
qualifiers: List of [(key, val), ...] for each sample
tail: [batch] entity IDs (optional, for training)
Returns:
scores: [batch] if tail given, else [batch, num_entities]
"""
# Step 1: Get embeddings
h = self.entity_emb(head)
r = self.relation_emb(relation)
# Step 2: Enrich relation with qualifiers
r_enriched = self._enrich_relation(r, qualifiers)
# Step 3: Attention branch
attn_branch = self.ln1(h + r_enriched)
# Step 4: Path branch
concat = torch.cat([h, attn_branch], dim=-1)
path_transform = self.path_net(concat)
path_branch = self.ln2(h + path_transform)
# Step 5: Gated fusion
alpha = torch.sigmoid(self.gate)
fused = self.dropout(alpha * attn_branch + (1 - alpha) * path_branch)
# Step 6: Scoring
if tail is not None:
return (fused * self.entity_emb(tail)).sum(dim=-1)
else:
return fused @ self.entity_emb.weight.T
Key Features:
mt_alertstar.py - Multi-Task AlertStarMulti-Task Architecture:
class MTAlertStar(nn.Module):
"""
Multi-Task AlertStar: Four simultaneous prediction tasks.
Tasks:
1. Tail prediction
2. Relation prediction
3. Qualifier key prediction
4. Qualifier value prediction
"""
def __init__(self, num_entities, num_relations,
num_qual_keys, num_qual_values,
embedding_dim=200, num_layers=3, num_heads=4):
super().__init__()
# Shared embeddings
self.entity_emb = nn.Embedding(num_entities, embedding_dim)
self.relation_emb = nn.Embedding(num_relations, embedding_dim)
self.qual_key_emb = nn.Embedding(num_qual_keys, embedding_dim)
self.qual_value_emb = nn.Embedding(num_qual_values, embedding_dim)
# Shared Transformer encoder
encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=num_heads,
dim_feedforward=embedding_dim * 4,
dropout=0.1,
batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)
# Task-specific heads
self.tail_head = self._make_head(num_entities)
self.relation_head = self._make_head(num_relations)
self.qual_key_head = self._make_head(num_qual_keys)
self.qual_value_head = self._make_head(num_qual_values)
def _make_head(self, output_size):
"""Create task-specific prediction head."""
return nn.Sequential(
nn.Linear(self.embedding_dim, self.embedding_dim),
nn.LayerNorm(self.embedding_dim),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(self.embedding_dim, output_size)
)
def forward(self, head, relation, tail, qualifiers,
task='tail', target_qual_idx=None):
"""
Task-specific forward pass with masking.
Args:
task: 'tail', 'relation', 'qual_key', or 'qual_value'
target_qual_idx: For qual_value task
"""
# Build masked sequence
seq = self._build_sequence(
head, relation, tail, qualifiers,
task, target_qual_idx
)
# Encode
encoded = self.encoder(seq)
context = encoded[:, 1, :] # Relation token
# Task-specific prediction
if task == 'tail':
return self.tail_head(context)
elif task == 'relation':
return self.relation_head(context)
elif task == 'qual_key':
return torch.sigmoid(self.qual_key_head(context))
elif task == 'qual_value':
return self.qual_value_head(context)
Key Features:
alertstar/models/baselines/)StarE:
class StarE(nn.Module):
"""StarE: Qualifier attention baseline."""
def forward(self, head, relation, qualifiers, tail=None):
# Enrich relation with attention over qualifiers
# Score: (h + r_enriched) Β· t
ShrinkE:
class ShrinkE(nn.Module):
"""ShrinkE: Shrinking transform for qualifiers."""
def forward(self, head, relation, qualifiers, tail=None):
# Shrink qualifiers into relation space
# Score: (proj(h) + shrink(r, q)) Β· t
NBFNet:
class NBFNet(nn.Module):
"""Neural Bellman-Ford Network."""
def forward(self, head, relation, graph, tail=None):
# Bellman-Ford propagation over graph
# Memory-safe chunked implementation
alertstar/data/)class DataPreprocessor:
"""
Preprocesses raw cybersecurity alert data.
Input format:
head,relation,tail,qualifier_key:value | key:value | ...
Output:
- Vocabularies (entity2id, relation2id, etc.)
- Train/valid/test splits
- Formatted datasets for each model
"""
def load(self, data_path):
"""
Load and preprocess data.
Returns:
train_data, valid_data, test_data
"""
# Auto-detect format (tab or comma separated)
# Build vocabularies
# Create train/valid/test splits (70/15/15)
# Handle variable-length qualifiers
Features:
class HyperRelationalDataset(Dataset):
"""PyTorch dataset for hyper-relational triples."""
def __getitem__(self, idx):
triple = self.data[idx]
return {
'head': self.preprocessor.entity2id[triple['head']],
'relation': self.preprocessor.relation2id[triple['relation']],
'tail': self.preprocessor.entity2id[triple['tail']],
'qualifiers': [
(self.preprocessor.qualifier_key2id[k],
self.preprocessor.qualifier_value2id[v])
for k, v in triple['qualifiers']
]
}
class MultiTaskDataset(Dataset):
"""
Dataset for multi-task learning.
Each triple generates 4 training samples (one per task).
"""
def __init__(self, data, preprocessor):
self.samples = []
for triple in data:
# Generate samples for each task
self.samples.append({'task': 'tail', ...})
self.samples.append({'task': 'relation', ...})
self.samples.append({'task': 'qual_key', ...})
for q in qualifiers:
self.samples.append({'task': 'qual_value', ...})
alertstar/training/)trainer.py - Standard Trainerclass AlertStarTrainer:
"""
Trainer for AlertStar and baseline models.
Features:
- Margin ranking loss
- Gradient clipping
- Learning rate scheduling
- Early stopping
- Checkpoint saving
"""
def train(self, epochs=100):
for epoch in range(epochs):
# Training loop
for batch in self.train_loader:
# Positive samples
pos_score = self.model(
batch['head'], batch['relation'],
batch['qualifiers'], batch['tail']
)
# Negative samples (random tail)
neg_tail = torch.randint(0, self.num_entities, ...)
neg_score = self.model(
batch['head'], batch['relation'],
batch['qualifiers'], neg_tail
)
# Margin ranking loss
loss = F.margin_ranking_loss(
pos_score, neg_score,
torch.ones_like(pos_score),
margin=1.0
)
# Backprop
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(
self.model.parameters(), 1.0
)
self.optimizer.step()
# Validation
if (epoch + 1) % self.eval_every == 0:
metrics = self.evaluate()
self.save_checkpoint_if_best(metrics)
class Evaluator:
"""
Comprehensive evaluation with all metrics.
Metrics:
- MRR (Mean Reciprocal Rank)
- MR (Mean Rank)
- Hits@1, Hits@3, Hits@10
- Filtered ranking (removes known positives)
"""
def evaluate(self, model, dataset, device):
model.eval()
ranks = []
with torch.no_grad():
for sample in dataset:
# Get scores for all entities
scores = model(
sample['head'],
sample['relation'],
sample['qualifiers']
) # [num_entities]
# Find rank of true tail
rank = (scores > scores[sample['tail']]).sum() + 1
ranks.append(rank.item())
ranks = np.array(ranks)
return {
'mr': float(np.mean(ranks)),
'mrr': float(np.mean(1.0 / ranks)),
'hits@1': float(np.mean(ranks <= 1)),
'hits@3': float(np.mean(ranks <= 3)),
'hits@10': float(np.mean(ranks <= 10))
}
scripts/)# Train AlertStar
python scripts/train_alertstar.py \
--data_path data/processed/alert33 \
--embedding_dim 200 \
--num_heads 4 \
--learning_rate 0.0005 \
--batch_size 128 \
--epochs 100 \
--gpu 0 \
--output_dir experiments/alertstar
# Evaluate all models
python scripts/evaluate_models.py \
--models alertstar stare shrinke nbfnet mt_alertstar \
--checkpoint_dir experiments/checkpoints \
--data_path data/processed/alert33 \
--output_file results/comparison.json
# Run complete experimental suite
python scripts/run_experiments.py \
--config configs/experiments.yaml \
--num_seeds 5 \
--output_dir experiments/full_results
# Preprocess your cybersecurity data
python scripts/preprocess_data.py \
--input data/raw/cybersecurity_alerts.txt \
--output data/processed/alert33 \
--train_ratio 0.7 \
--valid_ratio 0.15 \
--test_ratio 0.15
# Train AlertStar
python scripts/train_alertstar.py \
--data_path data/processed/alert33 \
--config configs/alertstar.yaml
# Train MT-AlertStar
python scripts/train_mt_alertstar.py \
--data_path data/processed/alert33 \
--config configs/mt_alertstar.yaml
# Train all baselines
python scripts/train_baselines.py \
--data_path data/processed/alert33 \
--models stare shrinke nbfnet
# Comprehensive evaluation
python scripts/evaluate_models.py \
--checkpoint_dir experiments/checkpoints \
--data_path data/processed/alert33 \
--output_file results/comparison.json
# Generate visualizations
python scripts/visualize_results.py \
--results_file results/comparison.json \
--output_dir results/figures
If you use AlertStar in your research, please cite:
@article{alertstar2026,
title={AlertStar: Path-Aware Alert Prediction on Hyper-Relational Knowledge Graphs
},
author={Name and Co-authors},
journal={arXiv preprint arXiv:2026.xxxxx},
year={2026}
}
This project is licensed under the MIT License - see the LICENSE file for details.
We welcome contributions! Please see our Contributing Guidelines.
Built with β€οΈ for the cybersecurity and knowledge graph communities