Edit File: __init__.py
import logging import os import time from typing import Set from restore_infected.backup_backends_lib import ( BackupBase, backend_auth_required, ) from restore_infected.helpers import DateTime, from_env from . import api, config from .api import ClusterlogicsConnector from .exceptions import RestoreError, TokenFileNotFoundError CLUSTERLOGICS_DIR = '/usr/local/clusterlogics' auth_required = backend_auth_required( config.TOKEN_FILE, "Initialize ClusterlogicsClient first!") config.check_existing_tokens() def is_suitable(): return os.path.exists(CLUSTERLOGICS_DIR) class ClusterlogicsClient: """ ClusterlogicsClient API wrapper """ RESTORE_TIMEOUT = 300 RESTORE_RETRY_TIMEOUT = 10 DEFAULT_URL = 'https://manage.clusterlogics.com/' _instance = None def __new__(cls, **kwargs): if cls._instance is None: cls._instance = ClusterlogicsConnector.__new__(cls) return cls._instance def __init__(self, **kwargs): self.logger = logging.getLogger(self.__class__.__qualname__) self.logger.info('Initializing Clusterlogics backend') self.token_file = config.TokenFile(config.TOKEN_FILE) self.token_file.token = kwargs or self.token_file.token self.api = ClusterlogicsConnector(**self.token_file.token) self.logger.info('Clusterlogics initialized successfully') @classmethod def get_instance(cls): if not cls._instance: ClusterlogicsClient() return cls._instance @classmethod def get_api(cls): return cls.get_instance().api @property def token(self): return self.token_file.token def raise_on_invalid_creds(self): self.api.get_supported_systems() def get_backup_jobs(self, until, path): self.logger.info('Retrieving backup jobs for path %s', path) jobs = self.api.jobs(until, path) self.logger.info('Backup jobs for this path: %d', len(jobs)) return [ ClusterlogicsBackupJob( path, job['starttime_sql'], job['jobid'] ) for job in jobs ] class Job: _TERMINATED = ['A', 'E', 'e', 'f', 'T', 'W'] def __init__(self, jobid): self.id = jobid self.api = ClusterlogicsClient.get_api() self.logger = logging.getLogger(self.__class__.__name__) def __str__(self): return '{} {}'.format(self.__class__.__name__, self.id) @property def status(self): status = self.api.status(self.id)['jobstatus'] self.logger.info('Job status: %s', status) return status @property def terminated(self): done = self.status in self._TERMINATED self.logger.info('Job %s is terminated: %s', self.id, done) return done class FileData: def __init__(self, filename, size, mtime, backup): self.filename = filename self.size = size self.mtime = mtime self.backup = backup def __repr__(self): return '{} [{} bytes] {}'.format(self.mtime, self.size, self.filename) class ClusterlogicsBackupJob(BackupBase, Job): def __init__(self, path, created, jobid): BackupBase.__init__(self, path, created) Job.__init__(self, jobid) def __str__(self): return Job.__str__(self) def close(self): pass def file_data(self, path): stat = self.api.fileinfo(self.id, path) if not stat or 'error' in stat: raise FileNotFoundError() size = stat['size'] mtime = DateTime.fromtimestamp(stat['mtime']) return FileData(path, size, mtime, backup=self) def restore( self, items: Set[FileData], destination_folder="/usr/local/clusterlogics/tmp/" ): """ Schedule a file restore job and wait for it to complete """ result = {} for item in items: path = item.filename self.logger.info( 'Requested to restore %s from job #%s', path, self.id ) # FIXME: Use one API call to restore all items r = self.api.restore(self.id, path, destination_folder) jobid = r['job'] job = Job(jobid) started = time.time() while time.time() - started < ClusterlogicsClient.RESTORE_TIMEOUT: time.sleep(ClusterlogicsClient.RESTORE_RETRY_TIMEOUT) if job.terminated: self.logger.info('Restore finished for %s', path) result[ os.path.join( destination_folder, os.path.basename(path) ) ] = path break else: raise RestoreError( 'Restore job for {} took too long!'.format(path) ) return result @from_env(username="ACCOUNT_NAME", apikey="API_KEY") def init( username, apikey, client=None, url=ClusterlogicsClient.DEFAULT_URL, ): client = ClusterlogicsClient( username=username, apikey=apikey, client=client, url=url ) client.raise_on_invalid_creds() @auth_required def backups(until=None, path=None): return ClusterlogicsClient.get_instance().get_backup_jobs(until, path) @auth_required def info(): return ClusterlogicsClient.get_instance().token