Skip to content

SQL

SQL plugin provides Schema and Fields models and converters between SQL (in sqlalchemy terms) and Data Package notations

Installation

Extra dependency needs to be installed:

pip install dplib-py[sql]

Usage

Converting a SQL dataframe to the Data Package notation:

from dplib.plugins.sql.models import SqlSchema

schema = SqlSchema(table=table).to_dp()
print(schema.to_text(format='json'))

Converting from Data Package notation to SQL:

from dplib.models import Schema
from dplib.plugins.sql.models import SqlSchema

schema = SqlSchema.from_dp(Schema.from_path('data/schema.json'))
print(schema.table)

Reference

dplib.plugins.sql.models.SqlSchema

Bases: Model

SQL Schema model

Source code in dplib/plugins/sql/models/schema.py
class SqlSchema(Model, arbitrary_types_allowed=True):
    """SQL Schema model"""

    table: Table

    # Getters

    def get_field_names(self) -> List[str]:
        """Get field names"""
        names: List[str] = []
        for column in self.table.columns:
            names.append(column.name)
        return names

    def get_field_types(self) -> List[Any]:
        """Get field types"""
        types: List[Any] = []
        for column in self.table.columns:
            types.append(type(column.type))
        return types

    # Converters

    def to_dp(self, *, with_metadata: bool = False) -> Schema:
        """Convert to Table Schema

        Returns:
            Table Schema
        """
        schema = Schema()

        # Fields
        for column in self.table.columns:
            if with_metadata and column.name in settings.METADATA_IDENTIFIERS:
                continue
            field = SqlField(column=column).to_dp()
            schema.fields.append(field)

        # Primary key
        for constraint in self.table.constraints:
            if isinstance(constraint, sa.PrimaryKeyConstraint):
                for column in constraint.columns:
                    if with_metadata and column.name in settings.METADATA_IDENTIFIERS:
                        continue
                    schema.primaryKey.append(str(column.name))

        # Foreign keys
        for constraint in self.table.constraints:
            if isinstance(constraint, sa.ForeignKeyConstraint):
                resource = ""
                own_fields: List[str] = []
                foreign_fields: List[str] = []
                for element in constraint.elements:
                    own_fields.append(str(element.parent.name))
                    if element.column.table.name != self.table.name:
                        resource = str(element.column.table.name)
                    foreign_fields.append(str(element.column.name))
                ref = ForeignKeyReference(resource=resource, fields=foreign_fields)
                fk = ForeignKey(fields=own_fields, reference=ref)
                schema.foreignKeys.append(fk)

        return schema

    @classmethod
    def from_dp(
        cls,
        schema: Schema,
        *,
        table_name: str,
        dialect: str = settings.DEFAULT_DIALECT,
        with_metadata: bool = False,
    ) -> SqlSchema:
        """Create SQL Schema from Table Schema

        Parameters:
            schema: Table Schema
            table_name: SQL table name
            dialect: SQL dialect
            with_metadata: Include metadata columns

        Returns:
            SQL Schema
        """
        columns: List[Column[Any]] = []
        constraints: List[Constraint] = []

        # Fields
        if with_metadata:
            columns.append(
                sa.Column(
                    settings.ROW_NUMBER_IDENTIFIER,
                    sa.Integer,
                    primary_key=True,
                    autoincrement=False,
                )
            )
            columns.append(sa.Column(settings.ROW_VALID_IDENTIFIER, sa.Boolean))
        for field in schema.fields:
            sql_field = SqlField.from_dp(field, table_name=table_name, dialect=dialect)
            columns.append(sql_field.column)

        # Primary key
        if schema.primaryKey:
            Class = sa.UniqueConstraint if with_metadata else sa.PrimaryKeyConstraint
            if not with_metadata:
                constraint = Class(*schema.primaryKey)
                constraints.append(constraint)

        # Foreign keys
        for fk in schema.foreignKeys:
            prefix: Callable[[str], str] = lambda field: ".".join([foreign_table, field])
            foreign_table = fk.reference.resource or table_name
            foreign_fields = list(map(prefix, fk.reference.fields))
            constraint = sa.ForeignKeyConstraint(fk.fields, foreign_fields)
            constraints.append(constraint)

        table = sa.Table(table_name, sa.MetaData(), *(columns + constraints))
        return SqlSchema(table=table)

