Snowflake: Fechas erroneas al insertar con Python

Algo que me paso en proyecto, es tener que insertar datos de MySQL directo a Snowflake a traves de python y que las fechas no recibieran el formato correcto. Cuando hacia el SELECT sobre la tabla en Snowflake, los campos de tipo DATE o DATETIME tenian siempre un valor como ‘541234123-12-01 00:00:00’.

Entiendo que el problema, esta principalmente al utilizar la funcion write_pandas, pero algo bastante molesto que me encontre, es que en Pandas, todos los campos de tipo fecha son ‘object’ entonces es dificil listarlos simplemente.

Una opcion, es recorrer todos los campos de tipo ‘object’ intentar convertirlos a fechas y en donde falla simplemente ignorarlo y seguir:

import pandas as pd


# Assuming df is your DataFrame
df = pd.DataFrame({
'ID': [1, 2, 3],
'Date1': ['2022-01-01', '2022-02-01', '2022-03-01'],
'Amount': [100, 150, 200],
'Date2': ['2022-01-15', '2022-02-15', '2022-03-15'],
'Category': ['A', 'B', 'A']
})

# Find columns with date-like values among object columns
date_like_columns = []
for col in df.select_dtypes(include='object').columns:
try:
pd.to_datetime(df[col], errors='raise')
date_like_columns.append(col)
except ValueError:
pass

# Print the list of date-like columns
print("Date-like columns:", date_like_columns)

El problema, es que para datasets grandes, quizas sea mas simple identificar a las columnas por su nombre. Si trabajan con una base de datos donde los campos de tipo DATE estan identificados en su nombre, o si tienen una lista de columnas que quieran convertir si o si, pueden hacer algo como lo siguiente:

    def write_pandas_df_to_table(df, table_name: str):

pd.set_option('display.max_columns', None)
df.drop(df.columns[df.columns.str.contains('unnamed', case=False)], axis=1, inplace=True)

for col in df.columns:
if (col.startswith('DATE') or
(col.endswith('DATE') and not col.endswith('UPDATE')) or
col.endswith('SYNC') or
col.endswith('_AT') or
col.endswith('DATETIME') or
col in DATE_COLUMNS_LIST):
df[col] = df[col].replace(['0000-00-00'], None)

if col not in NOT_DATE_COLUMN_LIST:
try:
df[col] = df[col].replace(['0000-00-00 00:00:00'], None)
df[col] = pd.to_datetime(df[col], format='%Y-%m-%d %H:%M:%S.%f', errors='coerce')
df[col] = df[col].dt.tz_localize('UTC')
except Exception as e:
print('Column with error: {}'.format(col))
raise e
return write_pandas(<conexion a snowflake>, df, table_name)

Este ultimo codigo tiene ademas algunos pasos extras para remover las fechas ‘0000-00-00’ que se pueden a veces ver por ejemplo en MySQL. Tambien le da el formato standard a las fechas de ‘YYYY-MM-DD’. Actualiza el dataframe, y luego corre el insert a Snowflake.

Snowflake: Como cargar datos desde AWS S3

Snowflake: Como cargar datos desde AWS S3

Actualmente estoy en un proyecto donde extraemos archivos de distintas fuentes de datos, los enviamos a s3 y luego cargamos los datos a Snowflake. Si pueden evitar usar codigo Python+Pandas, yo preferiria utilizar directamente el comando COPY (o Snowpipes) de Snowflake (que ya esta optimizado para hacer ese bulk insert), pero, segun entiendo, solo lo pueden hacer si Snowflake esta en la misma cuenta donde se encuentran los buckets. Como este no era el caso, ya que nuestros sistemas estan en distintas cuentas de AWS (manejadas todas con Tower Control), me veo obligado a manejar achivos en distintas cuentas de AWS con Snowflake configurado en una cuenta completamente distinta.

Para simplificar la carga, decidi crear un script desde el cual pueda cargar datos desde cualqueir sistema. Para empezar, extraigo los datos en formato CSV ¿Por que CSV? Es el formato recomendado por Snowflake. Les recomiendo revisar los siguientes links:

Haciendo un resumen de las recomendaciones, lo que tienen que tener en cuenta es lo siguiente:

  • Los archivos en lo posible deberian estar comprimidos (gzip o similar)
  • Deben ser mas grandes o iguales a 10 MB (siempre que sea posible)
  • No deberian superar los 250 o 300 MB (pueden ser un poco mas grandes, pero no nunca mayores a unos pocos GB).
  • El formato recomendado es CSV aunque puede procesar otros formatos bastante bien.

