How to write a Spark dataframe to Excel
Install xlsxwriter
, convert the frame to Pandas, write it as Excel. Due to limitations in the local file API (i.e. not supporting random writes), you’ll have to write the file to temporary storage first, before moving it to somewhere useful (like a storage mount point). When moving the temp file, it looks like dbutils.fs.cp
isn’t able to find it, but shutil.copyfile
can. 🤷🏻♂️
%pip install xlsxwriter
from shutil import copyfile
dfs = spark.read.parquet('dbfs:/mnt/in/myveryimportantdata.parquet')
df = dfs.toPandas()
df.to_excel('/local_disk0/tmp/data.xlsx', engine='xlsxwriter')
copyfile('/local_disk0/tmp/data.xlsx', '/dbfs/mnt/out/data.xlsx')
– via XlsxWriter Github and Databricks Docs
How to enable change data feed on a mount point (instead of on a table)
That’s assuming your mount point supports delta tables, i.e. it points to an ADLS gen 2 or something similar.
ALTER TABLE delta.`/mnt/your/path` SET TBLPROPERTIES (delta.enableChangeDataFeed=true)
☺️
SHOW TBLPROPERTIES delta.`/mnt/your/path`
DESCRIBE HISTORY delta.`/mnt/your/path`
–via blood, sweat, tears, and this link
Connect to a SQL Server instance
Connect to a SQL Server instance with Spark and read some rows (😊)
sql_user = ''
sql_pw = ''
sql_db_server = ''
sql_db = ''
my_connection_string = f"jdbc:sqlserver://{sql_db_server}.database.windows.net:1433;database={sql_db};user={sql_user}@{sql_db_server};password={sql_pw};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
def run(query):
df = (spark
.read
.format("jdbc")
.option("url", my_connection_string)
.option("query", query)
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load())
return df
df_results = run(f"select top 100 * from dbo.MyFancyTable")
display(df_results)
Connect to a SQL Server instance without Spark, and write a few rows (😖)
Just use pyodbc
and cry.
And in order to do that (using pyodbc, not crying, that one will come naturally), we need to install a few things, otherwise we’ll get this lovely error: [01000] [unixODBC][Driver Manager]Can't open lib 'ODBC Driver 17 for SQL Server' : file not found (0) (SQLDriverConnect)
.
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get -q -y install msodbcsql17
Make sure sqlalchemy is installed, and note that < 2.0 code is not fully compatible with >= 2.0 code. FUN!
%pip install sqlalchemy<2.0
Then, it’s just a matter of calling sqlalchemy
.
from sqlalchemy import create_engine
sql_user = ''
sql_pw = ''
sql_db_server = ''
sql_db = ''
my_alchemy_connection_string = f"mssql+pyodbc://{sql_user}:{sql_pw}@{sql_db_server}.database.windows.net:1433/{sql_db}?driver=ODBC+Driver+17+for+SQL+Server"
def run(query):
engine = create_engine(my_alchemy_connection_string)
with engine.begin() as conn:
result = conn.execute(query)
return result.rowcount
affected_rows = run(f"INSERT INTO dbo.MyFancyTable (A, B, C) VALUES ('1', '2', 3)")
print(f'{affected_rows} row(s) affected.')
Connect to a SQL Server instance without Spark, and write a few rows, in parallel (🤬)
Whoo, boy. In order to do this, you need to have the ODBC Driver installed on each and every instance, not just the driver, and one way to do it is via init scripts (which seem kinda messy but honestly, I haven’t found another way to connect to sql from the executors).
The script bellow packages the installation of the ODBC Driver into a shell script, and then uses dbutils.fs.put
to write it to the /databricks/scripts
folder.
dbutils.fs.put("/databricks/scripts/pyodbc-install.sh","""
#!/bin/bash
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get -q -y install msodbcsql17""", True)
You can then go to the compute’s Configuration -> Advanced options -> Init Scripts
, and add dbfs:/databricks/scripts/pyodbc-install.sh
to the list of scripts to run. Save and restart your cluster.
Now, you can connect to SQL Server and write data from mapPartitions
lambdas.
def run(query, engine):
with engine.begin() as conn:
result = conn.execute(query)
return result.rowcount
def process_partition(iterable):
engine = create_engine(my_alchemy_connection_string)
for item in iterable:
run(f"INSERT INTO dbo.MyFancyTable (A, B, C) VALUES ('1', '2', {item})", engine)
yield item
df.rdd.mapPartitions(process_partition).count()
You wouldn’t believe how much time I’ve wasted on this. 🤦🏻♂️
– via Databricks Community
Restart Databricks Notebook kernel
dbutils.library.restartPython()
– via Databricks Docs