Edit File: plugin.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import json import logging import os import pwd import sentry_sdk import shutil import time from collections import defaultdict from distutils.version import LooseVersion from pathlib import Path from defence360agent.api import inactivity from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from defence360agent.utils import atomic_rewrite, check_run from imav.model.wordpress import WPSite from imav.wordpress import cli, telemetry, PLUGIN_VERSION_FILE from imav.wordpress.utils import ( build_command_for_user, calculate_next_scan_timestamp, clear_get_cagefs_enabled_users_cache, get_last_scan, get_malware_history, prepare_scan_data, get_data_dir, ) from imav.wordpress.site_repository import ( delete_site, get_outdated_sites, get_sites_for_user, get_sites_to_install, get_sites_to_mark_as_manually_deleted, get_sites_to_uninstall, insert_installed_sites, mark_site_as_manually_deleted, update_site_version, ) from imav.wordpress.proxy_auth import setup_site_authentication logger = logging.getLogger(__name__) COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) def site_search(items: dict, user_info: pwd.struct_passwd, matcher) -> dict: # Get all WordPress sites for the user (the main site is always last) user_sites = get_sites_for_user(user_info) result = {path: [] for path in user_sites} for item in items: # Find all matching sites for this item matching_sites = [path for path in user_sites if matcher(item, path)] if matching_sites: # Find the most specific (longest) matching path most_specific_site = max(matching_sites, key=len) result[most_specific_site].append(item) return result async def _get_scan_data_for_user(sink, user_info: pwd.struct_passwd): # Get the last scan data last_scan = await get_last_scan(sink, user_info.pw_name) # Extract the last scan date last_scan_time = last_scan.get("scan_date", None) next_scan_time = None if MalwareScanSchedule.INTERVAL != Interval.NONE: next_scan_time = calculate_next_scan_timestamp() # Get the malware history for the user malware_history = get_malware_history(user_info.pw_name) # Split malware history by site. This part relies on the main site being the last one in the list. # Without this all malware could be attributed to the main site. malware_by_site = site_search( malware_history, user_info, lambda item, path: item["resource_type"] == "file" and item["file"].startswith(path), ) return last_scan_time, next_scan_time, malware_by_site async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore): async with semaphore: try: await coro except Exception as e: logger.error(f"Telemetry task failed: {e}") async def process_telemetry_tasks(coroutines: list, concurrency=10): """ Process a list of telemetry coroutines with a concurrency limit.s """ if not coroutines: return semaphore = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(_send_telemetry_task(coro, semaphore)) for coro in coroutines ] try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Some telemetry tasks failed: {e}") async def install_everywhere(sink): """Install the imunify-security plugin for all sites where it is not installed.""" logger.info("Installing imunify-security wp plugin") # Keep track of the installed sites installed = set() authenticated = set() failed = set() telemetry_coros = [] with inactivity.track.task("wp-plugin-installation"): try: clear_get_cagefs_enabled_users_cache() to_install = get_sites_to_install() if not to_install: return # Group sites by user id sites_by_user = defaultdict(list) for site in to_install: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: user_info = pwd.getpwuid(uid) username = user_info.pw_name except Exception as error: sentry_sdk.capture_message( "Skipping installation of WordPress plugin on" " {count} site(s) because they belong to user" " {user} and it is not possible to retrieve" " username for this user. Reason: {reason}".format( count=len(sites), user=uid, reason=error, ), level="warning", ) continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, user_info) for site in sites: if await remove_site_if_missing(sink, site): continue try: # Check if site is correctly installed and accessible using WP CLI is_wordpress_installed = ( await cli.is_wordpress_installed(site) ) if not is_wordpress_installed: sentry_sdk.capture_message( "WordPress site is not accessible using WP" " CLI. site={site}".format(site=site), level="warning", ) continue # Prepare scan data scan_data = prepare_scan_data( last_scan_time, next_scan_time, username, site, malware_by_site, ) # Create the scan data file await update_scan_data_file(site, scan_data) await update_site_auth( site, user_info, authenticated, failed ) # Install the plugin await cli.plugin_install(site) # Get the version of the plugin version = await cli.get_plugin_version(site) if not version: installed.add(site) else: installed.add( WPSite.build_with_version(site, version) ) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="installed_by_imunify", site=site, version=version, ) ) except Exception as error: logger.error( "Failed to install plugin to site=%s error=%r", site, error, ) logger.info( "Installed imunify-security wp plugin on %d sites", len(installed), ) if failed: logger.warning( "Failed to authenticate %d sites", len(failed), ) except asyncio.CancelledError: logger.info( "Installation imunify-security wp plugin was cancelled. Plugin" " was installed for %d sites", len(installed), ) except Exception as error: logger.error( "Error occurred during plugin installation. error=%r", error ) raise finally: # Insert the installed sites into the database insert_installed_sites(installed) # Send telemetry await process_telemetry_tasks(telemetry_coros) def get_latest_plugin_version() -> str: """Get the latest version of the imunify-security plugin from the version file.""" try: if not PLUGIN_VERSION_FILE.exists(): logger.error( "Plugin version file does not exist: %s", PLUGIN_VERSION_FILE ) return None return PLUGIN_VERSION_FILE.read_text().strip() except Exception as e: logger.error("Failed to read plugin version file: %s", e) return None async def update_everywhere(sink): """Update the imunify-security plugin on all sites where it is installed.""" latest_version = get_latest_plugin_version() if not latest_version: logger.error("Could not determine latest plugin version") return logger.info( "Updating imunify-security wp plugin to the latest version %s", latest_version, ) updated = set() telemetry_coros = [] with inactivity.track.task("wp-plugin-update"): try: # Get sites with outdated versions outdated_sites = get_outdated_sites(latest_version) logger.info(f"Found {len(outdated_sites)} outdated sites") if not outdated_sites: return # Group sites by user id sites_by_user = defaultdict(list) for site in outdated_sites: sites_by_user[site.uid].append(site) # Process each user's sites for uid, sites in sites_by_user.items(): try: user_info = pwd.getpwuid(uid) username = user_info.pw_name except Exception as error: logger.error( "Failed to get username for uid=%d. error=%s", uid, error, ) continue # Get scan data once for all sites of this user ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, user_info) for site in sites: if await remove_site_if_missing(sink, site): continue try: # Check if site still exists if not await cli.is_wordpress_installed(site): logger.info( "WordPress site no longer exists: %s", site ) continue # Prepare scan data scan_data = prepare_scan_data( last_scan_time, next_scan_time, username, site, malware_by_site, ) # Update the scan data file await update_scan_data_file(site, scan_data) # Now update the plugin await cli.plugin_update(site) updated.add(site) # Get the version after update version = await cli.get_plugin_version(site) if version: # Store original version for comparison original_version = site.version # Update the database with the new version update_site_version(site, version) # Create a new WPSite with updated version site = site.build_with_version(version) # Determine if this is a downgrade is_downgrade = LooseVersion( version ) < LooseVersion(original_version) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="downgraded_by_imunify" if is_downgrade else "updated_by_imunify", site=site, version=version, ) ) except Exception as error: logger.error( "Failed to update plugin on site=%s error=%s", site, error, ) logger.info( "Updated imunify-security wp plugin on %d sites", len(updated), ) except asyncio.CancelledError: logger.info( "Update of imunify-security wp plugin was cancelled. Plugin" " was updated on %d sites", len(updated), ) except Exception as error: logger.error( "Error occurred during plugin update. error=%s", error ) raise finally: # Send telemetry await process_telemetry_tasks(telemetry_coros) async def delete_plugin_files(site: WPSite): data_dir = get_data_dir(site) if data_dir.exists(): await asyncio.to_thread(shutil.rmtree, data_dir) async def remove_from_single_site(site: WPSite, sink, telemetry_coros) -> int: """ Remove the imunify-security plugin from a single site, including all cleanup and telemetry. Returns the number of affected sites (should be 1 if deletion was successful). This function is intended to be protected with asyncio.shield to ensure it completes even if the parent task is cancelled. """ try: # Check if site is still installed and accessible using WP CLI is_installed = await cli.is_plugin_installed(site) if not is_installed: # Plugin is no longer installed. It was removed manually by the user. mark_site_as_manually_deleted(site, time.time()) # Send telemetry for manual removal telemetry_coros.append( telemetry.send_event( sink=sink, event="removed_by_user", site=site, version=site.version, ) ) return 0 # Get the version of the plugin (for telemetry data) version = await cli.get_plugin_version(site) # Uninstall the plugin from WordPress site. await cli.plugin_uninstall(site) # Delete the data files from the site. await delete_plugin_files(site) # Delete the site from database. affected = delete_site(site) # Send telemetry for successful uninstall telemetry_coros.append( telemetry.send_event( sink=sink, event="uninstalled_by_imunify", site=site, version=version, ) ) return affected except Exception as error: # Log any error that occurs during the removal process logger.error("Failed to remove plugin from %s %s", site, error) return 0 async def remove_all_installed(sink): """Remove the imunify-security plugin from all sites where it is installed.""" logger.info("Deleting imunify-security wp plugin") telemetry_coros = [] affected = 0 with inactivity.track.task("wp-plugin-removal"): try: to_remove = get_sites_to_uninstall() for site in to_remove: try: affected += await asyncio.shield( remove_from_single_site(site, sink, telemetry_coros) ) except asyncio.CancelledError: logger.info( "Deleting imunify-security wp plugin was cancelled." " Plugin was deleted from %d sites (out of %d)", affected, len(to_remove), ) except Exception as error: logger.error("Error occurred during plugin deleting. %s", error) raise finally: logger.info( "Removed imunify-security wp plugin from %s sites", affected, ) # send telemetry await process_telemetry_tasks(telemetry_coros) async def tidy_up_manually_deleted(sink): telemetry_coros = [] try: to_mark_as_manually_removed = get_sites_to_mark_as_manually_deleted() if to_mark_as_manually_removed: now = time.time() for site in to_mark_as_manually_removed: mark_site_as_manually_deleted(site, now) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="removed_by_user", site=site, version=site.version, ) ) except Exception as error: logger.error("Error occurred during site tidy up. %s", error) finally: if telemetry_coros: await process_telemetry_tasks(telemetry_coros) async def update_data_on_sites(sink, sites: list[WPSite]): if not sites: return # Group sites by user id sites_by_user = defaultdict(list) for site in sites: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: user_info = pwd.getpwuid(uid) username = user_info.pw_name except Exception as error: logger.error( "Failed to get username for uid=%d. error=%s", uid, error, ) continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, user_info) for site in sites: if await remove_site_if_missing(sink, site): continue try: # Prepare scan data scan_data = prepare_scan_data( last_scan_time, next_scan_time, username, site, malware_by_site, ) # Update the scan data file await update_scan_data_file(site, scan_data) except Exception as error: logger.error( "Failed to update scan data on site=%s error=%s", site, error, ) async def update_scan_data_file(site: WPSite, scan_data: dict): # Get the gid for the given user user_info = pwd.getpwuid(site.uid) gid = user_info.pw_gid # Create data directory data_dir = get_data_dir(site) if os.path.islink(data_dir): # If the data directory is a symlink, interrupt the process. raise Exception( "Data directory %s is a symlink, skipping.", str(data_dir) ) if not data_dir.exists(): command = build_command_for_user( user_info.pw_name, [ "mkdir", "-p", str(data_dir), ], ) await check_run(command) if not data_dir.exists(): # Directory creation failed. Interrupt the process. raise Exception( "Failed to create directory %s for user %s", str(data_dir), user_info.pw_name, ) # we can safely change the permissions of the directory because we just created it data_dir.chmod(0o750) scan_data_path = data_dir / "scan_data.php" # Format the PHP file content php_content = ( "<?php\n" "if ( ! defined( 'WPINC' ) ) {\n" "\texit;\n" "}\n" "return json_decode( '" + json.dumps(scan_data).replace("'", "\\'") + "', true );" ) # Check if the file exists, create an empty file if it doesn't if not scan_data_path.exists(): scan_data_path.touch() # Write the formatted PHP file atomic_rewrite( scan_data_path, php_content, backup=False, uid=site.uid, gid=gid, permissions=0o400, ) async def update_auth_everywhere(): """Update auth.php files for all existing WordPress sites.""" logger.info("Updating auth.php files for existing WordPress sites") updated = set() failed = set() with inactivity.track.task("wp-auth-update"): try: clear_get_cagefs_enabled_users_cache() # Get all installed sites instead of sites to install # get_sites_to_uninstall returns all active sites from db installed_sites = get_sites_to_uninstall() if not installed_sites: logger.info("No installed WordPress sites found") return sites_by_user = defaultdict(list) for site in installed_sites: sites_by_user[site.uid].append(site) # Process users concurrently tasks = [] for uid, sites in sites_by_user.items(): try: user_info = pwd.getpwuid(uid) # Create tasks for all sites of this user for site in sites: task = update_site_auth( site, user_info, updated, failed ) tasks.append(task) except Exception as error: sentry_sdk.capture_message( "Skipping auth update for WordPress sites on" " {count} site(s) because they belong to user" " {user} and it is not possible to retrieve" " username for this user. Reason: {reason}".format( count=len(sites), user=uid, reason=error, ), level="warning", ) continue # Run all site updates concurrently with a reasonable limit # Adjust max_concurrent based on your system's I/O capacity max_concurrent = 10 for i in range(0, len(tasks), max_concurrent): batch = tasks[i : i + max_concurrent] await asyncio.gather(*batch, return_exceptions=True) logger.info( "Updated auth.php files for %d WordPress sites, %d failed", len(updated), len(failed), ) except asyncio.CancelledError: logger.info( "Auth update for WordPress sites was cancelled. Auth was" " updated for %d sites", len(updated), ) except Exception as error: logger.error("Error occurred during auth update. error=%s", error) raise async def update_site_auth(site, user_info, updated, failed): """Process authentication setup for a single site.""" try: await setup_site_authentication(site, user_info) updated.add(site) except Exception as error: failed.add(site) logger.error( "Failed to update auth for site=%s error=%s", site, error, ) async def remove_site_if_missing(sink, site: WPSite) -> bool: """ Checks if the site directory exists. If not, sends a 'site_removed' telemetry event and removes the site from the local database. Returns True if the site was removed (directory missing), False otherwise. Parameters: sink: The telemetry/event sink. site: The WPSite object to check and potentially remove. Side effect: If the site is missing, it will be removed from the database and a telemetry event will be sent. """ if os.path.isdir(site.docroot): return False await telemetry.send_event( sink=sink, event="site_removed", site=site, version=site.version ) delete_site(site) return True