Primero hay que agregar los imports:

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import boto3

Para leer un archivo en S3 debemos hacer lo siguiente:

session = boto3.session.Session()
client = session.client('s3')
csv_object = client.get_object(Bucket=<s3_bucket>, Key=<s3_key>)

with pd.read_csv(csv_object.get("Body"), chunksize=<chunksize>) as reader:
for chunk in reader:
print(chunk.to_string())

De esta forma podemos leer un archivo cada un <chunksize> o cantidad de filas (recomendado para no quedarse sin memoria). En este caso, cada ‘chunk’ es un dataframe de pandsa. Luego, tenemos que crear la conexion a snowflake:

ctx = snowflake.connector.connect(user='<user>',
                                               password='<password>',
                                               account='<account>',
                                               warehouse='<warehouse>',
                                               database='<database>')
cursor = ctx.cursor()

NOTA: No dejen sus passwords directamente en el codigo y menos aun si estan usando Git. Pueden almacenar la informacion en algun archivo seguro o con algun servicio como AWS Secret Manager.

Una vez que la conexion a Snowflake esta abierta, yo corro una serie de consultas para definir el contexto y donde estoy ejecutando las queries:

cursor.execute('USE ROLE <my_role>')
cursor.execute('USE DATABASE <my_db>')
cursor.execute('USE WAREHOUSE <my_compute>')
cursor.execute('USE SCHEMA <my_schema>')

Una vez que el contexto esta listo, puedo escribir los chunks en snowflake:

        with pd.read_csv(csv_object.get("Body"), chunksize=chunksize) as reader:
            for chunk in reader:
                chunk.to_string()
                chunk.columns = chunk.columns.str.upper()
                try:
                    write_pandas(ctx, chunk, <table_name>)
                except Exception as e:
                    print("There was an error while loading certain fields")
                    print(e)

IMPORTANTE: Las columnas del dataframe tienen que estar en uppercase y el dataframe en si mismo tiene que tener el mismo formato que la tabla. Es posible que tengan problemas con las fechas (¿quien no tiene problemas con las fechas?). En ese caso, pueden recorrer el dataframe y corregir las columnas antes de cargarlo a Snowflake. Por ejemplo, podemos convertir a ‘datetime’ las columnas que contengan ‘_DATE’ o ‘DATE_’ en el nombre:

        for col in chunk.columns:
            if ('_DATE' in col or 'DATE_' in col):
                df[col] = pd.to_datetime(df[col], format='%Y-%m-%d %H:%M:%S.%f')
                df[col] = df[col].dt.tz_localize('UTC')

Cualquier duda o comentario pueden dejarlo debajo.

Postgre: Recrear un usuario y asignarle permisos de solo lectura a un esquema

Crear un usuario en Postgre, no es demasiado complicado. Se puede simplemente hacer CREATE USER y con eso bastaria para tener el usuario pero ¿que pasa si quiero volver a crearlo? Cuando intente hacer DROP USER obtendre un error en caso que ese usuario ya este asignado o tenga permisos a distintos objetos de la base de datos.

Para re-crear el usuario, tendré entonces que revocar todos los permisos previos que tenga el usuario, borrarlo y crearlo de nuevo y asignarle los permisos que yo quiera. En este caso, le voy a dar permisos de lectura a un «esquema» de la base de datos. En este caso especifico, en vez de dar los permisos directamente a un usuario, creo un ROL, le asigno los permisos al ROL y luego asigno al usuario al ROL. De esta forma, si se crea una nueva tabla en el esquema, el usuario va a poder leer sus datos sin volver a tener que darle permisos.

Para ello, cree el siguiente script:

-- ROL: Si tiene permisos, los borro
DO $$DECLARE count int;
BEGIN
SELECT count(*) INTO count FROM pg_roles WHERE rolname = 'esquema_readonly';
IF count > 0 THEN
    DROP OWNED BY esquema_readonly;
END IF;
END$$;

-- USUARIO: Si tiene permisos, los borro
DO $$DECLARE count int;
BEGIN
SELECT count(*) INTO count FROM pg_roles WHERE rolname = 'usuario';
IF count > 0 THEN
    DROP OWNED BY "usuario";
END IF;
END$$;


-- Borro el usuario y el rol
DROP USER IF EXISTS "usuario";
DROP ROLE IF EXISTS "reference_readonly";

-- Creo el usuario
CREATE USER "usuario" WITH PASSWORD 'somepassword';

-- Le doy permisos para poder conectarse a "laBaseDeDatos"
GRANT CONNECT ON DATABASE "laBaseDeDatos" TO usuario;

