Migración de la tubería ELT de datos de tráfico
Tesfaye escribe sobre proyectos que involucran arquitectura de aprendizaje profundo y tecnologías descentralizadas como blockchain y NFT.
Resumen del Proyecto
Anteriormente construimos un almacén de datos ELT utilizando datos de tráfico. A medida que el proyecto continúa utilizando la base de datos de PostgreSQL y Redash, también migraremos los datos de mi almacén de datos de PostgreSQL al almacén de datos de MySQL y automatizaremos toda la tarea. También cambiaré el visor del tablero de Redash a un subconjunto de Apache.
Propósito del proyecto
El objetivo de este proyecto es comprender e implementar cambios y automatización con respecto a ELT Pipeline y Data Warehouse. En el proyecto anterior, construimos un almacén de datos usando PostgreSQL, Airflow, DBT y Redash.
Esta vez nuestro objetivo es crear y administrar el proceso de automatización del almacén de datos mientras migramos datos a Mysql, Airflow, DBT y Apache superset. Este proyecto sería útil para los ingenieros de datos o cualquier persona que realice procesos de migración, cambios y automatización de datos.
- flujo de aire apache – Workflow Manager para programar, orquestar y monitorear flujos de trabajo. Airflow utiliza gráficos acíclicos dirigidos (DAG) para administrar la orquestación del flujo de trabajo.
- postgresql – Un sistema de gestión de base de datos relacional de objetos (ORDBMS) con énfasis en la extensibilidad y el cumplimiento de estándares. Esta es la base de datos desde la cual migraremos los datos.
- DBT (herramienta de creación de datos) – Permite la transformación de datos en almacenes simplemente escribiendo informes seleccionados. Gestiona la transformación de estas expresiones seleccionadas en tablas y vistas.
- Redacción – Una aplicación web de código abierto utilizada para limpiar bases de datos y visualizar los resultados. Este es el generador de paneles desde el que realizaremos la migración.
- Superconjunto Apache – Una plataforma de exploración y visualización de datos de código abierto. Es rápido, liviano, intuitivo y lleno de opciones que facilitan a los usuarios de todos los niveles explorar y visualizar sus datos, desde simples gráficos de líneas hasta gráficos geoespaciales altamente detallados.
Datos utilizados
Los datos que utilizaremos para este proyecto se pueden descargar de pNEUMA fecha. pNEUMA es un conjunto de datos abierto a gran escala de trayectorias naturales de medio millón de vehículos en el congestionado centro de Atenas, Grecia. El experimento único en su tipo utiliza un enjambre de drones para recopilar los datos. Cada archivo para uno (área, fecha, hora) tiene ~87 MB de datos. Estos datos se almacenaron en la base de datos PostgreSQL (ver mi último artículo).
Fases del proyecto
En este proyecto usaremos PostgreSQL como nuestra base de datos anterior y la migraremos a la base de datos MySQL. A continuación, activaremos una transformación usando DBT. Por último, pero no menos importante, crearemos un tablero.
- Preparar un almacén de datos PostgreSQL (que ya tenemos)
- Escribir Airflow Dags que migrarán datos de PostgreSQL a MySQL
- Escribe códigos DBT para activar la transformación.
- Cree tableros usando Apache Superset
Escribiendo Airflow Dags
En esta etapa, ya tenemos los datos, por lo que la parte principal del proyecto, migrar de una base de datos a otra base de datos, puede volverse difícil. Pero aquí mostraré cómo logré migrar mis datos de Postgres a MySQL.
Este código nos permitirá obtener todos los nombres de esquema con sus nombres de tabla desde un parámetro de nombre de base de datos.
Códigos de ayuda para la migración
def convert_type(type):
if type == "boolean":
return "bit(1)"
elif type == "integer":
return "int"
elif type == "double precision":
return "double"
elif type == "character varying":
return "LONGTEXT"
elif type == "text":
return "LONGTEXT"
else:
return type
def create_table_query(table,columns):
create_query = f"USE {table[0]}; CREATE TABLE IF NOT EXISTS {table[1]}("
for index , c in enumerate(columns):
data_type = convert_type(c[2])
nullable = "NULL" if c[1] == "YES" else "NOT NULL"
if index != 0:
create_query += ','
create_query += "`" + str(c[0]) + "`" + " " + str(data_type) + " " + str(nullable)
create_query += ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;"
return create_query
def create_db_query(name):
db_query = f"CREATE DATABASE IF NOT EXISTS {name}"
return db_query
def get_schema_and_table_name(database_name):
connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
cursor = connection.cursor()
query = f'SELECT table_schema,table_name FROM information_schema.tables WHERE table_schema != "pg_catalog" and table_schema != "information_schema" ORDER BY table_schema'
cursor.execute(query)
schemas = cursor.fetchall()
return schemas
Este código nos permitirá crear nuestra base de datos y tabla de forma dinámica. Esta es una de las características importantes, ya que suelen ser los primeros pasos en la migración de datos. La función «get_schema_and_table_name» nos permitirá obtener todos los nombres de esquema con su nombre de tabla desde un parámetro de nombre de base de datos.
Desplácese hasta Continuar
Códigos del orquestador de migración de datos
def start_workflow(database_name):
schema = get_schema_and_table_name(database_name)
create_schemas_and_load_data(database_name , schema)
def create_schemas_and_load_data(database_name , schemas):
connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
postgres_engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{host_name}/{database_name}')
cursor = connection.cursor()
for s in schemas:
query = f"SELECT column_name , is_nullable , data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA= '{str(s[0])}' and TABLE_NAME = '{str(s[1])}'"
cursor.execute(query)
columns = cursor.fetchall()
mysql_connection = f'mysql://admin:admin@127.0.0.1:3306/{database_name}'
engine = create_engine(mysql_connection)
conn = engine.connect()
db_query = create_db_query(s[0])
db = conn.execute(db_query)
singe_db_connection = f'mysql://admin:admin@127.0.0.1:3306/{s[0]}'
single_db_engine = create_engine(singe_db_connection)
single_db_conn = engine.connect()
create_query = create_table_query(s,columns)
conn.execute(create_query)
the_data = pd.read_sql(f'SELECT * FROM {s[0]}.{s[1]}',postgres_engine)
x = the_data.to_sql(s[1], con=single_db_engine, if_exists='replace', index=False)
get_data_query = f'select * from {s[0]}.{s[1]};'
Estos son probablemente el código más importante de nuestro proyecto, ya que son los que organizan y llaman a cada función. La función «crear esquemas_y_cargar_datos» maneja el proceso después de recuperar los nombres de esquema y tabla de PostgreSQL. El resumen de su proceso será el siguiente:
- Obtenga todos los nombres de columna, nulabilidad y tipos de datos de todas las tablas
- Crear una consulta para crear una base de datos MySQL
- Cree una consulta para crear una tabla MySQL (usando las funciones anteriores)
- Leer datos de una base de datos PostgreSQL usando pandas
- Escribir datos en una base de datos MySQL usando pandas
Airflow Dags para llamar a las funciones anteriores
with DAG(
dag_id='migrate_data',
default_args=default_args,
description='migrate data from postgres to mysql',
start_date=datetime(2022,7,6,2),
schedule_interval='@once'
)as dag:
task1 = PythonOperator(
task_id='create_schema_and_migrate_data',
python_callable=start_workflow,
op_kwargs={'database_name': 'Warehouse' },
)
task2 = PythonOperator(
task_id='create_dataset_table',
python_callable=migrate_privilages,
op_kwargs={'database_name': 'Warehouse' },
)
task3 = PythonOperator(
task_id='create_schema_and_migrate_data_2',
python_callable=start_workflow,
op_kwargs={'database_name': 'trial' },
)
task4 = PythonOperator(
task_id='create_dataset_table_2',
python_callable=migrate_privilages,
op_kwargs={'database_name': 'trial' },
)
task1 >> task2 >> task3 >> task4
Códigos de migración de privilegios
def migrate_privilages(database_name):
connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
cursor = connection.cursor()
query = f"SELECT * FROM information_schema.table_privileges"
cursor.execute(query)
columns = cursor.fetchall()
for c in columns:
if c[1] == 'try_user':
check_user_existance = f'SELECT user,host FROM mysql.user where user = "{c[1]}"'
query = f'GRANT {c[5]} on {c[3]}.{c[4]} to {c[1]}@`localhost`;'
mysql_connection = f'mysql://{db_user}:{db_password}@{host_name}:3306/{c[3]}'
engine = create_engine(mysql_connection)
conn = engine.connect()
excuted = conn.execute(check_user_existance)
if len(excuted.fetchall()) == 0:
create_user_query = f'CREATE USER {c[1]}@`{host_name}` IDENTIFIED WITH mysql_native_password BY "password";'
excuted = conn.execute(create_user_query)
if c[5] != "TRUNCATE":
excuted = conn.execute(query)
En este punto, hemos migrado todos los datos de PostgreSQL, pero quedan por migrar los privilegios de usuario, ya que migrar los datos por sí solo no es suficiente. Entonces, en esta función, los pasos, en resumen, serían:
- Obtener todos los usuarios de PostgreSQL
- Compruebe si el usuario existe en la base de datos MySQL, si no, cree el usuario
- Obtenga todos los privilegios asociados con el usuario
TDC códigos para MYSQL
Los códigos DBT aquí no son muy diferentes de los códigos DBT de PostgreSQL. Aquí hay un código DBT para MySQL:
esquema.yml
version: 2
models:
- name: import_data
description: "traffic data"
columns:
- name: id
description: "unique id"
- name: track_id
description: "track id"
- name: type
description: "type"
- name: traveled_d
description: "traveled_d"
- name: avg_speed
description: "avg_speed"
- name: lat
description: "lat"
- name: lon
description: "lon"
- name: speed
description: "speed"
- name: lon_acc
description: "lon_acc"
- name: lat_acc
description: "lat_acc"
- name: time
description: "time"
tráfico_dbt_modelo.sql
{{ config(materialized='view') }}
with traffic_dbt_model as (
select * from public.import_data
)
select *
from traffic_dbt_model
Traffic_avg_speed_by_type_model.sql
select " type" , AVG(" avg_speed")
from {{ ref('traffic_dbt_model') }}
Group by " type"
Estos códigos DBT crearían múltiples vistas transformando los datos. DBT luego nos permitirá documentar nuestros modelos y servirlos localmente.
capturas de pantalla DBT
Construyendo un tablero usando Apache Superset
Apache Superset es realmente una herramienta poderosa que permite construir tableros con interfaz de usuario para visualización y también a partir de consultas SQL. Estos son algunos de los gráficos que pude crear.
Desafíos y conclusiones del proyecto
Este proyecto fue útil para comprender el proceso de migración de datos y los desafíos que podemos enfrentar en los proyectos de migración de datos del mundo real. También pudimos comprender la estructura de la base de datos PostgreSQL y MySQL y los modelos de privilegios. Algunas pautas incluyen:
- Mientras trabajaba en este proyecto, estuve observando e investigando para encontrar un módulo o aplicación que permitiera migrar datos de PostgreSQL a MySQL. Pero todo lo que pude encontrar fue MySQL a Postgres, lo que significa que más personas y empresas se están mudando a Postgres en lugar de MySQL. Una razón puede ser la escalabilidad.
- En PostgreSQL, uno puede almacenar múltiples esquemas y cada uno tiene la capacidad de almacenar múltiples tablas, mientras que en MySQL, el esquema es la base de datos, lo que significa que una base de datos no puede tener múltiples esquemas. Esto dificulta si hay dos bases de datos con el mismo esquema o dos esquemas con los mismos nombres de tabla.
- La estructura de privilegios de usuario en PostgreSQL tiene más datos que MySQL, que incluye información sobre quién ha otorgado qué acceso a un usuario determinado.
- Descubrí que Apache Superset es fácil de usar y tiene un proceso de instalación simple en comparación con Redash.
- PostgreSQL admite más tipos de datos que MySQL, pero MySQL tiene un subtipo de datos que no está disponible en PostgreSQL, lo que dificulta la conversión de tipos de datos entre bases de datos.
Planes futuros y conclusión.
Este proyecto me permitió profundizar en las arquitecturas y modelos de bases de datos en bases de datos como PostgreSQL y MySQL. También me permitió comparar herramientas de creación de tableros como Redash y Superset. Los trabajos futuros clave incluyen:
- Analice y habilite la migración de bases de datos grandes (en tamaño) utilizando los códigos de este proyecto.
- Agregar más funciones de prueba de migración de datos.
- Cree un panel de control e información más intuitivos.
Este contenido es preciso y verdadero al leal saber y entender del autor y no pretende reemplazar el asesoramiento formal e individualizado de un profesional calificado.