How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection
Editors Pick
Security
Software Engineering
Staff
Tutorials
In this tutorial, we build a realistic Zero-Trust network simulation by modeling a micro-segmented environment as a directed graph and forcing every request to earn access through continuous verification. We implement a dynamic policy engine that blends ABAC-style permissions with device posture, MFA, path reachability, zone sensitivity, and live risk signals such as anomaly and data-volume indicators. We then operationalize the model through a Flask API and run mixed traffic, including insider-lateral movement and exfiltration attempts, to show how trust scoring, adaptive controls, and automated quarantines block malicious flows in real time.
Copy Code
Copied
Use a different Browser
!pip -q install networkx flask
import math
import json
import time
import random
import hashlib
from dataclasses import dataclass, field
from typing import Dict, Any, List, Tuple, Optional
import networkx as nx
from flask import Flask, request, jsonify
import matplotlib.pyplot as plt
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def _clamp(x: float, lo: float = 0.0, hi: float = 1.0) -> float:
return max(lo, min(hi, x))
def _now_ts() -> float:
return time.time()
def _stable_hash(s: str) -> int:
h = hashlib.sha256(s.encode("utf-8")).hexdigest()
return int(h[:10], 16)
def _rand_choice_weighted(items: List[Any], weights: List[float]) -> Any:
return random.choices(items, weights=weights, k=1)[0]
def _pretty(obj: Any) -> str:
return json.dumps(obj, indent=2, sort_keys=False)
We set up the environment by installing the required libraries and importing all dependencies needed for graph modeling, risk scoring, and API handling. We define utility functions for trust normalization, hashing, timestamping, and weighted sampling to support deterministic simulations. We prepare helper functions that simplify logging and structured output formatting throughout the tutorial.
Copy Code
Copied
Use a different Browser
ZONES = ["public", "dmz", "app", "data", "admin"]
SENSITIVITY = {"public": 0.15, "dmz": 0.35, "app": 0.6, "data": 0.85, "admin": 0.95}
ASSETS = {
"public": ["cdn", "landing", "status"],
"dmz": ["api_gateway", "waf", "vpn"],
"app": ["orders_svc", "billing_svc", "ml_inference", "inventory_svc"],
"data": ["customer_db", "ledger_db", "feature_store"],
"admin": ["iam", "siem", "backup_vault"]
}
ACTIONS = ["read", "write", "deploy", "admin", "exfiltrate"]
ROLES = ["customer", "employee", "analyst", "engineer", "admin", "secops"]
DEVICE_TYPES = ["managed_laptop", "managed_server", "byod_phone", "unknown_iot"]
NETWORK_CONTEXT = ["corp_lan", "corp_vpn", "public_wifi", "tor_exit"]
@dataclass
class RequestContext:
user: str
role: str
device_id: str
device_type: str
device_posture: float
mfa: bool
source: str
src_node: str
dst_node: str
action: str
time_bucket: str
geo_risk: float
behavior_anomaly: float
data_volume: float
reason: str = ""
@dataclass
class Decision:
allowed: bool
trust_score: float
rule_hits: List[str] = field(default_factory=list)
controls: Dict[str, Any] = field(default_factory=dict)
explanation: str = ""
ts: float = field(default_factory=_now_ts)
@dataclass
class PrincipalState:
user: str
role: str
base_risk: float
last_seen_ts: float
rolling_denies: int = 0
rolling_allows: int = 0
quarantined: bool = False
compromise_score: float = 0.0
@dataclass
class DeviceState:
device_id: str
device_type: str
owner: str
posture: float
attested: bool
quarantined: bool = False
@dataclass
class FlowRecord:
ts: float
ctx: Dict[str, Any]
decision: Dict[str, Any]
We define the core domain schema including zones, assets, roles, device types, and contextual signals that shape our Zero-Trust environment. We formalize request, decision, principal, device, and flow record structures using dataclasses to maintain clarity and state integrity. We establish the foundational data model that enables continuous trust evaluation across identities, devices, and network paths.
Copy Code
Copied
Use a different Browser
def build_microsegmented_graph(seed: int = 7) -> nx.DiGraph:
random.seed(seed)
G = nx.DiGraph()
for z in ZONES:
G.add_node(f"zone:{z}", kind="zone", zone=z, sensitivity=SENSITIVITY[z])
for z, assets in ASSETS.items():
for a in assets:
node = f"{z}:{a}"
G.add_node(node, kind="asset", zone=z, sensitivity=SENSITIVITY[z] + random.uniform(-0.05, 0.05))
G.add_edge(f"zone:{z}", node, kind="contains")
allowed_paths = [
("public", "dmz"),
("dmz", "app"),
("app", "data"),
("admin", "app"),
("admin", "data"),
("admin", "dmz"),
("dmz", "admin")
]
for src_z, dst_z in allowed_paths:
G.add_edge(f"zone:{src_z}", f"zone:{dst_z}", kind="zone_route", base_allowed=True)
for src_z, dst_z in allowed_paths:
for src_a in ASSETS[src_z]:
for dst_a in ASSETS[dst_z]:
if random.random() < 0.45:
G.add_edge(f"{src_z}:{src_a}", f"{dst_z}:{dst_a}", kind="service_call", base_allowed=True)
for z in ZONES:
for a in ASSETS[z]:
if random.random() < 0.35:
G.add_edge(f"{z}:{a}", f"{z}:{a}", kind="self", base_allowed=True)
return G
def draw_graph(G: nx.DiGraph, title: str = "Zero-Trust Microsegmented Network Graph") -> None:
plt.figure(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, k=0.35)
kinds = nx.get_node_attributes(G, "kind")
node_colors = []
for n in G.nodes():
if kinds.get(n) == "zone":
node_colors.append(0.85)
else:
node_colors.append(G.nodes[n].get("sensitivity", 0.5))
nx.draw_networkx_nodes(G, pos, node_size=350, node_color=node_colors)
nx.draw_networkx_edges(G, pos, arrows=True, alpha=0.25)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title(title)
plt.axis("off")
plt.show()
We construct a micro-segmented directed network graph where zones and assets are explicitly modeled with sensitivity attributes. We programmatically generate inter-zone and service-level communication paths to simulate realistic enterprise traffic patterns. We visualize the network topology to clearly observe segmentation boundaries and potential lateral movement routes.
Copy Code
Copied
Use a different Browser
class ZeroTrustPolicyEngine:
def __init__(self, G: nx.DiGraph):
self.G = G
self.principals: Dict[str, PrincipalState] = {}
self.devices: Dict[str, DeviceState] = {}
self.flow_log: List[FlowRecord] = []
self.blocked_edges: set = set()
self.policy_version = "ztpe-v1.3"
self.role_perms = {
"customer": {"public": {"read"}, "dmz": {"read"}},
"employee": {"public": {"read"}, "dmz": {"read"}, "app": {"read", "write"}},
"analyst": {"public": {"read"}, "dmz": {"read"}, "app": {"read"}, "data": {"read"}},
"engineer": {"public": {"read"}, "dmz": {"read"}, "app": {"read", "write", "deploy"}, "data": {"read"}},
"admin": {"public": {"read"}, "dmz": {"read", "write"}, "app": {"read", "write", "deploy", "admin"}, "data": {"read", "write", "admin"}, "admin": {"read", "write", "admin"}},
"secops": {"public": {"read"}, "dmz": {"read", "write"}, "app": {"read", "admin"}, "data": {"read", "admin"}, "admin": {"read", "admin"}},
}
self.w = {
"role_fit": 1.4,
"device_posture": 1.8,
"mfa": 1.0,
"network_context": 1.2,
"time": 0.6,
"geo_risk": 1.2,
"behavior_anomaly": 2.2,
"data_volume": 1.4,
"principal_base_risk": 1.3,
"principal_compromise": 2.0,
"asset_sensitivity": 1.6,
"path_validity": 1.5,
"quarantine": 4.0,
}
self.thresholds = {
"allow": 0.72,
"step_up": 0.62,
"rate_limit": 0.55,
"deny": 0.0
}
def register_principal(self, user: str, role: str, base_risk: float) -> None:
self.principals[user] = PrincipalState(
user=user,
role=role,
base_risk=_clamp(base_risk),
last_seen_ts=_now_ts()
)
def register_device(self, device_id: str, device_type: str, owner: str, posture: float, attested: bool) -> None:
self.devices[device_id] = DeviceState(
device_id=device_id,
device_type=device_type,
owner=owner,
posture=_clamp(posture),
attested=bool(attested)
)
def _asset_zone_and_sensitivity(self, node: str) -> Tuple[str, float]:
if node.startswith("zone:"):
z = node.split(":", 1)[1]
return z, SENSITIVITY.get(z, 0.5)
z = self.G.nodes[node].get("zone", "public")
sens = float(self.G.nodes[node].get("sensitivity", SENSITIVITY.get(z, 0.5)))
return z, _clamp(sens)
def _base_abac_check(self, role: str, dst_zone: str, action: str) -> bool:
return action in self.role_perms.get(role, {}).get(dst_zone, set())
def _path_is_valid(self, src: str, dst: str) -> bool:
if (src, dst) in self.blocked_edges:
return False
try:
return nx.has_path(self.G, src, dst)
except nx.NetworkXError:
return False
def _network_context_risk(self, source: str) -> float:
table = {"corp_lan": 0.1, "corp_vpn": 0.25, "public_wifi": 0.65, "tor_exit": 0.9}
return table.get(source, 0.6)
def _time_risk(self, time_bucket: str) -> float:
return 0.15 if time_bucket == "business_hours" else 0.55
def _compute_trust_score(self, ctx: RequestContext) -> Tuple[float, List[str], Dict[str, Any]]:
rule_hits = []
controls: Dict[str, Any] = {}
principal = self.principals.get(ctx.user)
device = self.
← Zurück zu den Nachrichten