Infrastructure at your Service

Franck Pachot

Event Sourcing: CQN is not a replacement for CDC

By July 2, 2018 Oracle No Comments

We are in an era where software architects want to stream the transactions out of the database and distribute them, as events, to multiple microservices. Don’t ask why, but that’s the trend: store inconsistent eventually consistent copies of data in different physical components, rather than simply using logical views in the same database, where the data is ACIDely stored, processed and protected. Because it was decided that this segregation, in CQRS (Command Query Responsibility Segregation), will be physical, on different systems, the need for logical replication and change data capture is raising, with a new name: Event Sourcing.

When we want to replicate the changes without adding an overhead to the database, the solution is Change Data Capture from the redo stream. The redo contains all the physical changes and, with dictionary information and a little supplemental logging, we can mine it to extract the logical changes. Currently are commercial products (Oracle GoldenGate, Attunity, Dbvisit replicate) and there are some open source ones based on LogMiner (StreamSets, Debezium). LogMiner is available on all Oracle Database editions without any option. In Enterprise Edition, a more efficient solution was possible with Streams but now you have to pay for GoldenGate to use Streams. Unfortunately, sometimes you pay software update to get features removed and be sold in additional products.

Oracle has another feature that can help to replicate changes: Database Change notification, now known as Continuous Query Notification (CQN) or Object Change Notification (OCN). This feature has been implemented to refresh caches: you have a query that loads the cache and you want to be notified when some changes occurred, so that you have to update/refresh the cache. Then, in theory, this can be used to stream out the changes. However, CQN was not built for frequent changes but rather for nearly static, or slowly changing data. But sometimes we have to test by ourselves and here are my test using CQN with a lot of changes on the underlying table, just to show how it increases the load on the database and slows down the changes.

I create a DEMO table with one million rows:

17:21:56 SQL> whenever sqlerror exit failure;
17:21:56 SQL> create table DEMO (ID constraint DEMO_ID primary key) as select rownum from xmltable('1 to 1000000');
 
Table DEMO created.
 

And a table to hold notifications. As always when I want to start with an example, I start to get it from oracle-base:

17:21:58 SQL> -- from Tim Hall https://oracle-base.com/articles/10g/dbms_change_notification_10gR2
17:21:58 SQL> CREATE TABLE notifications (
2 id NUMBER,
3 message VARCHAR2(4000),
4 notification_date DATE
5 );
 
Table NOTIFICATIONS created.
 
17:21:58 SQL> CREATE SEQUENCE notifications_seq;
 
Sequence NOTIFICATIONS_SEQ created.

The callback function:

17:21:58 SQL> CREATE OR REPLACE PROCEDURE callback (ntfnds IN SYS.chnf$_desc) IS
2 l_regid NUMBER;
3 l_table_name VARCHAR2(60);
4 l_event_type NUMBER;
5 l_numtables NUMBER;
6 l_operation_type NUMBER;
7 l_numrows NUMBER;
8 l_row_id VARCHAR2(20);
9 l_operation VARCHAR2(20);
10 l_message VARCHAR2(4000) := NULL;
11 BEGIN
12 l_regid := ntfnds.registration_id;
13 l_numtables := ntfnds.numtables;
14 l_event_type := ntfnds.event_type;
15 IF l_event_type = DBMS_CHANGE_NOTIFICATION.EVENT_OBJCHANGE THEN
16 FOR i IN 1 .. l_numtables LOOP
17 l_table_name := ntfnds.table_desc_array(i).table_name;
18 l_operation_type := ntfnds.table_desc_array(i).Opflags;
19 IF (BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.ALL_ROWS) = 0) THEN
20 l_numrows := ntfnds.table_desc_array(i).numrows;
21 ELSE
22 l_numrows :=0; /* ROWID INFO NOT AVAILABLE */
23 END IF;
24 CASE
25 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.INSERTOP) != 0 THEN
26 l_operation := 'Records Inserted';
27 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.UPDATEOP) != 0 THEN
28 l_operation := 'Records Updated';
29 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.DELETEOP) != 0 THEN
30 l_operation := 'Records Deleted';
31 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.ALTEROP) != 0 THEN
32 l_operation := 'Table Altered';
33 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.DROPOP) != 0 THEN
34 l_operation := 'Table Dropped';
35 WHEN BITAND(l_operation_type, DBMS_CHANGE_NOTIFICATION.UNKNOWNOP) != 0 THEN
36 l_operation := 'Unknown Operation';
37 ELSE
38 l_operation := '?';
39 END CASE;
40 l_message := 'Table (' || l_table_name || ') - ' || l_operation || '. Rows=' || l_numrows;
41 INSERT INTO notifications (id, message, notification_date)
42 VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE);
43 COMMIT;
44 END LOOP;
45 END IF;
46 END;
47 /
 
