Oracle to Kafka made easy.

Oracle to Kafka made easy.

Source: http://oebsnotes.blogspot.com

Sometimes we need to expose data from Oracle database to external systems/services. 

In our experience, it ends ups with kind of interim tables or even interim databases or REST service solutions, the downsides of such approaches are well known.

These days one of the best solution to communicate services is message broker, namely Apache Kafka. 

This time we would like to send data from Oracle database to Kafka, after some googling we come up with – GoldenGate, WebLogic, bunch of connectors and etc… Oh my… all of them proprietary, overcomplicated and heavy, but we just need to capture changes in one table via database trigger, then send them to Kafka.

But it is not (yet?) implemented in native pl/sql, creating REST service on the other hand requires additional “moving parts” and potentially slow.

As we know, communicate to Kafka from Java code is dead simple, so we decided to create our API in Java then load Java code into Oracle stored procedure.

Below you can find the full description of the solution along with source code.

As result we got an a fully functional pl/sql API allowing us easily send data from Oracle database to Apache Kafka without using heavy and complex proprietary software, easier and potentially way faster comparable to custom REST service.

In this article we will show how to publish messages from Oracle Database 12+ to Apache Kafka brokers.

Oracle Database to Apache Kafka integration can be achieved using many methods. In this article we explain how to do this using internal Oracle Aurora JVM. Target audience for this type of integration are sites with active usage of PL/SQL.

I. Installation

I.1 Oracle Database preparation

If you still using Oracle Database 12.1.x you need to configure it to use JDK7. To do that:

perl $ORACLE_HOME/javavm/install/update_javavm_binaries.pl 7

cd $ORACLE_HOME/rdbms/lib

make -f ins_rdbms.mk ioracle

After that for existing instance you need to perform JDK upgrade using $ORACLE_HOME/javavm/install/update_javavm_db.sql script:

$ORACLE_HOME/perl/bin/perl -I $ORACLE_HOME/rdbms/admin $ORACLE_HOME/rdbms/admin/catcon.pl -b jvm_component_upgrade.log $ORACLE_HOME/javavm/install/update_javavm_db.sql

or create new instance using DBCA utility or set of scripts if you are old school Oracle DBA.

You can check version of Oracle Aurora JVM using

select dbms_java.get_jdk_version from dual;

No additional steps are required for Oracle Database 12.2+ including 18c.

I.2 Schema owner setup

Additional Java security permissions required to run Kafka producer code in Oracle Database. We use name APPS (Oracle Applications/Oracle E-Business Suite from 1998):

create user apps identified by apps

default tablespace users

temporary tablespace temp

quota unlimited on users;

grant CONNECT,RESOURCE,JAVAUSERPRIV to APPS;

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:oracle.aurora.security.JServerPermission’, ‘Verifier’, ”);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:java.lang.RuntimePermission’, ‘getClassLoader’, ”);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:javax.management.MBeanTrustPermission’, ‘register’, ”);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:javax.management.MBeanServerPermission’, ‘createMBeanServer’, ”);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:javax.management.MBeanPermission’, ‘org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*’, ‘registerMBean’);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:javax.management.MBeanPermission’, ‘org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*’, ‘unregisterMBean’);

exec dbms_java.grant_permission( ‘APPS’, ‘SYS:javax.management.MBeanPermission’, ‘org.apache.kafka.common.utils.AppInfoParser$AppInfo#-*’, ‘registerMBean’);

I.3 Loading of Kafka classes

We use Apache Kafka distribution as source of required libraries.

I.3.A Oracle Database 12.1

Download Apache Kafka 1.1.1 from https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz

We recommend this version for Oracle Database 12.1.x, untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/lz4-java-1.4.1.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/log4j-1.2.17.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-api-1.7.25.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-log4j12-1.7.25.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/kafka-clients-1.1.1.jar’);

I.3.B Oracle Database 12.2+

Download Apache Kafka 2.1.0 from https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz

(at moment of writing – latest stable Kafka version).

untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/lz4-java-1.5.0.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/log4j-1.2.17.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-api-1.7.25.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-log4j12-1.7.25.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-core-2.9.7.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-annotations-2.9.7.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-databind-2.9.7.jar’);

exec dbms_java.loadjava(‘-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/kafka-clients-2.1.0.jar’);

For any Oracle Database version check for invalid objects:

select count(*) from user_objects where status<>’VALID’;

If they exist recompile it using:

exec utl_recomp.recomp_serial;

or

exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

