Data comes in a vast range of shapes and sizes for Data Engineers, with common categories being structured data (such as a database table), semi-structured (such as JSON or XML), and unstructured data (such as text or images). Naturally, the requirements for the result can be quite varied as well. In this article, we’ll explore some typical obstacles that Data Engineers face when integrating data through a case study, to meet a specific requirement based on a given input.
The input
In this particular case, the data is arriving at a BigQuery from a source system, albeit in a very particular fashion. Each event can be either a full event (with all the attributes ) or can have only one (or more) event attribute, with the other attributes being NULL.
The desired outcome
From the input above, we’d like to obtain a structure that will allow us to tell, at any point in time, what is the state of all attributes (or their last known state at the moment), akin to a temporal fact table.
Analyzing our options
Typically when we’d like to access a previous value, we could use the LAG function with a window expression, as below:
SELECT input_data.country_id, input_data.store_id, input_data.event_time, input_data.attribute1,
LAG(input_data.attribute1) OVER(PARTITION BY country_id, store_id ORDER BY input_data.event_time) AS previous_value
FROM input_data
WHERE store_id = 'DE1004'
ORDER BY event_time DESC
Now, there is a couple of problems with the above approach.
First, the LAG function in BigQuery does not support a ‘IGNORE NULLS’ command at the moment, so we end up getting the previous value (even if it is a NULL) when we’re interested in the most recent non-NULL value.
Second, we’d need to apply such a transformation to each attribute. We might get away with 10 attributes, but how about when we have several dozens of them?
Let’s (un)pivot
It would be great if we can transpose all these attributes from columns to rows. For this, we have the UNPIVOT function. We’d first need to cast the attributes to a common data type (typically STRING) — that is because all the same inside the new column must have the same data type.
Then we can UNPIVOT the value of each attribute, thus obtaining two columns: the attribute and its value.
WITH processed AS (
SELECT
event_time,
country_id,
store_id,
attribute1,
CAST(attribute2 AS STRING) AS attribute2,
FROM input_data
)
SELECT
event_time,
country_id,
store_id,
attribute,
value
FROM processed
UNPIVOT(value FOR attribute IN (attribute1, attribute2))
But there is a problem with this approach. As you can see above, we’re getting a dense table — so there are no NULL entries for our attributes. That is because UNPIVOT ignores NULL values. We need something else to help us get the sparse table, a way to unpivot the values while keeping the nulls.
And then I found an interesting approach in a Stack Overflow answer.
I’ve adapted it for our needs, namely to work around the fact that an attribute could be a timestamp, which contains a colon. So what does the below code do?
We start by converting the row into a JSON object
We’re splitting it into key-value pairs
Using regular expressions, we extract the part before the first colon — the key (attribute)
We also extract the part after the first colon — the value
Since JSON is going to provide a ‘null’ string, we’re converting that to an actual NULL
WITH processed AS (
SELECT
event_time,
country_id,
store_id,
attribute1,
CAST(attribute2 AS STRING) AS attribute2,
FROM input_data
)
SELECT
event_time,
country_id,
store_id,
REGEXP_EXTRACT(kvp, '([^:]+)') AS attribute,
NULLIF(REGEXP_EXTRACT(kvp, ':(.*)' ), 'null') AS value
FROM processed i,
UNNEST(SPLIT(REGEXP_REPLACE(TO_JSON_STRING(i), r'[{}"]', ''))) kvp
This yields the following results:
You can now notice we have our sparse table, including the null (represented above as missing) attribute values.
Now, it’s time for us to fill in the gaps for each attribute value with the last non-NULL value. We’ve already mentioned that the LAG window function ignores NULLs, so we need another way to achieve this. Once again, it’s StackOverflow to the rescue.
In this code, we’re:
defining a window over our non-changing columns (country_id, store_id) + attribute, ordering it by event_time decreasingly
leveraging the NTH_VALUE column with IGNORE NULLS
If there is a value, we keep it, otherwise extract the last non-NULL event in our window
SELECT
event_time,
country_id,
store_id,
attribute,
value AS actual_value,
COALESCE(value, NTH_VALUE(IFNULL(value, NULL), 1 IGNORE NULLS) OVER previous_events) AS computed_value
FROM unpivoted
WINDOW previous_events AS (
PARTITION BY country_id, store_id, attribute ORDER BY event_time DESC ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING)
This yields the following result:
Pivoting the data
We can now PIVOT the data back to its original form. We’d need an aggregate function like ANY_VALUE as well as CAST to the attribute value’s original datatype. Given we’re using an aggregate function, we’d also need to use GROUP BY.
SELECT
event_time,
country_id,
store_id,
ANY_VALUE(attribute1) AS attribute1,
CAST(ANY_VALUE(attribute2) AS int64) AS attribute2,
CAST(ANY_VALUE(attribute3) AS int64) AS attribute3,
CAST(ANY_VALUE(attribute4) AS bool) AS attribute4,
ANY_VALUE(attribute5) AS attribute5,
CAST(ANY_VALUE(attribute6) AS int64) AS attribute6,
CAST(ANY_VALUE(attribute7) AS int64) AS attribute7,
CAST(ANY_VALUE(attribute8) AS bool) AS attribute8,
ANY_VALUE(attribute9) AS attribute9,
CAST(ANY_VALUE(attribute10) AS int64) AS attribute10
FROM computed_values
PIVOT(ANY_VALUE(computed_value) FOR attribute IN
('attribute1','attribute2', 'attribute3', 'attribute4', 'attribute5', 'attribute6', 'attribute7', 'attribute8', 'attribute9', 'attribute10'))
GROUP BY event_time, country_id, store_id
Final touches
With data now pivoted, we can create the temporal structure. We’re going to:
Consider the event_time as valid_from
Use the LEAD window function to retrieve the next event
Subtract a second from the next event and consider that as valid_to
In case there is no future event (this being a current_state), provide a placeholder with a date well into the future i.e. ‘2999–12–31 23:59:59 UTC’
SELECT
country_id,
store_id
event_time AS valid_from,
IFNULL(DATE_SUB(LEAD(event_time,1) OVER(PARTITION BY country_id, store_id ORDER BY event_time ASC ), INTERVAL 1 SECOND), '2999-12-31 23:59:59 UTC') AS valid_to,
attribute1,
attribute2,
attribute3,
attribute4,
attribute5,
attribute6,
attribute7,
attribute8,
attribute9,
attribute10
FROM pivoted
We can now query the history of changes to see its state at any point in time, including the current state.
SELECT
country_id,
store_id,
valid_from,
valid_to,
attribute1,
attribute2,
attribute3,
attribute4,
attribute5,
attribute6,
attribute7,
attribute8,
attribute9,
attribute10
FROM final_table
WHERE CURRENT_TIMESTAMP() BETWEEN valid_from and valid_to
Conclusion
In this practical exercise, we’ve looked at transforming variable-schema event data in BigQuery into a temporal structure, allowing us to query the state of all the attributes at any point in time.
Thanks for reading and stay tuned for other interesting posts about Data Engineering.