Gestione SCD con Delta Change Data Feed in Fabric

All'interno di Microsoft Fabric, le tabelle Delta trovano applicazione sia nel Data Warehouse che come tabelle gestite all'interno della Lakehouse. La loro versatilità si estende a diverse caratteristiche e funzionalità, rendendoli indispensabili in vari casi d'uso. Una di queste funzionalità è il delta change data feed

In questo articolo voglio mostrare come sfruttare il delta change data feed per facilitare la sincronizzazione continua dei dati tra diverse tabelle in lakehouse diversi all' interno della architettura a medaglione.

Che cosa sono i data feed?

I data feed consentono di tenere traccia delle modifiche a livello di riga tra le versioni di una tabella Delta. Se vengono abilitati su una tabella Delta vengono tracciate le modifiche con dei metadati che indicano se la riga specificata è stata inserita, eliminata o aggiornata.

Scenario di utilizzo

Lo scenario seguente illustra la procedura dettagliata per abilitare i data feed di modifica su una tabella delta in un livello silver. Successivamente simuleremo la modifica delle righe di questa tabella silver e vedremo come con una logica di tipo SCD 2 le modifiche saranno riportate nella tabella nel livello gold.

Per semplicità, lavoreremo solo con 2 Lakehouse (Silver e Gold) evitando di partire dal livello bronze per semplicità. Il codice seguente caricherà i nostri dati grezzi in una tabella Delta, "DimProducts", nella "SilverLayer" (la nostra Lakehouse predefinita).

Creiamo ora la tabella con il seguente script

DimProducts= [("EC-R098", 10000, "Silver"),  
              ("EC-T209", 1000, "Silver"),
              ("FW-1000", 7000, "Red"),
              ("FL-2301", 500, "Black") ]
columns = ["ProductAlternateKey","SafetyStockLevel","Color", ]
spark.createDataFrame(data=DimProducts, schema = columns).write.format("delta").mode("overwrite").saveAsTable("DimProducts")

Il risultato dello script si può vedere nell'immagine sotto

Ora popoliamo la tabdella nel layer gold utilizzando il seguente script di copia dati dal silver layer al gold layer

import pyspark.sql.functions as F
spark.read.format("delta").table("SilverLayer.DimProducts").withColumn("DeletedFlag", F.lit("N"))  \
     .write.format("delta").mode("overwrite").save("abfss://dcfbd0ab-da78-491a-8a60-1ace7dd434fb@onelake.dfs.fabric.microsoft.com/0420a87e-4462-4952-a987-5f572a96bf49/Tables/DimProducts")

Ora le tabelle sono state create ed essendo una la copia dell'altra sono anche sono sincronizzate. Il prossimo passo è abilitare il Change Data Feed sulla tabella DimProduct del livello Silver su cui poi faremo le modifiche dei dati. Per abilitare usiamo il seguente script

%%sql
ALTER TABLE SilverLayer.DimProducts SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Con Change Data Feed abilitato sulla nostra tabella, possiamo iniziare a tenere traccia delle nostre modifiche. Quindi, apporteremo 3 modifiche alla tabella "DimProduct" (1 aggiornamento, 1 eliminazione e 1 inserimento). L'obiettivo è identificare cosa è cambiato nella tabella del livello sylver ed essere in grado di replicare tali cambiamenti nella versione "DimProducts" del livello gold proprio come fosse una SCD di tipo 2 di un nostro ipotetico Data warehouse.

%%sql
-- Inserimento di un record
INSERT INTO SilverLayer.DimProducts  VALUES ('BK-R93R-44", 550, "Silver")

-- Aggiornamento di un record
UPDATE SilverLayer.DimProducts SET SafetyStockLevel = '1500' WHERE ProductAlternateKey = 'EC-T209'

-- delete a record
DELETE from SilverLayer.DimProducts WHERE ProductAlternateKey  = 'FL-2301'

prima delle modifiche

nuova situazione dopo le modifiche ai dati

Per visualizzare i Data Feed di modifica e per vedere esattamente cosa è successo possiamo usare la funzione DESCRIBE. Nell'immagine sotto dopo l'eseguzione dello script si possono vedere che sono presenti un buon numero di colonne di metadati associate alle modifiche, ma per semplicità ci concentreremo su "version", "operation" e "operationparameters" della colonna. Eseguiamo allora lo script per vedere questa funzionalità

%%sql 
describe history SilverLayer.DimProducts;

Queste informazioni sono utili, ma per ottenere un risultato più intuitivo e informativo che indichi chiaramente i valori modificati, le diverse versioni di tali modifiche o anche una versione o un commit specifico, possiamo utilizzare la funzione "table_changes" cui si passa la tabella e il valore che indica la prima versione del cambiamento che si vuole visualizzare. Di seguito lo script

%%sql
SELECT * FROM table_changes('SilverLayer.DimProducts', 1) order by _commit_timestamp DESC;

Nell'immagine sopra possiamo vedere i valori prima e dopo dei nostri 3 cambiamenti infatti la riga 1 e 2 rappresentano il prima e il dopo dell'aggiornamento del codice EC-T209 la riga 3 mostra la cancellazione della riga con codice FL-2301 e la riga 4 mostra l'inserimento del codice BK-R93R-44

Queste informazioni sono estremamente preziose, quindi per realizzare la SCD 2 sul livello gold. Creaimo una vista temporanea che prende in considerazione solo le ultime versioni di ogni codice

%%sql
CREATE OR REPLACE TEMPORARY VIEW DimProducts_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by ProductAlternateKey  order by _commit_version desc) as rank
          FROM table_changes('SilverLayer.DimProducts', 1)
          WHERE _change_type !='update_preimage')
    WHERE rank=1

Sotto il risultato nell'immagine

Ora utilizzeremo un'istruzione SQL MERGE per allineare la tabella del livello gold con le modifiche della tablle del livello silver in modo tale che siano allineate. L'obiettivo di questa istruzione MERGE è :

  • Inserisci eventuali nuove righe nella tabella GoldLakehouse.

  • Aggiorna una riga esistente in base ai criteri di unione e alla corrispondenza dei valori.

  • Aggiorna la nostra colonna "DeletedFlag" per le righe che sono state eliminate.

Di seguito lo script per la realizzazione della Merge

%%sql
MERGE INTO GoldLayer.DimProducts t 
USING DimProducts_latest_version  s ON s.ProductAlternateKey  = t.ProductAlternateKey   
WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET SafetyStockLevel = s.SafetyStockLevel 
WHEN MATCHED AND s._change_type='delete' THEN UPDATE SET DeletedFlag = 'Y' 
WHEN NOT MATCHED THEN INSERT (ProductAlternateKey, SafetyStockLevel, Color,DeletedFlag) VALUES (s.ProductAlternateKey, s.SafetyStockLevel,s.Color, 'N')

Il risultato della merge lo si può vedere sotto