Coverage for mlos_bench/mlos_bench/storage/sql/alembic/env.py: 49%

61 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-14 00:55 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Alembic environment script.""" 

6# pylint: disable=no-member 

7 

8import logging 

9import sys 

10from logging.config import fileConfig 

11 

12from alembic import context 

13from alembic.migration import MigrationContext 

14from sqlalchemy import create_engine, engine_from_config, pool 

15from sqlalchemy.dialects import mysql 

16from sqlalchemy.schema import Column as SchemaColumn 

17from sqlalchemy.sql.schema import Column as MetadataColumn 

18from sqlalchemy.types import TypeEngine 

19 

20from mlos_bench.storage.sql.schema import DbSchema 

21 

22# this is the Alembic Config object, which provides 

23# access to the values within the .ini file in use. 

24config = context.config 

25 

26# Interpret the config file for Python logging. 

27# This line sets up loggers basically. 

28# Don't override the mlos_bench or pytest loggers though. 

29if config.config_file_name is not None and "alembic" in sys.argv[0]: 

30 fileConfig(config.config_file_name) 

31alembic_logger = logging.getLogger("alembic") 

32 

33# add your model's MetaData object here 

34# for 'autogenerate' support 

35# NOTE: We override the alembic.ini file programmatically in storage/sql/schema.py 

36# However, the alembic.ini file value is used during alembic CLI operations 

37# (e.g., dev ops extending the schema). 

38sqlalchemy_url = config.get_main_option("sqlalchemy.url") 

39if not sqlalchemy_url: 

40 raise ValueError("Missing sqlalchemy.url: schema changes may not be accurate.") 

41engine = create_engine(sqlalchemy_url) 

42alembic_logger.info("engine.url %s", str(engine.url)) 

43target_metadata = DbSchema(engine=engine).meta 

44 

45# other values from the config, defined by the needs of env.py, 

46# can be acquired: 

47# my_important_option = config.get_main_option("my_important_option") 

48# ... etc. 

49 

50 

51def fq_class_name(t: object) -> str: 

52 """Return the fully qualified class name of a type.""" 

53 return t.__module__ + "." + t.__class__.__name__ 

54 

55 

56def custom_compare_types( 

57 migration_context: MigrationContext, 

58 inspected_column: SchemaColumn | None, 

59 metadata_column: MetadataColumn, 

60 inspected_type: TypeEngine, 

61 metadata_type: TypeEngine, 

62) -> bool | None: 

63 """ 

64 Custom column type comparator. 

65 

66 See `Comparing Types 

67 <https://alembic.sqlalchemy.org/en/latest/autogenerate.html#comparing-types>`_ 

68 documentation for more details. 

69 

70 Notes 

71 ----- 

72 In the case of a MySQL DateTime variant, it makes sure that the floating 

73 point accuracy is met. 

74 

75 Returns 

76 ------- 

77 result : bool | None 

78 Returns True if the column specifications don't match the column (i.e., 

79 a change is needed). 

80 Returns False when the column specification and column match. 

81 Returns None to fallback to the default comparator logic. 

