Techcorner

Pasthighlights

Oracle Streams

Contributor: Danny Brown, ExeVision Senior Consultant

Please see script sample below.

Oracle Streams is a flexible feature for information sharing between Oracle 10g databases. It
enables the propagation and management of data, transactions and events in a data stream from one database to another.

The three basic elements of Oracle Streams, capture, propagation, and apply, can be configured in a number of different ways depending on the business requirements. Streams can replicate data from one database to one or more databases. It uses advanced queuing and redo logs to send data through database links from the source database to the advanced queue of the receiving databases. Data is then moved from the advanced queue into the replicated tables.

Streams is extremely fast and efficient. Please see example setup code below.

REM
REM On Local Machine DB1
REM

connect sys/xxxxx

create tablespace logmnr_tbs datafile '/oradata/db1/logmnr_tbs_01.dbf'
size 25m reuse
autoextend on maxsize unlimited
/

begin
	dbms_logmnr_d.set_tablespace('LOGMNR_TBS');
end;
/

create user strmadm identified by strmadm
default tablespace logmnr_tbs
temporary tablespace temp
quota unlimited on logmnr_tbs
/

grant connect, resource, select any dictionary, select_catalog_role, exp_full_database, 
imp_full_database, aq_administrator_role
to strmadm
/

grant execute on sys.dbms_aq to strmadm;
grant execute on sys.dbms_aqadm to strmadm;
grant execute on sys.dbms_streams_adm to strmadm;
grant execute on sys.dbms_capture_adm to strmadm;
grant execute on sys.dbms_apply_adm to strmadm;
grant execute on sys.dbms_rule_adm to strmadm;

begin
	dbms_aqadm.grant_system_privilege(
	privilege => 'ENQUEUE_ANY',
	grantee => 'STRMADM',
	admin_option => FALSE);

	dbms_aqadm.grant_system_privilege(
	privilege => 'DEQUEUE_ANY',
	grantee => 'STRMADM',
	admin_option => FALSE);

	dbms_aqadm.grant_system_privilege(
	privilege => 'MANAGE_ANY',
	grantee => 'STRMADM',
	admin_option => TRUE);

end;
/

begin
	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_object_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ON_EVALUATION_CONTEXT,
	object_name => 'SYS.STREAMS$_EVALUATION_CONTEXT',
	grantee => 'STRMADM',
	grant_option => FALSE);
end;
/

create public database link db2
connect to strmadm identified by strmadm
using 'db2';

connect strmadm/strmadm

begin
	DBMS_STREAMS_ADM.SET_UP_QUEUE(
	queue_user => 'STRMADM');
end;
/

begin
	DBMS_STREAMS_ADM.ADD_TABLE_RULES(
	table_name => 'TESTUSER.EMPLOYEE',
	streams_type => 'CAPTURE',
	streams_name => 'STRMADM_CAPTURE',
	queue_name => 'STRMADM.STREAMS_QUEUE',
	include_dml => true,
	include_ddl => false,
	source_database => 'DB1');
end;
/

begin
	DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
	table_name => 'TESTUSER.EMPLOYEE',
	streams_name => 'STRMADM_PROPAGATE',
	source_queue_name => 'STRMADM.STREAMS_QUEUE',
	destination_queue_name => 'STRMADM.STREAMS_QUEUE@DB2',
	include_dml => true,
	include_ddl => false,
	source_database => 'DB1');
end;
/

connect strmadm/strmadm

declare
	v_started number;
begin
	select decode(status, 'ENABLED',1,0) into v_started
	from dba_capture
	where capture_name = 'STRMADM_CAPTURE';

	if (v_started = 0) then
		DBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'STRMADM_CAPTURE');
	end if;
end;
/

REM
REM Check to see if it is enabled.
REM

select capture_name, queue_name, status from dba_capture;

On the Target databse, issue the following commands:

REM
REM on Remote Machine DB2
REM

connect sys/xxxxx

create tablespace logmnr_tbs datafile '/oradata/db2/logmnr_tbs_01.dbf'
size 25m reuse
autoextend on maxsize unlimited
/

create user strmadm identified by strmadm
default tablespace logmnr_tbs
temporary tablespace temp
quota unlimited on logmnr_tbs;

grant connect, resource, select any dictionary, select_catalog_role, exp_full_database, 
imp_full_database, aq_administrator_role
to strmadm;

grant execute on sys.dbms_aq to strmadm;
grant execute on sys.dbms_aqadm to strmadm;
grant execute on sys.dbms_streams_adm to strmadm;
grant execute on sys.dbms_capture_adm to strmadm;
grant execute on sys.dbms_apply_adm to strmadm;
grant execute on sys.dbms_rule_adm to strmadm;

begin
	dbms_aqadm.grant_system_privilege(
	privilege => 'ENQUEUE_ANY',
	grantee => 'STRMADM',
	admin_option => FALSE);

	dbms_aqadm.grant_system_privilege(
	privilege => 'DEQUEUE_ANY',
	grantee => 'STRMADM',
	admin_option => FALSE);

	dbms_aqadm.grant_system_privilege(
	privilege => 'MANAGE_ANY',
	grantee => 'STRMADM',
	admin_option => TRUE);

end;
/

begin
	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_system_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
	grantee => 'STRMADM',
	grant_option => TRUE);

	dbms_rule_adm.grant_object_privilege(
	privilege => DBMS_RULE_ADM.EXECUTE_ON_EVALUATION_CONTEXT,
	object_name => 'SYS.STREAMS$_EVALUATION_CONTEXT',
	grantee => 'STRMADM',
	grant_option => FALSE);
end;
/

create public database link db1
connect to strmadm identified by strmadm
using 'db1';

connect strmadm/strmadm

begin
	DBMS_STREAMS_ADM.SET_UP_QUEUE(
	queue_user => 'STRMADM');
end;
/

begin
	DBMS_STREAMS_ADM.ADD_TABLE_RULES(
	table_name => 'TESTUSER.EMPLOYEE',
	streams_type => 'APPLY',
	streams_name => 'STRMADM_LOCAL',
	queue_name => 'STRMADM.STREAMS_QUEUE',
	include_dml => true,
	include_ddl => false,
	source_database => 'DB1');
end;
/

connect sys/xxxxx

grant select, insert, update, delete on testuser.employee to strmadm;

connect strmadm/strmadm

declare
	v_started number;
begin
	select decode(status, 'ENABLED',1,0) into v_started
	from dba_apply
	where apply_name = 'STRMADM_LOCAL';

	if (v_started = 0) then
		DBMS_APPLY_ADM.START_APPLY(apply_name => 'STRMADM_LOCAL');
	end if;
end;
/

REM
REM Check to see if it is enabled
REM

select apply_name, queue_name, apply_user, status from dba_apply;

 

	

When data is entered and commited into DB1.TESTUSER.EMPLOYEE, it appears in the DB2.TESTUSER.EMPLOYEE table. When it is updated in DB1, it gets updated in DB2. And, when it gets deleted from DB1, it gets deleted from DB2.

Check metalink.oracle.com for initialization parameters needed to run streams.

Tech Corner is new for January, 2008. We will keep a running list of these helpful tips on this page.