I.4 Kafka wrapper installation

Download or clone https://github.com/averemee-si/aurora-kafka

Start SQL*Plus and run as schema owner created on Step I.2 downloaded scripts in following order:

@KafkaUtils.sql

@KafkaUtilsJava.pkg

@KafkaUtils.pkg

This will create object type A2_TUPLE which simulates key-value pair, table type A2_ARRAY_OF_TUPLES – array of A2_TUPLE, Java source named eu.solutions.a2.aurora.kafka.KafkaUtils  and PL/SQL package A2_KAFKA_UTILS.

Do not forget to check for invalid objects:

select count(*) from user_objects where status<>’VALID’;

If they exist recompile using:

exec utl_recomp.recomp_serial;

or

exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

II. Usage example

Typical usage is in creation of producer with call to A2_KAFKA_UTILS.CREATE_PRODUCER then sending messages with A2_KAFKA_UTILS.SEND_JSON_MESSAGE or A2_KAFKA_UTILS.SEND_STRING_MESSAGE and then closing producer with A2_KAFKA_UTILS.CLOSE_PRODUCER.

declare

OP_RESULT varchar2(2000);

KAFKA_PROPS A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();

MSG_2_SEND A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();

begin

— Set mandatory Kafka producer properties

KAFKA_PROPS.extend(4);

KAFKA_PROPS(1) := A2_TUPLE(‘bootstrap.servers’, ‘kafka1.a2-solutions.eu:9092’);

KAFKA_PROPS(2) := A2_TUPLE(‘client.id’, ‘KafkaOracleAuroraProducer’);

KAFKA_PROPS(3) := A2_TUPLE(‘key.serializer’, ‘org.apache.kafka.common.serialization.StringSerializer’);

KAFKA_PROPS(4) := A2_TUPLE(‘value.serializer’, ‘org.apache.kafka.common.serialization.StringSerializer’);

— Create producer with name TestProducer

OP_RESULT := A2_KAFKA_UTILS.CREATE_PRODUCER(‘TestProducer’, KAFKA_PROPS);

if (‘SUCCESS’ = OP_RESULT) then

begin

MSG_2_SEND.extend(3);

MSG_2_SEND(1) := A2_TUPLE(‘INVOICE_ID’, ‘13474’);

MSG_2_SEND(2) := A2_TUPLE(‘INVOICE_NUM’, ‘AP-2019-13474’);

MSG_2_SEND(3) := A2_TUPLE(‘INVOICE_AMOUNT’, ‘2000’);

OP_RESULT := A2_KAFKA_UTILS.SEND_JSON_MESSAGE(‘TestProducer’,’TestTopic’,’Test-Message-Key’,MSG_2_SEND);

if (‘SUCCESS’ = OP_RESULT) then

OP_RESULT := A2_KAFKA_UTILS.CLOSE_PRODUCER(‘TestProducer’);

else

raise_application_error(-20000, OP_RESULT);

end if;

end;

else

raise_application_error(-20000, OP_RESULT);

end if;

end;

/

III. A2_KAFKA_UTILS functions

III.1 CREATE_PRODUCER

Creates Kafka producer

Parameters

PRODUCER_NAME – name of producer

PRODUCER_PROPS – array of tuples with Kafka producer properties

Returns

‘SUCCESS’ – for successful operation or errorstack in case of error

III.2 CLOSE_PRODUCER

Closes Kafka producer

Parameters

PRODUCER_NAME – name of producer

Returns

‘SUCCESS’ – for successful operation or errorstack in case of error

III.3 SEND_TEXT_MESSAGE

Sends text message to Kafka broker

Parameters

PRODUCER_NAME – name of producer

TOPIC – Kafka broker topic

MSG_KEY – Message key

MSG_VALUE – Message value

Returns

‘SUCCESS’ – for successful operation or errorstack in case of error

III.4 SEND_JSON_MESSAGE

JSONifies array of tiples and send this string to Kafka broker

Parameters

PRODUCER_NAME – name of producer

TOPIC – Kafka broker topic

MSG_KEY – Message key

ARRAY_OF_VALUES – Message value

Returns

‘SUCCESS’ – for successful operation or errorstack in case of error

III.5 ARRAY_2_JSON

Converts array of tuples (A2_ARRAY_OF_TUPLES) to JSON formatted string.

Parameters

ARRAY_OF_VALUES – in, A2_ARRAY_OF_TUPLES

Returns

JSON formatted string presentation of given array of tuples

Leave a Reply