C. Langdon

 

Say What?

This post is more about being good to your neighbor than a How-To.  But, I will delve into some technical details that could help your cause.

I recently came across an issue using JayDeBeApi and JPype; an interop library for the Python programming language that provides database connectivity using JDBC.  Resolving my problem proved to be time consuming until I came across someone’s magnificently commented code - their breadcrumbs.

Background

Without delving too much into the initiative of mine, let's settle on the fact that I need to replicate large LARGE quantities of data between two disparate systems.  An Extract, Transform and Load (ETL) operation.

The source system is IBM PureData System AKA Netezza and the destination system is a standard run-of-the-mill Linux system running PostgreSQL 10.  If you've never used Netezza, I'd recommend learning a little about it.  The amount of data it can churn through is quite impressive.  The system I'm working with is truly Big Data - trillions of records in a single table spanning gigabytes.

So, to recap - copy data from source to destination - that's it!

Constraints & Methodology

In a perfect world, I'd use a traditional ETL tool like Informatica or SAP Data Services.  But, these tools are costly in both purchase and maintenance.  And, frankly, it's just not an option for the initiative at hand.

My choice is to write a small application using Python leveraging JayDeBeApi and JPype.  The application is run from the destination Linux server.  The goals are:

  • Quick deliverable
  • Capable of moving large quantities of data between systems (timely)
  • Work within the constraints of Netezza (JDBC connectivity)
  • Follow company standards in technology stack (Open Source - Linux, Python, Java, PostgreSQL)

The Code

This is the abbreviated functional version that will hopefully help better illustrate the overall purpose.   There's a sync_table() method that acts as a wrapper for syncing a table.  It calls extract_table() and finally load_table().  Pretty simple stuff.

Now, as the code exists, you'd get a CLASS NOT FOUND ERROR when executing load_table().  And, that's the genesis for me writing this blog.  I spent over an hour trying to figure out why.  And, I never found an explicit answer in all my Googling.  I tried many, many things.

"""
Main.py
20180815
C. Langdon

A simple ETL like tool for copying data from two disparate systems using
JDBC connectivity between the two.

Tested and used with Python v2.7.5 """ import os import sys import time import argparse import logging import tempfile import jaydebeapi import psycopg2 from logging.handlers import RotatingFileHandler

# Database connection string stuff NETEZZA_DRIVER_CLASS = "org.netezza.Driver" NETEZZA_CONNECTION_STRING = "jdbc:netezza://localhost:5480/my_database" NETEZZA_USER_PASS = ["user-1", "pass-1"] PGS_SERVER_DRIVER_CLASS = "org.postgresql.Driver" PGS_SERVER_CONNECTION_STRING = "jdbc:postgresql://localhost:5432/my_database" PGS_USER_PASS = ["user-2", "pass-2"] #JARS = ["/usr/local/nz/lib/nzjdbc.jar", "/usr/local/nz/lib/postgresql-42.2.4.jar"] # Setup default logger used throughout logger = logging.getLogger("Default") def setup_logging(): """ Setup loggers for multiple endpoints, e.g. stdout, file, etc. """ logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # Console logger console_logger = logging.StreamHandler() console_logger.setFormatter(formatter) # File logger file_logger = RotatingFileHandler("log.txt", maxBytes=1048576, backupCount=0) file_logger.setFormatter(formatter) # Add the loggers logger.addHandler(console_logger) logger.addHandler(file_logger)

return def sync_table(table): """ A wrapper method for performing the ETL steps. """ logger.info("Syncing table %s...", table.get("table_name")) extract_file = extract_table(table) load_table(table, extract_file) logger.info("Completed syncing table %s...", table.get("table_name")) return

def extract_table(table): """ Extract data from source system using optimized Netezza function. """ extract_file = tempfile.NamedTemporaryFile(mode="wb", prefix="{}{}".format(table.get("table_name"), "-") , suffix=".flat", delete=False) logger.info("Extracting table %s from source to %s...", table.get("table_name"), extract_file.name) ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, "/usr/local/nz/lib/nzjdbc.jar")
ew_cursor = ew_connection.cursor() where_filter = "" if table.get("rolling_days") > 0: where_filter = "and eff_dt >= current_date - {}".format(table.get("rolling_days")) logger.debug("where_filter={}".format(where_filter)) ew_cursor.execute( """ create external table '{}' using ( delimiter '\t' encoding 'internal' remotesource 'jdbc' escapechar '\' logdir '{}' nullvalue '' ) as select * from {} where 1=1 {} """.format(extract_file.name, "/tmp", table.get("table_name"), where_filter) ) extract_file.close() return extract_file.name