-- Creo el rol
CREATE ROLE "reference_readonly";

-- Le doy permisos al rol sobre el esquema "esquema"
GRANT USAGE ON SCHEMA "esquema" TO "esquema_readonly";

ALTER DEFAULT PRIVILEGES IN SCHEMA "esquema"
GRANT SELECT ON TABLES to "esquema_readonly";

-- Asigno el rol al usuario
GRANT "esquema_readonly" TO usuario;

De esta forma puedo borrar un usuario existe y manejar sus permisos a través de roles. Creen roles justamente para manejar la seguridad. Es mas fácil tener roles de distintos tipos y agregar o sacarle permisos a los roles que trabajar todo por usuario. Facilita la administración de la seguridad de la base de datos y hace la vida de los DBA mas fácil.

Saludos!

T-SQL: Al hacer JOIN de un VARCHAR y un VARBINARY no hay resultados

Algunos sistemas, traen o tienen como dato una columna de tipo «binaria» que no es otra cosa que un codigo hexadecimal como por ejemplo «0x000A123FB». Algunas personas deciden almacenar este dato como varchar en la base de datos. De una forma u de otra, en principio no tengo ningun motivo para estar a favor o encontra, pero el problema esta si quieren hacer join de este valor en una columna VARCHAR con una en VARBINARY.

Probemos con el siguiente código:

DROP TABLE IF EXISTS tablaBinaria

CREATE TABLE tablaBinaria (
	Id	INT IDENTITY(1,1) PRIMARY KEY,
	binario VARBINARY(4) NOT NULL)

INSERT INTO tablaBinaria (binario)
	VALUES (0x02AF), (0x3BE0)

DROP TABLE IF EXISTS tablaString 

CREATE TABLE tablaString(
	Id	INT IDENTITY(1,1) PRIMARY KEY,
	binario VARCHAR(8) NOT NULL)

INSERT INTO tablaString (binario)
	VALUES ('0x02AF'), ('0x3BE0')

Si corren un SELECT de la siguiente forma, tendran 0 resultados:

SELECT *
FROM tablaBinaria tb
INNER JOIN tablaString ts
	ON tb.binario = ts.binario

A pesar que SQL Server realiza un casteo automatico, no logra convertir el tipo de dato apropiadamente. Incluso, aunque fuercen el casteo, esto tampoco funciona:

SELECT *
FROM tablaBinaria tb
INNER JOIN tablaString ts
	ON cast(tb.binario as varchar(8)) = ts.binario

De la única forma que logramos obtener resultados en T-SQL, es haciendo CONVERT y agregando la opción «1» al castear el VARBINARY:

SELECT *
FROM tablaBinaria tb
INNER JOIN tablaString ts
	ON CONVERT(VARCHAR(8), tb.binario, 1) = ts.binario

De esta forma, finalmente podremos traer resultados.

Como un comentario final, tengan en cuenta que castear una columna entera para resolver un JOIN, no es una practica muy performante en bases de datos con mucha información. Si ustedes tienen VARBINARY, operen con este mismo tipo de dato en todas las tablas que tengan la misma columna y eviten castear columnas en los JOINs. Esto solo agrega tiempo de procesamiento a una operación que ya es altamente costosa desde la perspectiva del procesamiento.

Suerte!

Windows: generando el par de llaves (keys) publica y privada

Usualmente tenia que hacerlo siempre en Linux para no tener que instalar putty en windows. También sucede que las llaves que se crean son por lo general de distinto formato (PEM, etc) y no suelen servir para los mismos casos. Por suerte, tenemos el comando ssh-keygen en PowerShell que sirve para hacer lo mismo que en la linea de comando de Linux (UNIX).

Iniciando PowerShell desde cualquier PC con Windows, se puede ejecutar el comando de la siguiente forma:

En mi caso, no le puse ninguna «passphrase» a mis llaves. Si solo necesitas unas llaves para probar algo, en principio, esta bien. Tengan en cuenta que esto tiene implicaciones en la seguridad de un sistema.

Luego podemos ver como quedaron las mismas en la carpeta (en mi caso de mi usuario):

Con esto ya tienen un par de keys generadas.

Saludos!

Spark: Introducción al procesamiento distribuido

Vimos en algún post anterior que pasaba con Map-Reduce y como funciona. También hay un pequeño comentario al final diciendo que ya es «poco común encontrar personas que escriban tareas de Map Reduce a mano (en Java)» ya que existen servicios como Hive. Sin embargo, hay otros servicios, que también son distribuidos, pero no están basado en este algoritmo y que son utilizados para procesar datos en Big Data. El principal, es Apache Spark.

