Edit File: plesk_notifications.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging from functools import lru_cache from pathlib import Path from defence360agent.subsys.panels import hosting_panel, plesk from defence360agent.contracts.plugins import MessageSink from defence360agent.contracts.hooks import HooksConfig from defence360agent.subsys import notifier logger = logging.getLogger(__name__) SCRIPT_PATH = "/usr/local/psa/admin/plib/modules/imunify360/scripts/send-notifications.php" HOOK_PATH = "/opt/imunify360/venv/share/imunify360/scripts/send-notifications" EVENTS = [ "CUSTOM_SCAN_MALWARE_FOUND", "USER_SCAN_MALWARE_FOUND", "REALTIME_MALWARE_FOUND", ] class PleskNotificationsHooks(MessageSink): async def create_sink(self, loop): """MessageSink method""" if hosting_panel.HostingPanel().NAME != plesk.Plesk.NAME: return if self.is_supported(): if not self.is_applied(): await self.add_hooks() else: await self.remove_hooks() @lru_cache(maxsize=1) def is_supported(self) -> bool: return Path(SCRIPT_PATH).exists() and Path(HOOK_PATH).exists() def is_applied(self) -> bool: config = HooksConfig().get() config_rules = config.get("rules", {}) if not all(event in config_rules for event in EVENTS): return False return all( HOOK_PATH in config_rules[event].get("SCRIPT", {}).get("scripts", []) for event in EVENTS ) async def add_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } for event in EVENTS: if event not in data["rules"]: data["rules"][event] = {} updated = False for event in EVENTS: rule = data["rules"][event] if HOOK_PATH not in rule.get("SCRIPT", {}).get("scripts", []): if "SCRIPT" not in rule: rule["SCRIPT"] = {} rule["SCRIPT"]["scripts"] = [] elif "scripts" not in rule["SCRIPT"]: rule["SCRIPT"]["scripts"] = [] rule["SCRIPT"]["enabled"] = True rule["SCRIPT"]["scripts"].append(HOOK_PATH) updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks added and configuration updated") async def remove_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH in rule.get("SCRIPT", {}).get("scripts", []): rule["SCRIPT"]["scripts"].remove(HOOK_PATH) rule["SCRIPT"]["enabled"] = len(rule["SCRIPT"]["scripts"]) != 0 updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks removed and configuration updated")