Enable ETL Using Snowflake Task and Stream — Super Easy

In this article I will try to explain how task and stream can be easily clubbed together to execute ETL pipeline.
Content:
1. What is Stream?
2. What is Task?
3. Demo
What is Stream?
You can consider stream as a spy which always monitors your source table and captures any changes made to the table. Like Insert, Update, Delete even Truncate. And guess what, you don’t need to implement any additional logic to capture changes either via updated timestamp or row version. Means Stream helps your CDC (Change Data Capture) very simple.
Few key points about Stream:
Stream is a first-class object which comes under Schema. We can create stream on Permanent/Transient/Temporary Tables.
Any stream table will have below 3 additional metadata columns act as CDC tracker.
METADATA$ACTION: captures DML operations like DELETE, INSERT
METADATA$ISUPDATE: if TRUE, then operation was part of an UPDATE statement. If FALSE, then part of Insert or Delete.
METADATA$ROWID: It’s a unique ID and can be used to track against specific row.
Note: metadata$action column contains either Insert or Delete. If we perform an Update on a specific row in our base table then the Stream will capture 2 rows —
1) METADATA$ACTION = INSERT and METADATA$ISUPDATE = TRUE. and 2) METADATA$ACTION = DELETE and METADATA$ISUPDATE = TRUE.
This is because the record is deleted first, and then new value is getting inserted during Update operation.
A sample Stream Table:

ID, NAME, CITY these columns are from Base table, rest 3 are Metadata columns (as mentioned above).
1st row: the entry is Deleted from base table. ACTION = DELETE and ISUPDATE = FALSE
2nd and 3rd row: the name of ID 111 is Updated in base table. Stream captures 2 rows for this operation. ACTION = INSERT and ISUPDATE = TRUE (updated record), and ACTION = DELETE and ISUPDATE = TRUE.
4th and 5th row: ACTION = INSERT and ISUPDATE = FALSE, which means Insert operation.
Stream doesn’t hold any data, rather it tracks the offset value. Once we consume stream data it will be empty, because at this point the offset of stream object is pointing to the latest version of the table. That means offset is moving forward as we consume stream data. This helps to get rid of duplicate values.
Refer to the below image, it shows what kind of data stream table holds when we perform DML operations on Base table.


NOTE: we can use multiple queries to consume the same Stream data but without updating the offset. For that we need to write our statements within a Transaction block (BEGIN — <Statements> — COMMIT). This will lock our stream data on that specific offset and if we run some DML operation on our Base table in parallel, stream will not capture them until it executes the COMMIT statement.
Stream has its own data retention period (default 14 days) which is no longer related to Time Travel of base table.
What is Task?
A Task represents a schedule unit of work. We can schedule a task at specific interval to execute certain SQL statements or Stored Procedures. It helps to automate and orchestrate within Snowflake.
Remember a Task can execute only one SQL statement or Stored Procedure. But we can wrap multiple SQL statements within a single stored proc and call.
Even we can build a task tree. There will be one parent/root task and can have multiple hands/sub-tasks. Only the root task will have a defined schedule and other sub tasks in the tree will have dependencies on its predecessor (CREATE A TASK child task …. AFTER predecessor task).

Note: While creating Task, we can either define the schedule time in minutes (you can see in below demo) or can use CRON.
As this article is mainly to provide a simple demo on ETL using stream and task, hence you can refer to Snowflake official site to understand more about Task and Stream.
DEMO
The requirement is very simple, there is a Source Employee table (EMP_BASE_TABLE). Once we get data in source table, will UpSert into our target table (EMP_FINAL_TABLE).
Look into the below pic, a Stream (Mr. Bond) which will monitor and capture all the changes (insert, update, delete, truncate) made on Source table. There is a Task (Mr. Manager) scheduled every 1 min interval to check whether stream has data. If yes then the task will perform (Mr. Worker) UPSERT (Update or Insert).
Once data is consumed the Stream offset will point to the latest version of the table and becomes empty. Stream will capture if any further changes on Base table and same process will repeat.

