Edit File: 002_move_resident_tables_into_separate_db.py
""" Move resident related tables to a separate database (imunify360-resident.db), it should be attached while running im360 migrations. """ import time from peewee import ( BooleanField, BlobField, CharField, Check, CompositeKey, FloatField, IntegerField, Model, TextField, ForeignKeyField, ) RESIDENT_TABLES = ( "iplist", "iplistrecord", "iplistpurpose", "incident", "ignore_list", "last_synclist", "messages_to_send", "blocked_port", "ignored_by_port_proto", ) TCP, UDP, ALL = "tcp", "udp", "all" class _SafeCharField(CharField): def adapt(self, value): return super().adapt(value.encode("utf-8", errors="ignore")) class Incident(Model): id = IntegerField(primary_key=True, null=True) plugin = CharField(null=True) rule = CharField(null=True) timestamp = FloatField(null=True) retries = IntegerField(null=True) severity = IntegerField(null=True) name = CharField(null=True) description = _SafeCharField(null=True) abuser = CharField(null=True) country = CharField(null=True, column_name="country_id") domain = TextField(null=True, default=None) class Meta: db_table = "incident" indexes = ((("timestamp",), False),) schema = "resident" class IPList(Model): WHITE = "WHITE" BLACK = "BLACK" GRAY = "GRAY" GRAY_SPLASHSCREEN = "GRAY_SPLASHSCREEN" IP_LISTS = (WHITE, BLACK, GRAY, GRAY_SPLASHSCREEN) SCOPE_LOCAL = "local" SCOPE_GROUP = "group" ip = CharField(null=False) listname = CharField( null=False, constraints=[Check("listname in ('{}')".format("','".join(IP_LISTS)))], ) expiration = IntegerField(default=0, null=True) imported_from = CharField(null=True) ctime = IntegerField(null=True, default=lambda: int(time.time())) deep = IntegerField(null=True) comment = CharField(null=True) country = CharField(null=True, column_name="country_id") captcha_passed = BooleanField(null=False, default=False) manual = BooleanField(null=False, default=True) full_access = BooleanField(null=True) auto_whitelisted = BooleanField(null=True, default=False) network_address = IntegerField(null=False) netmask = IntegerField(null=False) version = IntegerField(null=False) scope = CharField( null=True, constraints=[ Check("scope in ('%s','%s')" % (SCOPE_LOCAL, SCOPE_GROUP)) ], ) class Meta: db_table = "iplist" primary_key = CompositeKey( "network_address", "netmask", "version", "listname" ) schema = "resident" class IPListRecord(Model): network_address = IntegerField(null=False) netmask = IntegerField(null=False) version = IntegerField(null=False) iplist_id = IntegerField(null=False) class Meta: db_table = "iplistrecord" primary_key = CompositeKey( "network_address", "netmask", "version", "iplist_id" ) schema = "resident" class IPListPurpose(Model): purpose = CharField(null=False) iplist_id = IntegerField(null=False) class Meta: db_table = "iplistpurpose" primary_key = CompositeKey("purpose", "iplist_id") schema = "resident" class IgnoreList(Model): ip = CharField(null=False) network_address = IntegerField(null=False) netmask = IntegerField(null=False) version = IntegerField(null=False) class Meta: db_table = "ignore_list" primary_key = CompositeKey("network_address", "netmask", "version") schema = "resident" class LastSynclist(Model): timestamp = FloatField(null=True, default=0) name = CharField(null=False, primary_key=True) class Meta: db_table = "last_synclist" schema = "resident" class MessageToSend(Model): timestamp = FloatField(null=False) message = BlobField(null=False) class Meta: db_table = "messages_to_send" schema = "resident" class BlockedPort(Model): port = IntegerField(null=False) proto = CharField( null=False, constraints=[Check(f"proto in ('{TCP}', '{UDP}', '{ALL}')")], ) comment = CharField(null=True) class Meta: db_table = "blocked_port" schema = "resident" indexes = ((("port", "proto"), True),) class IgnoredByPort(Model): """Ignored IPs for ports blocked via :class:`BlockedPort`.""" port_proto = ForeignKeyField( BlockedPort, null=False, on_delete="CASCADE", related_name="ips" ) ip = CharField(null=False) comment = CharField(null=True) network_address = IntegerField(null=False) netmask = IntegerField(null=False) version = IntegerField(null=False) country = CharField(null=True, column_name="country_id") class Meta: db_table = "ignored_by_port_proto" schema = "resident" indexes = ( # create an unique on port/ip (("port_proto_id", "ip"), True), ) # We hit a bug in peewee, which generates wrong sql on table creation, # which is why create_table overridden # Namely: # REFERENCES "resident"."blocked_port" # ^^^^^^^ this part should be ommited @classmethod def create_table(cls, safe=True, **options): cls._meta.database.execute_sql( """ CREATE TABLE IF NOT EXISTS resident.ignored_by_port_proto ( "id" INTEGER NOT NULL PRIMARY KEY, "port_proto_id" INTEGER NOT NULL, "ip" VARCHAR(255) NOT NULL, "comment" VARCHAR(255), "network_address" INTEGER NOT NULL, "netmask" INTEGER NOT NULL, "version" INTEGER NOT NULL, "country_id" VARCHAR(255), FOREIGN KEY ("port_proto_id") REFERENCES "blocked_port" ("id") ON DELETE CASCADE )""" ) cls._schema.create_indexes(safe=safe) def migrate(migrator, database, fake=False, **kwargs): # create new tables migrator.create_model(Incident) migrator.create_model(IPList) migrator.create_model(IPListRecord) migrator.create_model(IPListPurpose) migrator.create_model(IgnoreList) migrator.create_model(LastSynclist) migrator.create_model(MessageToSend) migrator.create_model(BlockedPort) migrator.create_model(IgnoredByPort) to_drop = [] for table in RESIDENT_TABLES: not_nulls_fields = [ f"({field.column_name} is not null)" for field in migrator.orm[table]._meta.sorted_fields if not field.null ] if not_nulls_fields: not_nulls_cond = f"WHERE ({' AND '.join(not_nulls_fields)})" else: not_nulls_cond = "" columns = ",".join( f.column_name for f in migrator.orm[table]._meta.sorted_fields ) # copy the data to a new table migrator.sql( f"INSERT INTO resident.{table} ({columns}) " f"SELECT {columns} FROM {table} {not_nulls_cond}" ) # We can't remove old tables right away, # because any foreginkey references on_delete="CASCADE" # (e.g. IgnoredByPort.port_proto) will be dropped to_drop.append(f"DROP TABLE {table}") for table in to_drop: migrator.sql(table) # re-create indexes # old indexes should be deleted automatically after deleting old tables # https://sqlite.org/lang_droptable.html for table, field in [ ("ignored_by_port_proto", "country_id"), ("incident", "country_id"), ("iplist", "country_id"), ("iplist", "listname"), ("iplist", "expiration"), ("iplist", "ip"), ]: migrator.sql( f'CREATE INDEX IF NOT EXISTS resident."{table}_{field}" ON' f" {table} ({field})" ) def rollback(migrator, database, fake=False, **kwargs): pass