Usage
Applying column-wise filtering on the data being ingested
By default, the existing source and resource functions, sql_database
and sql_table
, ingest all of the records from the source table. But by using query_adapter_callback
, it is possible to pass a WHERE
clause inside the underlying SELECT
statement using the SQLAlchemy syntax. Thich enables filtering the data based on specific columns before extract.
The example below uses query_adapter_callback
to filter on the column customer_id
for the table orders
:
from dlt.sources.sql_database import sql_database
def query_adapter_callback(query, table):
if table.name == "orders":
# Only select rows where the column customer_id has value 1
return query.where(table.c.customer_id==1)
# Use the original query for other tables
return query
source = sql_database(
query_adapter_callback=query_adapter_callback
).with_resources("orders")
Transforming the data before load
You have direct access to the extracted data through the resource objects (sql_table()
or sql_database().with_resource())
), each of which represents a single SQL table. These objects are generators that yield
individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using add_map
.
The PyArrow backend does not yield individual rows rather loads chunks of data as ndarray
. In this case, the transformation function that goes into add_map
should be configured to expect an ndarray
input.
Examples:
Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See here for more information on pseudonymizing data with
dlt
)import dlt
import hashlib
from dlt.sources.sql_database import sql_database
def pseudonymize_name(doc):
'''
Pseudonmyisation is a deterministic type of PII-obscuring
Its role is to allow identifying users by their hash,
without revealing the underlying info.
'''
# add a constant salt to generate
salt = 'WI@N57%zZrmk#88c'
salted_string = doc['rfam_acc'] + salt
sh = hashlib.sha256()
sh.update(salted_string.encode())
hashed_string = sh.digest().hex()
doc['rfam_acc'] = hashed_string
return doc
pipeline = dlt.pipeline(
# Configure the pipeline
)
# using sql_database source to load family table and pseudonymize the column "rfam_acc"
source = sql_database().with_resources("family")
# modify this source instance's resource
source.family.add_map(pseudonymize_name)
# Run the pipeline. For a large db this may take a while
info = pipeline.run(source, write_disposition="replace")
print(info)Excluding unnecessary columns before load
import dlt
from dlt.sources.sql_database import sql_database
def remove_columns(doc):
del doc["rfam_id"]
return doc
pipeline = dlt.pipeline(
# Configure the pipeline
)
# using sql_database source to load family table and remove the column "rfam_id"
source = sql_database().with_resources("family")
# modify this source instance's resource
source.family.add_map(remove_columns)
# Run the pipeline. For a large db this may take a while
info = pipeline.run(source, write_disposition="replace")
print(info)
Deploying the sql_database pipeline
You can deploy the sql_database
pipeline with any of the dlt
deployment methods, such as GitHub Actions, Airflow, Dagster etc. See here for a full list of deployment methods.
Running on Airflow
When running on Airflow:
- Use the
dlt
Airflow Helper to create tasks from thesql_database
source. (If you want to run table extraction in parallel, then you can do this by settingdecompose = "parallel-isolated"
when doing the source->DAG conversion. See here for code example.) - Reflect tables at runtime with
defer_table_reflect
argument. - Set
allow_external_schedulers
to load data using Airflow intervals.