Debezium with Oracle 11g
Versions in use
- Kafka 2.3.0 / Confluent Platform 5.3.0
- Debezium 0.10
- Oracle 11g EE 11.2.0.4
tl;dr
It works, though you need some small adjustments to the DB setup that is described in the Debezium Tutorial Using Oracle. You need a minimum version of 11.2.0.4. You also need an Oracle license for XStream in addition to your 11g database. Watch out for Oracle data types that come through Debezium as a byte array: KSQL can’t read it, so you need some other way to deal with that – I’m going to blog on this in the future!
Starting out
We have an Oracle 11g (11.2.0.4) DB and I wanted to try out a CDC implementation rather than using the JDBC Kafka Connector. Oracle GoldenGate is an option, however Debezium appeared to be a good alternative – without spinning up a GoldenGate server. You still need an Oracle license for XStream however – something that the good folks at Debezium would like to change. This wasn’t an obstacle for me however as my current client have licenses for all Oracle products.
The Oracle 11g (11.2.0.4) EE DB
The Debezium tutorial is based around spinning up an Oracle DB using vagrant. Following the steps would result in you running an Oracle 12.2 DB – no good for my scenario.
I also had a look at Robin Moffat’s blog (always a great resource on Kafka) on Streaming data from Oracle into Kafka. He contrasts the JDBC connector against the Debezium Oracle connector. For this, you need to look after firing up your Oracle DB yourself – Robin provides the link to the Oracle Github account where scripts are available to build a docker image (you still need to download the database binaries from the Oracle Technology Network). However, these scripts aren’t suitable for creating an Oracle 11g EE DB – only XE is supported at that version.
I had a third option – where I’m working atm, I had a suitable docker image already available for my use. This is already in use for automated tests and has a number of flyway scripts which build the database for me. Given this is the closest I had to a production database, it seemed like the best place to start.
I’m afraid you’re still on your own here to work out how to get yourself a suitable DB. Maybe its just lying around – or perhaps like me you have already got it dockerised. This is something you’ll need to bring to the party!
Preparing the Database
The Debezium Oracle Tutorial (“Manual Steps“) tells us that we have some work to do on the DB to get things running. Robin has automated this in a script. Unfortunately, these are all geared to an example DB schema for Oracle 12. Unlike Oracle 12, 11g is not a “containerised” database: “All Oracle databases before Oracle Database 12c were non-CDBs”. I’m now going to take you through the customised steps I carried out.
While I’ll explain the steps one at a time, I have also created a script that applies all of these steps automatically.
Some things will be different for you throughout:
- My Oracle DB is running in docker – the container name is
oracleDBContainer
. - On my docker container, the
oradata
directory lives under/home/oracle/app/oracle
. - My DB is called
db1
.
Also, note that I’m using the default users and passwords here. You wouldn’t do this in a production environment!
Manual Steps
Here I’m explaining the steps needed to prepare an Oracle 11.2.04g EE database for use with Debezium. If you’re doing the same and in a rush, you may want to consider the next section instead (“Automated Script”).
Create the DB Recovery Area
Here we’re creating a directory for Oracle to use as a recovery area. This will likely depend on where your database is installed. The following made sense for my installation which is already running in a docker container:
1
|
docker exec -it faithDB /bin/sh -c "mkdir /home/oracle/app/oracle/oradata/recovery_area" |
Set archive log mode and enable GG replication
docker exec -ti -e ORACLE_SID=db1 oracleDBContainer sqlplus sys/oracle as SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/home/oracle/app/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
-- WAIT FOR DB TO SHUTDOWN. START IT UP AGAIN WITH:
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode":
archive log list
Create XStream admin user in the container database (used per Oracle’s recommendation for administering XStream)
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/home/oracle/app/oracle/oradata/db1/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER c##xstrmadmin IDENTIFIED BY xsa DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs;
GRANT CREATE SESSION TO c##xstrmadmin;
BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'c##xstrmadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE
);
END;
/
Create test user in the database (i.e. a regular user of the database)
CREATE USER debezium IDENTIFIED BY dbz;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE TO debezium;
ALTER USER debezium QUOTA 100M ON users;
Create XStream user (used by the Debezium connector to connect to the XStream outbound server)
CREATE TABLESPACE xstream_tbs DATAFILE '/home/oracle/app/oracle/oradata/db1/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER c##xstrm IDENTIFIED BY xs DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;
GRANT CREATE SESSION TO c##xstrm;
GRANT SELECT ON V_$DATABASE to c##xstrm;
GRANT FLASHBACK ANY TABLE to c##xstrm;
GRANT SELECT_CATALOG_ROLE to c##xstrm;
exit;
Create XStream Outbound server:
docker exec -ti oracleDBContainer sqlplus c##xstrmadmin/xsa@//localhost:1521/db1
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'debezium';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas);
END;
/
exit;
Alter the XStream Outbound server to allow the xstrm user to connect to it
docker exec -ti -e ORACLE_SID=db1 oracleDBContainer sqlplus sys/oracle as SYSDBA
BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'dbzxout',
connect_user => 'c##xstrm');
END;
/
exit;
Bespoke Table Setup
For every database table that you want to stream into Kafka, you need to run the following:
alter table <SCHEMA.TABLENAME> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
GRANT SELECT ON <SCHEMA.TABLENAME> to c##xstrm;
Automated Script
The previous section walked you through what needs to be done. Of course, this can be automated. You can find the script I wrote for this purpose in my github account here. This simply copies 4 other sql scripts onto the docker container, and then executes them using sqlplus.
As with the previous section, there will be a few differences between my database and yours, so be sure to inspect the scripts and change the necessary settings. For sure, you’ll need to change alterForCdc.sql which specifies the tables that you’re going to be streaming.
Also, the scripts make use of Docker, so be sure you’ve got that available, and have a think about how you’re going to look after the passwords for the users that are created.
Running Debezium
With your DB now suitable configured, we can move on to configuring Debezium itself. The simplest way of running the stack is through docker images. I used docker-compose. For my POC, I decided to use the Debezium images that already had the connectors installed on top of Kafka Connect.
The first thing to be aware of is that (due to licensing requirements) you need to download the Oracle “Instant Client“. The Debezium Oracle documentation also mentions this. Having downloaded it, it needs to be uncompressed (I created a directory for this called “oracle_instantclient” beside my docker-compose file) and copied onto the docker image for kafka connect. This is perhaps easier explained by looking at the Dockerfiles.
Starting at the top, here is my docker-compose file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
version: '3' services: zookeeper: image: debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 schema-registry: image: confluentinc/cp-schema-registry:5.3.0 hostname: schema-registry depends_on: - zookeeper - kafka ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' connect: depends_on: - schema-registry build: context: debezium-with-oracle-jdbc args: DEBEZIUM_VERSION: ${DEBEZIUM_VERSION} ports: - 8083:8083 - 15005:5005 links: - kafka environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - LD_LIBRARY_PATH=/instant_client - KAFKA_DEBUG=true - DEBUG_SUSPEND_FLAG=n - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 ksql-server: image: confluentinc/cp-ksql-server:5.3.0 depends_on: - kafka - schema-registry environment: KSQL_CUB_KAFKA_TIMEOUT: 120 KSQL_BOOTSTRAP_SERVERS: kafka:9092 KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081 KSQL_KSQL_SERVICE_ID: confluent_rmoff_01 KSQL_KSQL_STREAMS_PRODUCER_RETRIES: 2147483647 KSQL_KSQL_EXTENSION_DIR: /etc/ksql-server/ext KSQL_CONFIG_DIR: /etc/ksql-server/ext ksql-cli: image: confluentinc/cp-ksql-cli:5.3.0 depends_on: - ksql-server entrypoint: /bin/sh tty: true networks: default: external: name: faith |
If you study this, you’ll spot that “connect” isn’t run directly from a prebuilt image, but instead it is read another Dockerfile. This allows us to install the Oracle “Instant Client” that you’ve downloaded and extracted above. Here’s the Dockerfile for the connect image:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
ARG DEBEZIUM_VERSION FROM debezium/connect:$DEBEZIUM_VERSION ENV INSTANT_CLIENT_DIR=/instant_client/ USER root RUN yum -y install libaio && yum clean all USER kafka # Deploy Oracle client and drivers COPY oracle_instantclient/* $INSTANT_CLIENT_DIR COPY oracle_instantclient/xstreams.jar /kafka/libs COPY oracle_instantclient/ojdbc8.jar /kafka/libs |
Running “docker-compose up” will give us our stack. At this piint, we’ve got a database ready for streaming plus all the stack we need to do the streaming. We just need the glue – the kafka connect Connector.
Note that I’m also using the Schema Registry as I want to write the output in Avro rather than JSON.
Create the Connector
This is happily back in line with the Debezium Connector for Oracle documentation and also the script used in Robin Moffat’s blog I mentioned earlier. However, there are a couple of extra tweeks needed to make it work for Oracle 11:
1
2
|
"database.tablename.case.insensitive": "true" "database.oracle.version": "11" |
Here’s my complete connector:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" \ -d '{ "name": "netstream-source-debezium-xstream", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "database.server.name" : "netstream", "database.hostname" : "faithDB", "database.port" : "1521", "database.user" : "c##xstrm", "database.password" : "xs", "database.dbname" : "db1", "database.out.server.name" : "dbzxout", "database.history.kafka.bootstrap.servers" : "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "database.tablename.case.insensitive": "true", "database.oracle.version": "11", "include.schema.changes": "true", "table.whitelist": "db1.service_owner.services", "transforms": "InsertTopic,InsertSourceDetails", "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertTopic.topic.field":"messagetopic", "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSourceDetails.static.field":"messagesource", "transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on netstream" } }' |
Note that the “database.history.kafka.bootstrap.servers” property points to the kafka docker container.
Finally?
We should be there now. Congratulations!
If you’ve got data in your database table, then you should find that a topic has been created and it contains your data. Adding, Updating or Deleting data will cause new messages to be written to your topic.
Don’t forget…
You might not quite be home and dry. While your data is sitting in a topic, you’ll likely want to be doing something with it!
I intended to read the data using KSQL, however, I found that some fields in the database had been defined as “NUMBER” without scale or precision. This meant that the connector wrote the value into the topic as a byte array, something which KSQL does not yet support. Stay tuned for a forthcoming blog detailing what I did to read this data!