Spark comenzó como un proyecto de la Universidad de Berkeley hace ya varios años (2009 aproximadamente). Mas específicamente, escribieron un paper no sobre «Spark» sino mas bien sobre el core de Spark que son los Resilient Distributed Datasets (RDDs). Su objetivo era:

…to provide an abstraction that supports applications with working sets (i.e., applications that reuse an intermediate result in multiple parallel operations) while preserving the attractive properties of MapReduce and related models: automatic fault tolerance, locality-aware scheduling, and scalability. RDDs should be as easy to program against as data flow models, but capable of efficiently expressing computations with working sets.

Traduciendo, su intención era lograr que resultados parciales del procesamiento de los datos, pudiera ser reutilizado. Ergo, esto datasets se van almacenando en memoria y son reutilizados por el proceso sin tener que ir al Disco Rigido para almacenar resultados parciales como en Map Reduce.

Cuando se ejecuta una transformación que modifica un dataset, se crea uno nuevo. Se entiende por transformación, a métodos específicos de los RDD como GroupBy, map o filter. Por otro lado, los RDDs no se materializan en memoria hasta que se ejecuta una acción como por ejemplo collect, count o take. En este aspecto, podemos decir que Spark limita un poco nuestra capacidad de procesamiento a la memoria RAM que tengamos disponible, sin embargo, es extremadamente mas rápido que Map Reduce y ademas existen mecanismos o buenas practicas para evitar desbordar la memoria.

Spark fue principalmente pensado para realizar tareas de Analytics y es hoy el líder del procesamiento de datos en Big Data, tanto en proyectos full Hadoop on Premise, como en la nube de la mano de soluciones como Databricks.

La arquitectura de este sistema de procesamiento, es similar al esquema que utiliza HDFS en general. Se tiene un master, y se tienen nodos que hacen el trabajo pesado. Estos nodos deben, al menos, tener mucha memoria RAM disponible (o acorde para procesar el volumen de datos que se quiere procesar).

Sin embargo, no es todo color de rosa. Spark, es un poco mas sensible respecto a la tolerancia a fallos. Si un nodo de Spark cae durante el procesamiento, la tarea completa falla. Y este es justamente un punto donde Hive con Map-Reduce sigue siendo un poco mejor.

La siguiente vez, hablare un poco mas de la arquitectura y como se utiliza esta tecnología.

T-SQL: Auditar cambios en una tabla en SQL Server

Existen varias formas de crear auditorias en SQL Server. De hecho existe un feature propio de SQL Server para monitorear una base de datos en entera y definir políticas, pero hoy solo voy a mostrar una forma simple agregando dos columnas para indicar quien y cuando hicieron un cambio en una tabla.

Para esto, podemos agregar dos columnas en una tabla de SQL Server haciendo ALTER TABLE y luego podemos asignarles valores por default:

ALTER TABLE [tabla] ADD [audit_date] DATETIME DEFAULT GETDATE();
ALTER TABLE [tabla] ADD [audit_user] VARCHAR(255) DEFAULT SUSER_NAME();

Esta es una forma simple de modificar una tabla para empezar a auditarla. Esto no modifica registros previamente existentes (los cuales quedaran con dos columnas nuevas con valor en nulo) pero permite auditar una tabla de manera simple, rapida y con un costo relativamente bajo.

Si van a agregar columnas en una tabla extremadamente grande, les recomiendo que primero prueben agregar las columnas.

A partir de SQL Server 2016 existe la opcion de hacer versionado de registros uno a uno. Esto se puede lograr con lo descripto en el siguiente link: https://www.sqlshack.com/track-history-data-changes-using-sql-server-2016-system-versioned-temporal-tables/

PyCharm: log.config o error ‘formatter’

Este error sucede en PyCharm cuando intentamos correr un script de python donde estamos usando logging (puede pasar en otro IDE también). En el caso de PyCharm, cuando intentamos ejecutarlo debemos primero configurar como queremos que se ejecute yendo a:

Cuando creamos como queremos que se ejecute, debemos definir el «Working Directory». Este sera el lugar donde el IDE ira a buscar el archivo loging.config:

Para saber como se ve un archivo de config de python o donde buscar más información, pueden ver el siguiente link de stackoverflow: https://stackoverflow.com/questions/23161745/python-logging-file-config-keyerror-formatters

El error ‘formatters’ aparece porque intenta buscar la key ‘formatters’ dentro de un archivo. Si el archivo no existe, no siempre suele dar el error «no encuentro el archivo de config»; a veces solo tira el error ‘formatters’. Especialmente puede pasar si estan dentro de un try y catch y este último no muestra bien todo el mensaje de error.

