Edit File: waf_rules_configurator.py
""" A plugin responsible for periodically launching AppVersionDetector and optionally limiting ModSecurity rulesets for the sites that use various CMS. Currently it sets up and maintains a cron job to achieve this. """ import os from logging import getLogger from defence360agent.contracts.config import SystemConfig from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSink, expect from im360.contracts.config import Modsec as Config from defence360agent.subsys import web_server from defence360agent.subsys.persistent_state import load_state, save_state from im360.subsys.panels.hosting_panel import HostingPanel from im360.subsys.waf_rules_configurator import ( try_restore_config_from_backup, ) logger = getLogger(__name__) UPDATE_COMPONENTS_SCRIPT = ( "/opt/imunify360/venv/share/imunify360/scripts/" "update_components_versions.py" ) WAF_CONFIGURATOR_CRON_PATH = "/etc/cron.d/waf_configurator" class WAFRuleSetConfigurator(MessageSink): async def create_sink(self, loop): self._app_specific_ruleset = load_state("WAFRuleSetConfigurator").get( "app_specific_ruleset" ) if ( self._app_specific_ruleset is None or self._app_specific_ruleset != Config.APP_SPECIFIC_RULESET ): self._app_specific_ruleset = Config.APP_SPECIFIC_RULESET await try_restore_config_from_backup() async def shutdown(self): save_state( "WAFRuleSetConfigurator", {"app_specific_ruleset": self._app_specific_ruleset}, ) async def _truncate_conf(self): """ If app-specific httpd config exists and is not empty, truncate it """ try: config_path = HostingPanel().get_app_specific_waf_config() st = os.stat(config_path) except (FileNotFoundError, NotImplementedError): pass else: if st.st_size: open(config_path, "w").close() await web_server.graceful_restart() logger.info("App specific ruleset config truncated") @expect(MessageType.ConfigUpdate) async def truncate_conf(self, message): if isinstance(message["conf"], SystemConfig): enabled = Config.APP_SPECIFIC_RULESET if enabled != self._app_specific_ruleset: self._app_specific_ruleset = enabled if not enabled: await self._truncate_conf()