Source code for csql._.renderer.parameters

from __future__ import annotations

import abc
import zoneinfo
from abc import ABC
from collections.abc import Collection
from collections.abc import Collection as CollectionABC
from datetime import date, datetime
from typing import (
	TYPE_CHECKING,
	NewType,
)

from ..models.dialect import ParamStyle, SQLDialect
from ..models.query import (
	AutoKey,
	ParameterList,
	ParameterPlaceholder,
	ScalarParameterValue,
)
from ..utils import assert_never

SQL = NewType("SQL", str)

if TYPE_CHECKING:
	import csql.render.param


class ParamList:
	_params: list[ScalarParameterValue]
	_param_names: list[str | None]

	def __init__(self) -> None:
		self._params = []
		self._param_names = []

	def add(
		self, param: ScalarParameterValue, name: AutoKey | str | None, fmt: str = ""
	) -> int:
		self._params.append(param)
		self._param_names.append(name if isinstance(name, str) else None)
		return len(self._params) - 1

	def render(self) -> tuple[ParameterList, tuple[str | None, ...]]:
		return tuple(self._params), tuple(self._param_names)


[docs] class ParameterRenderer(ABC): """ This is a base class to define how SQL parameters are rendered. A new ``ParameterRenderer`` is created each time a :class:`csql.Query` is built, and :meth:`_renderScalarSql` is called once for each parameter that needs to be placed into full :class:`csql.RenderedQuery`. These are called in the order the parameters appear in the rendered query. """ renderedParams: ParamList ":meta private:" @staticmethod def get(dialect: SQLDialect) -> type[ParameterRenderer]: if dialect.paramstyle is ParamStyle.numeric: return ColonNumeric elif dialect.paramstyle is ParamStyle.numeric_dollar: return DollarNumeric elif dialect.paramstyle is ParamStyle.qmark: return QMark elif dialect.paramstyle is ParamStyle.clickhouse: return Clickhouse else: assert_never(dialect.paramstyle) def __init__(self) -> None: self.renderedParams = ParamList()
[docs] @abc.abstractmethod def _renderScalarSql( self, index: int, key: AutoKey | str | None, fmt: str ) -> csql.render.param.SQL: """ This is called once for each parameter that needs to be rendered into a :class:`csql.RenderedQuery`. Implementations might be simple: for example, the builtin :class:`csql.render.param.QMark` renderer just defines .. code-block:: py def _renderScalarSql(self, index, key): return SQL('?') :param index: - the index of the current parameter in the rendered query. Numbered from 0. :param key: - the (possibly missing) name of the current parameter. :param fmt: - the format spec passed at f-string time, e.g. `f"select {p["param"]:spec}"`. """
def _renderScalar( self, paramKey: AutoKey | str | None, paramValue: ScalarParameterValue, fmt: str ) -> tuple[int, SQL]: index = self.renderedParams.add(paramValue, paramKey, fmt) return (index, self._renderScalarSql(index, paramKey, fmt)) def _renderCollection( self, paramKey: AutoKey | str, paramValues: Collection[ScalarParameterValue], ) -> tuple[list[int], SQL]: _params = [ self._renderScalar(None, paramValue, "") for paramValue in paramValues ] indices = [i for i, _sql in _params] sql = [sql for _i, sql in _params] return (indices, SQL(f"( {','.join(sql)} )")) def renderList(self) -> tuple[ParameterList, tuple[str | None, ...]]: return self.renderedParams.render() def render(self, param: ParameterPlaceholder) -> SQL: paramKey = param.key paramValue = param.value fmt = param.fmt if isinstance(paramValue, CollectionABC) and not isinstance(paramValue, str): _indices, sql = self._renderCollection(paramKey, paramValue) # pyright: ignore[reportUnknownArgumentType] else: index, sql = self._renderScalar(paramKey, paramValue, fmt) _indices = [index] return sql
[docs] class QMark(ParameterRenderer): """ A ``ParameterRenderer`` that renders param placeholders as '?'. """ def _renderScalarSql(self, index: int, key: str | AutoKey | None, fmt: str) -> SQL: return SQL("?")
class NumericParameterRenderer(ParameterRenderer, ABC): renderedKeys: dict[int, SQL] paramNumberFrom: int def __init__(self) -> None: super().__init__() self.renderedKeys = {} self.paramNumberFrom = 1 @abc.abstractmethod def _renderIndex(self, index: int) -> SQL: pass def _renderScalarSql(self, index: int, key: str | AutoKey | None, fmt: str) -> SQL: return self._renderIndex(index + self.paramNumberFrom) def render(self, param: ParameterPlaceholder) -> SQL: # should be able to hash(AutoKey) directly, but I was hitting a flakey test key = (param._key_context or 0) ^ hash( param.key if isinstance(param.key, str) else param.key.k ) if key in self.renderedKeys: preRendered = self.renderedKeys[key] return preRendered else: sql = super().render(param) self.renderedKeys = {**self.renderedKeys, key: sql} return sql
[docs] class ColonNumeric(NumericParameterRenderer): """ A ``ParameterRenderer`` that renders param placeholders like ':1'. """ def _renderIndex(self, index: int) -> SQL: return SQL(f":{index}")
[docs] class DollarNumeric(NumericParameterRenderer): """ A ``ParameterRenderer`` that renders param placeholders like '$1'. """ def _renderIndex(self, index: int) -> SQL: return SQL(f"${index}")
class Clickhouse(ParameterRenderer): def _renderScalarSql(self, index: int, key: AutoKey | str | None, fmt: str) -> SQL: if key is None: raise ValueError("clickhouse params don't work if key is None!") return SQL(f"{{{key.k if isinstance(key, AutoKey) else key}:{fmt}}}") def render(self, param: ParameterPlaceholder) -> SQL: if (fmt := param.fmt) == "": if isinstance(param.value, str): fmt = "String" elif isinstance(param.value, int): fmt = "Int64" elif isinstance(param.value, datetime): tz = param.value.tzinfo if tz is not None and isinstance(tz, zoneinfo.ZoneInfo): # 3 is default precision in clickhouse.. sometimes. fmt = f"DateTime64(3, '{tz.key}')" else: fmt = "DateTime64" elif isinstance(param.value, date): fmt = "Date" elif isinstance(param.value, float): fmt = "Float64" newParam = param._withFmt(fmt) return super().render(newParam)