Procedure CALLBACK compiled
 
17:21:58 SQL> -- thanks Tim

and the CQN registration:

17:21:58 SQL> -- register on DEMO;
17:21:58 SQL>
17:21:58 SQL> DECLARE
2 reginfo CQ_NOTIFICATION$_REG_INFO;
3 v_cursor SYS_REFCURSOR;
4 regid NUMBER;
5 BEGIN
6 reginfo := cq_notification$_reg_info ( 'callback', DBMS_CHANGE_NOTIFICATION.QOS_ROWIDS, 0, 0, 0);
7 regid := sys.DBMS_CHANGE_NOTIFICATION.new_reg_start(reginfo);
8 OPEN v_cursor FOR
9 SELECT dbms_cq_notification.CQ_NOTIFICATION_QUERYID, demo.* from DEMO;
10 CLOSE v_cursor;
11 sys.DBMS_CHANGE_NOTIFICATION.reg_end;
12 END;
13 /
 
PL/SQL procedure successfully completed.

Now I delete 1 million rows and commit:


17:21:58 SQL> exec dbms_workload_repository.create_snapshot;
 
PL/SQL procedure successfully completed.
 
17:22:02 SQL>
17:22:02 SQL> -- 1000000 deletes
17:22:02 SQL>
17:22:02 SQL> exec for i in 1..1000000 loop delete from DEMO WHERE id=i; commit; end loop;
 
PL/SQL procedure successfully completed.
 
17:39:23 SQL>
17:39:23 SQL> exec dbms_workload_repository.create_snapshot;

Here are the notifications captured:

17:39:41 SQL> select count(*) from notifications;
COUNT(*)
--------
942741
 
17:39:54 SQL> select * from notifications fetch first 10 rows only;
 
ID MESSAGE NOTIFICATION_DATE
--- ------------------------------------------- -----------------
135 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
138 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
140 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
142 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
145 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
147 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
149 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
152 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
154 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18
156 Table (DEMO.DEMO) - Records Deleted. Rows=1 09-MAY-18

The DML has been long and SQL Monitoring shows that 64% of the time was waiting on ‘Wait for EMON to process ntfns’ which is the notification process:
CaptureCQN

The execution of the delete itself (cdq5w65zk18r1 DELETE FROM DEMO WHERE ID=:B1) is only a small part of the database time. And we have additional load on the database:
CaptureCQN01

The following is activity related to Continuous Query notification queuing of messages, the one that slows down the modifications, during the delete (from 17:22 to 17:38):

59p1yadp2g6mb call DBMS_AQADM_SYS.REGISTER_DRIVER ( )
gzf71xphapf1b select /*+ INDEX(TAB AQ$_AQ_SRVNTFN_TABLE_1_I) */ tab.rowid, tab.msgid, tab.corrid, tab.priority, tab.delay, tab.expiration ,tab.retry_count, tab.exception_qschema, tab.exception_queue, tab.chain_no, tab.local_order_no, tab.enq_time, tab.time_manager_info, tab.state, tab.enq_tid, tab.step_no, tab.sender_name, tab.sender_address, tab.sender_protocol, tab.dequeue_msgid, tab.user_prop, tab.user_data from "SYS"."AQ_SRVNTFN_TABLE_1" tab where q_name = :1 and (state = :2 ) order by q_name, state, enq_time, step_no, chain_no, local_order_no for update skip locked
61cgh171qq5m6 delete /*+ CACHE_CB("AQ_SRVNTFN_TABLE_1") */ from "SYS"."AQ_SRVNTFN_TABLE_1" where rowid = :1
ccrv58ajb7pxg begin callback(ntfnds => :1); end;
cdq5w65zk18r1 DELETE FROM DEMO WHERE ID=:B1

