Database Table to CSV in S3 Using Airflow

I’m very excited about these 6 lines of code. Using Airflow Hooks & Pandas Dataframes, I am able to export the contents of a database table to a CSV file in an AWS S3 bucket. Also, by using StringIO, the CSV file is only created at the destination; I don’t have to create it locally, copy it & then delete it when I am done.

To be fair, the “<your_module>.get_engine” component does take this a little beyond 6 lines. I created a module that contains a function that takes 2 arguments: SQL flavor (mysql, mssql, postgres, etc.) & the name of a connection I have created in Airflow. Depending on the SQL flavor, a hook is created that uses the connection. It is a very simple function; if you want, you could do the same thing without a hook using SQL Alchemy.

# connect to a database using a hook & read a table into a dataframe
db_connection = <your_module>.get_engine({'flavor': 'mysql', 'name': <connection_name>})
dataframe = pd.read_sql_table('reservation', db_connection)

# convert the dataframe into a csv file-like object
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)

# write the file-like object into s3 using an S3 hook
s3_connection = S3Hook(aws_conn_id='<name_of_one_of_your_Airflow_connections_to_AWS>')
s3_connection.load_string(string_data=csv_buffer.getvalue(), key='<folder_name>/<file_name>', bucket_name='<your_bucket_name>', replace=True)

Database Table to CSV in S3 Using Airflow

Leave a Reply

Your email address will not be published. Required fields are marked *