Coverage for mlos_bench/mlos_bench/storage/sql/alembic/env.py: 49%
61 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-14 00:55 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-14 00:55 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Alembic environment script."""
6# pylint: disable=no-member
8import logging
9import sys
10from logging.config import fileConfig
12from alembic import context
13from alembic.migration import MigrationContext
14from sqlalchemy import create_engine, engine_from_config, pool
15from sqlalchemy.dialects import mysql
16from sqlalchemy.schema import Column as SchemaColumn
17from sqlalchemy.sql.schema import Column as MetadataColumn
18from sqlalchemy.types import TypeEngine
20from mlos_bench.storage.sql.schema import DbSchema
22# this is the Alembic Config object, which provides
23# access to the values within the .ini file in use.
24config = context.config
26# Interpret the config file for Python logging.
27# This line sets up loggers basically.
28# Don't override the mlos_bench or pytest loggers though.
29if config.config_file_name is not None and "alembic" in sys.argv[0]:
30 fileConfig(config.config_file_name)
31alembic_logger = logging.getLogger("alembic")
33# add your model's MetaData object here
34# for 'autogenerate' support
35# NOTE: We override the alembic.ini file programmatically in storage/sql/schema.py
36# However, the alembic.ini file value is used during alembic CLI operations
37# (e.g., dev ops extending the schema).
38sqlalchemy_url = config.get_main_option("sqlalchemy.url")
39if not sqlalchemy_url:
40 raise ValueError("Missing sqlalchemy.url: schema changes may not be accurate.")
41engine = create_engine(sqlalchemy_url)
42alembic_logger.info("engine.url %s", str(engine.url))
43target_metadata = DbSchema(engine=engine).meta
45# other values from the config, defined by the needs of env.py,
46# can be acquired:
47# my_important_option = config.get_main_option("my_important_option")
48# ... etc.
51def fq_class_name(t: object) -> str:
52 """Return the fully qualified class name of a type."""
53 return t.__module__ + "." + t.__class__.__name__
56def custom_compare_types(
57 migration_context: MigrationContext,
58 inspected_column: SchemaColumn | None,
59 metadata_column: MetadataColumn,
60 inspected_type: TypeEngine,
61 metadata_type: TypeEngine,
62) -> bool | None:
63 """
64 Custom column type comparator.
66 See `Comparing Types
67 <https://alembic.sqlalchemy.org/en/latest/autogenerate.html#comparing-types>`_
68 documentation for more details.
70 Notes
71 -----
72 In the case of a MySQL DateTime variant, it makes sure that the floating
73 point accuracy is met.
75 Returns
76 -------
77 result : bool | None
78 Returns True if the column specifications don't match the column (i.e.,
79 a change is needed).
80 Returns False when the column specification and column match.
81 Returns None to fallback to the default comparator logic.
82 """
83 metadata_dialect_type = metadata_type.dialect_impl(migration_context.dialect)
84 if alembic_logger.isEnabledFor(logging.DEBUG):
85 alembic_logger.debug(
86 (
87 "Comparing columns: "
88 "inspected_column: [%s] %s and "
89 "metadata_column: [%s (%s)] %s "
90 "inspected_column.__dict__: %s\n"
91 "inspected_column.dialect_options: %s\n"
92 "inspected_column.dialect_kwargs: %s\n"
93 "inspected_type.__dict__: %s\n"
94 "metadata_column.__dict__: %s\n"
95 "metadata_type.__dict__: %s\n"
96 "metadata_dialect_type.__dict__: %s\n"
97 ),
98 fq_class_name(inspected_type),
99 inspected_column,
100 fq_class_name(metadata_type),
101 fq_class_name(metadata_dialect_type),
102 metadata_column,
103 inspected_column.__dict__,
104 dict(inspected_column.dialect_options) if inspected_column is not None else None,
105 dict(inspected_column.dialect_kwargs) if inspected_column is not None else None,
106 inspected_type.__dict__,
107 metadata_column.__dict__,
108 metadata_type.__dict__,
109 metadata_dialect_type.__dict__,
110 )
112 # Implement a more detailed DATETIME precision comparison for MySQL.
113 # Note: Currently also handles MariaDB.
114 if migration_context.dialect.name == "mysql":
115 if isinstance(metadata_dialect_type, (mysql.DATETIME, mysql.TIMESTAMP)):
116 if not isinstance(inspected_type, type(metadata_dialect_type)):
117 alembic_logger.info(
118 "inspected_type %s does not match metadata_dialect_type %s",
119 fq_class_name(inspected_type),
120 fq_class_name(metadata_dialect_type),
121 )
122 return True
123 else:
124 assert isinstance(
125 inspected_type, (mysql.DATETIME, mysql.TIMESTAMP)
126 ), "Expected inspected_type to be a MySQL DATETIME or TIMESTAMP type."
127 if inspected_type.fsp != metadata_dialect_type.fsp:
128 alembic_logger.info(
129 "inspected_type.fsp (%s) and metadata_dialect_type.fsp (%s) don't match",
130 inspected_type.fsp,
131 metadata_dialect_type.fsp,
132 )
133 return True
135 if inspected_type.timezone != metadata_dialect_type.timezone:
136 alembic_logger.info(
137 (
138 "inspected_type.timezone (%s) and "
139 "metadata_dialect_type.timezone (%s) don't match"
140 ),
141 inspected_type.timezone,
142 metadata_dialect_type.timezone,
143 )
144 return True
146 if alembic_logger.isEnabledFor(logging.DEBUG):
147 alembic_logger.debug(
148 (
149 "Using default compare_type behavior for "
150 "inspected_column: [%s] %s and "
151 "metadata_column: [%s (%s)] %s (see above for details).\n"
152 ),
153 fq_class_name(inspected_type),
154 inspected_column,
155 fq_class_name(metadata_type),
156 fq_class_name(metadata_dialect_type),
157 metadata_column,
158 )
159 return None # fallback to default comparison behavior
162def run_migrations_offline() -> None:
163 """
164 Run migrations in 'offline' mode.
166 This configures the context with just a URL and not an Engine, though an Engine is
167 acceptable here as well. By skipping the Engine creation we don't even need a DBAPI
168 to be available.
170 Calls to context.execute() here emit the given string to the script output.
171 """
172 url = config.get_main_option("sqlalchemy.url")
173 context.configure(
174 url=url,
175 target_metadata=target_metadata,
176 literal_binds=True,
177 dialect_opts={"paramstyle": "named"},
178 compare_type=custom_compare_types,
179 )
181 with context.begin_transaction():
182 context.run_migrations()
185def run_migrations_online() -> None:
186 """
187 Run migrations in 'online' mode.
189 In this scenario we need to create an Engine and associate a connection with the
190 context.
191 """
192 connectable = config.attributes.get("connection", None)
194 if connectable is None:
195 # only create Engine if we don't have a Connection
196 # from the outside
197 connectable = engine_from_config(
198 config.get_section(config.config_ini_section) or {},
199 prefix="sqlalchemy.",
200 poolclass=pool.NullPool,
201 )
203 with connectable.connect() as connection:
204 context.configure(
205 connection=connection,
206 target_metadata=target_metadata,
207 compare_type=custom_compare_types,
208 )
210 with context.begin_transaction():
211 context.run_migrations()
212 else:
213 context.configure(
214 connection=connectable,
215 target_metadata=target_metadata,
216 compare_type=custom_compare_types,
217 )
219 with context.begin_transaction():
220 context.run_migrations()
223if context.is_offline_mode():
224 run_migrations_offline()
225else:
226 run_migrations_online()