Source code for csql.contrib.persist.snowflake

"""
``contrib.persist.snowflake`` contains cache implementations specifically
for the Snowflake database. For this file to import properly, you'll need
to make sure `snowflake-connector-python` is installed.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from csql import Q, Query, RenderedQuery

from . import Cacher, Key

if TYPE_CHECKING:
	import snowflake.connector
import logging

logger = logging.getLogger(__name__)


[docs] class SnowflakeResultSetCacher(Cacher): """ Caches queries using the `RESULT_SCAN` functionality of snowflake. This is nice because it means you can kill your snowflake connection without losing temp tables, and the results are still cleaned up properly by Snowflake in 7 days. :type connection: `snowflake.connector.Connection <https://docs.snowflake.com/en/user-guide/python-connector-api.html#object-connection>`_ """ def __init__(self, connection: snowflake.connector.SnowflakeConnection): self._con = connection def _persist(self, rq: RenderedQuery, key: Key, tag: str | None) -> Query: sql, params, _param_names = rq logger.debug(f"Executing persist SQL:\n{sql}\nwith params: {params}") c = self._con.cursor() try: c.execute(sql, params) qid = c.sfqid finally: c.close() retrieve_sql = Q(f"""select * from table(result_scan('{qid}')""") return retrieve_sql