table: Table instance-attribute

from_dp(schema, *, table_name, dialect=settings.DEFAULT_DIALECT, with_metadata=False) classmethod

Create SQL Schema from Table Schema

Parameters:

Name Type Description Default
schema Schema

Table Schema

required
table_name str

SQL table name

required
dialect str

SQL dialect

DEFAULT_DIALECT
with_metadata bool

Include metadata columns

False

Returns:

Type Description
SqlSchema

SQL Schema

Source code in dplib/plugins/sql/models/schema.py
@classmethod
def from_dp(
    cls,
    schema: Schema,
    *,
    table_name: str,
    dialect: str = settings.DEFAULT_DIALECT,
    with_metadata: bool = False,
) -> SqlSchema:
    """Create SQL Schema from Table Schema

    Parameters:
        schema: Table Schema
        table_name: SQL table name
        dialect: SQL dialect
        with_metadata: Include metadata columns

    Returns:
        SQL Schema
    """
    columns: List[Column[Any]] = []
    constraints: List[Constraint] = []

    # Fields
    if with_metadata:
        columns.append(
            sa.Column(
                settings.ROW_NUMBER_IDENTIFIER,
                sa.Integer,
                primary_key=True,
                autoincrement=False,
            )
        )
        columns.append(sa.Column(settings.ROW_VALID_IDENTIFIER, sa.Boolean))
    for field in schema.fields:
        sql_field = SqlField.from_dp(field, table_name=table_name, dialect=dialect)
        columns.append(sql_field.column)

    # Primary key
    if schema.primaryKey:
        Class = sa.UniqueConstraint if with_metadata else sa.PrimaryKeyConstraint
        if not with_metadata:
            constraint = Class(*schema.primaryKey)
            constraints.append(constraint)

    # Foreign keys
    for fk in schema.foreignKeys:
        prefix: Callable[[str], str] = lambda field: ".".join([foreign_table, field])
        foreign_table = fk.reference.resource or table_name
        foreign_fields = list(map(prefix, fk.reference.fields))
        constraint = sa.ForeignKeyConstraint(fk.fields, foreign_fields)
        constraints.append(constraint)

    table = sa.Table(table_name, sa.MetaData(), *(columns + constraints))
    return SqlSchema(table=table)

get_field_names()

Get field names

Source code in dplib/plugins/sql/models/schema.py
def get_field_names(self) -> List[str]:
    """Get field names"""
    names: List[str] = []
    for column in self.table.columns:
        names.append(column.name)
    return names

get_field_types()

Get field types

Source code in dplib/plugins/sql/models/schema.py
def get_field_types(self) -> List[Any]:
    """Get field types"""
    types: List[Any] = []
    for column in self.table.columns:
        types.append(type(column.type))
    return types

to_dp(*, with_metadata=False)

Convert to Table Schema

Returns:

Type Description
Schema

Table Schema

Source code in dplib/plugins/sql/models/schema.py
def to_dp(self, *, with_metadata: bool = False) -> Schema:
    """Convert to Table Schema

    Returns:
        Table Schema
    """
    schema = Schema()

    # Fields
    for column in self.table.columns:
        if with_metadata and column.name in settings.METADATA_IDENTIFIERS:
            continue
        field = SqlField(column=column).to_dp()
        schema.fields.append(field)

    # Primary key
    for constraint in self.table.constraints:
        if isinstance(constraint, sa.PrimaryKeyConstraint):
            for column in constraint.columns:
                if with_metadata and column.name in settings.METADATA_IDENTIFIERS:
                    continue
                schema.primaryKey.append(str(column.name))

    # Foreign keys
    for constraint in self.table.constraints:
        if isinstance(constraint, sa.ForeignKeyConstraint):
            resource = ""
            own_fields: List[str] = []
            foreign_fields: List[str] = []
            for element in constraint.elements:
                own_fields.append(str(element.parent.name))
                if element.column.table.name != self.table.name:
                    resource = str(element.column.table.name)
                foreign_fields.append(str(element.column.name))
            ref = ForeignKeyReference(resource=resource, fields=foreign_fields)
            fk = ForeignKey(fields=own_fields, reference=ref)
            schema.foreignKeys.append(fk)

    return schema

