oppo-node / tests /test_apps_script_toolbox.py
DJ-Goanna-Coding's picture
Deploy from GitHub Actions
c87f72b verified
"""
Comprehensive tests for apps_script_toolbox.py
Tests cover:
- AppsScriptToolbox initialization
- Worker initialization
- Connection verification
- Identity strike report
- Full audit functionality
"""
import pytest
import json
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import sys
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from workers.apps_script_toolbox import AppsScriptToolbox
class TestAppsScriptToolboxInit:
"""Test AppsScriptToolbox initialization"""
def test_init_default_values(self):
"""Test that AppsScriptToolbox initializes correctly"""
toolbox = AppsScriptToolbox()
assert toolbox.reporter is None
assert toolbox.archivist is None
assert toolbox.repo_root is not None
def test_init_repo_root_path(self):
"""Test that repo_root is a valid path"""
toolbox = AppsScriptToolbox()
assert isinstance(toolbox.repo_root, Path)
class TestAppsScriptToolboxWorkerInit:
"""Test worker initialization"""
def test_initialize_workers_without_modules(self):
"""Test worker initialization when modules not available"""
toolbox = AppsScriptToolbox()
# Should not raise error even if workers not available
toolbox.initialize_workers()
# Workers should remain None if modules not available
# (actual behavior depends on import success)
class TestAppsScriptToolboxVerifyConnections:
"""Test connection verification"""
def test_verify_connections_all_present(self, mock_env_vars):
"""Test verification when all connections present"""
toolbox = AppsScriptToolbox()
result = toolbox.verify_connections()
# Should have some checks
assert isinstance(result, bool)
def test_verify_connections_missing_env_vars(self, monkeypatch):
"""Test verification with missing env vars"""
# Clear environment variables
for var in ["GOOGLE_SHEETS_CREDENTIALS", "RCLONE_CONFIG_DATA", "HF_TOKEN"]:
monkeypatch.delenv(var, raising=False)
toolbox = AppsScriptToolbox()
result = toolbox.verify_connections()
# Should return False when connections missing
assert result == False
def test_verify_connections_partial_env_vars(self, monkeypatch):
"""Test verification with some env vars present"""
monkeypatch.setenv("HF_TOKEN", "test_token")
monkeypatch.delenv("GOOGLE_SHEETS_CREDENTIALS", raising=False)
monkeypatch.delenv("RCLONE_CONFIG_DATA", raising=False)
toolbox = AppsScriptToolbox()
result = toolbox.verify_connections()
# Should return False when not all connections present
assert result == False
class TestAppsScriptToolboxIdentityStrike:
"""Test identity strike report"""
def test_run_identity_strike_no_reporter(self):
"""Test identity strike when reporter not available"""
toolbox = AppsScriptToolbox()
toolbox.reporter = None
result = toolbox.run_identity_strike()
assert result == False
def test_run_identity_strike_with_reporter(self):
"""Test identity strike with mock reporter"""
toolbox = AppsScriptToolbox()
# Mock reporter
mock_reporter = Mock()
mock_reporter.create_identity_strike_report = Mock(return_value="success")
toolbox.reporter = mock_reporter
result = toolbox.run_identity_strike()
assert result == True
mock_reporter.create_identity_strike_report.assert_called_once()
def test_run_identity_strike_reporter_error(self):
"""Test identity strike when reporter raises error"""
toolbox = AppsScriptToolbox()
# Mock reporter that raises error
mock_reporter = Mock()
mock_reporter.create_identity_strike_report = Mock(side_effect=Exception("Error"))
toolbox.reporter = mock_reporter
result = toolbox.run_identity_strike()
assert result == False
class TestAppsScriptToolboxFullAudit:
"""Test full audit functionality"""
def test_run_full_audit_no_archivist(self):
"""Test full audit when archivist not available"""
toolbox = AppsScriptToolbox()
toolbox.archivist = None
result = toolbox.run_full_audit()
assert result == False
def test_run_full_audit_with_archivist(self, temp_dir):
"""Test full audit with mock archivist"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
# Mock archivist
mock_archivist = Mock()
mock_archivist.process_cargo_bay = Mock()
mock_archivist._save_archive_index = Mock()
mock_archivist.files_processed = 10
toolbox.archivist = mock_archivist
result = toolbox.run_full_audit()
# Should attempt to save archive index
mock_archivist._save_archive_index.assert_called()
def test_run_full_audit_archivist_error(self, temp_dir):
"""Test full audit when archivist raises error"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
# Mock archivist that raises error
mock_archivist = Mock()
mock_archivist.process_cargo_bay = Mock(side_effect=Exception("Error"))
toolbox.archivist = mock_archivist
result = toolbox.run_full_audit()
assert result == False
class TestAppsScriptToolboxWorkerStatus:
"""Test worker status dashboard"""
def test_update_worker_status_no_file(self, temp_dir):
"""Test status update when worker_status.json not found"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
result = toolbox.update_worker_status_dashboard()
assert result == False
def test_update_worker_status_with_file(self, temp_dir):
"""Test status update with valid worker_status.json"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
# Create worker status file
status_file = temp_dir / "worker_status.json"
status_data = {
"last_updated": "2026-04-14",
"sync_status": {
"gdrive_last_sync": "2026-04-14"
}
}
status_file.write_text(json.dumps(status_data))
# Mock reporter
mock_reporter = Mock()
mock_reporter.update_worker_status_sheet = Mock()
toolbox.reporter = mock_reporter
result = toolbox.update_worker_status_dashboard()
assert result == True
mock_reporter.update_worker_status_sheet.assert_called_once()
def test_update_worker_status_invalid_json(self, temp_dir):
"""Test status update with invalid JSON"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
# Create invalid JSON file
status_file = temp_dir / "worker_status.json"
status_file.write_text("invalid json {]}")
result = toolbox.update_worker_status_dashboard()
assert result == False
class TestAppsScriptToolboxIntegration:
"""Integration tests for AppsScriptToolbox"""
def test_full_workflow_mock(self, temp_dir, mock_env_vars):
"""Test complete workflow with mocked workers"""
toolbox = AppsScriptToolbox()
toolbox.repo_root = temp_dir
# Mock both workers
mock_reporter = Mock()
mock_reporter.create_identity_strike_report = Mock(return_value="success")
mock_reporter.update_worker_status_sheet = Mock()
mock_archivist = Mock()
mock_archivist.process_cargo_bay = Mock()
mock_archivist._save_archive_index = Mock()
mock_archivist.files_processed = 5
toolbox.reporter = mock_reporter
toolbox.archivist = mock_archivist
# Test all operations
assert toolbox.verify_connections() == True
assert toolbox.run_identity_strike() == True