Leave Us Some Bread[crumbs]
C. Langdon
Say What?
This post is more about being good to your neighbor than a How-To
. But, I will delve into some technical details that could help your cause.
I recently came across an issue using JayDeBeApi and JPype; an interop library for the Python programming language that provides database connectivity using JDBC. Resolving my problem proved to be time consuming until I came across someone’s magnificently commented code - their breadcrumbs.
Background
Without delving too much into the initiative of mine, let's settle on the fact that I need to replicate large LARGE quantities of data between two disparate systems. An Extract, Transform and Load (ETL) operation.
The source system is IBM PureData System AKA Netezza and the destination system is a standard run-of-the-mill Linux system running PostgreSQL 10. If you've never used Netezza, I'd recommend learning a little about it. The amount of data it can churn through is quite impressive. The system I'm working with is truly Big Data - trillions of records in a single table spanning gigabytes.
So, to recap - copy data from source to destination - that's it!
Constraints & Methodology
In a perfect world, I'd use a traditional ETL tool like Informatica or SAP Data Services. But, these tools are costly in both purchase and maintenance. And, frankly, it's just not an option for the initiative at hand.
My choice is to write a small application using Python leveraging JayDeBeApi and JPype. The application is run from the destination Linux server. The goals are:
- Quick deliverable
- Capable of moving large quantities of data between systems (timely)
- Work within the constraints of Netezza (JDBC connectivity)
- Follow company standards in technology stack (Open Source - Linux, Python, Java, PostgreSQL)
The Code
This is the abbreviated functional version that will hopefully help better illustrate the overall purpose. There's a sync_table()
method that acts as a wrapper for syncing a table. It calls extract_table()
and finally load_table()
. Pretty simple stuff.
Now, as the code exists, you'd get a CLASS NOT FOUND ERROR
when executing load_table()
. And, that's the genesis for me writing this blog. I spent over an hour trying to figure out why. And, I never found an explicit answer in all my Googling. I tried many, many things.
"""
Main.py
20180815
C. Langdon
A simple ETL like tool for copying data from two disparate systems using
JDBC connectivity between the two.
Tested and used with Python v2.7.5
"""
import os
import sys
import time
import argparse
import logging
import tempfile
import jaydebeapi
import psycopg2
from logging.handlers import RotatingFileHandler
# Database connection string stuff
NETEZZA_DRIVER_CLASS = "org.netezza.Driver"
NETEZZA_CONNECTION_STRING = "jdbc:netezza://localhost:5480/my_database"
NETEZZA_USER_PASS = ["user-1", "pass-1"]
PGS_SERVER_DRIVER_CLASS = "org.postgresql.Driver"
PGS_SERVER_CONNECTION_STRING = "jdbc:postgresql://localhost:5432/my_database"
PGS_USER_PASS = ["user-2", "pass-2"]
#JARS = ["/usr/local/nz/lib/nzjdbc.jar", "/usr/local/nz/lib/postgresql-42.2.4.jar"]
# Setup default logger used throughout
logger = logging.getLogger("Default")
def setup_logging():
"""
Setup loggers for multiple endpoints, e.g. stdout, file, etc.
"""
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# Console logger
console_logger = logging.StreamHandler()
console_logger.setFormatter(formatter)
# File logger
file_logger = RotatingFileHandler("log.txt", maxBytes=1048576, backupCount=0)
file_logger.setFormatter(formatter)
# Add the loggers
logger.addHandler(console_logger)
logger.addHandler(file_logger)
return
def sync_table(table):
"""
A wrapper method for performing the ETL steps.
"""
logger.info("Syncing table %s...", table.get("table_name"))
extract_file = extract_table(table)
load_table(table, extract_file)
logger.info("Completed syncing table %s...", table.get("table_name"))
return
def extract_table(table):
"""
Extract data from source system using optimized Netezza function.
"""
extract_file = tempfile.NamedTemporaryFile(mode="wb", prefix="{}{}".format(table.get("table_name"), "-") , suffix=".flat", delete=False)
logger.info("Extracting table %s from source to %s...", table.get("table_name"), extract_file.name)
ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, "/usr/local/nz/lib/nzjdbc.jar")
ew_cursor = ew_connection.cursor()
where_filter = ""
if table.get("rolling_days") > 0:
where_filter = "and eff_dt >= current_date - {}".format(table.get("rolling_days"))
logger.debug("where_filter={}".format(where_filter))
ew_cursor.execute(
"""
create external table '{}'
using
(
delimiter '\t'
encoding 'internal'
remotesource 'jdbc'
escapechar '\'
logdir '{}'
nullvalue ''
)
as
select *
from {}
where 1=1
{}
""".format(extract_file.name, "/tmp", table.get("table_name"), where_filter)
)
extract_file.close()
return extract_file.name
def load_table(table, extract_file):
"""
Load destination table using optimized PostgreSQL copy_from method.
"""
logger.info("Loading table %s from %s...", table.get("table_name"), extract_file)
dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, "/usr/local/nz/lib/postgresql-42.2.4.jar")
dev_cursor = dev_connection.cursor()
with open (extract_file, "r") as external_table:
if table.get("rolling_days") < 1:
dev_cursor.execute(
"""
truncate table {}
""".format(table.get("table_name"))
)
logger.info("Truncate contents of table %s...", table.get("table_name"))
else:
where_filter = "and eff_dt >= current_date - {}".format(table.get("rolling_days"))
logger.debug("where_filter={}".format(where_filter))
dev_cursor.execute(
"""
delete
from {}
where 1=1
{}
""".format(table.get("table_name"), where_filter)
)
logger.info("Delete contents of table %s...", table.get("table_name"))
dev_cursor.copy_from(external_table, table.get("table_name"), null="")
dev_connection.commit()
external_table.close()
os.remove(extract_file)
return
def main():
"""
Application entry point
"""
try:
setup_logging()
logger.info("DB-Sync v0.0.0.2")
logger.info("C. Langdon 20180815")
logger.info("Working folder is %s", os.path.dirname(__file__))
for table in [
[{"table_name":"my_table_1", "rolling_days":0}],
[{"table_name":"my_table_2", "rolling_days":45}],
[{"table_name":"my_table_3", "rolling_days":15}],
[{"table_name":"my_table_4", "rolling_days":0}]
]:
sync_table(table[0])
except Exception:
logger.exception("The following unrecoverable error happened:")
finally:
logger.info("Done...Goodbye!")
if __name__ == "__main__":
main()
The Problem
Remember, we're using Java as our means for interacting with our databases. JayDeBeApi spawns a JVM when making a connection. One of the items JayDeBeApi needs is the associated JAR file when establishing the connection ("/usr/local/nz/lib/nzjdbc.jar")
. This first happens in the extract_table()
method and subsequently another connection in the load_table()
method using a different JAR file ("/usr/local/nz/lib/postgresql-42.2.4.jar")
.
#From extract_table()
ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, "/usr/local/nz/lib/nzjdbc.jar")
#From load_table()
dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, "/usr/local/nz/lib/postgresql-42.2.4.jar")
Assuming a new JVM is spawned for each connection is the wrong assumption. Each subsequent connection will share the initial JVM. And, in our example, the initial JVM only has the first JAR ("/usr/local/nz/lib/nzjdbc.jar")
.
The load_table()
method fails with CLASS NOT FOUND ERROR
because it knows nothing about postgresql-42.2.4.jar
.
The Fix
When creating the first connection, provide a list of known JAR files to be used. Continue to use the same list for all connections created. We'd change our code as follows, noting that we've removed the literal string and replaced it with a list called JARS
.
JARS = ["/usr/local/nz/lib/nzjdbc.jar", "/usr/local/nz/lib/postgresql-42.2.4.jar"]
#From extract_table()
ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, JARS)
#From load_table()
dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, JARS)
Bread[crumbs]
Finally, all of this mumbo-jumbo to get to the meat of what I want you to takeaway. Commenting code is important. But, when you comment your code, make sure it has purpose. Don't comment the obvious. I stumbled on someone else's perfectly commented code that saved me hours of frustration. You could say, they left me and others some breadcrumbs. The exact comment that solved my problem is...
# We ran into a issue wherein attempts to use multiple JDBC connections,
# that use different ".jar" files, caused a "class not found error" from
# the JVM. This happened because `jaydebeapi` had already started a JVM
# with the ".jar" file from the first connection, and doesn't start one
# up for subsequent connections and instead attempts to reuse the first
# JVM, which can't be made aware of new ".jar" files once started.
#
# To solve for this we instead send along all of the ".jar" files we're
# aware of, just in case we need to use them in a subsequent connection.
In your next coding exercise, leave the next person a piece of bread or at least some crumbs. They'll be grateful.