Debezium with Oracle 11g

Debezium with Oracle 11g

Versions in use

  • Kafka 2.3.0 / Confluent Platform 5.3.0
  • Debezium 0.10
  • Oracle 11g EE 11.2.0.4

tl;dr

It works, though you need some small adjustments to the DB setup that is described in the Debezium Tutorial Using Oracle. You need a minimum version of 11.2.0.4. You also need an Oracle license for XStream in addition to your 11g database. Watch out for Oracle data types that come through Debezium as a byte array: KSQL can’t read it, so you need some other way to deal with that – I’m going to blog on this in the future!

Starting out

We have an Oracle 11g (11.2.0.4) DB and I wanted to try out a CDC implementation rather than using the JDBC Kafka Connector. Oracle GoldenGate is an option, however Debezium appeared to be a good alternative – without spinning up a GoldenGate server. You still need an Oracle license for XStream however – something that the good folks at Debezium would like to change. This wasn’t an obstacle for me however as my current client have licenses for all Oracle products.

The Oracle 11g (11.2.0.4) EE DB

The Debezium tutorial is based around spinning up an Oracle DB using vagrant. Following the steps would result in you running an Oracle 12.2 DB – no good for my scenario.

I also had a look at Robin Moffat’s blog (always a great resource on Kafka) on Streaming data from Oracle into Kafka. He contrasts the JDBC connector against the Debezium Oracle connector. For this, you need to look after firing up your Oracle DB yourself – Robin provides the link to the Oracle Github account where scripts are available to build a docker image (you still need to download the database binaries from the Oracle Technology Network). However, these scripts aren’t suitable for creating an Oracle 11g EE DB – only XE is supported at that version.

I had a third option – where I’m working atm, I had a suitable docker image already available for my use. This is already in use for automated tests and has a number of flyway scripts which build the database for me. Given this is the closest I had to a production database, it seemed like the best place to start.

I’m afraid you’re still on your own here to work out how to get yourself a suitable DB. Maybe its just lying around – or perhaps like me you have already got it dockerised. This is something you’ll need to bring to the party!

Preparing the Database

The Debezium Oracle Tutorial (“Manual Steps“) tells us that we have some work to do on the DB to get things running. Robin has automated this in a script. Unfortunately, these are all geared to an example DB schema for Oracle 12. Unlike Oracle 12, 11g is not a “containerised” database: “All Oracle databases before Oracle Database 12c were non-CDBs”. I’m now going to take you through the customised steps I carried out.

While I’ll explain the steps one at a time, I have also created a script that applies all of these steps automatically.

Some things will be different for you throughout:

  • My Oracle DB is running in docker – the container name is oracleDBContainer.
  • On my docker container, the oradata directory lives under /home/oracle/app/oracle.
  • My DB is called db1.

Also, note that I’m using the default users and passwords here. You wouldn’t do this in a production environment!

Manual Steps

Here I’m explaining the steps needed to prepare an Oracle 11.2.04g EE database for use with Debezium. If you’re doing the same and in a rush, you may want to consider the next section instead (“Automated Script”).

Create the DB Recovery Area

Here we’re creating a directory for Oracle to use as a recovery area. This will likely depend on where your database is installed. The following made sense for my installation which is already running in a docker container:

1
docker exec -it faithDB /bin/sh -c "mkdir /home/oracle/app/oracle/oradata/recovery_area"

Set archive log mode and enable GG replication

docker exec -ti -e ORACLE_SID=db1 oracleDBContainer sqlplus sys/oracle as SYSDBA

alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/home/oracle/app/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate

-- WAIT FOR DB TO SHUTDOWN. START IT UP AGAIN WITH:
startup mount
alter database archivelog;
alter database open;

-- Should show "Database log mode: Archive Mode":
archive log list

Create XStream admin user in the container database (used per Oracle’s recommendation for administering XStream)

CREATE TABLESPACE xstream_adm_tbs DATAFILE '/home/oracle/app/oracle/oradata/db1/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE USER c##xstrmadmin IDENTIFIED BY xsa DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs;

GRANT CREATE SESSION TO c##xstrmadmin;

BEGIN
   DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
      grantee                 => 'c##xstrmadmin',
      privilege_type          => 'CAPTURE',
      grant_select_privileges => TRUE
   );
END;
/

Ref: XStream Configuration

Create test user in the database (i.e. a regular user of the database)

CREATE USER debezium IDENTIFIED BY dbz;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE TO debezium;
ALTER USER debezium QUOTA 100M ON users;

Create XStream user (used by the Debezium connector to connect to the XStream outbound server)

CREATE TABLESPACE xstream_tbs DATAFILE '/home/oracle/app/oracle/oradata/db1/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE USER c##xstrm IDENTIFIED BY xs DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;

GRANT CREATE SESSION TO c##xstrm;
GRANT SELECT ON V_$DATABASE to c##xstrm;
GRANT FLASHBACK ANY TABLE to c##xstrm;
GRANT SELECT_CATALOG_ROLE to c##xstrm;

exit;

Create XStream Outbound server:

docker exec -ti oracleDBContainer sqlplus c##xstrmadmin/xsa@//localhost:1521/db1

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
    tables(1)  := NULL;
    schemas(1) := 'debezium';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
/

exit;

Alter the XStream Outbound server to allow the xstrm user to connect to it

docker exec -ti -e ORACLE_SID=db1 oracleDBContainer sqlplus sys/oracle as SYSDBA

BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'dbzxout',
    connect_user => 'c##xstrm');
END;
/

exit;

Bespoke Table Setup

For every database table that you want to stream into Kafka, you need to run the following:

alter table <SCHEMA.TABLENAME> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
GRANT SELECT ON <SCHEMA.TABLENAME> to c##xstrm;

Automated Script

The previous section walked you through what needs to be done. Of course, this can be automated. You can find the script I wrote for this purpose in my github account here. This simply copies 4 other sql scripts onto the docker container, and then executes them using sqlplus.

As with the previous section, there will be a few differences between my database and yours, so be sure to inspect the scripts and change the necessary settings. For sure, you’ll need to change alterForCdc.sql which specifies the tables that you’re going to be streaming.

Also, the scripts make use of Docker, so be sure you’ve got that available, and have a think about how you’re going to look after the passwords for the users that are created.

Running Debezium

With your DB now suitable configured, we can move on to configuring Debezium itself. The simplest way of running the stack is through docker images. I used docker-compose. For my POC, I decided to use the Debezium images that already had the connectors installed on top of Kafka Connect.

The first thing to be aware of is that (due to licensing requirements) you need to download the Oracle “Instant Client“. The Debezium Oracle documentation also mentions this. Having downloaded it, it needs to be uncompressed (I created a directory for this called “oracle_instantclient” beside my docker-compose file) and copied onto the docker image for kafka connect. This is perhaps easier explained by looking at the Dockerfiles.

Starting at the top, here is my docker-compose file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
version: '3'
 
services:
 
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
 
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
 
  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.0
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
 
  connect:
    depends_on:
      - schema-registry
    build:
      context: debezium-with-oracle-jdbc
      args:
        DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
     - 15005:5005
    links:
     - kafka
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - LD_LIBRARY_PATH=/instant_client
     - KAFKA_DEBUG=true
     - DEBUG_SUSPEND_FLAG=n
     - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
 
  ksql-server:
    image: confluentinc/cp-ksql-server:5.3.0
    depends_on:
      - kafka
      - schema-registry
    environment:
      KSQL_CUB_KAFKA_TIMEOUT: 120
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_STREAMS_PRODUCER_RETRIES: 2147483647
      KSQL_KSQL_EXTENSION_DIR: /etc/ksql-server/ext
      KSQL_CONFIG_DIR: /etc/ksql-server/ext
 
  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.3.0
    depends_on:
      - ksql-server
    entrypoint: /bin/sh
    tty: true
 
 
networks:
  default:
    external:
      name: faith

If you study this, you’ll spot that “connect” isn’t run directly from a prebuilt image, but instead it is read another Dockerfile. This allows us to install the Oracle “Instant Client” that you’ve downloaded and extracted above. Here’s the Dockerfile for the connect image:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ARG DEBEZIUM_VERSION
FROM debezium/connect:$DEBEZIUM_VERSION
 
ENV INSTANT_CLIENT_DIR=/instant_client/
 
USER root
RUN yum -y install libaio && yum clean all
 
USER kafka
# Deploy Oracle client and drivers
 
COPY oracle_instantclient/* $INSTANT_CLIENT_DIR
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs

Running “docker-compose up” will give us our stack. At this piint, we’ve got a database ready for streaming plus all the stack we need to do the streaming. We just need the glue – the kafka connect Connector.

Note that I’m also using the Schema Registry as I want to write the output in Avro rather than JSON.

Create the Connector

This is happily back in line with the Debezium Connector for Oracle documentation and also the script used in Robin Moffat’s blog I mentioned earlier. However, there are a couple of extra tweeks needed to make it work for Oracle 11:

1
2
"database.tablename.case.insensitive": "true"
"database.oracle.version": "11"

Here’s my complete connector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" \
    -d '{
      "name": "netstream-source-debezium-xstream",
      "config": {
            "connector.class": "io.debezium.connector.oracle.OracleConnector",
            "database.server.name" : "netstream",
            "database.hostname" : "faithDB",
            "database.port" : "1521",
            "database.user" : "c##xstrm",
            "database.password" : "xs",
            "database.dbname" : "db1",
            "database.out.server.name" : "dbzxout",
            "database.history.kafka.bootstrap.servers" : "kafka:9092",
            "database.history.kafka.topic": "schema-changes.inventory",
            "database.tablename.case.insensitive": "true",
            "database.oracle.version": "11",
            "include.schema.changes": "true",
            "table.whitelist": "db1.service_owner.services",
            "transforms": "InsertTopic,InsertSourceDetails",
            "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertTopic.topic.field":"messagetopic",
            "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertSourceDetails.static.field":"messagesource",
            "transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on netstream"
       }
    }'

Note that the “database.history.kafka.bootstrap.servers” property points to the kafka docker container.

Finally?

We should be there now. Congratulations!

If you’ve got data in your database table, then you should find that a topic has been created and it contains your data. Adding, Updating or Deleting data will cause new messages to be written to your topic.

Don’t forget…

You might not quite be home and dry. While your data is sitting in a topic, you’ll likely want to be doing something with it!

I intended to read the data using KSQL, however, I found that some fields in the database had been defined as “NUMBER” without scale or precision. This meant that the connector wrote the value into the topic as a byte array, something which KSQL does not yet support. Stay tuned for a forthcoming blog detailing what I did to read this data!

Leave a Reply