i) Create a Base Table
create or replace table EMP_BASE_TABLE (
EMPLOYEE_ID int,
NAME varchar(255),
JOB_ID varchar(50),
DEPARTMENT_ID int
);
ii) Create Destination table which will consume stream data.
create or replace table EMP_FINAL_TABLE (
EMPLOYEE_ID int,
NAME varchar(255),
JOB_ID varchar(50),
DEPARTMENT_ID int
);
iii) create a Stream on base table (EMP_BASE_TABLE)
create or replace stream empstream on table EMP_BASE_TABLE;
NOTE: the above Stream will capture all changes like Insert, Update, Delete. But if we want to capture only Append/Insert then just add additional parameter append_only = True at the end of your create stream statement (insert_only = True for External table).
The Stream table has 3 additional metadata columns.

iv) Create Task which will check whether Stream has any data in every minute and if YES then will perform UPSERT on Final/destination table. Also, this task will deal with only Insert or Updated records.
create or replace task emp_task
warehouse = YOUR_WH_NAME
schedule = ‘1 minute’
when
system$stream_has_data(‘empstream’)
as
merge into EMP_FINAL_TABLE as tar
using empstream as src
on tar.EMPLOYEE_ID = src.EMPLOYEE_ID
when matched and src.METADATA$ACTION = ‘INSERT’ and src.METADATA$ISUPDATE = TRUE then
update set
tar.JOB_ID = src.JOB_ID,
tar.DEPARTMENT_ID = src.DEPARTMENT_ID,
tar.NAME = src.NAME
when not matched and src.METADATA$ACTION = ‘INSERT’ and src.METADATA$ISUPDATE = FALSE then
insert (EMPLOYEE_ID, NAME, JOB_ID, DEPARTMENT_ID)
values (src.EMPLOYEE_ID, src.NAME, src.JOB_ID, src.DEPARTMENT_ID);
At this point the task will be suspended and need to resume

alter task emp_task resume;

Observation 1 — Version 0 : so far all objects are created, Stream is spying on Source Table and every 1 min interval Task is checking Stream data. All objects are having 0 record.

v) Let’s Insert 5 new records into base table
insert into EMP_BASE_TABLE
VALUES
(100, ‘Donald’, ‘SH_CLERK’, 50),
(101, ‘Douglas’, ‘SH_CLERK’, 50),
(102, ‘Jennifer’, ‘AD_ASST’, 10),
(103, ‘Michael’, ‘MK_MAN’, 20),
(104, ‘Pat’, ‘MK_REP’, 20);
Stream captures those 5 records. ACTION = INSERT and ISUPDATE = FALSE means new records.

Observation 2 — Version 1: Source table has 5 records (assume V1) but Stream offset is pointing to the older version (i.e. V0), hence it captures the delta (V1-V0). So far we didn’t consume stream data.

Now after 1 min our task executes (scheduled every 1 min interval) and final table also has those 5 records. Here we consumed stream data.

As we know offset is moving forward once we consume stream data, the stream offset is pointing to the latest version of the table (V1-V1), hence no record in stream table.

Observation 3 — Version 1: Both Source and Final tables are having same records and Stream is empty.

vi) Let’s Update 2 records and Insert 1 new record into base table
update EMP_BASE_TABLE set Name = ‘DUMMY1’, JOB_ID = ‘DUMMY_JOB’ where EMPLOYEE_ID = 100;
update EMP_BASE_TABLE set Name = ‘DUMMY2’, DEPARTMENT_ID = 500 where EMPLOYEE_ID = 101;
insert into EMP_BASE_TABLE VALUES (1, ‘Susan’, ‘HR_REP’, 40);
Base Table has 6 records. The Stream table captures 1 new record (action = INSERT and IsUpdate = False) and for each update 2 entries — 1 Insert (IsUpdate = True) and 1 Delete (IsUpdate = True).

After 1 min, Final table will have all these changes.

And as mentioned above Stream offset will again point to the latest version and becomes empty, ready to captures upcoming changes.
Observation 4 — Version 2: Before consumption Stream captured all the changes (V2-V1). Post consumption both Source and Target tables are in sync and Stream becomes empty (V2-V2)

Hope you get some idea about Stream, Task and how to build your ETL pipeline. It’s simple, hassle-free and easy to implement.
Happy Learning !!