Edit File: export_wblist.py
import asyncio import logging import os from pathlib import Path from defence360agent.utils import ensure_line_in_file_bytes from im360 import files from im360.contracts.plugins import IDSAwareMessageSink from im360.internals import strategy from im360.simple_rpc.resident_socket import send_to_socket from im360.utils.lazy_init import RULES_CHECK_IN_PROGRESS from im360.subsys import csf logger = logging.getLogger(__name__) IPSET_RESTORE_SCRIPT = ( b"/opt/imunify360/venv/bin/python3 " b"/opt/imunify360/venv/share/imunify360/scripts/rules_checker.py " b"ipsets-consistent" ) class ExportWBList(IDSAwareMessageSink): STRATEGY = strategy.Strategy.CSF_COOP_STRATEGY AVAILABLE_ON_FREEMIUM = False async def create_sink(self, loop): self._loop = loop def _determine_csf_post_hook_script(self): """ Determine which CSF post-hook script to use based on priority rules: - If /usr/local/csf/bin/csfpost.sh exists, use it (higher priority) - Otherwise use /etc/csf/csfpost.sh - Preserve any custom content from either script Returns path to the script that should be used. """ if Path(csf.CSF_POST_HOOK_SCRIPT_USR_LOCAL).exists(): logger.info( "Using /usr/local/csf/bin/csfpost.sh as post-hook script" ) return csf.CSF_POST_HOOK_SCRIPT_USR_LOCAL else: logger.info("Using /etc/csf/csfpost.sh as post-hook script") return csf.CSF_POST_HOOK_SCRIPT_ETC def _preserve_custom_content(self, script_path): """ Preserve custom content in the CSF post-hook script by ensuring our IPSET_RESTORE_SCRIPT is added without removing other content """ script = Path(script_path) script.parent.mkdir(parents=True, exist_ok=True) if not script.exists(): script.write_text( f"#!/bin/sh\n{IPSET_RESTORE_SCRIPT.decode('utf8')}\n" ) script.chmod(0o750) return True ipset_line_added = ensure_line_in_file_bytes( script_path, IPSET_RESTORE_SCRIPT, ) if ipset_line_added: logger.info(f"Added command to restore ipsets to {script_path}") return ipset_line_added async def activate(self): """ When switching to CSF mode, some critical addresses added to csf allow list and the post-hook script is configured. """ prefix = files.Index.files_path(files.WHITELISTS) ALLOW_LIST = os.path.join(prefix, "imunify360.txt") try: # NOTE: it assumes ascii-based locale encoding/fs (very likely) include_added = ensure_line_in_file_bytes( csf.CSF_ALLOW_FILE, b"Include " + os.fsencode(ALLOW_LIST) ) if include_added: logger.info("Need to restart CSF to include imunify360.txt") script_path = self._determine_csf_post_hook_script() script_updated = self._preserve_custom_content(script_path) # Restart CSF if necessary if include_added or script_updated: logger.info("CSF config was changed, restarting CSF") while RULES_CHECK_IN_PROGRESS.exists(): await asyncio.sleep(1) await csf.restart_all() # on CSF restart we need to recheck rules immediately await send_to_socket( msg={ "method": "RECREATE_RULES", }, wait_for_response=False, ) self._mark_as_active() except asyncio.CancelledError: pass except Exception: logger.exception("Failed to activate %r plugin", self)