SQL Server: Como saber en que estado esta un RESTORE o BACKUP

Supongamos que no hicieron el RESTORE por el SSMS, sino que tienen un script en bash o powershell que lo hace pero no muestra el estado o el progreso ¿Como hago para saber en que estado se encuentra?

Con la siguiente consulta, es posible obtener una estimación de todos los procesos de BACKUP o RESTORE que están corriendo en la instancia de base de datos (siempre y cuando tengan permisos para verlo):

SELECT
session_id as SPID,
command,
a.text AS Query,
start_time,
percent_complete,
dateadd(second,estimated_completion_time/1000, getdate()) as estimated_completion_time
FROM sys.dm_exec_requests r
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) a
WHERE r.command in ('BACKUP DATABASE','RESTORE DATABASE')

Hadoop: Performance – CODECs

Hadoop: Performance – CODECs

Si bien Hadoop esta pensado para grandes volúmenes de datos y para trabajar sobre hardware «commodity», esto no significa que tener nodos con discos llenos sea barato. Por eso, usualmente se utilizan algoritmos que comprimen los datos al almacenarlos. Si lo que hacemos es traer archivos y los almacenamos con ORC o Parquet, deberíamos estar en buenas condiciones. Sin embargo, estos algoritmos de compresión solo trabajan con los datos en formato «columna» o tipo «tabla» (por ejemplo al cargar un CSV).

Por otro lado, no podemos, por ejemplo, almacenar una imagen o un PDF entero con ORC (o al menos no es conveniente hacerlo) por lo que surge una nueva duda ¿Como comprimo estos archivos para ahorrar espacio y como afecta la performance?

Primero, existe un concepto de suma importancia sobre los algoritmos de compresión el cual se conoce en ingles como «Splittable». Que un algoritmo tenga la capacidad de ser «splittable» no es menor. Que no lo sea, implica que no puedo leer un bloque del archivo sin descomprimir el archivo ENTERO. Ergo, solo puedo leer los bloques del archivo comprimido con una sola tarea de MAP perdiendo todas las ventajas de las lecturas distribuidas.

Si el algoritmo SI es «Splittable» entonces SI puedo leer solo una parte del archivo sin descomprimirlo entero con múltiples tareas MAP.

Hadoop admite al menos 4 algoritmos de compresión básicos:

  • GZIP: el adoptado por el proyecto GNU no es splittable y puede implicar ciertos problemas de performance.
  • BZIP2: es el que mejor ratio de compresión tiene, pero también es el mas lento. Este CODE solo lo usaríamos en información que vamos a consultar con MUY BAJA frecuencia.
  • SNAPPY: Este algoritmo no comprime mucho sin embargo es bastante veloz. Lamentablemente no es «Splittable»
  • LZO: es similar a SNAPPY en términos del nivel de compresión y velocidad pero si es «Splittable» (en ciertos casos) ya que tiene la capacidad de almacenar ademas un indice que referencia a cada bloque comprimido.

Para trabajar con archivos de texto plano, LZO puede ser una buena opción. La recomendación que les puedo dar igualmente, es que vayan probando cada uno y vean cual les da mejor resultado. Para los datos mas viejos que tienen que estar disponibles, pueden usar BZIP2 siempre y cuando el cliente que consulte los datos este dispuesto a aceptar la demora. Sin embargo, podría suceder que BZIP2 aun así de mejores resultados que GZIP por ejemplo.

Otra recomendación, es que si van a utilizar alguno de estos algoritmos, es que usen las librerías nativas de los mismos donde sea posible (en vez de la de Java) ya que usualmente dan mejores resultados.

Existen tambien otras alternativas:

  • Por ejemplo antes de escribir el archivo, podríamos dividirlo por código nosotros en archivos mas chicos y comprimirlos luego (si hacen esto, tengan en cuenta el tamaño de un bloque de Hadoop).
  • Almacenar los datos sin comprimir (o al menos los que se consulten con mayor frecuencia).
  • Hacer uso de las «Sequence File» de Hadoop.

Esta ultima opción quedara para otra ocasión.

El trabajo de mejorar performance es como en cualquier motor de bases de datos. El primer paso es obtener métricas, luego hacer cambios, y luego obtener esas métricas de nuevo y evaluar. Es un trabajo demandante y que requiere constancia y ser metódico. Sin embargo, un trabajo bien hecho suele tener un impacto directo en los clientes finales y en mi experiencia personal, suelen ser fuente de una gran satisfacción.

¡Suerte en la odisea!