And at the end (17:38), when the modifications are committed, my callback function is running to process the messages:
CaptureCQN02
the main query is the insert from the callback function:
8z4m5tw9uh02d INSERT INTO NOTIFICATIONS (ID, MESSAGE, NOTIFICATION_DATE) VALUES (NOTIFICATIONS_SEQ.NEXTVAL, :B1 , SYSDATE)
The callback function may send the changes to another system, rather than inserting them here, but then you can question the availability and, anyway, this will still have a high overhead in context switches and network roundtrips.

In summary, for 1 million rows deleted, here are the queries that have been executed 1 million times:

Elapsed
Executions Rows Processed Rows per Exec Time (s) %CPU %IO SQL Id
------------ --------------- -------------- ---------- ----- ----- -------------
1,000,000 1,000,000 1.0 123.4 55.2 3.2 cdq5w65zk18r1 Module: java@VM188 (TNS V1-V3) DELETE FROM DEMO WHERE ID=:B1
999,753 999,753 1.0 261.5 88.6 .7 dw9yv631knnqd insert into "SYS"."AQ_SRVNTFN_TABLE_1" (q_name, msgid, corrid, priority, state, delay, expiration, time_manager_info, local_order_no, chain_no, enq_time, step_no, enq_uid, enq_tid, retry_count, exception_qschema, exception_queue, recipient_key, dequeue_msgid, user_data, sender_name, sender_address, sender_protoc
978,351 978,351 1.0 212.5 64.3 0 61cgh171qq5m6 Module: DBMS_SCHEDULER delete /*+ CACHE_CB("AQ_SRVNTFN_TABLE_1") */ from "SYS"."AQ_SRVNTFN_TABLE_1" where rowid = :1
978,248 942,657 1.0 971.6 20 .7 8z4m5tw9uh02d Module: DBMS_SCHEDULER INSERT INTO NOTIFICATIONS (ID, MESSAGE, NOTIFICATION_DATE) VALUES (NOTIFICATIONS_SEQ.NEXTVAL, :B1 , SYSDATE)
978,167 942,559 1.0 1,178.7 33.1 .5 ccrv58ajb7pxg Module: DBMS_SCHEDULER begin callback(ntfnds => :1); end;
977,984 977,809 1.0 73.9 96.5 0 brq600g3299zp Module: DBMS_SCHEDULER SELECT INSTANCE_NUMBER FROM SYS.V$INSTANCE
933,845 978,350 1.0 446.9 51.4 .7 gzf71xphapf1b Module: DBMS_SCHEDULER select /*+ INDEX(TAB AQ$_AQ_SRVNTFN_TABLE_1_I) */ tab.rowid, tab.msgid, tab.corrid, tab.priority, tab.delay, tab.expiration ,tab.retry_count, tab.exception_qschema, tab.exception_queue, tab.chain_no, tab.local_order_no, tab.enq_time, tab.time_manager_info, tab.state, tab.enq_tid, tab.step_no, tab.sender_name

This is a huge overhead. And all this has generated 8 millions of redo entries.

In summary, just forget about CQN to stream changes. This feature is aimed at cache refresh for rarely changing data. What we call today ‘event sourcing’ exists for a long time in the database, with redo logs. When a user executes some DML, Oracle generates the redo records first, store them and apply them to update the current version of the table rows. And the redo logs keeps the atomicity of transaction (the ‘A’ in ACID). Then better use this if the changes need to be propagated to other systems.

 

Leave a Reply

Franck Pachot
Franck Pachot

Technology Leader