A data transformation framework similar to dbt, with one key difference: every query is plain SQL, executable in your IDE as-is. No templating, no macros, no custom syntax -- which means you don't need a proprietary cloud IDE to work with your own SQL.
Data engineers define their tables in SQL and register them in a YAML catalog. matviews resolves dependencies across all resources -- tables, data sources, and tests alike -- exports the result as a directed graph, and hands it to Airflow for orchestration.
The framework is designed to enforce standards and catch problems early. Everything is validated and wired together: schemas are checked, dependencies are tracked, and nothing runs in production that matviews doesn't know about.
- SQL table definitions -- plain SQL, no templating, runs in any IDE or database client
- YAML catalog -- central registry of all tables and their metadata
- Dependency graph -- automatically derived from table references, exported for Airflow orchestration
- External table imports -- import table definitions from PostgreSQL or SQL Server into the catalog
- Schema validation -- extensive type-aware schema checks (understands type hierarchies, e.g. int > smallint), run as unit tests and against the sandbox database
- Data tests -- write test queries as plain SQL scripts to validate data quality and correctness
- DMS task definitions -- configure AWS Database Migration Service tasks alongside your tables
matviews was built at FRI:DAY, a German digital insurance company. It started as a proof of concept for implementing materialized views on AWS Athena, grew into the central framework the data team used for all their transformations, and was open-sourced when the company wound down. In production, it managed over 1500 tables -- around 400 materialized views and over 1000 external table definitions.
[[TOC]]
docker compose run app --helpThere are some tests that need the real Athena engine to run and thus require AWS credentials.
# set credentials. AWS_SECRET_ACCESS_KEY etc. are supported as well.
export AWS_PROFILE=bi-sandbox
docker compose run testAll those tests are marked with athena and can be excluded, i.e. for performance reasons:
# Credentials not required
docker compose run test -k 'not athena'We can use -k for a lot more purposes.
It supports Python operators (and/or) and matches for instance test file names, test names, test parameter IDs and markers (athena is a marker).
# Runs all tests for base.my_table, including its columns.
docker compose run test -k 'base.my_table'
# Runs the test named test_something on base.my_table (or its columns, depending
# on the nature of the test)
docker compose run test -k 'test_something and base.my_table'
# Runs the test named test_something on base.my_table.my_column, if it is a
# test that targets columns.
docker compose run test -k 'test_something and base.my_table.my_column'
# Runs all tests for base.my_table, except for the ones that require Athena
docker compose run test -k 'base.my_table and not athena'As with tests, some features of the server require true AWS credentials, some do not.
# Optional, but required i.e. for deployment, processing of tables, ...
# AWS_SECRET_ACCESS_KEY etc. are supported as well.
export AWS_PROFILE=bi-sandbox
docker compose up appAfter the server has started up, it exposes a Swagger UI that acts as documentation of all provided endpoints.
Requirements are managed via Poetry. Our Docker Compose configuration offers poetry as a service, for instance:
docker compose run poetry --help
# Update requirement locks file.
docker compose run poetry lockPlease modify pyproject.toml manually when adding or removing dependencies and maintain lexicographical ordering.
poetry install does not provide that.
While working perfectly fine on command line, we support PyCharm as our IDE of choice. For those features to work, PyCharm must be told to use the Python interpreter in Docker Compose:
- Add an interpreter.
- Choose Docker Compose.
- As service, choose
pycharm.
Supported features:
- Predefined run configurations.
Those who read from open file will choose the currently open file in the editor as target.
All features that require access to Athena assume that your AWS profile to the sandbox account is named
bi-sandbox. Some of them may require administrator privileges. - Running with profiler
- Showing test coverage does not work, but we are working on it
There are several table types available. Documentation is available by running following command:
docker compose run app describe-table-typesIn case you are updating a table and want to know which table type you are dealing with, run docker compose run app show-tables base.foobar if your table name is base.foobar.
View definitions live as SELECT statements in matviews/table_source/.
They additionally need to be registered in our configuration.
There is a helper command import that should automate the last part.
In order to create or update a view called base.foobar:
- Develop view SQL in matviews/table_source/base/foobar.sql.
- Run
docker compose run app import materialised-view base.foobarfor a materialised view or rundocker compose run app import pure-view base.foobarfor a pure view. - Check definition in matviews/tables.yaml.
- Run tests
docker compose run test. - When updating a materialised view, consider running
docker compose run app compare-view base.foobar. It executes the view's code into a temporary table and compares its row count and common columns with the "live" table. It is great at detecting accidental 1:n join explosions or proving that a simple refactoring or adding a new column did not change the common output at all. - Commit.
Like views, external tables are registered in our configuration.
The import command is able to insert or update external tables from databases, but requires a connection to the source database in order to figure out source tables' schemas.
Those tables are replicated every morning to our infrastructure using AWS' Database Migration Service (DMS).
deploy-dms-table-mappings.
For time being, we require this for data protection reasons, because everyone who can change the replication configuration can change which personal data gets copied from live systems to our realm.
While our application does not care how connecting to the source database is happening, for this tutorial, we assume you want to import bc_foo and bc_bar.
-
Run the import.
APP_IMPORT_DMS_SQLSERVER_DATABASE_DSN="xxx" \ docker compose run app import dms sqlserver my_db.bc_foo my_db.bc_bar -
In tables.yaml, check all new columns for data privacy issues and exclude them via the column property
exclude_reason: personal_data. -
Commit.
Renaming tables: DMS import supports renaming tables on replication.
Normally, for clarity reasons, we should keep table names on source and our system in sync.
However, in case that we are replicating not from the actual source table, but from a wrapper view, importing from a differing source table name can be achieved by using following syntax: 'my_db.bc_bar<-bc_bar_wrapper_view'.
Single speech marks are important here because < is a reserved character in your shell.
This is virtually the same process as importing SQL server tables, except for using a different database connection and the postgres keyword on import.
For example, we want to import my_db.foo:
-
Run the import:
APP_IMPORT_DMS_POSTGRES_DATABASE_DSN="xxx" \ docker compose run app import dms postgres my_db.foo -
Like with SQL Server tables, single columns may be excluded via the column property
exclude_reason:in tables.yaml. -
Commit.
Tables imported via DMS might add new columns that are not present in the source table, but computed via an expression:
columns:
- name: foo
type: string
- name: bar
type: string
- name: foo_and_bar
type: string
expression: $foo || ' ' || $barIn this case, column foo_and_bar is not copied from the original table (even if it existed), but computed as a new column from the SQLite expression $foo || ' ' || $bar.
AWS provides extensive documentation on that feature. However, for our purposes it should suffice to say:
- Do not overuse this feature.
The example above should actually be done in a
basetable transformation. The main use case for this feature is to mask personal data usinghash_sha256()function, so critical data does not enter our clean tables. - Basically all standard SQLite features are available - even functions that are not listed in above's link, like
upper(). - Source column names must be prepended with
$.
When hashing, remember that Guidewire's data might be dirty and values that should be same are not, i.e. "Main street " vs "main street" (trailing space, differing character casing).
Normalisation could look like:
name: name_hashed
type: string
expression: lower(hash_sha256(lower(trim($name))))Importing from other sources, i.e. ingested JSON files, requires you to manually add their definition to tables.yaml.
Static tables are defined as CSV files because from the options (binary like Parquet, JSON, plain text like CSV/TSV), this is the only one that is both human-readable and looks like a table. PyCharm and DataGrip support displaying those as a table.
Implementation details:
- The header of the CSV file defines the schema, a bit like the
SELECTstatement of views do as well. - Static files can be automatically imported to and must be registered in matviews/tables.yaml, just like views.
- Comma in a string:
\,. ⚠ Quoting columns instead does not work:"friday, you are cool" NULL:\N- Arrays: The data type must be enclosed in brackets, i.e.
column_name: [string]and elements of the array must be separated by|, i.e.foo|barparses toARRAY['foo', 'bar']. If literal pipe-characters should be part of a string in an array, they need to be escaped :foo\|bar. If a field is not defined as an array, the|character is not treated specially and does not need to be escaped.
Example:
foo: string,bar: bigint,baz: timestamp,eggs: decimal(18\,2),ham: [string]
friday,300,2021-01-01 12:34:56,12.34,be|not be
friday\, you are cool,\N,\N,\N,a literal pipe \|Also known as append-only, time series are tables that grow, but its rows never get updated. Every time they get processed, only new rows get processed and old ones stay untouched, as opposed to materialised views that get fully reprocessed every time.
Development works similar to materialised views.
Time series may source other time series, which enables us to split complex queries into multiple, iterative time series and/or branch many time series off a single base.
Schema changes are only deployed after a merge request is merged. Hence, code that depends on a change in a table can not be put into the same merge request.
Example: Let's say you want to add a column col1 to the query for a materialised view / external table foo, and you want to use foo.col1 in the query for another materialised view mv.
If you commit both and make a merge request, the schema test for mv will fail because foo.col1 does in fact not exist until the merge.
In this case, you need to split your feature in to several merge requests:
- Make all possible changes that do not depend on other changes.
- Repeat until all dependencies are solved. You will need as many merge requests as the longest chain of dependencies you want to update.
There are some reasons why we do not support this kind of change in a single merge request:
-
If we wanted to support it while keeping our schema tests, we would have to somehow deploy the changes to tables before running the tests.
No matter how we put it, be it by deploying tables to some name with
_reviewas suffix or replacing table names in queries with sub queries that are those tables altogether: There is no nice solution that does not have the potential to blow up. -
Furthermore, in case of DMS replicated tables and materialised views, simply adding the column to the schema does not automatically add it to the data, meaning that we could not run data tests in the same pipeline anyway (assuming we add that feature in the future).
-
Last but not least, splitting a feature into smaller portions is a good idea in order to keep merge requests small, lower the risk of bad side effects and reduce branch lifetime.
Whenever using window functions, i.e. ROW_NUMBER(), add the column with the highest cardinality as last sorter in order to avoid non-deterministic sorting.
Let's assume a heavily simplified example, roughly taken from Guidewire: A table with an ID, foreign key and effective date and data column. As simplification, we define a row as being effective if there is no other row with later effective date.
| id | fk | effective_at | data |
|---|---|---|---|
| 1 | 1 | 2023-01-01 | a |
| 2 | 1 | 2023-01-02 | b |
| 3 | 2 | 2023-01-01 | c |
| 4 | 2 | 2023-02-01 | d |
| 5 | 2 | 2023-02-01 | e |
We want to find out data per foreign key (our "object"), so we would query :
SELECT
fk,
data
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY
fk
ORDER BY
effective_at DESC
) AS sorter_desc
FROM
table
)
WHERE sorter_desc = 1
-- Side note: The query could be rewritten using first/last_value(), but above
-- version is the one you will find significantly more often in our code:
SELECT
fk,
FIRST_VALUE() OVER (
PARTITION BY
fk
ORDER BY
effective_at DESC
) AS data
FROM
tableOur result looks like:
| fk | data |
|---|---|
| 1 | b |
| 2 | d |
… or does it?
| fk | data |
|---|---|
| 1 | b |
| 2 | e <<<< |
Since all rows with fk=1 have distinct values in effective_at, there is no issue.
For fk=2 however, there are two candidates with the greatest value in effective_at and as a result, the query's result is practically random!
The actual behaviour of the query depends on a few circumstances, but in case of Athena, the only practically relevant one is that we are using multiple executors per query that communicate via practically random network latencies and run on shared instances with practically random other workloads by other queries.
Why do we care?
- Random results make it hard to compare a table before and after refactoring if we want to make sure that refactoring did not change results. This becomes even more a problem when developing and debugging big report tables that potentially combine multiple of those hiccups.
- Random noise in results might lower confidence in data and raise questions by our stakeholders.
First, let's further assume that there is no other column that could be used to improve the sorting regarding business logic, or in other words: There is no way to clearly determine from the data whether d or e is the correct answer.
We see this situation fairly often in Guidewire, supposedly due to programming mistakes.
A solution to this problem is to add the column with the highest cardinality, which is in almost all cases the id column, as last sorter to the query.
This does not significantly impact query performance because the buckets to sort are normally of length 1 and only in the cases we want to fix of length 2 or more.
Neither does it make the results more "correct", but if we cannot deterministically tell which one is true anyway, it is better to have at least a stable result.
SELECT
fk,
data
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY
fk
ORDER BY
effective_at DESC,
-- CHANGE IS HERE. Use DESC as heuristic because newer rows should be
-- "more correct", right? (not really, but we tried.)
id DESC
) AS sorter_desc
FROM
table
)
WHERE sorter_desc = 1| fk | data |
|---|---|
| 1 | b |
| 2 | e |
Always.
One way to heuristically detect such issues in a table's code is to call compare-view on the unchanged (master's) version of the table.
Since execution on Athena is fairly random, the comparison will fail latest after a few tries if there is non-deterministic sorting in the code:
docker compose run app compare-view reports.home_content_orderStruct types are represented as dictionaries of column name to type, arrays as a list with a single element that is the data type. Take following Hive type as an example:
id string,
person struct<
age: integer,
names: array<string>,
birth_day: date>In tables.yaml:
columns:
- name: id
type: string
- name: person
type:
age: integer
names:
- string
birth_day: dateNested JSON is queried most comfortably with structs. The schema from above section is able to query following JSON object:
{
"id": "abcd-efgh-ijkl",
"person": {
"age": 12,
"names": ["John Doe", "Al Capone"],
"birth_day": "1990-01-02"
}
}The JSON parser can deal with:
- Fields (columns, struct keys) not present in the schema, but in the JSON file by ignoring them.
- Fields present in the schema, but not in the JSON file by returning
NULL. - Fields with changed data types that can be cast to the target data type. The same rules as for normal columns apply (int can be cast to string, but not the other way around).
It can not deal with differing value types that can not be cast, especially anything to struct or vice versa:
{
"person": {
"age": 12,
"cool": true,
"name": "Bruce Wayne",
"parents": null
}
}
{
"person": "Al Capone"
}In this example, defining the column person as a struct inevitably fails when it reaches the second object with an error message similar to HIVE_BAD_DATA: Error parsing field value for field 4: Data is not JSONObject but java.lang.String with value Al Capone.
The field number represents the zero-indexed number of actually queried fields from the table.
Since the query optimizer removes columns from the schema the final result does not require, simply selecting * from the table will help to find the correct column.
In the next step, debug tables that read all columns as string can be used to find the conflicting rows and files:
SELECT
"$path",
person
FROM
data_lake.__plain_YOUR_TABLE_NAME
WHERE
-- JSON should have an opening mustache. "person = 'Al Capone'", from the error
-- message, would also work.
person NOT LIKE '{%'Ideally, the issue should only happen in legacy tables where we did not validate the schema before storing.
Real life example: Let's suppose Athena has thrown following error message at you, which you would either directly get from your client if you fired the query manually, or from materialised-views-poc's Sentry project or its logs.
INVALID_FUNCTION_ARGUMENT: Invalid format: "2022-08-23 11:28:26.080000" is malformed at " 11:28:26.080000".
We know from Sentry which table we are talking about since it tells us which URL was called.
In this case, Athena does not tell which column is affected, so first order of business is to find the exact column or a list of column candidates. Pointers could be the data type (which columns are/include timestamps?) or some contextual information that is specific to the table we are looking at.
By looking sharp, we assume the error happens in the request column:
SELECT
"$path",
request
FROM
-- Plain version of events.dynamic_pricing's source table.
data_lake.__plain_dynamic_pricing_events
WHERE
request LIKE '%2022-08-23 11:28:26.080000%'
-- Limit the amount of data queried by narrowing down the date of the event
-- to October 9th 2022.
AND _partition LIKE '2022/10/09/%'There are three solutions, with the first being the preferred one.
Often, only a few files are affected due to bugs in the production system.
In this case, the files can be downloaded, fixed locally and re-uploaded, with backups stored to s3://xxx-data-lake-{sandbox,production}/backups/.
For instance, in order to fix the above issue, you could use following jq program.
However, do not forget to ask your colleagues since they might have some solutions in their history.
jq -c '(select(.person | type == "string").person) = {"name":.person}' source.json > fixed.jsonCasting to string is always possible, so we can define person as a string and use PrestoDB's JSON functions and operators for extracting data.
Compared to the preferred approach of using structs with reliable schemas, this method is tedious and prone for mistakes:
- The JSON parser does not return the original string from the file, but a re-serialised version that went through some funky changes:
- All values are cast to string
- Even
nullbecomes the string"null" - ... except for booleans. They stay.
- The result of above example would be
{"person": {"age": "12", "cool": true, "name": "Bruce Wayne", "parents": "null"}} - Parse with
CAST(NULLIF(JSON_EXTRACT(person, '$.age'), 'null') AS INTEGER), except for booleans. The cast can be omitted if the target type is string. If a valid string value is"null": 🤷
- They claim
json_extract()takes a JSONPath, which is true - but they do not support*, which makes dealing with nested arrays harder.
If we are talking about Sandbox or there are other reasons that we are fine with deleting the whole file that contains the broken record, the file can be safely deleted.
⚠ Structures and arrays are advanced types - use with care.
Be aware that storing empty arrays {} will result in an error (only true if storage format is PARQUET, our default):
Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead
To avoid this, store the arrays conditionally based on their size/cardinality.
-- returns null in case the array has no elements
IF(CARDINALITY(ticket.tags) > 0, ticket.tags)Do not use structs simply to reduce the number of columns in a table by grouping i.e. all attributes of a person into one struct.
Structural types make most sense in cases where, in a classic relational database world, we would create two or more tables with 1:n relationship to a primary table, but in Athena world, we want to denormalize those into one table for performance and convenience.
A good example are the results of market crawling:
Every crawled profile yields multiple tariff offers:
| profile_id | tariffs |
|---|---|
| abc | [{"name": "a", "price": 1}, {"name": "b", "price": 2}, ...] |
As types:
profile_id: string
tariffs:
- name: string
price: integerStructs are not stored with column names, but like bare-bones structs in C.
That means, if your struct looks like <a:string, b:string, c:int> and you remove b, existing files will try to read the second column as integer (<a:string, c:int>) instead of ignoring it.
Likewise, if you naively swap the names of the string columns, the columns of existing files will actually swap data.
- Be very careful when dealing with structs in tables that use historical files (time series, as well as partition visibility "all").
- In the unlucky case that you have to change the schema, only add columns at the end - never change or remove columns.
- In the future, if this case becomes more frequent, we will implement schema evolution tests.
Athena does not tell us the exact schema of a structural types: The only information we get on import are whether a type is a struct (row) or an array.
Hence, the importer will only be able to add empty structs and arrays to tables.yaml that you need to complete yourself.
Some reports require us to process personal data. Due to the sensitive nature of that data, special care needs to be taken when handling it.
This section describes implementation detail in Materialised Views that is not covered by the general concept.
Sensitive databases are defined like regular databases, except for that adding external tables explicitly is forbidden. Instead, those tables are derived from their same-named counterpart in their non-sensitive database sibling. (Materialised) views however are allowed, though the usual considerations about where to put tables apply.
Example:
databases:
- name: claim_sensitive # no DMS tables defined!
- name: claim
external_tables:
- name: claim_table
columns:
- name: ID
type: bigint
- name: non_sensitive
type: string
- name: sensitive
type: string
exclude_reason: personal_data # forbidden in non-sensitive database
replicate_to_sensitive: trueIn this example, additionally to the explicitly defined table claim.claim_table with columns ID and non_sensitive, claim_sensitive.claim_table is automatically generated with columns ID and sensitive.
It is forbidden to include columns into both versions of the table in order to avoid situations where the source of a column (sensitive or non-sensitive version) purely depends on where a developer found it first: exclude_reason: personal_data is mandatory.
For use cases where both sensitive and non-sensitive data of the same table is required, both tables should be joined on ID.
It is further forbidden to specify computed columns with exclude_reason but replicate_to_sensitive.
While this would technically work without issues and result in computed column in sensitive table, but not in non-sensitive, we can not come up with a use case for that yet because the only purpose for computed columns is to pseudonymize personal data in the first place, which does not make sense in sensitive tables.
Actual data is only replicated on production.
Other environments (sandbox) generate a unique string in format $column_name-$ID, where $ID is the value of column ID in the particular row.
This means that data on sandbox is truly anonymized and more lax permissions can apply, like enabling everyone to create review apps.
We decided against more complex methods like hashing or truncating values or even giving the user freedom of choice because:
- This means high potential for confusion and possible data leaks, i.e. hashing house numbers can easily be reverted by use of rainbow tables.
- The actual values or their pseudonymized variant (i.e. hash) are only superior over anonymized values in context of development if identity information (i.e.
group by,distinct) is required. However, the goal here is to provide support for sensitive data in the easiest but safest manner possible and not to enable analysis on raw data. If you find yourself in a situation where you would need pseudonomized values during development, you should probably reconsider your approach and instead i.e. group on ID column or use hashed variants of the column from non-sensitive databases (see Importing Computed Columns).
Permission-wise:
- only
admins can access personal data. - Materialised Views itself can not access personal data. At the current stage, it only serves for configuring metadata and DMS replication of the sensitive databases.
Like every other database engine, PrestoDB has its pitfalls, peculiarities and hundreds or thousands of blog entries on the internet that try to explain them. Hence, we do not want to offer the 57th PrestoDB tutorial, but look at some topics that are common in our case.
Made for the modern world, PrestoDB is pretty powerful when it comes to handling arrays and arrays with objects that we often encounter in JSON input from external. Our classic approach is to turn those arrays into tables, do our job and then join them back to the original table, for instance for pivoting a list of objects. The amount of code required can tremendously be reduced by using array functions, making the query easier to read as well - if one is familiar with those functions.
Assume following array we want to pivot into columns Liability, Partially Comprehensive and Fully Comprehensive with the prices as values:
[
{
"coverage": "Liability",
"price": 1
}, {
"coverage": "Partially Comprehensive",
"price": 2
}, {
"coverage": "Fully Comprehensive",
"price": 3
},
// more...
]In the relational world, we would unnest the array into a table, run column aggregate functions and join back to the original table.
Enter array functions:
transformtakes an array and applies a lambda function to each and every object, comparable toapplyin Pandas DataFrames ormapin Python.filtertakes an array and removes elements if a filter function does not returntrue, like Python'sfilterfunction.reduceis the swiss army knife for any aggregate on arrays. It takes a start value, a function that takes the start value and an array element that gets called on each element and another function that transforms the final aggregated value into a result.
In above case, we could simply write:
SELECT
-- Note: there might be a filter(..., x -> x IS NOT NULL) required for broken
-- data since array_max returns NULL if *any* element is NULL.
array_max(
transform(
filter(col, x - > x.coverage = 'Liability'),
x - > x.price
)
) AS liability_price,
...
FROM
tinstead of
WITH cte AS (
SELECT
id,
-- ... and we are being nice here, assuming FILTER does exist
max(col2.price) FILTER (WHERE col2.coverage) AS liability_price,
...
FROM t
LATERAL VIEW unnest(col) AS col2
)
SELECT
*
FROM
t
JOIN
cte ON cte.id = t.idarray_sum, opposed to array_max, is not implemented in our version of Athena/PrestoDB, but it can be emulated using reduce:
SELECT
reduce(
-- A column name in real world
[1, 2, NULL, 3, NULL],
-- The start value. NULL so we can return NULL if no elements or all are NULL.
NULL,
-- If x is not NULL, add it to the state.
(s, x) -> IF(x IS NULL, s, COALESCE(s, 0) + x),
-- No final transformation - we use the state as it is.
s -> s
)
-- 6PrestoDB provides GREATEST() and LEAST(), but opposed to most other implementations, they return NULL on NULL input.
Instead of resorting to IF() chains or COALESCE(nullable, DATE '1900-01-01') orgies that do even return wrong data (1900-01-01 in case everything is NULL), we can leverage array functions again:
SELECT
-- array_max() returns NULL on NULL input as well, but we can use filter() first.
ARRAY_MAX(FILTER(ARRAY[nullable_column_1, nullable_column_2, ...], x -> x IS NOT NULL))
FROM
tThis is not specific to Athena or PrestoDB.
CASE WHEN comes in two flavours: One can take any expression as conditions - similar to if-else chains in other programming languages, the other compares to a given variable like switch statements in other programming languages:
CASE
WHEN x = 1 THEN 'one'
WHEN x = 2 THEN 'two'
ELSE 'other'
END
-- Can be simplified:
CASE x
WHEN 1 THEN 'one'
WHEN 2 THEN 'two'
ELSE 'other'
ENDThe latter version is preferred for simple statements due to improved readability and less chance of typos (i.e. y instead of x).
Historically, we had a lot of discussion in merge requests revolving around SQL code style. With SQLFluff there is a tool that analyses SQL code and raises errors on code style violations as well as common programming pitfalls (so-called linting) according to rule sets that we all agree on.
We expect SQLFluff to make our lives nicer and easier by reducing the amount of time that we spend in reformatting SQL code to reviewer's likings and the social friction that evolves from that. In an ideal world, SQLFluff will ultimately render SQL Standards pages superfluous and remove all complaints about SQL code style from merge requests.
Realistically, being a linter and not a formatter, it does not give us deterministic formatting of SQL files, meaning that might still be discussions.
Please also note that SQLFluff is highly configurable and configuration is not set in stone.
If you disagree with a given rule, let's discuss it for instance in our standup.
Furthermore, being extensible and written in Python, it is possible to extend it with our own rules, enabling us to potentially write checks on the wildest expectations, like Please, all table's aliases need to start with i_hate_my_life.
SQLFluff tests are running alongside other tests in pytest, which means that a full test suite run will also run all SQL checks.
Tests for table sources are tagged with their respective table path, so pytest -k 'base.some_table' will not only run all Python tests on that table, but also SQLFluff on the table's SQL code.
See also Running Locally.
All new files are automatically picked up.
Existing files need to be enabled for SQLFluff checks by removing their path in .sqlfluffignore since enabling all at once on introduction of SQLFluff would have lead to literally hundreds of errors, making the introducing merge request a monster both in size and time.
For this migration period, you are kindly asked to enable SQLFluff checks on files you touch, so we eventually end up with all of them enabled without assigning the mind-numbing task of fixing hundreds of SQL files to a single person.
In case any problems with SQLFluff it might be worth running in by invoking lint subcommand.
By doing this we avoid any warnings or other log messages suppression.
In order to execute mentioned above command you can run
docker compose run --entrypoint sh test -c "sqlfluff lint /path/to/your/file/test_query.sql"Performance of SQLFluff is bad. Scanning all our ~570 SQL files at the time of introduction takes tens of minutes. To mitigate this, successful test results are cached similar to successful flake8 results, so the test does not run again until the file or SQLFluff's config changed. However, it might be that we need to take further measures if test run times are increasing significantly. Please do not hesitate to bring that up, i.e. in our stand-ups.
- SQLFluff is not perfect and might encounter parser errors.
If you can not make it parse using the known mitigations below, please add the SQL file to
.sqlfluffignore. x IS [NOT] DISTINCT FROM yis not understood without wrapping it into parenthesis and will lead to parser errors. It might not be pretty, but please use(x IS [NOT] DISTINCT FROM y).
Materialised views offers an endpoint (/schema/$database/$table) for retrieving JSONSchema files that can be used for validating rows (records, "JSON Objects") for their compliance with our defined schemas.
Its main purpose is to validate records in the SQS Ingester before they enter our data lake.
Make sure that any record that enters our data lake does not break the table because Athena throws exceptions.
Following design decisions support the goal:
- Existence of properties:
- All properties (columns and struct fields) are optional and might even be missing since Athena will in both cases parse
null. - Extra properties on root (column) level are forbidden. This is actually not required by Athena because it ignores extra properties, however we are using this as an alerting system in case the records we receive change and no one notices us.
- Extra properties in structs are allowed. This pays tribute to the fact that JSON data structures are often very dynamic and prone to change.
- All properties (columns and struct fields) are optional and might even be missing since Athena will in both cases parse
- Data types are validated to a degree that we can be sure that Athena does not encounter issues while casting, for instance:
- Primitive types: A property with data type
integervalidates that the value is an integer or can be cast. - Complex types (Array, Struct) are tested recursively until primitive types are reached.
- Primitive types: A property with data type
Business validation of records since this is very hard to achieve via static schema validation with feasible effort. For instance, nullability of given fields often depends on the value of other fields and allowed values of a string property are often limited because we are actually looking at enumerations.
Ideally, engineering teams would use the same JSONSchema to validate their records before they send them to us, but history shows that this is not happening at all.
You can run data reconciliation SQL scripts using the reconcile-data command. For more details run:
docker compose run app reconcile-data --helpSince the data reconciliation SQL scripts contain parameters that are replaced in the runtime before executing the query against the Athena this command should ease up the process of pre-commit checks and running the scripts locally. Most probably, you anyway will replace the parametrized placeholders in your SQL while developing the reconciliation script.
This usually means that the schema in tables.yaml (which is used to define the expected schema in the Glue metadata catalog) differs from the schema that the view code returns.
- Redeploy the view in order to be sure that expectation and reality are in sync.
- If that did not help, run
import pure-view foo.barin order to update the schema intables.yaml. - If that did not change anything, but the view outputs complex data types, check those manually (for example: The order of fields in a struct).
We are using a query parser to analyse queries, like getting all referenced tables, so we can build efficient and correct DAGs on Airflow. In ancient times, this parser was not great in understanding Athena and Materialised Views offered features to work around that. However, we were working closely together with the developer in order to fix bugs as soon as possible and there do not seem to be any issues anymore.
If you encounter the said issue, please bring it up in our team's chat or standup.
We want to be able to query, if possible, all our data in one engine (as in: We can JOIN everything in one place).
After some research and learnings from the past, we decided that we want to give ETL tasks on Athena a go since just firing a query on Athena is much simpler and often faster and more cost-effective than developing and running a Spark job.
One of our primary goals is to never ever maintain the same piece of code twice. That is why we want to provide base table (and possibly all other queries) as views on Athena and make them available to all downstream jobs like this.
However, during research it became apparent that using views does not provide ideal performance though with our current queryset size negligible, but can also lead to weird exceptions when queries become too complex, especially in self-joins. Since AWS Elastic Views are not an option either, we decided to implement matviews.
- Implement queries as
SELECTstatements, notCREATE VIEW. This enables query developers to simply change and fire queries without copy&pasting it elsewhere. Furthermore, automation and tests can grab the query definition and do whatever they need, for instance analyse the query structure or run against real data and run quality checks. - Hide implementation results from users and query developers: In the target databases, there must not be anything else than the final data that those people should use. They should and do not care whether it is a view, a table or a table of our materialised view algorithm. That is why we are using two databases to hide the actual view code and the staging tables.
- Explicit is better than implicit: Views are defined in a YAML configuration file.
- Schemas are defined explicitly.
This helps us in a lot of ways:
- Protect us against unwanted schema changes as well as surprising implicit type conversions.
- Enable us to define external tables.
- Enable us to validate schema evolution, as in: Only allow well-defined actions, like adding a column, but not removing columns.
- In those schemas, no arbitrary data types are allowed.
This helps us to enforce standards further, i.e. by not allowing arbitrary combinations of precision and scale in
Decimaltypes. If a data type you require is not present, do not hesitate to discuss with the team if it makes sense. - The HTTP server does not implement authentication. It is not exposed to the world, but only callable from within the Kubernetes cluster. As long as we do not expose data in the APIs, but just metadata, it does not make sense to implement authentication.
We are relying on classic registering of partitions in the Glue metadata store instead of partition projection because the latter is only working for Athena (August 2021), not Spark/Glue or any other member of the Hive ecosystem.
However, if that changes at some point in the future, we might want to re-evaluate, because partition projection removes the need to manage partitions and is even a bit faster, presumably because no calls to the metadata store are required.
Note: For historical reasons, the "firehose ingested" table type is using partition projection. We will only change it if needed, i.e. because we want to query the tables in Spark.
Originally, we used mo-sql-parsing when we started this project since it was the only one we found that could parse the biggest portion of our queries. Its accuracy is fine and in the meantime it implemented all Athena language features that we need.
In the meantime, more libraries emerged:
- sqlparse is a fairly old library that is often used for parsing, formatting and splitting SQL. However, its low level output is that of a tokenizer (list of tokens, including space, comma, etc.), not a parser (abstract syntax tree, AST).
- sqlfluff actually is a parser as well, but its target lies more on linting, so is the focus of their interfaces.
- sqloxide is fast, but does not support our SQL dialect.
- sqltree is practically dead.
Ultimately, sqlglot is a very powerful and fast (10x compared to mo-sql-parsing) library with a great interface to ASTs and that is what we are using today.