import hashlib import logging import sys import traceback from pathlib import Path import snowflake.connector import structlog from structlog import BoundLogger from schemachange.CLIScriptExecutionError import CLIScriptExecutionError from schemachange.config.get_merged_config import get_merged_config from schemachange.config.RenderConfig import RenderConfig from schemachange.deploy import deploy from schemachange.JinjaTemplateProcessor import JinjaTemplateProcessor from schemachange.redact_config_secrets import redact_config_secrets from schemachange.ScriptExecutionError import ScriptExecutionError from schemachange.session.SnowflakeSession import SnowflakeSession # region Global Variables # metadata SCHEMACHANGE_VERSION = "4.3.3" SNOWFLAKE_APPLICATION_NAME = "schemachange" module_logger = structlog.getLogger(__name__) def render(config: RenderConfig, script_path: Path, logger: BoundLogger) -> None: """ Renders the provided script. Note: does not apply secrets filtering. """ # Always process with jinja engine jinja_processor = JinjaTemplateProcessor(project_root=config.root_folder, modules_folder=config.modules_folder) content = jinja_processor.render(jinja_processor.relpath(script_path), config.config_vars) checksum = hashlib.sha224(content.encode("utf-8")).hexdigest() logger.info("Success", checksum=checksum, content=content) def verify(config, logger: BoundLogger) -> None: """ Verifies Snowflake connectivity and displays configuration parameters. Tests the connection to Snowflake using the provided configuration and reports the connection status along with the configuration parameters being used. """ logger.info("=" * 80) logger.info("Schemachange Configuration Verification") logger.info("=" * 80) # Display schemachange-specific configuration logger.info("") logger.info("Schemachange Configuration:") logger.info(f" Config File: {config.config_file_path}") logger.info(f" Root Folder: {config.root_folder}") if config.modules_folder: logger.info(f" Modules Folder: {config.modules_folder}") logger.info(f" Log Level: {logging.getLevelName(config.log_level)}") logger.info(f" Config Variables: {len(config.config_vars)} variable(s) defined") # Display deploy-specific configuration if available if hasattr(config, "change_history_table"): logger.info("") logger.info("Deploy Configuration:") logger.info(f" Change History Table: {config.change_history_table}") logger.info(f" Create Change History Table: {config.create_change_history_table}") logger.info(f" Autocommit: {config.autocommit}") logger.info(f" Dry Run: {config.dry_run}") if config.query_tag: logger.info(f" Query Tag: {config.query_tag}") # Display Snowflake connection configuration session_kwargs = config.get_session_kwargs() logger.info("") logger.info("Snowflake Connection Configuration:") # Display connections.toml settings if used (read from config object, not session_kwargs) # Check both the attribute existence and value has_file_path = hasattr(config, "connections_file_path") file_path_value = getattr(config, "connections_file_path", None) if has_file_path else None has_conn_name = hasattr(config, "connection_name") conn_name_value = getattr(config, "connection_name", None) if has_conn_name else None if file_path_value: logger.info(f" Connections File: {file_path_value}") if conn_name_value: logger.info(f" Connection Name: {conn_name_value}") # Add diagnostic output when using connections.toml if config.log_level <= logging.DEBUG: logger.debug("") logger.debug("=== DIAGNOSTIC INFO ===") logger.debug(f"Config object has connections_file_path: {hasattr(config, 'connections_file_path')}") if hasattr(config, "connections_file_path"): logger.debug(f" Value: {config.connections_file_path}") logger.debug(f" Type: {type(config.connections_file_path)}") logger.debug(f"Config object has connection_name: {hasattr(config, 'connection_name')}") if hasattr(config, "connection_name"): logger.debug(f" Value: {config.connection_name}") logger.debug(f"Session kwargs keys: {list(session_kwargs.keys())}") logger.debug( f"Session kwargs (masked): {', '.join([k for k in session_kwargs.keys() if k not in ['password', 'token', 'private_key_passphrase', 'private_key_file_pwd']])}" ) # Check what snowflake_ parameters are in the config config_attrs = { k: getattr(config, k) for k in dir(config) if k.startswith("snowflake_") and not k.startswith("__") } logger.debug(f"Config snowflake_* attributes: {list(config_attrs.keys())}") for k, v in config_attrs.items(): if k not in [ "snowflake_password", "snowflake_token", "snowflake_private_key_passphrase", "snowflake_private_key_file_pwd", ]: logger.debug(f" {k}: {v}") logger.debug("======================") logger.debug("") # Display connection parameters (mask sensitive data) if session_kwargs.get("account"): logger.info(f" Account: {session_kwargs['account']}") if session_kwargs.get("user"): logger.info(f" User: {session_kwargs['user']}") if session_kwargs.get("role"): logger.info(f" Role: {session_kwargs['role']}") if session_kwargs.get("warehouse"): logger.info(f" Warehouse: {session_kwargs['warehouse']}") if session_kwargs.get("database"): logger.info(f" Database: {session_kwargs['database']}") if session_kwargs.get("schema"): logger.info(f" Schema: {session_kwargs['schema']}") if session_kwargs.get("authenticator"): logger.info(f" Authenticator: {session_kwargs['authenticator']}") if session_kwargs.get("password"): logger.info(" Password: ****** (set)") if session_kwargs.get("token"): logger.info(" Token: ****** (set)") if session_kwargs.get("private_key_file"): logger.info(f" Private Key Path: {session_kwargs['private_key_file']}") if session_kwargs.get("private_key_file_pwd"): logger.info(" Private Key Passphrase: ****** (set)") # Test Snowflake connectivity logger.info("") logger.info("Testing Snowflake Connectivity...") logger.info("-" * 80) try: # Use SnowflakeSession for consistency with deploy command # This ensures all authentication parameters (including private_key_file_pwd) are handled identically session = SnowflakeSession( schemachange_version=SCHEMACHANGE_VERSION, application=SNOWFLAKE_APPLICATION_NAME, logger=logger, change_history_table=None, # Not needed for verify **session_kwargs, ) logger.info("") logger.info("[OK] Connection Successful!") logger.info("") logger.info("Connection Details:") logger.info(f" Account: {session.account}") logger.info(f" User: {session.user}") logger.info(f" Role: {session.role}") logger.info(f" Warehouse: {session.warehouse}") logger.info(f" Database: {session.database}") logger.info(f" Schema: {session.schema}") logger.info(f" Session ID: {session.con.session_id}") # Test a simple query logger.info("") logger.info("Testing Query Execution...") cursor = session.con.cursor() cursor.execute("SELECT CURRENT_VERSION()") snowflake_version = cursor.fetchone()[0] cursor.close() logger.info(f" Snowflake Version: {snowflake_version}") logger.info("") logger.info("=" * 80) logger.info("[OK] Verification Complete - All checks passed!") logger.info("=" * 80) # Close the connection session.con.close() except snowflake.connector.errors.HttpError as e: logger.error("") logger.error("[ERROR] Connection Failed!") logger.error(f" Error: {str(e)}") logger.error("") logger.error("Troubleshooting:") logger.error(" - Verify your Snowflake account identifier is correct") logger.error(" - Check your network connectivity to Snowflake") logger.error(" - Ensure the account name format is correct (e.g., 'myorg-account' or 'xy12345.us-east-1.aws')") logger.error("=" * 80) raise except snowflake.connector.errors.DatabaseError as e: logger.error("") logger.error("[ERROR] Connection Failed!") logger.error(f" Error: {str(e)}") logger.error("") logger.error("Troubleshooting:") logger.error(" - Verify your account name is correct") logger.error(" - Check that required connection parameters are provided") logger.error(" - Ensure you have network connectivity to Snowflake") logger.error("=" * 80) raise except snowflake.connector.errors.ProgrammingError as e: logger.error("") logger.error("[ERROR] Authentication Failed!") logger.error(f" Error: {str(e)}") logger.error("") logger.error("Troubleshooting:") logger.error(" - Verify your username and password/token are correct") logger.error(" - Check your authentication method (password, PAT, OAuth, JWT, etc.)") logger.error(" - Ensure your user account is not locked or expired") logger.error(" - For MFA-enabled accounts, use Programmatic Access Tokens (PATs)") logger.error("=" * 80) raise except Exception as e: logger.error("") logger.error("[ERROR] Verification Failed!") logger.error(f" Error: {str(e)}") logger.error("=" * 80) raise def main(): try: module_logger.info(f"schemachange version: {SCHEMACHANGE_VERSION}") config = get_merged_config(logger=module_logger) redact_config_secrets(config_secrets=config.secrets) structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(config.log_level), ) logger = structlog.getLogger() logger = logger.bind(schemachange_version=SCHEMACHANGE_VERSION) config.log_details() # Finally, execute the command if config.subcommand == "render": render( config=config, script_path=config.script_path, logger=logger, ) elif config.subcommand == "verify": verify( config=config, logger=logger, ) else: session = SnowflakeSession( schemachange_version=SCHEMACHANGE_VERSION, application=SNOWFLAKE_APPLICATION_NAME, logger=logger, change_history_table=config.change_history_table, **config.get_session_kwargs(), ) deploy(config=config, session=session) except ScriptExecutionError as e: # Script execution failures - provide rich context module_logger.error(f"Script execution failed: {e.script_type} {e.script_name}") module_logger.error(f" Path: {e.script_path}") if e.sql_error_code or e.sql_state: error_code_info = f"SQL Error {e.sql_error_code}" if e.sql_error_code else "" sql_state_info = f"[{e.sql_state}]" if e.sql_state else "" module_logger.error(f" Code: {error_code_info} {sql_state_info}".strip()) module_logger.error(f" {e.error_message}") module_logger.error("") module_logger.error( "Troubleshooting: Review SQL syntax, verify objects exist, check permissions, or run with -L DEBUG" ) # DEBUG level: show the query and structured data (structlog filters automatically) if e.query: module_logger.debug("Failed SQL query", query=e.query) module_logger.debug("Script error details", **e.get_structured_error()) sys.exit(2) # Exit code 2 for script errors except CLIScriptExecutionError as e: # CLI script execution failures - provide context without duplicating the error module_logger.error(f"CLI script execution failed: {e.script_type} {e.script_name}") module_logger.error(f" Path: {e.script_path}") if e.step_index is not None: module_logger.error(f" Failed at step: {e.step_index + 1}") if e.cli_tool: module_logger.error(f" CLI tool: {e.cli_tool}") if e.exit_code is not None: module_logger.error(f" Exit code: {e.exit_code}") module_logger.error("") module_logger.error("Troubleshooting: Check CLI tool installation, verify command syntax, or run with -L DEBUG") # DEBUG level: show full structured data module_logger.debug("CLI script error details", **e.get_structured_error()) sys.exit(2) # Exit code 2 for script errors except ValueError as e: module_logger.error(f"Configuration error: {str(e)}") module_logger.error("Run 'schemachange verify' to validate your configuration") sys.exit(1) except FileNotFoundError as e: module_logger.error(f"File not found: {str(e)}") sys.exit(1) except PermissionError as e: module_logger.error(f"Permission denied: {str(e)}") sys.exit(1) except snowflake.connector.errors.HttpError as e: module_logger.error(f"Snowflake HTTP error: {str(e)}") module_logger.error("Check account identifier, network connectivity, and firewall settings") module_logger.error("Run 'schemachange verify' to test connection") sys.exit(3) # Exit code 3 for connection errors except (snowflake.connector.errors.DatabaseError, snowflake.connector.errors.ProgrammingError) as e: module_logger.error(f"Snowflake connection/authentication error: {str(e)}") module_logger.error("Check credentials, account identifier, and role permissions") module_logger.error("Run 'schemachange verify' to test connection") sys.exit(3) # Exit code 3 for connection errors except KeyboardInterrupt: module_logger.warning("Operation cancelled by user") sys.exit(130) # Standard exit code for SIGINT except Exception as e: module_logger.error(f"Unexpected error [{type(e).__name__}]: {str(e)}") module_logger.debug("Full traceback", traceback=traceback.format_exc()) module_logger.error("Run with -L DEBUG for full traceback") module_logger.error("Report issue: https://github.com/Snowflake-Labs/schemachange/issues") sys.exit(1) if __name__ == "__main__": main()