dplib.plugins.sql.models.SqlField

Bases: Model

SQL Field model

Source code in dplib/plugins/sql/models/field.py
class SqlField(Model, arbitrary_types_allowed=True):
    """SQL Field model"""

    column: Column[Any]

    # Converters

    def to_dp(self) -> models.IField:
        """Convert to Table Schema Field

        Returns:
            Table Schema Field
        """
        # Type
        Field = models.Field
        if isinstance(self.column.type, ARRAY_TYPES):
            Field = models.ArrayField
        elif isinstance(self.column.type, BOOLEAN_TYPES):
            Field = models.BooleanField
        elif isinstance(self.column.type, DATE_TYPES):
            Field = models.DateField
        elif isinstance(self.column.type, DATETIME_TYPES):
            Field = models.DatetimeField
        elif isinstance(self.column.type, INTEGER_TYPES):
            Field = models.IntegerField
        elif isinstance(self.column.type, NUMBER_TYPES):
            Field = models.NumberField
        elif isinstance(self.column.type, OBJECT_TYPES):
            Field = models.ObjectField
        elif isinstance(self.column.type, STRING_TYPES):
            Field = models.StringField
        elif isinstance(self.column.type, TIME_TYPES):
            Field = models.TimeField

        # Name
        field = Field(name=self.column.name)

        # Description
        if self.column.comment:
            field.description = self.column.comment

        # Constraints
        if not self.column.nullable:
            field.constraints.required = True
        if isinstance(self.column.type, sa.Enum):
            if self.column.enums:
                field.constraints.enum = self.column.enums
        if isinstance(field, models.StringField):
            if isinstance(self.column.type, (sa.CHAR, sa.VARCHAR)):
                if self.column.type.length:
                    field.constraints.maxLength = self.column.type.length
            if isinstance(self.column.type, sa.CHAR):
                if self.column.type.length:
                    field.constraints.minLength = self.column.type.length

        return field

    @classmethod
    def from_dp(
        cls,
        field: models.IField,
        *,
        dialect: str = settings.DEFAULT_DIALECT,
        table_name: Optional[str] = None,
    ) -> SqlField:
        """Create SQL Field from Table Schema Field

        Parameters:
            field: Table Schema Field
            dialect: SQL dialect
            table_name: SQL table name

        Returns:
            SQL Field
        """
        Check = sa.CheckConstraint
        checks: List[sa.CheckConstraint] = []
        comment = field.description
        dialect_obj = registry.load(dialect)()
        nullable = not field.constraints.required
        quoted_name = dialect_obj.identifier_preparer.quote(field.name)

        # Type
        column_type = sa.Text
        if field.type == "any":
            column_type = sa.Text
        elif field.type == "boolean":
            column_type = sa.Boolean
        elif field.type == "date":
            column_type = sa.Date
        elif field.type == "datetime":
            column_type = sa.DateTime
        elif field.type == "integer":
            column_type = sa.Integer
        elif field.type == "number":
            column_type = sa.Float
        elif field.type == "string":
            column_type = sa.Text
        elif field.type == "time":
            column_type = sa.Time
        elif field.type == "year":
            column_type = sa.Integer
        if dialect_obj.name == "postgresql":
            if field.type == "array":
                column_type = pg.JSONB
            elif field.type == "geojson":
                column_type = pg.JSONB
            elif field.type == "object":
                column_type = pg.JSONB
            elif field.type == "number":
                column_type = sa.Numeric

        # Unique contstraint
        unique = field.constraints.unique
        if dialect_obj.name == "mysql":
            # MySQL requires keys to have an explicit maximum length
            # https://stackoverflow.com/questions/1827063/mysql-error-key-specification-without-a-key-length
            unique = unique and column_type is not sa.Text

        # Length contraints
        if field.type == "string":
            min = field.constraints.minLength
            max = field.constraints.maxLength
            if min is not None and max is not None and min == max:
                column_type = sa.CHAR(max)
            if max is not None:
                if column_type is sa.Text:
                    column_type = sa.VARCHAR(length=max)
                if dialect_obj.name == "sqlite":
                    checks.append(Check("LENGTH(%s) <= %s" % (quoted_name, max)))
            if min is not None:
                if not isinstance(column_type, sa.CHAR) or dialect_obj.name == "sqlite":
                    checks.append(Check("LENGTH(%s) >= %s" % (quoted_name, min)))

        # Limit contstraints
        if isinstance(field, (models.IntegerField, models.NumberField)):
            min = field.constraints.minimum
            max = field.constraints.maximum
            if min is not None:
                checks.append(Check("%s >= %s" % (quoted_name, min)))
            if max is not None:
                checks.append(Check("%s <= %s" % (quoted_name, max)))

        # Pattern constraint
        if field.type == "string":
            val = field.constraints.pattern
            if val is not None:
                if dialect_obj.name == "postgresql":
                    checks.append(Check("%s ~ '%s'" % (quoted_name, val)))
                elif dialect_obj.name != "duckdb":
                    check = Check("%s REGEXP '%s'" % (quoted_name, val))
                    checks.append(check)

        # Enum constraint
        if field.type == "string":
            val = field.constraints.enum
            if val is not None:
                # NOTE: https://github.com/frictionlessdata/frictionless-py/issues/778
                if not table_name:
                    table_name = "".join(random.choice(letters) for _ in range(8))
                enum_name = "%s_%s_enum" % (table_name, field.name)
                quoted_enum_name = dialect_obj.identifier_preparer.quote(enum_name)
                column_type = sa.Enum(*val, name=quoted_enum_name)

        # TODO: shall it use "autoincrement=False"
        # https://github.com/Mause/duckdb_engine/issues/595#issuecomment-1495408566
        column_args = [field.name, column_type] + checks
        column_kwargs = {"nullable": nullable, "unique": unique, "comment": comment}
        column = sa.Column(*column_args, **column_kwargs)  # type: ignore
        return SqlField(column=column)