def load_table(table, extract_file): """ Load destination table using optimized PostgreSQL copy_from method. """ logger.info("Loading table %s from %s...", table.get("table_name"), extract_file) dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, "/usr/local/nz/lib/postgresql-42.2.4.jar")
dev_cursor = dev_connection.cursor() with open (extract_file, "r") as external_table: if table.get("rolling_days") < 1: dev_cursor.execute( """ truncate table {} """.format(table.get("table_name")) ) logger.info("Truncate contents of table %s...", table.get("table_name")) else: where_filter = "and eff_dt >= current_date - {}".format(table.get("rolling_days")) logger.debug("where_filter={}".format(where_filter)) dev_cursor.execute( """ delete from {} where 1=1 {} """.format(table.get("table_name"), where_filter) ) logger.info("Delete contents of table %s...", table.get("table_name")) dev_cursor.copy_from(external_table, table.get("table_name"), null="") dev_connection.commit() external_table.close() os.remove(extract_file) return

def main(): """ Application entry point """ try: setup_logging() logger.info("DB-Sync v0.0.0.2") logger.info("C. Langdon 20180815") logger.info("Working folder is %s", os.path.dirname(__file__)) for table in [ [{"table_name":"my_table_1", "rolling_days":0}], [{"table_name":"my_table_2", "rolling_days":45}],
[{"table_name":"my_table_3", "rolling_days":15}],
[{"table_name":"my_table_4", "rolling_days":0}]
]: sync_table(table[0]) except Exception: logger.exception("The following unrecoverable error happened:") finally: logger.info("Done...Goodbye!") if __name__ == "__main__": main()

The Problem

Remember, we're using Java as our means for interacting with our databases.  JayDeBeApi spawns a JVM when making a connection.  One of the items JayDeBeApi needs is the associated JAR file when establishing the connection ("/usr/local/nz/lib/nzjdbc.jar").  This first happens in the extract_table() method and subsequently another connection in the load_table() method using a different JAR file ("/usr/local/nz/lib/postgresql-42.2.4.jar").

#From extract_table()
ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, "/usr/local/nz/lib/nzjdbc.jar")

#From load_table()
dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, "/usr/local/nz/lib/postgresql-42.2.4.jar")

Assuming a new JVM is spawned for each connection is the wrong assumption.  Each subsequent connection will share the initial JVM.  And, in our example, the initial JVM only has the first JAR ("/usr/local/nz/lib/nzjdbc.jar").

The load_table() method fails with CLASS NOT FOUND ERROR because it knows nothing about postgresql-42.2.4.jar.

The Fix

When creating the first connection, provide a list of known JAR files to be used.  Continue to use the same list for all connections created.  We'd change our code as follows, noting that we've removed the literal string and replaced it with a list called JARS.

JARS = ["/usr/local/nz/lib/nzjdbc.jar", "/usr/local/nz/lib/postgresql-42.2.4.jar"]

#From extract_table()
ew_connection = jaydebeapi.connect(NETEZZA_DRIVER_CLASS, NETEZZA_CONNECTION_STRING, NETEZZA_USER_PASS, JARS)

#From load_table()
dev_connection = jaydebeapi.connect(PGS_SERVER_DRIVER_CLASS, PGS_SERVER_CONNECTION_STRING, PGS_USER_PASS, JARS)

Bread[crumbs]

Finally, all of this mumbo-jumbo to get to the meat of what I want you to takeaway.  Commenting code is important.  But, when you comment your code, make sure it has purpose.  Don't comment the obvious.  I stumbled on someone else's perfectly commented code that saved me hours of frustration.  You could say, they left me and others some breadcrumbs.  The exact comment that solved my problem is...

# We ran into a issue wherein attempts to use multiple JDBC connections,
# that use different ".jar" files, caused a "class not found error" from
# the JVM. This happened because `jaydebeapi` had already started  a JVM
# with the ".jar" file from the first connection,  and doesn't start one
# up for subsequent connections and instead attempts  to reuse the first
# JVM, which can't be made aware of new ".jar" files once started.
#
# To solve for this we instead send along all  of the ".jar" files we're
# aware of, just in case we need to use them in a subsequent connection.

In your next coding exercise, leave the next person a piece of bread or at least some crumbs.  They'll be grateful.