S-Dreamer's picture
Create osint_core/validators.py
e848759 verified
"""
osint_core.validators
=====================
Input validation and normalization for the Passive OSINT Control Panel.
Design goals:
- Treat all input as hostile.
- Normalize before hashing, enrichment, audit, or reporting.
- Return structured results so downstream modules do not guess intent.
- Reject ambiguous or dangerous inputs early.
- Avoid network calls. This module is pure validation/normalization.
Supported indicator types:
- domain
- username
- email
- ip
- url
"""
from __future__ import annotations
import html
import ipaddress
import re
from dataclasses import dataclass
from enum import Enum
from typing import Literal
from urllib.parse import urlparse, urlunparse
IndicatorType = Literal["domain", "username", "email", "ip", "url", "unknown"]
class ValidationErrorCode(str, Enum):
EMPTY_INPUT = "empty_input"
TOO_LONG = "too_long"
CONTROL_CHARACTERS = "control_characters"
INVALID_TYPE = "invalid_type"
INVALID_DOMAIN = "invalid_domain"
INVALID_USERNAME = "invalid_username"
INVALID_EMAIL = "invalid_email"
INVALID_IP = "invalid_ip"
INVALID_URL = "invalid_url"
UNSUPPORTED_INDICATOR = "unsupported_indicator"
BLOCKED_LOCAL_TARGET = "blocked_local_target"
BLOCKED_DANGEROUS_PATTERN = "blocked_dangerous_pattern"
@dataclass(frozen=True)
class ValidationResult:
ok: bool
indicator_type: IndicatorType
normalized: str
original_length: int
warnings: list[str]
error: str | None = None
error_code: ValidationErrorCode | None = None
MAX_INPUT_LENGTH = 256
MAX_USERNAME_LENGTH = 64
MAX_EMAIL_LOCAL_LENGTH = 64
MAX_EMAIL_LENGTH = 320
MAX_DOMAIN_LENGTH = 253
MAX_URL_LENGTH = 2048
CONTROL_CHARS_RE = re.compile(r"[\x00-\x1f\x7f]")
DOMAIN_RE = re.compile(
r"^(?=.{1,253}$)(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$"
)
USERNAME_RE = re.compile(r"^[a-zA-Z0-9_.-]{2,64}$")
EMAIL_RE = re.compile(r"^[A-Za-z0-9.!#$%&'*+/=?^_`{|}~-]{1,64}@[A-Za-z0-9.-]{1,255}\.[A-Za-z]{2,63}$")
DANGEROUS_PATTERNS = [
re.compile(pattern, re.IGNORECASE)
for pattern in [
r"\.\./",
r"%2e%2e",
r"<\s*script",
r"javascript:",
r"data:",
r"file:",
r";",
r"\|",
r"&&",
r"\$\(",
r"`",
r"\{.*\}",
]
]
LOCAL_HOSTNAMES = {"localhost", "ip6-localhost", "ip6-loopback"}
PRIVATE_NETS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]
def validate_indicator(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> ValidationResult:
"""
Validate and normalize a user-supplied OSINT indicator.
Parameters
----------
raw_value:
User input.
forced_type:
One of: Auto, Domain, Username, Email, IP, URL.
allow_private_targets:
Whether private/local network targets should be accepted.
This should remain False for public Spaces.
Returns
-------
ValidationResult
Structured validation result.
"""
original_length = len(raw_value) if raw_value is not None else 0
warnings: list[str] = []
try:
cleaned = sanitize_raw_input(raw_value)
check_dangerous_patterns(cleaned)
forced = normalize_forced_type(forced_type)
if forced != "auto":
indicator_type, normalized = validate_as_type(cleaned, forced, allow_private_targets)
else:
indicator_type, normalized = classify_auto(cleaned, allow_private_targets)
if normalized != cleaned:
warnings.append("Input was normalized before processing.")
return ValidationResult(
ok=True,
indicator_type=indicator_type,
normalized=normalized,
original_length=original_length,
warnings=warnings,
)
except ValidationException as exc:
return ValidationResult(
ok=False,
indicator_type="unknown",
normalized="",
original_length=original_length,
warnings=warnings,
error=str(exc),
error_code=exc.code,
)
class ValidationException(ValueError):
def __init__(self, message: str, code: ValidationErrorCode):
super().__init__(message)
self.code = code
def sanitize_raw_input(raw_value: str) -> str:
if raw_value is None:
raise ValidationException("Input is required.", ValidationErrorCode.EMPTY_INPUT)
value = str(raw_value).strip()
if not value:
raise ValidationException("Input is empty.", ValidationErrorCode.EMPTY_INPUT)
if CONTROL_CHARS_RE.search(value):
raise ValidationException(
"Input contains control characters.",
ValidationErrorCode.CONTROL_CHARACTERS,
)
if len(value) > MAX_INPUT_LENGTH:
raise ValidationException(
f"Input exceeds {MAX_INPUT_LENGTH} characters.",
ValidationErrorCode.TOO_LONG,
)
# Escape then unescape to normalize obvious HTML entity tricks without
# returning an escaped value to downstream validators.
escaped = html.escape(value, quote=True)
return html.unescape(escaped).strip()
def check_dangerous_patterns(value: str) -> None:
for pattern in DANGEROUS_PATTERNS:
if pattern.search(value):
raise ValidationException(
"Input contains a blocked pattern.",
ValidationErrorCode.BLOCKED_DANGEROUS_PATTERN,
)
def normalize_forced_type(forced_type: str) -> str:
value = (forced_type or "Auto").strip().lower()
aliases = {
"auto": "auto",
"domain": "domain",
"username": "username",
"user": "username",
"email": "email",
"mail": "email",
"ip": "ip",
"ip address": "ip",
"url": "url",
"uri": "url",
}
if value not in aliases:
raise ValidationException(
f"Unsupported forced type: {forced_type}",
ValidationErrorCode.INVALID_TYPE,
)
return aliases[value]
def classify_auto(value: str, allow_private_targets: bool) -> tuple[IndicatorType, str]:
# URL first, because URLs can contain domains/IPs.
if looks_like_url(value):
return validate_url(value, allow_private_targets)
# IP before domain.
try:
return validate_ip(value, allow_private_targets)
except ValidationException:
pass
if "@" in value:
return validate_email(value, allow_private_targets)
if "." in value:
return validate_domain(value, allow_private_targets)
if USERNAME_RE.fullmatch(value):
return validate_username(value, allow_private_targets)
raise ValidationException(
"Unsupported or malformed indicator.",
ValidationErrorCode.UNSUPPORTED_INDICATOR,
)
def validate_as_type(value: str, forced: str, allow_private_targets: bool) -> tuple[IndicatorType, str]:
if forced == "domain":
return validate_domain(value, allow_private_targets)
if forced == "username":
return validate_username(value, allow_private_targets)
if forced == "email":
return validate_email(value, allow_private_targets)
if forced == "ip":
return validate_ip(value, allow_private_targets)
if forced == "url":
return validate_url(value, allow_private_targets)
raise ValidationException("Unsupported indicator type.", ValidationErrorCode.INVALID_TYPE)
def validate_domain(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
domain = value.strip().lower().rstrip(".")
if len(domain) > MAX_DOMAIN_LENGTH or not DOMAIN_RE.fullmatch(domain):
raise ValidationException("Invalid domain.", ValidationErrorCode.INVALID_DOMAIN)
labels = domain.split(".")
for label in labels:
if label.startswith("-") or label.endswith("-"):
raise ValidationException("Invalid domain label.", ValidationErrorCode.INVALID_DOMAIN)
if domain in LOCAL_HOSTNAMES and not allow_private_targets:
raise ValidationException(
"Local/private targets are blocked by policy.",
ValidationErrorCode.BLOCKED_LOCAL_TARGET,
)
return "domain", domain
def validate_username(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
del allow_private_targets
username = value.strip()
if len(username) > MAX_USERNAME_LENGTH or not USERNAME_RE.fullmatch(username):
raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME)
if username in {".", ".."}:
raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME)
return "username", username
def validate_email(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
email = value.strip().lower()
if len(email) > MAX_EMAIL_LENGTH or not EMAIL_RE.fullmatch(email):
raise ValidationException("Invalid email address.", ValidationErrorCode.INVALID_EMAIL)
local, domain = email.rsplit("@", 1)
if len(local) > MAX_EMAIL_LOCAL_LENGTH:
raise ValidationException("Invalid email local part.", ValidationErrorCode.INVALID_EMAIL)
_, normalized_domain = validate_domain(domain, allow_private_targets)
return "email", f"{local}@{normalized_domain}"
def validate_ip(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
try:
ip = ipaddress.ip_address(value.strip())
except ValueError as exc:
raise ValidationException("Invalid IP address.", ValidationErrorCode.INVALID_IP) from exc
if not allow_private_targets and is_private_or_local_ip(ip):
raise ValidationException(
"Local/private targets are blocked by policy.",
ValidationErrorCode.BLOCKED_LOCAL_TARGET,
)
return "ip", str(ip)
def validate_url(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
if len(value) > MAX_URL_LENGTH:
raise ValidationException("URL is too long.", ValidationErrorCode.TOO_LONG)
parsed = urlparse(value.strip())
if parsed.scheme.lower() not in {"http", "https"} or not parsed.netloc:
raise ValidationException(
"Invalid URL. Only http:// and https:// URLs are supported.",
ValidationErrorCode.INVALID_URL,
)
hostname = parsed.hostname
if not hostname:
raise ValidationException("Invalid URL hostname.", ValidationErrorCode.INVALID_URL)
hostname = hostname.lower().rstrip(".")
if hostname in LOCAL_HOSTNAMES and not allow_private_targets:
raise ValidationException(
"Local/private targets are blocked by policy.",
ValidationErrorCode.BLOCKED_LOCAL_TARGET,
)
# Validate hostname as IP or domain.
try:
_, normalized_host = validate_ip(hostname, allow_private_targets)
except ValidationException:
_, normalized_host = validate_domain(hostname, allow_private_targets)
# Strip fragments. Fragments are client-side and not useful for passive OSINT hashing.
normalized = urlunparse(
(
parsed.scheme.lower(),
normalized_host if parsed.port is None else f"{normalized_host}:{parsed.port}",
parsed.path or "",
"",
parsed.query or "",
"",
)
)
return "url", normalized
def looks_like_url(value: str) -> bool:
lowered = value.lower()
return lowered.startswith("http://") or lowered.startswith("https://")
def is_private_or_local_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
return (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or any(ip in net for net in PRIVATE_NETS)
)
def assert_valid_or_raise(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
"""
Convenience helper for callers that prefer exceptions.
"""
result = validate_indicator(raw_value, forced_type, allow_private_targets)
if not result.ok:
raise ValidationException(result.error or "Validation failed.", result.error_code or ValidationErrorCode.UNSUPPORTED_INDICATOR)
return result.indicator_type, result.normalized