column: Column[Any] instance-attribute

from_dp(field, *, dialect=settings.DEFAULT_DIALECT, table_name=None) classmethod

Create SQL Field from Table Schema Field

Parameters:

Name Type Description Default
field IField

Table Schema Field

required
dialect str

SQL dialect

DEFAULT_DIALECT
table_name Optional[str]

SQL table name

None

Returns:

Type Description
SqlField

SQL Field

Source code in dplib/plugins/sql/models/field.py
@classmethod
def from_dp(
    cls,
    field: models.IField,
    *,
    dialect: str = settings.DEFAULT_DIALECT,
    table_name: Optional[str] = None,
) -> SqlField:
    """Create SQL Field from Table Schema Field

    Parameters:
        field: Table Schema Field
        dialect: SQL dialect
        table_name: SQL table name

    Returns:
        SQL Field
    """
    Check = sa.CheckConstraint
    checks: List[sa.CheckConstraint] = []
    comment = field.description
    dialect_obj = registry.load(dialect)()
    nullable = not field.constraints.required
    quoted_name = dialect_obj.identifier_preparer.quote(field.name)

    # Type
    column_type = sa.Text
    if field.type == "any":
        column_type = sa.Text
    elif field.type == "boolean":
        column_type = sa.Boolean
    elif field.type == "date":
        column_type = sa.Date
    elif field.type == "datetime":
        column_type = sa.DateTime
    elif field.type == "integer":
        column_type = sa.Integer
    elif field.type == "number":
        column_type = sa.Float
    elif field.type == "string":
        column_type = sa.Text
    elif field.type == "time":
        column_type = sa.Time
    elif field.type == "year":
        column_type = sa.Integer
    if dialect_obj.name == "postgresql":
        if field.type == "array":
            column_type = pg.JSONB
        elif field.type == "geojson":
            column_type = pg.JSONB
        elif field.type == "object":
            column_type = pg.JSONB
        elif field.type == "number":
            column_type = sa.Numeric

    # Unique contstraint
    unique = field.constraints.unique
    if dialect_obj.name == "mysql":
        # MySQL requires keys to have an explicit maximum length
        # https://stackoverflow.com/questions/1827063/mysql-error-key-specification-without-a-key-length
        unique = unique and column_type is not sa.Text

    # Length contraints
    if field.type == "string":
        min = field.constraints.minLength
        max = field.constraints.maxLength
        if min is not None and max is not None and min == max:
            column_type = sa.CHAR(max)
        if max is not None:
            if column_type is sa.Text:
                column_type = sa.VARCHAR(length=max)
            if dialect_obj.name == "sqlite":
                checks.append(Check("LENGTH(%s) <= %s" % (quoted_name, max)))
        if min is not None:
            if not isinstance(column_type, sa.CHAR) or dialect_obj.name == "sqlite":
                checks.append(Check("LENGTH(%s) >= %s" % (quoted_name, min)))

    # Limit contstraints
    if isinstance(field, (models.IntegerField, models.NumberField)):
        min = field.constraints.minimum
        max = field.constraints.maximum
        if min is not None:
            checks.append(Check("%s >= %s" % (quoted_name, min)))
        if max is not None:
            checks.append(Check("%s <= %s" % (quoted_name, max)))

    # Pattern constraint
    if field.type == "string":
        val = field.constraints.pattern
        if val is not None:
            if dialect_obj.name == "postgresql":
                checks.append(Check("%s ~ '%s'" % (quoted_name, val)))
            elif dialect_obj.name != "duckdb":
                check = Check("%s REGEXP '%s'" % (quoted_name, val))
                checks.append(check)

    # Enum constraint
    if field.type == "string":
        val = field.constraints.enum
        if val is not None:
            # NOTE: https://github.com/frictionlessdata/frictionless-py/issues/778
            if not table_name:
                table_name = "".join(random.choice(letters) for _ in range(8))
            enum_name = "%s_%s_enum" % (table_name, field.name)
            quoted_enum_name = dialect_obj.identifier_preparer.quote(enum_name)
            column_type = sa.Enum(*val, name=quoted_enum_name)

    # TODO: shall it use "autoincrement=False"
    # https://github.com/Mause/duckdb_engine/issues/595#issuecomment-1495408566
    column_args = [field.name, column_type] + checks
    column_kwargs = {"nullable": nullable, "unique": unique, "comment": comment}
    column = sa.Column(*column_args, **column_kwargs)  # type: ignore
    return SqlField(column=column)