82 """ 

83 metadata_dialect_type = metadata_type.dialect_impl(migration_context.dialect) 

84 if alembic_logger.isEnabledFor(logging.DEBUG): 

85 alembic_logger.debug( 

86 ( 

87 "Comparing columns: " 

88 "inspected_column: [%s] %s and " 

89 "metadata_column: [%s (%s)] %s " 

90 "inspected_column.__dict__: %s\n" 

91 "inspected_column.dialect_options: %s\n" 

92 "inspected_column.dialect_kwargs: %s\n" 

93 "inspected_type.__dict__: %s\n" 

94 "metadata_column.__dict__: %s\n" 

95 "metadata_type.__dict__: %s\n" 

96 "metadata_dialect_type.__dict__: %s\n" 

97 ), 

98 fq_class_name(inspected_type), 

99 inspected_column, 

100 fq_class_name(metadata_type), 

101 fq_class_name(metadata_dialect_type), 

102 metadata_column, 

103 inspected_column.__dict__, 

104 dict(inspected_column.dialect_options) if inspected_column is not None else None, 

105 dict(inspected_column.dialect_kwargs) if inspected_column is not None else None, 

106 inspected_type.__dict__, 

107 metadata_column.__dict__, 

108 metadata_type.__dict__, 

109 metadata_dialect_type.__dict__, 

110 ) 

111 

112 # Implement a more detailed DATETIME precision comparison for MySQL. 

113 # Note: Currently also handles MariaDB. 

114 if migration_context.dialect.name == "mysql": 

115 if isinstance(metadata_dialect_type, (mysql.DATETIME, mysql.TIMESTAMP)): 

116 if not isinstance(inspected_type, type(metadata_dialect_type)): 

117 alembic_logger.info( 

118 "inspected_type %s does not match metadata_dialect_type %s", 

119 fq_class_name(inspected_type), 

120 fq_class_name(metadata_dialect_type), 

121 ) 

122 return True 

123 else: 

124 assert isinstance( 

125 inspected_type, (mysql.DATETIME, mysql.TIMESTAMP) 

126 ), "Expected inspected_type to be a MySQL DATETIME or TIMESTAMP type." 

127 if inspected_type.fsp != metadata_dialect_type.fsp: 

128 alembic_logger.info( 

129 "inspected_type.fsp (%s) and metadata_dialect_type.fsp (%s) don't match", 

130 inspected_type.fsp, 

131 metadata_dialect_type.fsp, 

132 ) 

133 return True 

134 

135 if inspected_type.timezone != metadata_dialect_type.timezone: 

136 alembic_logger.info( 

137 ( 

138 "inspected_type.timezone (%s) and " 

139 "metadata_dialect_type.timezone (%s) don't match" 

140 ), 

141 inspected_type.timezone, 

142 metadata_dialect_type.timezone, 

143 ) 

144 return True 

145 

146 if alembic_logger.isEnabledFor(logging.DEBUG): 

147 alembic_logger.debug( 

148 ( 

149 "Using default compare_type behavior for " 

150 "inspected_column: [%s] %s and " 

151 "metadata_column: [%s (%s)] %s (see above for details).\n" 

152 ), 

153 fq_class_name(inspected_type), 

154 inspected_column, 

155 fq_class_name(metadata_type), 

156 fq_class_name(metadata_dialect_type), 

157 metadata_column, 

158 ) 

159 return None # fallback to default comparison behavior 

160 

161 

162def run_migrations_offline() -> None: 

163 """ 

164 Run migrations in 'offline' mode. 

165 

166 This configures the context with just a URL and not an Engine, though an Engine is 

167 acceptable here as well. By skipping the Engine creation we don't even need a DBAPI 

168 to be available. 

169 

170 Calls to context.execute() here emit the given string to the script output. 

171 """ 

172 url = config.get_main_option("sqlalchemy.url") 

173 context.configure( 

174 url=url, 

175 target_metadata=target_metadata, 

176 literal_binds=True, 

177 dialect_opts={"paramstyle": "named"}, 

178 compare_type=custom_compare_types, 

179 ) 

180 

181 with context.begin_transaction(): 

182 context.run_migrations() 

183 

184 

185def run_migrations_online() -> None: 

186 """ 

187 Run migrations in 'online' mode. 

188 

189 In this scenario we need to create an Engine and associate a connection with the 

190 context. 

191 """ 

192 connectable = config.attributes.get("connection", None) 

193 

194 if connectable is None: 

195 # only create Engine if we don't have a Connection 

196 # from the outside 

197 connectable = engine_from_config( 

198 config.get_section(config.config_ini_section) or {}, 

199 prefix="sqlalchemy.", 

200 poolclass=pool.NullPool, 

201 ) 

202 

203 with connectable.connect() as connection: 

204 context.configure( 

205 connection=connection, 

206 target_metadata=target_metadata, 

207 compare_type=custom_compare_types, 

208 ) 

209 

210 with context.begin_transaction(): 

211 context.run_migrations() 

212 else: 

213 context.configure( 

214 connection=connectable, 

215 target_metadata=target_metadata, 

216 compare_type=custom_compare_types, 

217 ) 

218 

219 with context.begin_transaction(): 

220 context.run_migrations() 

221 

222 

223if context.is_offline_mode(): 

224 run_migrations_offline() 

225else: 

226 run_migrations_online()