to_dp()

Convert to Table Schema Field

Returns:

Type Description
IField

Table Schema Field

Source code in dplib/plugins/sql/models/field.py
def to_dp(self) -> models.IField:
    """Convert to Table Schema Field

    Returns:
        Table Schema Field
    """
    # Type
    Field = models.Field
    if isinstance(self.column.type, ARRAY_TYPES):
        Field = models.ArrayField
    elif isinstance(self.column.type, BOOLEAN_TYPES):
        Field = models.BooleanField
    elif isinstance(self.column.type, DATE_TYPES):
        Field = models.DateField
    elif isinstance(self.column.type, DATETIME_TYPES):
        Field = models.DatetimeField
    elif isinstance(self.column.type, INTEGER_TYPES):
        Field = models.IntegerField
    elif isinstance(self.column.type, NUMBER_TYPES):
        Field = models.NumberField
    elif isinstance(self.column.type, OBJECT_TYPES):
        Field = models.ObjectField
    elif isinstance(self.column.type, STRING_TYPES):
        Field = models.StringField
    elif isinstance(self.column.type, TIME_TYPES):
        Field = models.TimeField

    # Name
    field = Field(name=self.column.name)

    # Description
    if self.column.comment:
        field.description = self.column.comment

    # Constraints
    if not self.column.nullable:
        field.constraints.required = True
    if isinstance(self.column.type, sa.Enum):
        if self.column.enums:
            field.constraints.enum = self.column.enums
    if isinstance(field, models.StringField):
        if isinstance(self.column.type, (sa.CHAR, sa.VARCHAR)):
            if self.column.type.length:
                field.constraints.maxLength = self.column.type.length
        if isinstance(self.column.type, sa.CHAR):
            if self.column.type.length:
                field.constraints.minLength = self.column.type.length

    return field