Locations of visitors to this page
Showing posts with label plsql. Show all posts
Showing posts with label plsql. Show all posts

Monday, July 6, 2009

returning clause returning子句

returning clause
returning子句

使用RETURNING子句返回DML语句影响的记录的值
1.
drop table t purge;
create table t(a char, b int);
insert into t values ('a',1);
insert into t values ('b',3);
insert into t values ('c',4);
commit;

var v_a char
var v_b number
insert into t values ('d',2) returning a,b into :v_a,:v_b;
print v_a v_b
update t set b=9 where a='c' returning a,b into :v_a,:v_b;
print v_a v_b
delete t where a='b' returning a,b into :v_a,:v_b;
print v_a v_b

rollback;

SQL> var v_a char
SQL> var v_b number
SQL> insert into t values ('d',2) returning a,b into :v_a,:v_b;

1 row created.

SQL> print v_a v_b

V_A
--------------------------------
d


       V_B
----------
         2

SQL> update t set b=9 where a='c' returning a,b into :v_a,:v_b;

1 row updated.

SQL> print v_a v_b

V_A
--------------------------------
c


       V_B
----------
         9

SQL> delete t where a='b' returning a,b into :v_a,:v_b;

1 row deleted.

SQL> print v_a v_b

V_A
--------------------------------
b


       V_B
----------
         3

SQL>
对于INSERT语句, RETURNING返回的是插入后的值
对于UPDATE语句, RETURNING返回的是变更之后的值
对于DELETE语句, RETURNING返回的是删除前的值



2. RETURNING后面不仅可以是字段, 还可以是一个或多个表达式
var v_a varchar2(50)
var v_b number
var v_b2 number
insert into t values ('d',2) returning rowid,b into :v_a,:v_b;
print v_a v_b
update t set b=b+1 returning sum(b) into :v_b;
print v_b
delete t where a='b' returning rowid,sum(b) into :v_rid,:v_b;
delete t where a in ('b','c') returning min(b),sum(b) into :v_b,:v_b2;
print v_b v_b2
rollback;

SQL> var v_a varchar2(50)
SQL> var v_b number
SQL> var v_b2 number
SQL> insert into t values ('d',2) returning rowid,b into :v_a,:v_b;

1 row created.

SQL> print v_a v_b

V_A
--------------------------------------------------------------------------------------------------------------------------------
AAAFY8AAGAAAACPAAD


       V_B
----------
         2

SQL> update t set b=b+1 returning sum(b) into :v_b;

4 rows updated.

SQL> print v_b

       V_B
----------
        14

SQL> delete t where a='b' returning rowid,sum(b) into :v_rid,:v_b;
delete t where a='b' returning rowid,sum(b) into :v_rid,:v_b
                               *
ERROR at line 1:
ORA-00937: not a single-group group function


SQL> delete t where a in ('b','c') returning min(b),sum(b) into :v_b,:v_b2;

2 rows deleted.

SQL> print v_b v_b2

       V_B
----------
         4


      V_B2
----------
         9

SQL> rollback;

Rollback complete.

SQL>
表达式可以是rowid, 函数, 或是聚集函数(10g新特性)


3. RETURNING BULK COLLECT INTO返回多条记录
如果DML语句影响了多条记录, 而RETURNING子句后也不是聚集函数, 那么使用BULK COLLECT INTO一次返回多条记录
set serveroutpu on size unlimited
select * from t;
declare
  type t_t is table of t%rowtype index by binary_integer;
  v_t_a t_t;
begin
  update t set b=b+1 returning a,b bulk collect into v_t_a;
  for i in 1..v_t_a.count loop
    dbms_output.put_line(i||':a='||v_t_a(i).a||',b='||v_t_a(i).b);
  end loop;
  rollback;
end;
/
SQL> set serveroutpu on size unlimited
SQL> select * from t;

A          B
- ----------
a          1
b          3
c          4

SQL> declare
  2    type t_t is table of t%rowtype index by binary_integer;
  3    v_t_a t_t;
  4  begin
  5    update t set b=b+1 returning a,b bulk collect into v_t_a;
  6    for i in 1..v_t_a.count loop
  7      dbms_output.put_line(i||':a='||v_t_a(i).a||',b='||v_t_a(i).b);
  8    end loop;
  9    rollback;
 10  end;
 11  /
1:a=a,b=2
2:a=b,b=4
3:a=c,b=5

PL/SQL procedure successfully completed.

SQL>


4.使用中的限制

a.聚集函数和非聚集函数表达式不能一起用, 见前面的例子

b.聚集函数中不能用DISTINCT
select * from t;
var v_b number
update t set b=b+1 returning count(distinct b) into :v_b;
print v_b
rollback;
SQL> select * from t;

A          B
- ----------
a          2
b          4
c          5

SQL> var v_b number
SQL> update t set b=b+1 returning count(distinct b) into :v_b;
update t set b=b+1 returning count(distinct b) into :v_b
                             *
ERROR at line 1:
ORA-00934: group function is not allowed here


SQL> print v_b

       V_B
----------


SQL> rollback;

Rollback complete.

SQL>

c.聚集函数不能在INSERT语句里使用
var v_b number
insert into t values ('d',9) returning count(b) into :v_b;
print v_b
rollback;
SQL> var v_b number
SQL> insert into t values ('d',9) returning count(b) into :v_b;
insert into t values ('d',9) returning count(b) into :v_b
                                       *
ERROR at line 1:
ORA-00934: group function is not allowed here


SQL> print v_b

       V_B
----------


SQL> rollback;

Rollback complete.

SQL>

d.如果表达式包含主键字段或非空字段, 且存在BEFORE UPDATE触发器, UPDATE语句会失败
这是文档写的
"If the expr list contains a primary key column or other NOT NULL column, then the update statement fails if the table has a BEFORE UPDATE trigger defined on it."
但实际测试没有发现问题
alter table t modify (b not null);
alter table t modify (a primary key);
create or replace trigger tr_t
  before update on t for each row
begin
  :new.b := 2;
end;
/
var v_b number
var v_a number
update t set b=b+1 returning count(a),sum(b) into :v_b,:v_a;
print v_a v_b

e. INSERT INTO子查询不支持RETURNING
var v_b number
insert into t (select 'g',8 from t) returning sum(b) into v_b;
SQL> insert into t (select 'g',8 from t) returning sum(b) into v_b;
insert into t (select 'g',8 from t) returning sum(b) into v_b
                                    *
ERROR at line 1:
ORA-00933: SQL command not properly ended


SQL>

f.多表插入语句,MERGE语句,并行DML,远程对象,LONG类型,视图,INSTEAD OF触发器均不支持RETURNING
(都没测试过...)




外部链接:
DELETE
INSERT
UPDATE
RETURNING INTO Clause
Examples of Dynamic Bulk Binds


-fin-

Monday, June 22, 2009

Listing files in directory using PLSQL 用PLSQL列目录内容

Listing files in directory using PLSQL
使用PLSQL列目录内容

用3种方法列出目录中的文件列表


1. 使用DBMS_BACKUP_RESTORE程序包列出目录中的文件

Oracle 10g中, DBMS_BACKUP_RESTORE程序包提供了一个函数SEARCHFILE, 用这个函数可以列出目录中的文件
PROCEDURE SEARCHFILES
 Argument Name                  Type                    In/Out Default?
 ------------------------------ ----------------------- ------ --------
 PATTERN                        VARCHAR2                IN/OUT
 NS                             VARCHAR2                IN/OUT
 CCF                            BOOLEAN                 IN     DEFAULT
 OMF                            BOOLEAN                 IN     DEFAULT
 FTYPE                          VARCHAR2                IN     DEFAULT
后4个参数含义不详
第1个参数为要查询的目录名:
Unix下只能是目录名, 以斜杠结束, 也可以不带, 比如
'/var/log'

'/var/log/'
Windows下可以带文件名通配符,比如
'C:\WINDOWS\Help\*.hlp'

举例:
conn / as sysdba
set pages 50000 line 130
set serveroutput on size unlimited
var v_pat varchar2(1000);
var v_ns varchar2(1000);
exec :v_pat := '/var/log'
exec dbms_backup_restore.searchfiles(:v_pat, :v_ns)
select fname_krbmsft as name from x$krbmsft;
NAME
----------------------------------------------------------------------------------------------------------------------------------
/var/log/rpmpkgs.1
/var/log/rpmpkgs
/var/log/sa/sa16
/var/log/sa/sar17
/var/log/sa/sa15
/var/log/sa/sa21
/var/log/sa/sa20
/var/log/sa/sa18
/var/log/sa/sa19
/var/log/sa/sa22
/var/log/sa/sar14
/var/log/sa/sar15
/var/log/sa/sar18
/var/log/sa/sa17
/var/log/sa/sa14
/var/log/sa/sar20
/var/log/sa/sar13
/var/log/sa/sar21
/var/log/sa/sar16
/var/log/sa/sar19
/var/log/rpmpkgs.4
/var/log/wtmp
/var/log/prelink.log
/var/log/Pegasus/install.log
/var/log/Xorg.0.log
/var/log/rpmpkgs.3
/var/log/wtmp.1
/var/log/mcelog
/var/log/dmesg
/var/log/scrollkeeper.log
/var/log/Xorg.0.log.old
/var/log/gdm/:0.log.2
/var/log/gdm/:0.log
/var/log/gdm/:0.log.1
/var/log/rpmpkgs.2

35 rows selected.

调用dbms_backup_restore.searchfiles后, 在内存表x$krbmsft中生成文件列表
扫描是递归的, 包括了该目录和该目录下的子目录. 因此如果目录很深文件很多, 会占用大量内存
不显示隐藏文件(Unix下是以点开头的)


参考:
29 May 2007 - Finding Files
Is there a way to read the names of a set of files with a given extension from a directory as if from a SQL cursor? (同上)
封装为程序包 The XUTL_FINDFILES package
Oracle database directory listing with ls function by Harry Dragstra



2. JAVA编程获取目录文件列表

见ASKTOM上的例子, reading files in a directory -- how to get a list of available files.

create global temporary table DIR_LIST ( filename varchar2(255) ) on commit delete rows;

create or replace
  and compile java source named "DirList"
as
import java.io.*;
import java.sql.*;

public class DirList
{
  public static void getList(String directory)
                     throws SQLException
  {
      File path = new File( directory );
      String[] list = path.list();
      String element;

      for(int i = 0; i < list.length; i++)
      {
        element = list[i];
        #sql { INSERT INTO DIR_LIST (FILENAME)
               VALUES (:element) };
      }
  }
}
/

create or replace procedure get_dir_list( p_directory in varchar2 )
  as language java
name 'DirList.getList( java.lang.String )';
/

exec get_dir_list( '/var/log' );

select * from dir_list where rownum < 10;
FILENAME
----------------------------------------------------------------------------------------------------------------------------------
dbexprm_screen3.tmp
.ICE-unix
dbexprm_screen3.tmp.bad
.X0-lock
uscreens
dbexprm_screen3.tmp.good
.X11-unix
dbexprm_screen3.tmp.delete
dbexprm_screen3.tmp.run

9 rows selected.

能显示出隐藏文件


3. 调用外部操作系统命令获得文件列表

使用DBMS_SCHEDULER调用操作系统命令生成文件列表文件, 再用UTL_FILE读取改列表文件, 比较麻烦
set serveroutput on size unlimited

begin
  dbms_scheduler.create_job(
    job_name => 'os_ls',
    job_type => 'executable',
    job_action => '/bin/sh',
    number_of_arguments => 2,
    comments => 'OS ls'
  );
  dbms_scheduler.set_job_argument_value(
    job_name => 'os_ls',
    argument_position => 1,
    argument_value => '-c'
  );
  dbms_scheduler.set_job_argument_value(
    job_name => 'os_ls',
    argument_position => 2,
    argument_value => 'ls -l /var/log/*.log >/tmp/os_list.txt'
  );
end;
/
exec dbms_scheduler.run_job('os_ls');

create or replace directory os_ls_dir as '/tmp';
declare
  l_file utl_file.file_type;
  l_text     varchar2(2000);
  l_line     number(10) := 1;
begin
  l_file := utl_file.fopen(upper('os_ls_dir'), 'os_list.txt', 'r');
  begin
    loop
      utl_file.get_line(l_file, l_text);
      dbms_output.put_line(l_text);
      l_line := l_line + 1;
    end loop;
  exception
    when no_data_found then
      null;
  end;
  utl_file.fclose(l_file);
end;
/

exec dbms_scheduler.drop_job('os_ls');
drop directory os_ls_dir;
-rw-r--r--  1 root root  40296 Jun 12 07:40 /var/log/Xorg.0.log
-rw-------  1 root root  14424 Jun  4  2008 /var/log/anaconda.log
-rw-------  1 root root      0 Jun 21 04:02 /var/log/boot.log
-rw-r--r--  1 root root 402720 Jun 22 04:02 /var/log/prelink.log
-rw-r--r--  1 root root  63438 Jun  4  2008 /var/log/scrollkeeper.log

PL/SQL procedure successfully completed.



参考:list contents of directory



-fin-

Wednesday, June 10, 2009

Asynchronous Commit 异步提交

Asynchronous Commit
Oracle 10gR2+ 的异步提交

Oracle 10gR2开始, 增强了提交的功能, 实现异步/批量的提交


1. 异步提交

默认时提交的步骤:
1) 向系统全局区(SGA)中的重做缓冲区(redo log buffer)中写'事务结束(end of transaction)'记录
2) 通知(发送消息到)写日志进程(LGWR), 告诉它刷新重做缓冲区到磁盘
3) 等待磁盘刷新完成. 这就是常见的'日志文件同步'('log file sync')事件

10gR2 COMMIT增加了新选项:
WAIT: 等待相应的重做信息写到在线重做日志文件中后,提交命令才返回(缺省)
NOWAIT: 不等重做信息写到日志中, 提交命令就返回
IMMEDIATE: 写日志进程立刻写重做信息(缺省). 即强制执行一次磁盘 IO.
BATCH: 将重做信息缓冲起来. 写日志进程到时再写重做信息.

虽然提高了性能, 但是一旦系统宕机, 缓冲区中的已提交事务的重做信息将丢失. 如果遭遇磁盘IO错误, 也会丢失重做信息.



2. COMMIT命令

新的 COMMIT 命令增加了如下选项:
COMMIT WRITE IMMEDIATE|BATCH WAIT|NOWAIT;
WRITE Clause

COMMIT WRITE NOWAIT: 不做前面提到的第3步
COMMIT WRITE BATCH,NOWAIT: 不做第2和3步, 重做记录保留在缓冲区内, 直到其他人提交导致刷新, 或后台事件导致缓冲区异步的刷新
后台事件如下:
缓冲区1/3满
缓冲区充满1M
每3秒

测试:
CREATE TABLE commit_test (
  id           NUMBER(10),
  description  VARCHAR2(50),
  CONSTRAINT commit_test_pk PRIMARY KEY (id)
);

CONN A/A
SET SERVEROUTPUT ON
DECLARE
  function get_waits(p_event in varchar2) return number
  is
 l_waits  NUMBER;
  begin
 select total_waits
      into l_waits
      from v$session_event
     where event = p_event
       and sid = (select sid from v$mystat where rownum=1);
 return l_waits;
  exception
      when no_data_found then return 0;
  end;
  PROCEDURE do_loop (p_type  IN  VARCHAR2) AS
    l_start  NUMBER;
    l_loops  NUMBER := 1000;
 l_lfs    NUMBER;
  BEGIN
    EXECUTE IMMEDIATE 'TRUNCATE TABLE commit_test';

 l_lfs := get_waits('log file sync');
    l_start := DBMS_UTILITY.get_time;
    FOR i IN 1 .. l_loops LOOP
      INSERT INTO commit_test (id, description)
      VALUES (i, 'Description for ' || i);
     
      CASE p_type
        WHEN ' ' THEN COMMIT;
        WHEN 'WRITE' THEN COMMIT WRITE;
        WHEN 'WRITE WAIT' THEN COMMIT WRITE WAIT;
        WHEN 'WRITE NOWAIT' THEN COMMIT WRITE NOWAIT;
        WHEN 'WRITE BATCH' THEN COMMIT WRITE BATCH;
        WHEN 'WRITE IMMEDIATE' THEN COMMIT WRITE IMMEDIATE;
        WHEN 'WRITE BATCH WAIT' THEN COMMIT WRITE BATCH WAIT;
        WHEN 'WRITE BATCH NOWAIT' THEN COMMIT WRITE BATCH NOWAIT;
        WHEN 'WRITE IMMEDIATE WAIT' THEN COMMIT WRITE IMMEDIATE WAIT;
        WHEN 'WRITE IMMEDIATE NOWAIT' THEN COMMIT WRITE IMMEDIATE NOWAIT;
      END CASE;
    END LOOP;
    DBMS_OUTPUT.put_line(RPAD('COMMIT ' || p_type, 30)
      || ': ' || (DBMS_UTILITY.get_time - l_start)
      || ': ' || (get_waits('log file sync') - l_lfs)
   );
  END;
BEGIN
  do_loop(' ');
  do_loop('WRITE');
  do_loop('WRITE WAIT');
  do_loop('WRITE NOWAIT');
  do_loop('WRITE BATCH');
  do_loop('WRITE IMMEDIATE');
  do_loop('WRITE BATCH WAIT');
  do_loop('WRITE BATCH NOWAIT');
  do_loop('WRITE IMMEDIATE WAIT');
  do_loop('WRITE IMMEDIATE NOWAIT');
END;
/
COMMIT                        : 19: 0
COMMIT WRITE                  : 151: 1000
COMMIT WRITE WAIT             : 150: 1000
COMMIT WRITE NOWAIT           : 20: 0
COMMIT WRITE BATCH            : 151: 1000
COMMIT WRITE IMMEDIATE        : 152: 1000
COMMIT WRITE BATCH WAIT       : 151: 1000
COMMIT WRITE BATCH NOWAIT     : 15: 0
COMMIT WRITE IMMEDIATE WAIT   : 153: 1000
COMMIT WRITE IMMEDIATE NOWAIT : 20: 0
可以看出
1) COMMIT什么参数都不带, 等于NOWAIT, 原因后面讲
2) 参数默认是IMMEDIATE, WAIT
3) BATCH和IMMEDIATE速度差不多(为啥?)
4) WAIT产生等待事件, NOWAIT不产生
5) BATCH+NOWAIT最快, IMMEDIATE+WAIT最慢


3. 系统初始化参数

新增的系统参数是 COMMIT_WRITE
语法: COMMIT_WRITE = '{IMMEDIATE | BATCH},{WAIT |NOWAIT}'
可以在系统级或会话级设置, ALTER SYSTEM, ALTER SESSION
比如:
SQL> show parameter commit

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
commit_point_strength                integer     1
commit_write                         string
max_commit_propagation_delay         integer     0
SQL> alter system set commit_write='batch,nowait';

System altered.

SQL> show parameter commit

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
commit_point_strength                integer     1
commit_write                         string      batch,nowait
max_commit_propagation_delay         integer     0
SQL>

11g取消了COMMIT_WRITE (为了兼容仍保留), 拆分为2个单独的参数 COMMIT_LOGGINGCOMMIT_WAIT, 分别对应 IMMEDIATE | BATCH 和 WAIT | NOWAIT

测试:
conn a/a
SET SERVEROUTPUT ON
DECLARE
  function get_waits(p_event in varchar2) return number
  is
 l_waits  NUMBER;
  begin
 select total_waits
      into l_waits
      from v$session_event
     where event = p_event
       and sid = (select sid from v$mystat where rownum=1);
 return l_waits;
  exception
      when no_data_found then return 0;
  end;
  PROCEDURE do_loop (p_type  IN  VARCHAR2) AS
    l_start  NUMBER;
    l_loops  NUMBER := 1000;
 l_lfs    NUMBER;
  BEGIN
    if p_type is not null then
    EXECUTE IMMEDIATE 'ALTER SESSION SET COMMIT_WRITE=''' || p_type || '''';
 end if;
    EXECUTE IMMEDIATE 'TRUNCATE TABLE commit_test';

 l_lfs := get_waits('log file sync');
    l_start := DBMS_UTILITY.get_time;
    FOR i IN 1 .. l_loops LOOP
      INSERT INTO commit_test (id, description)
      VALUES (i, 'Description for ' || i);
      COMMIT;
    END LOOP;
    DBMS_OUTPUT.put_line(RPAD('COMMIT_WRITE=' || p_type, 30)
      || ': ' || (DBMS_UTILITY.get_time - l_start)
      || ': ' || (get_waits('log file sync') - l_lfs)
   );
  END;
BEGIN
  do_loop(NULL);
  do_loop('WAIT');
  do_loop('NOWAIT');
  do_loop('BATCH');
  do_loop('IMMEDIATE');
  do_loop('BATCH,WAIT');
  do_loop('BATCH,NOWAIT');
  do_loop('IMMEDIATE,WAIT');
  do_loop('IMMEDIATE,NOWAIT');
END;
/
COMMIT_WRITE=                 : 20: 0
COMMIT_WRITE=WAIT             : 151: 1000
COMMIT_WRITE=NOWAIT           : 19: 0
COMMIT_WRITE=BATCH            : 14: 0
COMMIT_WRITE=IMMEDIATE        : 19: 0
COMMIT_WRITE=BATCH,WAIT       : 150: 1000
COMMIT_WRITE=BATCH,NOWAIT     : 15: 0
COMMIT_WRITE=IMMEDIATE,WAIT   : 153: 1000
COMMIT_WRITE=IMMEDIATE,NOWAIT : 20: 0
第因为在第3步设置了NOWAIT, 所以后面第4,5步也继承了这个配置


4. PLSQL中的优化

PL/SQL 会自动将其中的 COMMIT 优化成为"COMMIT WRITE NOWAIT", 只有最后一次 COMMIT 才是真正的"COMMIT"

conn a/a
set serveroutput on size unlimited
truncate table commit_test;
select total_waits
  from v$session_event
 where event = 'log file sync'
   and sid = (select sid from v$mystat where rownum=1);
declare
  l_loops number := 1000;
begin
  FOR i IN 1 .. l_loops LOOP
    INSERT INTO commit_test (id, description)
    VALUES (i, 'Description for ' || i);
    COMMIT;
  END LOOP;
end;
/
select total_waits
  from v$session_event
 where event = 'log file sync'
   and sid = (select sid from v$mystat where rownum=1);
TOTAL_WAITS
-----------
          1

SQL>   2    3    4    5    6    7    8    9   10
PL/SQL procedure successfully completed.

SQL>   2    3    4
TOTAL_WAITS
-----------
          2

只产生了1次等待事件

10gR2版本以前也发现有异步提交, 见The LGWR dilemma



外部链接:
Commit Enhancements in Oracle 10g Database Release 2
10gR2 New Feature: Asynchronous Commit
On setting commit_write
Quantifying Commit Time
Asynchronous Commit - New Feature in Oracle 10GR2 (10.2)
Expert Oracle Database 11g Administration By Sam R. Alapati


-fin-

Thursday, May 14, 2009

Inter-Session Communication DBMS_PIPE 会话间通信 DBMS_PIPE

Inter-Session Communication DBMS_PIPE
会话间通信 DBMS_PIPE


DBMS_PIPE在Oracle7引入, 也是会话间通信的一种, 常用于与外部服务通信,调试PL/SQL程序,或安全审计


1.
DBMS_PIPE与DBMS_ALERT的比较:
1)DBMS_ALERT是单向的. DBMS_PIPE是双向的.
2)DBMS_ALERT是基于事务的, 提交后发出警报. DBMS_PIPE不是基于事务的.
3)DBMS_ALERT一条警报可以有多个接收者, 可以是广播模式的. DBMS_PIPE的一条消息只能被最后等待一个用户接收到.
4)DBMS_ALERT是由DBMS_PIPE和DBMS_LOCK实现的
5)DBMS_ALERT只能发送一条VARCHAR2型字符串, 最多1800个字符. DBMS_PIPE可以发送多条信息,多种数据类型
6)都只能在同一个实例内会话间通信
7)发送的消息不是持久的, 数据库实例关闭, 消息就被清除
8)都是异步的

另参考
Can I do mutlithreaded programming in PLSQL:
dbms_alert is used to send a signal to ALL interested parties. It is transactional (the
signal is not sent until you commit). It "loses" signals -- if I signal the same event 5
times and commit -- only one event might get broadcast. It is like a unix signal in this
way. Dbms_alert is asyncronous -- the sender never gets anything back from the reciever.

dbms_pipe is used to send a message to a SINGLE interested party (although >1 person can
be reading the pipe, only ONE person will get the message). The message is sent
immediately, regardless of your transactions state. It is like a unix pipe. Dbms_pipe
can be syncronous -- i can send a message and get a message back.


2.
参考Oracle文档DBMS_PIPE:

The DBMS_PIPE package lets two or more sessions in the same instance communicate. Oracle pipes are similar in concept to the pipes used in UNIX, but Oracle pipes are not implemented using the operating system pipe mechanisms.
DBMS_PIPE包让处于同一实例中的两个或多个会话进行通信. Oracle管道和UNIX中使用的管道的概念类似, 但不是用操作系统的管道机制实现的.

Pipe functionality has several potential applications:
管道功能有几种可能的应用:

External service interface: You can communicate with user-written services that are external to the RDBMS. This can be done effectively in a shared server process, so that several instances of the service are executing simultaneously. Additionally, the services are available asynchronously. The requestor of the service does not need to block a waiting reply. The requestor can check (with or without time out) at a later time. The service can be written in any of the 3GL languages that Oracle supports.
外部服务接口:能够与RDBMS之外的用户编写的服务进行通信.(略)

Independent transactions: The pipe can communicate to a separate session which can perform an operation in an independent transaction (such as logging an attempted security violation detected by a trigger).
独立的事务:管道能够与在一个独立事务(比如记录由触发器发现的违反安全的尝试)s中运行操作的单独的会话通信

Alerters (non-transactional): You can post another process without requiring the waiting process to poll. If an "after-row" or "after-statement" trigger were to alert an application, then the application would treat this alert as an indication that the data probably changed. The application would then read the data to get the current value. Because this is an "after" trigger, the application would want to do a "SELECT FOR UPDATE" to make sure it read the correct data.
警报(非事务性的):你能够通知另一个进程,无需那个等待的进程去轮询.(略)

Debugging: Triggers and stored procedures can send debugging information to a pipe. Another session can keep reading out of the pipe and display it on the screen or write it to a file.
调试:触发器和存储过程向管道发送调试信息. 另一个会话从管道读取并显示到屏幕或写入文件.

Concentrator: This is useful for multiplexing large numbers of users over a fewer number of network connections, or improving performance by concentrating several user-transactions into one DBMS transaction.
集中器:用于多路复用网络连接数较少的大量用户, 或者将多个用户事务集中到一个DBMS事务中用以改善性能.(这个比较有意思,但到底是什么意思呢?)

-----
Public Pipes
公有管道

You may create a public pipe either implicitly or explicitly. For implicit public pipes, the pipe is automatically created when it is referenced for the first time, and it disappears when it no longer contains data. Because the pipe descriptor is stored in the SGA, there is some space usage overhead until the empty pipe is aged out of the cache.
可以隐式或显式的创建公有管道.隐式公有管道在第一次被引用时自动创建,当不再存在数据时消失.(略)

You create an explicit public pipe by calling the CREATE_PIPE function with the private flag set to FALSE. You must deallocate explicitly-created pipes by calling the REMOVE_PIPE function.
调用CREATE_PIPE加上private标记等于FALSE创建显式公有管道.必须调用REMOVE_PIPE删除显式创建的管道.

The domain of a public pipe is the schema in which it was created, either explicitly or implicitly.
公有管道的域(是啥?)是被创建的用户, 不论显式的或隐式的

-----
Writing and Reading Pipes
读写管道

Each public pipe works asynchronously. Any number of schema users can write to a public pipe, as long as they have EXECUTE permission on the DBMS_PIPE package, and they know the name of the public pipe. However, once buffered information is read by one user, it is emptied from the buffer, and is not available for other readers of the same pipe.
公有管道是异步工作的. 可以有任意数量的用户写一个公有管道, 只要它们拥有DBMS_PIPE的执行权限, 和知道公有管道名. 但是, 缓冲的信息一旦被一个用户读取了, 它就从缓冲区中被清空, 对于其它用户就不可以使用了.

The sending session builds a message using one or more calls to the PACK_MESSAGE procedure. This procedure adds the message to the session's local message buffer. The information in this buffer is sent by calling the SEND_MESSAGE function, designating the pipe name to be used to send the message. When SEND_MESSAGE is called, all messages that have been stacked in the local buffer are sent.
发送会话调用PACK_MESSAGE一次或多次来创建一个信息. 这个过程将消息加到会话本地的信息缓冲区. 调用SEND_MESSAGE函数,指定发送消息的管道名,用以发送缓冲区中的信息. 当SEND_MESSAGE被调用, 堆放在本地缓冲区的所有信息都被发送出去.

A process that wants to receive a message calls the RECEIVE_MESSAGE function, designating the pipe name from which to receive the message. The process then calls the UNPACK_MESSAGE procedure to access each of the items in the message.
想要接受信息的进程调用RECEIVE_MESSAGE函数, 并指定接受信息的管道名. 然后调用UNPACK_MESSAGE过程去访问信息中的每条.

-----
Private Pipes
私有管道

You explicitly create a private pipe by calling the CREATE_PIPE function. Once created, the private pipe persists in shared memory until you explicitly deallocate it by calling the REMOVE_PIPE function. A private pipe is also deallocated when the database instance is shut down.
可以通过调用CREATE_PIPE函数显式的创建一个私有管道. 私有管道一旦被创建, 就持久保存在共享内存中, 直到显式的调用REMOVE_PIPE函数清除它. 当数据库实例关闭时私有管道也被清除.

You cannot create a private pipe if an implicit pipe exists in memory and has the same name as the private pipe you are trying to create. In this case, CREATE_PIPE returns an error.
如果已经有一个隐式的同名管道存在了, 就不能再创建同名的私有管道. 这种情况下, CREATE_PIPE返回一个错误.

Access to a private pipe is restricted to:
访问私有管道有如下限制:
Sessions running under the same userid as the creator of the pipe
在与管道创建者同样的用户的会话下运行.
Stored subprograms executing in the same userid privilege domain as the pipe creator
在与管道创建者同样用户权限域的存储过程下运行
Users connected as SYSDBA
SYSDBA用户

An attempt by any other user to send or receive messages on the pipe, or to remove the pipe, results in an immediate error. Any attempt by another user to create a pipe with the same name also causes an error.

As with public pipes, you must first build your message using calls to PACK_MESSAGE before calling SEND_MESSAGE. Similarly, you must call RECEIVE_MESSAGE to retrieve the message before accessing the items in the message by calling UNPACK_MESSAGE.




另外参考Definitions and Examples of how Oracle Pipes works:

Let's assume we have sessions A, B and C.
假设有会话A,B,C

Case 1
------
Let's say A and B send messages through the pipe and C is waiting on them.
If A sends a message first and then B sends another message, C will receive A's message first and then B's message.
会话A,B通过管道发送信息, 会话C等待信息
如果A首先发送了一条信息, 然后B发送了另一条, C会首先接收到A的信息, 然后是B的.

Case 2
------
Now, Let's put A and B to receive and C to send.
A will wait for message first and B will wait for a message after A did it.
When C sends a message, B is the one who receives it but A will remain waiting.
If C sends another message now A receives the message.
A和B接收, C发送
首先A等待信息, 然后B等待信息.
C发出一条信息, B接收到, A仍然等待.
C再发出一条信息, 这次A接收到

This shows that the queue for waiting a pipe behaves in a "stack" manner. Which ever waits first receives the message last and whichever waits last will receives it first.
这表明等待管道的队列是堆栈的方式. 最先等待的接收到最后的信息, 最后等待的接收到最先的信息.


3.
举例, 也参考前面的连接(Definitions and Examples of how Oracle Pipes works):

创建发送,接收存储过程,和测试表等
/**
 * 发送.
 */
create or replace procedure send_pipe as
  result number;
  my_pipe varchar2(30);
  user user_users%rowtype;
begin
  select * into user from user_users;
  --生成一个管道名, 这个管道将用于传递用户信息等
  my_pipe := dbms_pipe.UNIQUE_SESSION_NAME;
  --将my_pipe管道名发送到'waiter'管道, 'waiter'管道也是隐式创建的
  dbms_pipe.pack_message(my_pipe);
  result := dbms_pipe.send_message('waiter');
  --然后, 将用户信息发送到my_pipe管道中
  -- this message will have 3 parts. A varchar2, a number and a date. 
  dbms_pipe.pack_message(user.USERNAME);
  dbms_pipe.pack_message(user.user_id);
  dbms_pipe.pack_message(user.created);
  result := dbms_pipe.send_message(my_pipe);
end;
/


/**
 * 接收.
 */
create or replace procedure read_pipe as
  result number;
  v varchar2(4000);
  d date;
  n number;
  w raw(32767);
  r rowid;
  his_pipe varchar2(30);
  type_not_handled exception;
begin
  --从'waiter'管道接收信息, 得到对端的my_pipe管道名, 这里叫his_pipe.
  result := dbms_pipe.RECEIVE_MESSAGE('waiter');
  dbms_pipe.unpack_message(his_pipe);
  dbms_output.put_line('Pipe :'||his_pipe);
  --然后, 从his_pipe管道获取并打印接收到的信息
  result := dbms_pipe.RECEIVE_MESSAGE(his_pipe);
  result := dbms_pipe.next_item_type;
  while result <> 0 loop
    dbms_output.put_line('type =>'||to_char(result));
    if result=9 then -- varchar2
      dbms_pipe.unpack_message(v);
      dbms_output.put_line(v);
    elsif result=6 then -- number
      dbms_pipe.unpack_message(n);
      dbms_output.put_line(n);
    elsif result=12 then -- date
      dbms_pipe.unpack_message(d);
      dbms_output.put_line(d);
    elsif result=11 then -- rowid
      dbms_pipe.unpack_message_rowid(r);
      dbms_output.put_line(r);
    elsif result=23 then -- raw
      dbms_pipe.unpack_message_raw(w);
      dbms_output.put_line(w);
    else 
      raise type_not_handled;
    end if;
    result := dbms_pipe.next_item_type;
  end loop;
exception
  when type_not_handled then
    dbms_output.put_line('Type '||to_char(result)||' not handled');
    dbms_pipe.purge(his_pipe);
  when others then
    dbms_output.put_line('error: '||to_char(result));
    dbms_pipe.purge(his_pipe);
end;
/


/**
 * 删除所有的管道.
 */
create or replace procedure remove_all_pipes is
  result number;
begin
  for i in (select * from v$db_pipes) loop
    dbms_output.put('Pipe '||i.name);
    begin
      result := dbms_pipe.remove_pipe(i.name);
      dbms_output.put_line(' removed.');
    exception
      when others then
        dbms_output.put_line(' not removed.');
    end;
  end loop;
end;
/


--建测试表, 其上创建触发器
drop table any_table;
create table any_table(dummy varchar2(30));

create or replace trigger audit_any_table
  after insert on any_table
  for each row
declare
  name varchar2(30);
  result number;
  usid varchar2(30);
begin
  --同样, 先将自己的管道名发送非'waiter'
  usid := dbms_pipe.unique_session_name;
  dbms_pipe.pack_message(usid);
  result := dbms_pipe.send_message('waiter');
  --然后, 将用户信息等发送到自己的管道, 由waiter端接收
  select username into name from user_users;
  dbms_pipe.pack_message(name);
  dbms_pipe.pack_message(:new.dummy);
  result := dbms_pipe.send_message(usid);
end;
/


Case 1
------
Session: A
Explanation: The same user can read from it's own pipe.
Two pipes are created. One is 'waiter' the other has the name of the unique_session_name.

1) A: exec send_pipe
2) A: exec read_pipe
3) A: Select * from v$db_pipes;

同一个会话:
同一个用户能够读取它自己的管道
SQL> exec send_pipe

PL/SQL procedure successfully completed.

SQL> exec read_pipe
Pipe :ORA$PIPE$0659B71A0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

SQL> Select * from v$db_pipes;

   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           ORA$PIPE$0659B71A0001          PUBLIC        1686
           WAITER                         PUBLIC        1671


Case 2
------
Sessions: A & B
Explanation: A sends info, B receives it. No new pipes.

1) A: exec send_pipe
2) B: exec read_pipe
3) A: Select * from v$db_pipes;

会话A发送信息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B接收
SQL> exec read_pipe
Pipe :ORA$PIPE$0659B71A0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

查看管道信息
SQL> Select * from v$db_pipes;

   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           ORA$PIPE$0659B71A0001          PUBLIC        1686
           WAITER                         PUBLIC        1671


Case 3
------
Sessions: A & B
Explanation: A will hold until a message is received.
A new pipe is created.

1) A: exec read_pipe
2) B: exec send_pipe
3) A: Select * from v$db_pipes;

会话A先等待接收
SQL> exec read_pipe
会话B再发送
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B发送时,隐式创建了一个新的管道
Pipe :ORA$PIPE$06573F5C0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

SQL> Select * from v$db_pipes;

   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           ORA$PIPE$0659B71A0001          PUBLIC        1686
           WAITER                         PUBLIC        1671
           ORA$PIPE$06573F5C0001          PUBLIC        1686


Case 4
------
Sessions: A & B
Notes: A must have the table any_table and the trigger audit_any_table in its schema.
Explanation: B will hold until a message from the trigger is recieved which is sent when A inserts a new value. Commit or rollback doesn't have any effect, the message was already been received.
No new pipes.

1) B: exec read_pipe
2) A: insert into any_table values('New Value');
3) A: try both commit and rollback.

会话B等待信息
SQL> exec read_pipe
会话A插入数据,触发触发器发送信息
事务B立即收到信息, 不论(不等)会话A事务提交与否
Pipe :ORA$PIPE$0659B71A0001
type =>9
A
type =>9
New Value

PL/SQL procedure successfully completed.

SQL>
会话A回滚
SQL> rollback;

Rollback complete.


Case 5
------
Sessions: A, B & C
Explanation: On step 3, B will receive the message. On Step 4 A receives it.
A new pipe is created.

1) A: exec read_pipe
2) B: exec read_pipe
3) C: exec send_pipe
4) C: exec send_pipe

会话A等待消息
SQL> exec read_pipe
会话B等待消息
SQL> exec read_pipe
会话C发送消息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B是后启的,先收到消息
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

会话C再次发送消息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

这次会话A收到消息
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.


Case 6
------
Sessions: A, B & C
Explanation: On step 3, B will receive the message. On Step 5 B again receives it.
On step 6 A receives it.

1) A: exec read_pipe
2) B: exec read_pipe
3) C: exec send_pipe
4) B: exec read_pipe
5) C: exec send_pipe
6) C: exec send_pipe

会话A等待消息
SQL> exec read_pipe
会话B等待消息
SQL> exec read_pipe
会话C发送消息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B是后启的,先收到消息
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

会话B再次等待消息
SQL> exec read_pipe
会话C再次发送消息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B还是后启的,又收到了消息
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

会话A仍然在等待消息, 按ctrl-c退出
BEGIN read_pipe; END;

*
ERROR at line 1:
ORA-00604: error occurred at recursive SQL level 1
ORA-01013: user requested cancel of current operation
ORA-06508: PL/SQL: could not find program unit being called: "SYS.DBMS_SYS_ERROR"
ORA-06512: at "SYS.DBMS_PIPE", line 167
ORA-06512: at "SYS.DBMS_PIPE", line 192
ORA-06512: at "A.READ_PIPE", line 46
ORA-06556: the pipe is empty, cannot fulfill the unpack_message request
ORA-06512: at line 1




Case 7
------
Sessions: A, B & C
Explanation: On Step 3, A receives C's message. On Step 4 A received B's Message.

1) C,B: exec dbms_output.put_line(dbms_pipe.unique_session_name)
2) C: exec send_pipe
3) B: exec send_pipe
4) A: exec read_pipe
5) A: exec read_pipe

会话C显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name)
ORA$PIPE$066337EB0001

PL/SQL procedure successfully completed.

会话B显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name)
ORA$PIPE$06573F5C0001

PL/SQL procedure successfully completed.

会话C发送信息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B发送信息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话A接收信息, 收到C发送的信息(先发的信息)
SQL> exec read_pipe
error: 3
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

会话A再次接收信息, 收到了B发送的信息(后发的信息)
SQL> exec read_pipe
Pipe :ORA$PIPE$06573F5C0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.


Case 8
------
Sessions: A,B,C & D
Explanation: On Step 4, B receives D' message. On step 5, A receives C's message.
A new pipe is created.

1) C,D: exec dbms_output.put_line(dbms_pipe.unique_session_name)
2) D: exec send_pipe
3) C: exec send_pipe
4) B: exec read_pipe
5) A: exec read_pipe

会话C显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name)
ORA$PIPE$066337EB0001

PL/SQL procedure successfully completed.

会话D显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name)
ORA$PIPE$06612A790001

PL/SQL procedure successfully completed.

会话D发送信息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话C发送信息
SQL> exec send_pipe

PL/SQL procedure successfully completed.

会话B接收信息, 收到会话D发送的信息
SQL> exec read_pipe
Pipe :ORA$PIPE$06612A790001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.

会话A接收信息, 收到会话C发送的信息
SQL> exec read_pipe
Pipe :ORA$PIPE$066337EB0001
type =>9
A
type =>6
48
type =>12
2008-07-11 09:43:05

PL/SQL procedure successfully completed.


Case 9
------
Session: Any
Explanation: Cleaning up.

1) exec remove_all_pipes
2) select * from v$db_pipes;

查看管道,删除管道
SQL> select * from v$db_pipes;

   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           ORA$PIPE$0659B71A0001          PUBLIC        1686
           WAITER                         PUBLIC        1671
           ORA$PIPE$06573F5C0001          PUBLIC        1686
           ORA$PIPE$06612A790001          PUBLIC        1686
           ORA$PIPE$066337EB0001          PUBLIC        1686

SQL> exec remove_all_pipes
Pipe ORA$PIPE$0659B71A0001 removed.
Pipe WAITER removed.
Pipe ORA$PIPE$06573F5C0001 removed.
Pipe ORA$PIPE$06612A790001 removed.
Pipe ORA$PIPE$066337EB0001 removed.

PL/SQL procedure successfully completed.

SQL> select * from v$db_pipes;

no rows selected



4.
源码:
package dbms_pipe is

-- DE-HEAD     <- tell SED where to cut when generating fixed package

  ------------
  --  OVERVIEW
  --
  --  This package provides a DBMS "pipe" service which allows messages
  --  to be sent between sessions.
  --
  --  The metaphor is similar to UNIX pipes:  you can do
  --    dbms_pipe.send_message(<pipename>)
  --    dbms_pipe.receive_message(<pipename>)
  --  which will cause a message to be sent or received.  You do
  --    dbms_pipe.pack_message(<varchar2>|<number>|<date>)
  --  to pack an item into a static buffer (which will then be sent with
  --  the "send_message" call), and
  --    dbms_pipe.unpack_message(<varchar2>|<number>|<date>)
  --  to get an item out of the static buffer (which is filled by the
  --  "receive_message" call).
  --  Pipes can be private to a user-id - which only allows session connected
  --  under the same user-id or stored procedure owned by the user-id to read
  --  write to the pipe.  Pipes could be public - and all database users with
  --  execute privilege on dbms_pipe and knowledge of the pipe can read or
  --  write to the pipe.
  --
  --  Pipes operate independently of transactions.  They also operate
  --  asynchronously.  There can be multiple readers and writers of the
  --  same pipe.
  --
  --  Pipes only operate between sessions in the same instance.
  --
  --  Pipes can be explicitly created using
  --    dbms_pipe.create_pipe(<pipename>)
  --  and removed using
  --    dbms_pipe.remove_pipe(<pipename>)
  --  A pipe created using the explicit create command should be removed
  --  using the remove function.  A pipe can also be created implicitly.
  --  Pipes automatically come into existence the first time they are
  --  referenced.  They effectively disappear when they contain no more
  --  data (some overhead remains in the SGA until it gets aged out).
  --  Pipes take up space in the SGA (see "maxpipesize" parameter to
  --  "send_message").


  --------
  --  USES
  --
  --  The pipe functionality has several potential applications:
  --
  --    o External service interface.  You can provide the ability to
  --      communicate with (user-written) services that are external to the
  --      RDBMS.  This can be done in a (effectively) multi-threaded manner
  --      so that several instances of the service can be executing
  --      simultaneously. Additionally, the services are available
  --      asynchronously - the requestor of the service need not block
  --      awaiting a reply.  The requestor can check (with or without
  --      timeout) at a later time.  The service can be written in any
  --      of the 3GL languages that ORACLE supports, not just C.  See
  --      example below.
  --    o Independent transactions.  The pipe can be used to communicate
  --      to a separate session which can perform an operation in an
  --      independent transaction (such as logging an attempted security
  --      violation detected by a trigger).
  --    o Alerters (non-transactional).  You can post another process
  --      without requiring the waiting process to poll.  If an "after-row"
  --      or "after-statement" trigger were to alert an application, then
  --      the application would treat this alert as an indication that
  --      the data probably changed.  The application would then go read
  --      the data to get the current value.  Since this is an "after"
  --      trigger, the application would want to do a "select for update"
  --      to make sure it read the correct data.
  --    o Debugging.  Triggers and/or stored procedures can send debugging
  --      information to a pipe.  Another session can keep reading out
  --      of the pipe and displaying it on the screen or writing it
  --      out to a file.
  --    o Concentrator. Useful for multiplexing large numbers of users
  --      over a fewer number of network connections, or improving
  --      performance by concentrating several user-transactions into
  --      one dbms-transaction.


  ------------
  --  SECURITY
  --
  --  Security can be achieved by use of 'grant execute' on the dbms_pipe
  --  package, by creating a pipe using the 'private' parameter in the create
  --  function and by writing cover packages that only expose particular
  --  features or pipenames to particular users or roles.


  ------------
  --  EXAMPLES
  --
  --  External service interface
  ------------------------------
  --
  --  Put the user-written 3GL code into an OCI or Precompiler program.
  --  The program connects to the database and executes PL/SQL code to read
  --  its request from the pipe, computes the result, and then executes
  --  PL/SQL code to send the result on a pipe back to the requestor.
  --  Below is an example of a stock service request.
  --
  --  The recommended sequence for the arguments to pass on the pipe
  --  for all service requests is
  --
  --      protocol_version      varchar2        - '1', 10 bytes or less
  --      returnpipe            varchar2        - 30 bytes or less
  --      service               varchar2        - 30 bytes or less
  --      arg1                  varchar2/number/date
  --         ...
  --      argn                  varchar2/number/date
  --
  --  The recommended format for returning the result is
  --
  --      success               varchar2        - 'SUCCESS' if OK,
  --                                              otherwise error message
  --      arg1                  varchar2/number/date
  --         ...
  --      argn                  varchar2/number/date
  --
  --
  --  The "stock price request server" would do, using OCI or PRO* (in
  --  pseudo-code):
  --
  --    <loop forever>
  --      begin dbms_stock_server.get_request(:stocksymbol); end;
  --      <figure out price based on stocksymbol (probably from some radio
  --            signal), set error if can't find such a stock>
  --      begin dbms_stock_server.return_price(:error, :price); end;
  --
  --  A client would do:
  --
  --    begin :price := stock_request('YOURCOMPANY'); end;
  --
  --  The stored procedure, dbms_stock_server, which is called by the
  --  "stock price request server" above is:
  --
  --    create or replace package dbms_stock_server is
  --      procedure get_request(symbol out varchar2);
  --      procedure return_price(errormsg in varchar2, price in varchar2);
  --    end;
  --
  --    create  or replace package body dbms_stock_server is
  --      returnpipe    varchar2(30);
  --
  --      procedure returnerror(reason varchar2) is
  --        s integer;
  --      begin
  --        dbms_pipe.pack_message(reason);
  --        s := dbms_pipe.send_message(returnpipe);
  --        if s <> 0 then
  --          raise_application_error(-20000, 'Error:' || to_char(s) ||
  --            ' sending on pipe');
  --        end if;
  --      end;
  --
  --      procedure get_request(symbol out varchar2) is
  --        protocol_version varchar2(10);
  --        s                  integer;
  --        service            varchar2(30);
  --      begin
  --        s := dbms_pipe.receive_message('stock_service');
  --        if s <> 0 then
  --          raise_application_error(-20000, 'Error:' || to_char(s) ||
  --            'reading pipe');
  --        end if;
  --        dbms_pipe.unpack_message(protocol_version);
  --        if protocol_version <> '1' then
  --          raise_application_error(-20000, 'Bad protocol: ' ||
  --            protocol_version);
  --        end if;
  --        dbms_pipe.unpack_message(returnpipe);
  --        dbms_pipe.unpack_message(service);
  --        if service != 'getprice' then
  --          returnerror('Service ' || service || ' not supported');
  --        end if;
  --        dbms_pipe.unpack_message(symbol);
  --      end;
  --
  --      procedure return_price(errormsg in varchar2, price in varchar2) is
  --        s integer;
  --      begin
  --        if errormsg is null then
  --          dbms_pipe.pack_message('SUCCESS');
  --          dbms_pipe.pack_message(price);
  --        else
  --          dbms_pipe.pack_message(errormsg);
  --        end if;
  --        s := dbms_pipe.send_message(returnpipe);
  --        if s <> 0 then
  --          raise_application_error(-20000, 'Error:'||to_char(s)||
  --            ' sending on pipe');
  --        end if;
  --      end;
  --    end;
  --
  --
  --  The procedure called by the client is:
  --
  --    create or replace function stock_request (symbol varchar2)
  --        return varchar2 is
  --      s        integer;
  --      price    varchar2(20);
  --      errormsg varchar2(512);
  --    begin
  --      dbms_pipe.pack_message('1');  -- protocol version
  --      dbms_pipe.pack_message(dbms_pipe.unique_session_name); -- return pipe
  --      dbms_pipe.pack_message('getprice');
  --      dbms_pipe.pack_message(symbol);
  --      s := dbms_pipe.send_message('stock_service');
  --      if s <> 0 then
  --        raise_application_error(-20000, 'Error:'||to_char(s)||
  --          ' sending on pipe');
  --      end if;
  --      s := dbms_pipe.receive_message(dbms_pipe.unique_session_name);
  --      if s <> 0 then
  --        raise_application_error(-20000, 'Error:'||to_char(s)||
  --          ' receiving on pipe');
  --      end if;
  --      dbms_pipe.unpack_message(errormsg);
  --      if errormsg <> 'SUCCESS' then
  --        raise_application_error(-20000, errormsg);
  --      end if;
  --      dbms_pipe.unpack_message(price);
  --      return price;
  --    end;
  --
  --  You would typically only grant execute on 'dbms_stock_service' to
  --  the stock service application server, and would only grant execute
  --  on 'stock_request' to those users allowed to use the service.


  ---------------------
  --  SPECIAL CONSTANTS
  --
  maxwait   constant integer := 86400000; /* 1000 days */
  --  The maximum time to wait attempting to send or receive a message


  ----------------------------
  --  PROCEDURES AND FUNCTIONS
  --
  procedure pack_message(item in varchar2 character set any_cs);
  pragma restrict_references(pack_message,WNDS,RNDS);
  procedure pack_message(item in number);
  pragma restrict_references(pack_message,WNDS,RNDS);
  procedure pack_message(item in date);
  pragma restrict_references(pack_message,WNDS,RNDS);
  procedure pack_message_raw(item in raw);
  pragma restrict_references(pack_message_raw,WNDS,RNDS);
  procedure pack_message_rowid(item in rowid);
  pragma restrict_references(pack_message_rowid,WNDS,RNDS);
  --  Pack an item into the message buffer
  --  Input parameters:
  --    item
  --      Item to pack into the local message buffer.
  --  Exceptions:
  --    ORA-06558 generated if message buffer overflows (currently 4096
  --    bytes).  Each item in the buffer takes one byte for the type,
  --    two bytes for the length, plus the actual data.  There is also one
  --    byte needed to terminate the message.
  --
  procedure unpack_message(item out varchar2 character set any_cs);
  pragma restrict_references(unpack_message,WNDS,RNDS);
  procedure unpack_message(item out number);
  pragma restrict_references(unpack_message,WNDS,RNDS);
  procedure unpack_message(item out date);
  pragma restrict_references(unpack_message,WNDS,RNDS);
  procedure unpack_message_raw(item out raw);
  pragma restrict_references(unpack_message_raw,WNDS,RNDS);
  procedure unpack_message_rowid(item out rowid);
  pragma restrict_references(unpack_message_rowid,WNDS,RNDS);
  --  Unpack an item from the local message buffer
  --  Output parameters:
  --    item
  --      The argument to receive the next unpacked item from the local
  --      message buffer.
  --  Exceptions:
  --    ORA-06556 or 06559 are generated if the buffer contains
  --    no more items, or if the item is not of the same type as that
  --    requested (see 'next_item_type' below).
  --
  function next_item_type return integer;
  pragma restrict_references(next_item_type,WNDS,RNDS);
  --  Get the type of the next item in the local message buffer
  --  Return value:
  --    Type of next item in buffer:
  --        0    no more items
  --        9    varchar2
  --        6    number
  --       11    rowid
  --       12    date
  --       23    raw
  --
  function create_pipe(pipename in varchar2,
                  maxpipesize in integer default 8192,
                  private in boolean default TRUE)
    return integer;
  pragma restrict_references(create_pipe,WNDS,RNDS);
  --  Create an empty pipe with the given name.
  --  Input parameters:
  --    pipename
  --      Name of pipe to be created.  WARNING: Do not use pipe names
  --      beginning with 'ORA$'.  These are reserved for use by procedures
  --      provided by Oracle Corporation.  Pipename should not be longer than
  --      128 bytes, and is case_insensitive.  At this time, the name cannot
  --      contain NLS characters.
  --    maxpipesize
  --      Maximum allowed size for the pipe.  The total size of all the
  --      messages on the pipe cannot exceed this amount.  The maxpipesize
  --      for a pipe becomes part of the pipe and persists for the lifetime
  --      of the pipe.  Callers of send_message with larger values will
  --      cause the maxpipesize to be increased.  Callers with a smaller
  --      value will just use the larger value.  The specification of
  --      maxpipesize here allows us to avoid its use in future send_message
  --      calls.
  --    private
  --      Boolean indicating whether the pipe will be private - and for the
  --      use of the creating user-id, or public.  A private pipe can be used
  --      directly through calls to this package by sessions connected to the
  --      database as the same user as the one that created the pipe.  It can
  --      also be used via stored procedures owned by the user that created
  --      the pipe.  The procedure may be executed by anyone with execute
  --      privilege on it.  A public pipe can be accessed by anyone who has
  --      knowledge of it and execute privilege on dbms_pipe.
  --  Return values:
  --    0 - Success.  This is returned even if the pipe had been created in
  --        mode that permits its use by the user executing the create call.
  --        If a pipe already existed, it is not emptied.
  --  Exceptions:
  --    Null pipe name.
  --    Permission error.  Pipe with the same name already exists and
  --      you are not allowed to use it.
  --
  function remove_pipe(pipename in varchar2)
    return integer;
  pragma restrict_references(remove_pipe,WNDS,RNDS);
  --  Remove the named pipe.
  --  Input Parameters:
  --    pipename
  --      Name of pipe to remove.
  --  Return value:
  --    0 - Success. Calling remove on a pipe that does not exist returns 0.
  --  Exceptions:
  --    Null pipe name.
  --    Permission error.  Insufficient privilege to remove pipe.  The
  --      pipe was created and is owned by someone else.
  --
  function send_message(pipename in varchar2,
                        timeout in integer default maxwait,
                        maxpipesize in integer default 8192)
    return integer;
  pragma restrict_references(send_message,WNDS,RNDS);
  --  Send a message on the named pipe.  The message is contained in the
  --    local message buffer which was filled with calls to 'pack_message'.
  --    A pipe could have been created explicitly using 'create_pipe', or
  --    it will be created implicitly.
  --  Input parameters:
  --    pipename
  --      Name of pipe to place the message on.  The message is copied
  --      from the local buffer which can be filled by the "pack_message"
  --      routine.  WARNING:  Do not use pipe names beginning with 'ORA$'.
  --      These names are reserved for use by procedures provided by
  --      Oracle Corporation.  Pipename should not be longer than 128 bytes,
  --      and is case_insensitive.  At this time, the name cannot
  --      contain NLS characters.
  --    timeout
  --      Time to wait while attempting to place a message on a pipe, in
  --      seconds (see return codes below).
  --    maxpipesize
  --      Maximum allowed size for the pipe.  The total size of all the
  --      messages on the pipe cannot exceed this amount.  If this message
  --      would exceed this amount the call will block.  The maxpipesize
  --      for a pipe becomes part of the pipe and persists for the lifetime
  --      of the pipe.  Callers of send_message with larger values will
  --      cause the maxpipesize to be increased.  Callers with a smaller
  --      value will just use the larger value.  The specification of
  --      maxpipesize here allows us to avoid the use of a "open_pipe" call.
  --  Return value:
  --    0 - Success
  --    1 - Timed out (either because can't get lock on pipe or pipe stays
  --        too full)
  --    3 - Interrupted
  --  Exceptions:
  --    Null pipe name.
  --    Permission error.  Insufficient privilege to write to the pipe.
  --      The pipe is private and owned by someone else.
  function receive_message(pipename in varchar2,
                           timeout in integer default maxwait)
    return integer;
  pragma restrict_references(receive_message,WNDS,RNDS);
  --  Receive a message from the named pipe.  Copy the message into the
  --    local message buffer.  Use 'unpack_message' to access the
  --    individual items in the message.  The pipe can be created explicitly
  --    using the 'create_pipe' function or it will be created implicitly.
  --  Input parameters:
  --    pipename
  --      Name of pipe from which to retrieve a message.  The message is
  --      copied into a local buffer which can be accessed by the
  --      "unpack_message" routine.  WARNING:  Do not use pipe names
  --      beginning with 'ORA$'.  These names are reserved for use by
  --      procedures provided by Oracle Corporation. Pipename should not be
  --      longer than 128 bytes, and is case-insensitive.  At this time,
  --      the name cannot contain NLS characters.
  --    timeout
  --      Time to wait for a message.  A timeout of 0 allows you to read
  --      without blocking.
  --  Return value:
  --    0 - Success
  --    1 - Timed out
  --    2 - Record in pipe too big for buffer (should not happen).
  --    3 - Interrupted
  --  Exceptions:
  --    Null pipe name.
  --    Permission error.  Insufficient privilege to remove the record
  --      from the pipe.  The pipe is owned by someone else.
  procedure reset_buffer;
  pragma restrict_references(reset_buffer,WNDS,RNDS);
  --  Reset pack and unpack positioning indicators to 0.  Generally this
  --    routine is not needed.
  --
  procedure purge(pipename in varchar2);
  pragma restrict_references(purge,WNDS,RNDS);
  --  Empty out the named pipe.  An empty pipe is a candidate for LRU
  --    removal from the SGA, therefore 'purge' can be used to free all
  --    memory associated with a pipe.
  --  Input Parameters:
  --    pipename
  --      Name of pipe from which to remove all messages.  The local
  --      buffer may be overwritten with messages as they are discarded.
  --      Pipename should not be longer than 128 bytes, and is
  --      case-insensitive.
  --  Exceptions:
  --    Permission error if pipe belongs to another user.
  --
  function unique_session_name return varchar2;
  pragma restrict_references(unique_session_name,WNDS,RNDS,WNPS);
  --  Get a name that is unique among all sessions currently connected
  --    to this database.  Multiple calls to this routine from the same
  --    session will always return the same value.
  --  Return value:
  --    A unique name.  The returned name can be up to 30 bytes.
  --

  pragma TIMESTAMP('2000-06-09:14:30:00');

end;

-- CUT_HERE    <- tell sed where to chop off the rest


可以看到,调用的其实都是C的接口
PACKAGE BODY dbms_pipe IS
  PACKBUF   CHAR(4096) := 'a';
  UNPACKBUF CHAR(4096) := 'a';
  PACKPOS   BINARY_INTEGER := 0;
  UNPACKPOS BINARY_INTEGER := 2000000000;


PROCEDURE SENDPIPE(PIPENAME IN VARCHAR2, POS IN BINARY_INTEGER,
  BUFFER IN OUT NOCOPY CHAR, MAXPIPESIZE IN BINARY_INTEGER,
  TIMEOUT IN BINARY_INTEGER, RETVAL OUT BINARY_INTEGER);
PRAGMA INTERFACE (C, SENDPIPE);


PROCEDURE RECEIVEPIPE(PIPENAME IN VARCHAR2, BUFFER IN OUT NOCOPY CHAR,
  TIMEOUT IN BINARY_INTEGER, RETVAL OUT BINARY_INTEGER);
PRAGMA INTERFACE (C, RECEIVEPIPE);


PROCEDURE COPYINTOBUF(A IN VARCHAR2 CHARACTER SET ANY_CS,
  POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR);
PRAGMA INTERFACE (C, COPYINTOBUF);


PROCEDURE COPYINTOBUF(A IN NUMBER, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN OUT NOCOPY CHAR);
PRAGMA INTERFACE (C, COPYINTOBUF);


PROCEDURE COPYINTOBUF(A IN DATE, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN OUT NOCOPY CHAR);
PRAGMA INTERFACE (C, COPYINTOBUF);


PROCEDURE COPYFROMBUF(A OUT VARCHAR2 CHARACTER SET ANY_CS,
  POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR);
PRAGMA INTERFACE (C, COPYFROMBUF);


PROCEDURE COPYFROMBUF(A OUT NUMBER, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN CHAR);
PRAGMA INTERFACE (C, COPYFROMBUF);


PROCEDURE COPYFROMBUF(A OUT DATE, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN CHAR);
PRAGMA INTERFACE (C, COPYFROMBUF);


FUNCTION GETTYPEFROMBUF(POS IN BINARY_INTEGER, BUF IN CHAR)
  RETURN BINARY_INTEGER;
PRAGMA INTERFACE (C, GETTYPEFROMBUF);


PROCEDURE COPYINTOBUFBINARY(A IN RAW, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN OUT NOCOPY CHAR);
PRAGMA INTERFACE (C, COPYINTOBUFBINARY);


PROCEDURE COPYINTOBUFROWID(A IN ROWID, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN OUT NOCOPY CHAR);
PRAGMA INTERFACE (C, COPYINTOBUFROWID);


PROCEDURE COPYFROMBUFBINARY(A OUT RAW , POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN CHAR);
PRAGMA INTERFACE (C, COPYFROMBUFBINARY);


PROCEDURE COPYFROMBUFROWID(A OUT ROWID, POS IN OUT NOCOPY BINARY_INTEGER,
  BUF IN CHAR);
PRAGMA INTERFACE (C, COPYFROMBUFROWID);


PROCEDURE CREATEPIPE(PIPENAME IN VARCHAR2, MAXPIPESIZE IN BINARY_INTEGER,
  PRIVATE IN BOOLEAN, RETVAL OUT BINARY_INTEGER);
PRAGMA INTERFACE (C, CREATEPIPE);


PROCEDURE REMOVEPIPE(PIPENAME IN VARCHAR2, RETVAL OUT BINARY_INTEGER);
PRAGMA INTERFACE (C, REMOVEPIPE);


FUNCTION UNIQUE_SESSION_ID RETURN VARCHAR2;
PRAGMA INTERFACE (C, UNIQUE_SESSION_ID);


PROCEDURE PACK_MESSAGE(ITEM IN VARCHAR2 CHARACTER SET ANY_CS) IS
BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END;


PROCEDURE PACK_MESSAGE_RAW(ITEM IN RAW) IS
BEGIN COPYINTOBUFBINARY(ITEM, PACKPOS, PACKBUF); END;


PROCEDURE PACK_MESSAGE_ROWID(ITEM IN ROWID) IS
BEGIN COPYINTOBUFROWID(ITEM, PACKPOS, PACKBUF); END;


PROCEDURE PACK_MESSAGE(ITEM IN NUMBER) IS
BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END;


PROCEDURE PACK_MESSAGE(ITEM IN DATE) IS
BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END;


PROCEDURE UNPACK_MESSAGE(ITEM OUT VARCHAR2 CHARACTER SET ANY_CS) IS
BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END;


PROCEDURE UNPACK_MESSAGE_RAW(ITEM OUT RAW) IS
BEGIN COPYFROMBUFBINARY(ITEM, UNPACKPOS, UNPACKBUF); END;


PROCEDURE UNPACK_MESSAGE_ROWID(ITEM OUT ROWID) IS
BEGIN COPYFROMBUFROWID(ITEM, UNPACKPOS, UNPACKBUF); END;


PROCEDURE UNPACK_MESSAGE(ITEM OUT NUMBER) IS
BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END;


PROCEDURE UNPACK_MESSAGE(ITEM OUT DATE) IS
BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END;


FUNCTION NEXT_ITEM_TYPE RETURN INTEGER IS
  INTERNAL_TYPE BINARY_INTEGER;
BEGIN
  INTERNAL_TYPE :=  GETTYPEFROMBUF(UNPACKPOS, UNPACKBUF);
  IF INTERNAL_TYPE = 1 THEN
    RETURN 9;
  ELSIF INTERNAL_TYPE = 2 THEN
    RETURN 6;
  ELSE
    RETURN INTERNAL_TYPE;
  END IF;
END;


FUNCTION CREATE_PIPE(PIPENAME IN VARCHAR2,
  MAXPIPESIZE IN INTEGER DEFAULT 8192,
  PRIVATE IN BOOLEAN DEFAULT TRUE)
RETURN INTEGER IS
  RETVAL BINARY_INTEGER;
  MPS    BINARY_INTEGER := MAXPIPESIZE;
  PVT    BOOLEAN := PRIVATE;
BEGIN
  IF PIPENAME IS NULL THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null');
  END IF;
  CREATEPIPE(PIPENAME, MPS, PVT, RETVAL);
  IF RETVAL = 4 THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322,
      'Insufficient privilege to access pipe');
  END IF;
  RETURN RETVAL;
END;


FUNCTION REMOVE_PIPE(PIPENAME IN VARCHAR2)
RETURN INTEGER IS
  RETVAL BINARY_INTEGER;
BEGIN
  IF PIPENAME IS NULL THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null');
  END IF;
  REMOVEPIPE(PIPENAME, RETVAL);
  IF RETVAL = 4 THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322,
      'Insufficient privilege to access pipe');
  END IF;
  RETURN RETVAL;
END;


FUNCTION SEND_MESSAGE(PIPENAME IN VARCHAR2,
  TIMEOUT IN INTEGER DEFAULT MAXWAIT,
  MAXPIPESIZE IN INTEGER DEFAULT 8192)
RETURN INTEGER IS
  RETVAL BINARY_INTEGER;
  MPS    BINARY_INTEGER := MAXPIPESIZE;
  TMO    BINARY_INTEGER := TIMEOUT;
BEGIN
  IF PIPENAME IS NULL THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null');
  END IF;
  SENDPIPE(PIPENAME, PACKPOS, PACKBUF, MPS, TMO, RETVAL);
  IF RETVAL = 0 THEN
    PACKPOS := 0;
  END IF;
  IF RETVAL = 4 THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322,
      'Insufficient privilege to access pipe');
  END IF;
RETURN RETVAL;
END;


FUNCTION RECEIVE_MESSAGE(PIPENAME IN VARCHAR2,
  TIMEOUT IN INTEGER DEFAULT MAXWAIT)
RETURN INTEGER IS
  RETVAL BINARY_INTEGER;
  TMO    BINARY_INTEGER := TIMEOUT;
BEGIN
  IF PIPENAME IS NULL THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null');
  END IF;
  RECEIVEPIPE(PIPENAME, UNPACKBUF, TMO, RETVAL);
  IF RETVAL = 0 THEN
    UNPACKPOS := 0;
  ELSE
    UNPACKPOS := 2000000000;
  END IF;
  IF RETVAL = 4 THEN
    DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322,
      'Insufficient privilege to access pipe');
  END IF;
RETURN RETVAL;
END;


PROCEDURE RESET_BUFFER IS
BEGIN
  UNPACKPOS := 0;
  PACKPOS := 0;
END;


PROCEDURE PURGE(PIPENAME IN VARCHAR2) IS
  RETVAL BINARY_INTEGER;
BEGIN
  LOOP
    RETVAL := RECEIVE_MESSAGE(PIPENAME, 0);
    IF RETVAL <> 0 THEN
      EXIT;
    END IF;
  END LOOP;
END;


FUNCTION UNIQUE_SESSION_NAME RETURN VARCHAR2 IS
BEGIN
  RETURN ('ORA$PIPE$' || UNIQUE_SESSION_ID);
END;


END;



外部链接:




PACKAGE DBMS_PIPE Specification

When Do DBMS_PIPE Connections Get Closed
管道处于非活动状态或没有消息达到一段时间, 会被从共享池中自动清除, 这时再操作管道就会报错, 通常是ORA-20011
即使是显式创建的管道, 也可能会被自动清除
唯一解决办法是删除并重建

Dynamic SQL and System Commands Using DBMS_PIPE
使用管道调用SQL语句或外部命令

COMMONLY ASKED QUESTIONS ABOUT DBMS_PIPE PACKAGE
使用DBMS_PIPE比用轮询更有效, 但需测试
管道中的数据不是持久性的, 随着数据库实例关闭而丢失

Example of DBMS_PIPE() with Pro*C on Unix
Pro*C程序使用DBMS_PIPE管道

PLSQL: Example use of DBMS_PIPE for Debugging
使用DBMS_PIPE管道取代DBMS_OUTPUT来调试程序

How To Use the PIPE Option with RMAN
使用管道向RMAN发送命令和接收返回信息

What is the difference between PIPE and QUEUE for profile Concurrent:TM Transport Type ?

DBMS_PIPE: Communicating Between Sessions





-fin-

Friday, May 8, 2009

Inter-Session Communication DBMS_ALERT 会话间通信 DBMS_ALERT

Inter-Session Communication DBMS_ALERT
会话间通信 DBMS_ALERT

提问:如何让程序获知数据被改变?
回答:
1.轮询表, 查询count(*)等查看记录是否增加了
2.轮询审计表(audit table), 查询count(*)或时间戳字段, 得知表是否有更新
3.DBMS_ALERT
4.DBMS_AQ ...



dbms_alert首次出现在Oracle版本7中, 是数据库会话间通信的一种实现方式,
提供了一种向多用户广播数据库事件(database events)即警报(alerts)的机制
是依赖dbms_pipe和dbms_lock实现的
可用于, 有时想监控数据库表的变化, 应用程序不得不反复查询, 造成开销很大
dbms_alert的机制使得当发生变化时, 数据库可以主动的去通知应用程序




参考文档:13 DBMS_ALERT:

Alerts are transaction-based. This means that the waiting session is not alerted until the transaction signalling the alert commits. There can be any number of concurrent signalers of a given alert, and there can be any number of concurrent waiters on a given alert.
警报是基于事务的. 这意味着直到发起警报的会话提交了事务,等待警报的会话才收到警报. 一个指定的警报可以同时有任意多个发起者, 也可以有任意多个接受者.

A waiting application is blocked in the database and cannot do any other work.
等待中的应用程序被阻塞, 不能做其它操作

An application can register for multiple events and can then wait for any of them to occur using the WAITANY procedure.
一个应用程序可以注册多个事件, 然后使用WAITANY存储过程, 等待它们中任意一个(就是多对多的关系, 一个警报可以有多个接收者, 一个接收者可以接收多个警报)

An application can also supply an optional timeout parameter to the WAITONE or WAITANY procedures. A timeout of 0 returns immediately if there is no pending alert.
一个应用程序也可以对WAITONE或WAITANY存储过程指定一个超时参数. 如果没有待决的警报, 超时参数为0导致立即返回.

The signalling session can optionally pass a message that is received by the waiting session.
发信号的会话可以向等待会话传递一个消息.

Alerts can be signalled more often than the corresponding application wait calls. In such cases, the older alerts are discarded. The application always gets the latest alert (based on transaction commit times).
发出的警报可以多于应用程序的等待调用. 在这种情况下, 旧的警报被丢弃. 应用程序总是得到最新的警报(基于事务提交的时间).

If the application does not require transaction-based alerts, the DBMS_PIPE package may provide a useful alternative.
如果应用程序不需要基于事务的警报, 那么DBMS_PIPE包可供选择.

If the transaction is rolled back after the call to SIGNAL, no alert occurs.
如果在调用SIGNAL后事务回滚了, 没有警报发生

It is possible to receive an alert, read the data, and find that no data has changed. This is because the data changed after the prior alert, but before the data was read for that prior alert.
有可能收到警报,再去读数据,发现数据没有改变. 这是因为数据是在发出警报后,读数据前改变的(这怎么可能? 先发警报再改数据? 发警报可能比事务提交要快?)

Usually, Oracle is event-driven; this means that there are no polling loops. There are two cases where polling loops can occur:
通常, Oracle是事件驱动的; 这意味着没有轮询循环. 轮训循环有两种情况:

Shared mode. If your database is running in shared mode, a polling loop is required to check for alerts from another instance. The polling loop defaults to one second and can be set by the SET_DEFAULTS procedure.
共享模式. 如果你的数据库运行于共享模式, 就需要轮询循环检查另一个实例的警报. 默认轮询周期是1秒钟, 可以通过SET_DEFAULT存储过程设置.

WAITANY procedure. If you use the WAITANY procedure, and if a signalling session does a signal but does not commit within one second of the signal, a polling loop is required so that this uncommitted alert does not camouflage other alerts. The polling loop begins at a one second interval and exponentially backs off to 30-second intervals.
WAITANY过程. 如果使用了WAITANY过程, 并且如果发出了一个警报,在1秒内没有提交事务的话, 需要一个轮询以便这个未提交的警报不会阻挡住别的警报. 轮询间隔以1秒开始, 然后以指数增长直到30秒.


测试:


1. 授权
conn / as sysdba
grant execute on dbms_alert to a;


2.
新打开一个会话, 接收警报
conn a/a
set pages 50000 line 130
set serveroutput on size unlimited
注册一个警报
exec dbms_alert.register('alert_test');


3.
查看警报信息
SYS用户运行
SQL> select * from dbms_alert_info;

NAME                           SID                            C
------------------------------ ------------------------------ -
MESSAGE
--------------------------------------------------------------------------------
ALERT_TEST                     065C00C00001                   N



SQL> desc dbms_alert_info
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 NAME                                      NOT NULL VARCHAR2(30)
 SID                                       NOT NULL VARCHAR2(30)
 CHANGED                                            VARCHAR2(1)
 MESSAGE                                            VARCHAR2(1800)

SQL>
DBMS_ALERT_INFO
NAME : 警报名. 可以有同名的, 也就是说可以有多个会话注册和接收同一个警报
SID : 等于DBMS_SESSION.UNIQUE_SESSION_ID. 分为3个部分, 是 SID + SERIAL# + InstanceNumber
How Is SID In DBMS_ALERT_INFO Related To SID In V$Session
select name
       , to_number(substr(sid,1,4),'xxxx') sid
       , to_number(substr(sid,5,4),'xxxx') serial#
       , to_number(substr(sid,9,4),'xxxx') instance#
  from dbms_alert_info;
NAME                                  SID    SERIAL#  INSTANCE#
------------------------------ ---------- ---------- ----------
ALERT_TEST                           1628        192          1
CHANGED: N:没有发出警报或警报已被接收 Y:警报已发出还未被接收
MESSAGE: 警报消息


4.
注册完后, 等待接收警报
var v_messge varchar2(1000)
var v_status number
exec dbms_alert.waitone('alert_test', :v_messge, :v_status);
print :v_status :v_messge
调用waitone后没有返回, 一直在等待警报


5.
打开另一个会话, 发出警报
conn a/a
set pages 50000 line 130
set serveroutput on size unlimited
exec dbms_alert.signal('alert_test', 'hello world!');
commit;
事务提交后才发出警报


6.
等待接收警报的会话接收到了警报, 打印出信息
SQL> print :v_status :v_messge

  V_STATUS
----------
         0


V_MESSGE
----------------------------------------------------------------------------------------------------------------------------------
hello world!


查询dbms_alert_info信息
SQL> select * from dbms_alert_info;

NAME                           SID                            C
------------------------------ ------------------------------ -
MESSAGE
--------------------------------------------------------------------------------
ALERT_TEST                     065C00C00001                   N
hello world!


这里CHANGED还是N, 因为警报发出后立刻被收到了
如果发出的警报没有被收到, CHANGED是Y


7. 问题:注册时挂起
如果注册警报调用dbms_alert.register时就停住了,没有返回
说明之前有别的会话发出了警告,但没有提交事务, 这样注册过程就被阻塞住了
只要提交或回滚该事务即可解决

参考:
PL/SQL Session Hangs When Executing "Dbms_alert.Register(''Varchar'')" Statement
DBMS_ALERT.WAITONE HANGS WAITING ON DBMS_ALERT.SIGNAL


8. 多次警报
exec dbms_alert.signal('alert_test', 'hello one')
commit;
exec dbms_alert.signal('alert_test', 'hello two')
commit;

SQL> select * from dbms_alert_info;

NAME                           SID                            C
------------------------------ ------------------------------ -
MESSAGE
--------------------------------------------------------------------------------
ALERT_TEST                     065C00C00001                   Y
hello two


看到只有最新的一条警报生效

接收警报
SQL> exec dbms_alert.waitone('alert_test', :v_messge, :v_status);

PL/SQL procedure successfully completed.

SQL> print :v_status :v_messge

  V_STATUS
----------
         0


V_MESSGE
----------------------------------------------------------------------------------------------------------------------------------
hello two

也是最新一条

接收后CHANGED字段会变为N


9. 删除警报
删除之前定义的警报
必须在注册该警报的会话上运行
exec dbms_alert.remove('alert_test')

如果注册警报的会话退出了, 用其它会话删除不掉, dbms_alert_info表中还存在该记录
SQL> select * from dbms_alert_info;

NAME                           SID                            C
------------------------------ ------------------------------ -
MESSAGE
--------------------------------------------------------------------------------
ALERT_TEST                     065C00C00001                   N
hello 2


参考文档 How To Remove Alerts From DBMS_ALERTS_INFO Table 上说重新注册,再删除就能删掉
因为前一个会话已经退出了, 这时重新注册oracle会自动删除原记录并增加一条新的(或者说是覆盖), 所以可以删除


10. 删除管道
虽然删除了警报, 但是, dbms_alert创建的管道仍在系统中, 没有删除. 查看v$db_pipes:
SQL> col name for a40
SQL> select * from v$db_pipes;

   OWNERID NAME                                     TYPE     PIPE_SIZE
---------- ---------------------------------------- ------- ----------
           ORA$ALERT$065C00C20001                   PUBLIC        1687
           ORA$ALERT$066300730001                   PUBLIC        1687
           ORA$ALERT$066C021C0001                   PUBLIC        1687
           ORA$ALERT$066C021A0001                   PUBLIC        1687
           ORA$ALERT$065D00670001                   PUBLIC        1687

SQL>
这是因为DBMS_ALERT使用的是隐式管道, 使用DBMS_PIPE.PURGE清空, 并没有直接删除, 等待系统自动删除
可用dbms_pipe.remove_pipe将其删掉
declare
  v int;
begin
  for x in (select name from v$db_pipes where name like 'ORA$ALERT$%')
  loop
    v := dbms_pipe.remove_pipe(x.name);
  end loop;
end;
/


11. 源码
package dbms_alert is

  ------------
  --  OVERVIEW
  --
  --  This package provides support for the asynchronous (as opposed to
  --  polling) notification of database events.  By appropriate use of
  --  this package and database triggers, an application can cause itself
  --  to be notified whenever values of interest in the database are
  --  changed.
  --
  --  For example, suppose a graphics tool is displaying a graph of some
  --  data from a database table.  The graphics tool can, after reading and
  --  graphing the data, wait on a database alert ('dbms_alert.waitone')
  --  covering the data just read.  The tool will automatically wake up when
  --  the data is changed by any other user.  All that is required is that a
  --  trigger be placed on the database table which then performs a signal
  --  ('dbms_alert.signal') whenever the trigger is fired.
  --
  --  Alerts are transaction based.  This means that the waiting session
  --  does not get alerted until the transaction signalling the alert commits.
  --
  --  There can be any number of concurrent signallers of a given alert, and
  --  there can be any number of concurrent waiters on a given alert.
  --
  --  A waiting application will be blocked in the database and cannot do
  --  any other work.
  --
  --  Most of the calls in the package, except for 'signal', do commits.
  --

  -----------
  --  EXAMPLE
  --
  --  Suppose the application wishes to graph average salaries, say by
  --  department, for all employees.  So the application needs to know
  --  whenever 'emp' is changed.  The application would look like this:
  --
  --      dbms_alert.register('emp_table_alert');
  --    readagain:
  --      
  --      dbms_alert.waitone('emp_table_alert', :message, :status);
  --      if status = 0 then goto readagain; else 
  --
  --  The 'emp' table would have a trigger similar to the following:
  --
  --    create trigger emptrig after insert or update or delete on emp
  --    begin
  --      dbms_alert.signal('emp_table_alert', 'message_text');
  --    end;
  --
  --  When the application is no longer interested in the alert, it does
  --    dbms_alert.remove('emp_table_alert');
  --  This is important since it reduces the amount of work required by
  --  the alert signaller.
  --
  --  If a session exits (or dies) while there exist registered alerts,
  --  they will eventually be cleaned up by future users of this package.
  --
  --  The above example guarantees that the application will always see
  --  the latest data, although it may not see every intermediate value.


  --------------
  --  VARIATIONS
  --
  --  The application can register for multiple events and can then wait for
  --  any of them to occur using the 'waitany' call.
  --
  --  An application can also supply an optional 'timeout' parameter to the
  --  'waitone' or 'waitany' calls.  A 'timeout' of 0 returns immediately
  --  if there is no pending alert.
  --
  --  The signalling session can optionally pass a message which will be
  --  received by the waiting session.
  --
  --  Alerts may be signalled more often than the corresponding application
  --  'wait' calls.  In such cases the older alerts are discaded.  The
  --  application always gets the latest alert (based on transaction commit
  --  times).
  --
  --  If the application does not require transaction based alerts, then the
  --  'dbms_pipe' package may provide a useful alternative
  --
  --  If the transaction is rolled back after the call to 'dbms_alert.signal',
  --  no alert will occur.
  --
  --  It is possible to receive an alert, read the data, and find that no
  --  data has changed.  This is because the data changed after the *prior*
  --  alert, but before the data was read for that *prior* alert.


  --------------------------
  --  IMPLEMENTATION DETAILS
  --
  --  In most cases the implementation is event-driven, i.e., there are no
  --  polling loops.  There are two cases where polling loops can occur:
  --
  --    1) Parallel mode.  If your database is running parallel mode then
  --       a polling loop is required to check for alerts from another
  --       instance.  The polling loop defaults to one second and is settable
  --       by the 'set_defaults' call.
  --    2) Waitany call.  If you use the 'waitany' call, and a signalling
  --       session does a signal but does not commit within one second of the
  --       signal, then a polling loop is required so that this uncommitted
  --       alert does not camouflage other alerts.  The polling loop begins
  --       at a one second interval and exponentially backs off to 30 second
  --       intervals.
  --
  --  This package uses the dbms_lock package (for synchronization between
  --  signallers and waiters) and the dbms_pipe package (for asynchronous
  --  event dispatching).

  -------------------------------------------------------
  --  INTERACTION WITH MULTI-THREADED AND PARALLEL SERVER
  --
  --  When running with the parallel server AND multi-threaded server, a
  --  multi-threaded (dispatcher) "shared server" will be bound to a
  --  session (and therefore not shareable) during the time a session has
  --  any alerts "registered", OR from the time a session "signals" an
  --  alert until the time the session commits.  Therefore, applications
  --  which register for alerts should use "dedicated servers" rather than
  --  connecting through the dispatcher (to a "shared server") since
  --  registration typically lasts for a long time, and applications which
  --  cause "signals" should have relatively short transactions so as not
  --  to tie up "shared servers" for too long.

  ------------
  --  SECURITY
  --
  --  Security on this package may be controlled by granting execute on
  --  this package to just those users or roles that you trust.  You may
  --  wish to write a cover package on top of this one which restricts
  --  the alertnames used.  Execute privilege on this cover package can
  --  then be granted rather than on this package.


  -------------
  --  RESOURCES
  --
  --  This package uses one database pipe and two locks for each alert a
  --  session has registered.


  ---------------------
  --  SPECIAL CONSTANTS
  --
  maxwait constant integer :=  86400000; -- 1000 days
  --  The maximum time to wait for an alert (essentially forever).


  ----------------------------
  --  PROCEDURES AND FUNCTIONS
  --
  procedure set_defaults(sensitivity in number);
  --  Set various defaults for this package.
  --  Input parameters:
  --    sensitivity
  --      In case a polling loop is required (see "Implementation Details"
  --      above), this is the time to sleep between polls.  Deafult is 5 sec.
  --
  procedure register(name in varchar2);
  --  Register interest in an alert.  A session may register interest in
  --    an unlimited number of alerts.  Alerts should be de-registered when
  --    the session no longer has any interest (see 'remove').  This call
  --    always performs a 'commit'.
  --  Input parameters:
  --    name
  --      The name of the alert in which this session is interested.
  --      WARNING:  Alert names beginning with 'ORA$' are reserved for use for
  --      products provided by Oracle Corporation.  Name must be 30 bytes
  --      or less.  The name is case-insensitive.
  --
  procedure remove(name in varchar2);
  --  Remove alert from registration list.  Do this when the session is no
  --    longer interested in an alert.  Removing an alert is important
  --    since it will reduce the amount of work done by signalers of the alert.
  --    If a session dies without removing the alert, that alert will
  --    eventually (but not immediately) be cleaned up.  This call always
  --    performs a commit.
  --  Input parameters:
  --    name
  --      The name of the alert to be removed from registration list. The
  --      name is case-insensitive.
  --
  procedure removeall;
  --  Remove all alerts for this session from registration list.  Do this
  --    when the session is no longer interested in any alerts.  Removing
  --    alerts is important since it will reduce the amount of work done
  --    by signalers of the alert.  If a session dies without removing all
  --    of its alerts, the alerts will eventually (but not immediately)
  --    be cleaned up.  This call always performs a commit.
  --
  --    This procedure is called automatically upon first reference to this
  --    package during a session.  Therefore no alerts from prior sessions
  --    which may have terminated abnormally can affect this session.
  procedure waitany(name out varchar2,
                    message out varchar2,
                    status out integer,
                    timeout in number default maxwait);
  --  Wait for an alert to occur for any of the alerts for which this
  --    session is registered.  Although probably unusual, the same session
  --    that waits for the alert may also first signal the alert.  In this
  --    case remember to commit after the signal and prior to the wait.
  --    Otherwise a lock request exception (status 4) will occur.  This
  --    call always performs a commit.
  --  Input parameters:
  --    timeout
  --      The maximum time to wait for an alert.  If no alert occurs before
  --      timeout seconds, then this call will return with status of 1.
  --  Output parameters:
  --    name
  --      The name of the alert that occurred, in uppercase.
  --    message
  --      The message associated with the alert.  This is the message
  --      provided by the 'signal' call.  Note that if multiple signals
  --      on this alert occurred before the waitany call, then the message
  --      will correspond to the most recent signal call.  Messages from
  --      prior signal calls will be discarded.
  --    status
  --      0 - alert occurred
  --      1 - timeout occurred
  --  Errors raised:
  --    -20000, ORU-10024: there are no alerts registered.
  --       Cause: You must register an alert before waiting.
  --
  procedure waitone(name in varchar2,
                    message out varchar2,
                    status out integer,
                    timeout in number default maxwait);
  --  Wait for specified alert to occur. If the alert was signalled since
  --    the register or last waitone/waitany, then this call will return
  --    immediately.  The same session that waits for the alert may also
  --    first signal the alert.  In this case remember to commit after the
  --    signal and prior to the wait.  Otherwise a lock request exception
  --    (status 4) will occur.  This call always performs a commit.
  --  Input parameters:
  --    name
  --      The name of the alert to wait for. The name is case-insensitive.
  --    timeout
  --      The maximum time to wait for this alert.  If no alert occurs before
  --      timeout seconds, then this call will return with status of 1.
  --      If the named alert has not been registered then the this call
  --      will return after the timeout period expires.
  --  Output parameters:
  --    message
  --      The message associated with the alert.  This is the message
  --      provided by the 'signal' call.  Note that if multiple signals
  --      on this alert occurred before the waitone call, then the message
  --      will correspond to the most recent signal call.  Messages from
  --      prior signal calls will be discarded.  The message may be up to
  --      1800 bytes.
  --    status
  --      0 - alert occurred
  --      1 - timeout occurred
  --
  procedure signal(name in varchar2,
                   message in varchar2);
  --  Signal an alert.
  --  Input parameters:
  --    name
  --      Name of the alert to signal.  The effect of the signal call only
  --      occurs when the transaction in which it is made commits.  If the
  --      transaction rolls back, then the effect of the signal call is as
  --      if it had never occurred.  All sessions that have registered
  --      interest in this alert will be notified.  If the interested sessions
  --      are currently waiting, they will be awakened.  If the interested
  --      sessions are not currently waiting, then they will be notified the
  --      next time they do a wait call.  Multiple sessions may concurrently
  --      perform signals on the same alert.  However the first session
  --      will block concurrent sessions until the first session commits.
  --      Name must be 30 bytes or less. It is case-insensitive.  This call
  --      does not perform a commit.
  --    message
  --      Message to associate with this alert.  This will be passed to
  --      the waiting session.  The waiting session may be able to avoid
  --      reading the database after the alert occurs by using the
  --      information in this message.  The message must be 1800 bytes or less.

end;

PACKAGE BODY dbms_alert IS
  P_INT           NUMBER         := 5;
  THIS_SESSION_ID   VARCHAR2(30)   := DBMS_SESSION.UNIQUE_SESSION_ID;
  PARALLEL          BOOLEAN        := DBMS_UTILITY.IS_CLUSTER_DATABASE;
  SIGPIPE           VARCHAR2(30)   := 'ORA$ALERT$' || THIS_SESSION_ID;
  MSGSEQ            BINARY_INTEGER := 0;
  FIRSTREGISTER     BOOLEAN        := TRUE;
  INSTANTIATING_PKG BOOLEAN        := TRUE;


FUNCTION MINIMUM(V1 NUMBER, V2 NUMBER) RETURN NUMBER IS
BEGIN
  IF V1 < V2 THEN
    RETURN V1;
  ELSE
    RETURN V2;
  END IF;
END;


PROCEDURE SET_DEFAULTS(SENSITIVITY IN NUMBER) IS
BEGIN
  IF SENSITIVITY >= 0 THEN
    P_INT := SENSITIVITY;
  END IF;
END;


PROCEDURE REGISTER(NAME IN VARCHAR2) IS
  STATUS  INTEGER;
  LSTATUS INTEGER;
  LOCKID  INTEGER;
  CURSOR  C1 IS
    SELECT DISTINCT SUBSTR(KGLNAOBJ,11) SID FROM X$KGLOB
     WHERE KGLHDNSP = 7
     AND   KGLNAOBJ LIKE 'ORA$ALERT$%'
     AND   BITAND(KGLHDFLG,128)!=0
    UNION
    SELECT DISTINCT SID FROM DBMS_ALERT_INFO;
BEGIN
  IF INSTANTIATING_PKG THEN
    REMOVEALL;
    INSTANTIATING_PKG := FALSE;
  END IF;
  IF (FIRSTREGISTER) THEN
    IF DBMS_UTILITY.IS_CLUSTER_DATABASE THEN
      FOR REC IN C1 LOOP
        LOCKID := DBMS_UTILITY.GET_HASH_VALUE(REC.SID, 2000002048, 2048);
        LSTATUS := DBMS_LOCK.REQUEST(LOCKID, DBMS_LOCK.X_MODE,
                     TIMEOUT => 0, RELEASE_ON_COMMIT => TRUE);
        IF LSTATUS = 0 THEN
          DBMS_PIPE.PURGE('ORA$ALERT$' || REC.SID);
          DELETE DBMS_ALERT_INFO WHERE SID = REC.SID;
          COMMIT;
        ELSIF LSTATUS NOT IN (1,2,4) THEN
          RAISE_APPLICATION_ERROR(-20000,
            'ORU-10025: lock request error, status: ' || TO_CHAR(LSTATUS));
        END IF;
      END LOOP;
      LSTATUS := DBMS_LOCK.REQUEST(DBMS_UTILITY.GET_HASH_VALUE(THIS_SESSION_ID,
        2000002048,
        2048),
        DBMS_LOCK.S_MODE, TIMEOUT => 60);
      IF LSTATUS != 0  AND LSTATUS != 4 THEN
        RAISE_APPLICATION_ERROR(-20000,
          'ORU-10021: lock request error, status: ' || TO_CHAR(LSTATUS));
      END IF;
    ELSE
      FOR REC IN C1 LOOP
        IF NOT DBMS_SESSION.IS_SESSION_ALIVE(REC.SID) THEN
          DBMS_PIPE.PURGE('ORA$ALERT$' || REC.SID);
          DELETE DBMS_ALERT_INFO WHERE SID = REC.SID;
          COMMIT;
        END IF;
      END LOOP;
    END IF;
    FIRSTREGISTER := FALSE;
  END IF;
  STATUS := DBMS_LOCK.REQUEST(DBMS_UTILITY.GET_HASH_VALUE(UPPER(NAME),
    2000000000, 2048), DBMS_LOCK.X_MODE,
    DBMS_LOCK.MAXWAIT, RELEASE_ON_COMMIT => TRUE);
  IF STATUS != 0 THEN
    RAISE_APPLICATION_ERROR(-20000,
      'ORU-10002: lock request error, status: ' || TO_CHAR(STATUS));
  END IF;
  INSERT INTO DBMS_ALERT_INFO VALUES (UPPER(REGISTER.NAME), THIS_SESSION_ID,
    'N', NULL);
  COMMIT;
EXCEPTION
  WHEN DUP_VAL_ON_INDEX THEN COMMIT;
END;


PROCEDURE REMOVE(NAME IN VARCHAR2) IS
BEGIN
  IF INSTANTIATING_PKG THEN
    REMOVEALL;
    INSTANTIATING_PKG := FALSE;
  END IF;
  DELETE FROM DBMS_ALERT_INFO
   WHERE NAME  = UPPER(REMOVE.NAME)
     AND SID   = THIS_SESSION_ID;
  COMMIT;
END;


PROCEDURE PIPE_WAIT(MAXTIME NUMBER, CUMTIME IN OUT NUMBER) IS
  STATUS INTEGER;
  TMO    NUMBER := MAXTIME;
BEGIN
  IF PARALLEL THEN
    TMO := MINIMUM(TMO, P_INT);
  END IF;
  IF TMO = MAXWAIT THEN
    TMO := DBMS_PIPE.MAXWAIT;
  END IF;
  STATUS := DBMS_PIPE.RECEIVE_MESSAGE(SIGPIPE, TMO);
  IF STATUS = 1 THEN
    CUMTIME := CUMTIME + TMO;
    RETURN;
  END IF;
  IF STATUS <> 0 THEN
    RAISE_APPLICATION_ERROR(-20000, 'ORU-10015: error:' || TO_CHAR(STATUS)
      || ' waiting for pipe message.');
  END IF;
  RETURN;
END;


PROCEDURE OPTIMISTIC(
  NAME    OUT VARCHAR2,
  MESSAGE OUT VARCHAR2,
  STATUS  OUT INTEGER)
IS
  LOCKID  INTEGER;
  LSTATUS INTEGER;
  CURSOR  C1 IS
    SELECT NAME FROM DBMS_ALERT_INFO
     WHERE SID = THIS_SESSION_ID
     AND   CHANGED = 'Y';
BEGIN
  STATUS := 1;
  FOR REC IN C1 LOOP
    LOCKID := DBMS_UTILITY.GET_HASH_VALUE(REC.NAME, 2000000000, 2048);
    LSTATUS := DBMS_LOCK.REQUEST(LOCKID, DBMS_LOCK.SX_MODE, TIMEOUT => 0,
      RELEASE_ON_COMMIT => TRUE);
    IF LSTATUS <> 1 THEN
      IF LSTATUS <> 0 THEN
        RAISE_APPLICATION_ERROR(-20000, 'ORU-10019: error ' ||
          TO_CHAR(LSTATUS) || ' on lock request.');
      END IF;
      UPDATE DBMS_ALERT_INFO SET CHANGED = 'N'
       WHERE SID = THIS_SESSION_ID
       AND   NAME = REC.NAME;
      SELECT MESSAGE INTO MESSAGE FROM DBMS_ALERT_INFO
       WHERE SID = THIS_SESSION_ID
       AND   NAME = REC.NAME;
      COMMIT;
      DBMS_PIPE.PURGE(SIGPIPE);
      NAME := REC.NAME;
      STATUS := 0;
      RETURN;
    END IF;
  END LOOP;
  RETURN;
END;


PROCEDURE WAITANY(
  NAME    OUT VARCHAR2,
  MESSAGE OUT VARCHAR2,
  STATUS  OUT INTEGER,
  TIMEOUT IN  NUMBER    DEFAULT MAXWAIT)
IS
  WAITIME  NUMBER        := 0;
  CUMTIME  NUMBER        := 0;
  LOCKID   INTEGER;
  ST       INTEGER;
  LSTATUS  INTEGER;
  TIMEDOUT BOOLEAN;
  CHANGED  VARCHAR2(1);
  FOUNDONE BOOLEAN;
  CURSOR   C1 IS
    SELECT NAME FROM DBMS_ALERT_INFO
     WHERE SID = THIS_SESSION_ID;
BEGIN
  IF INSTANTIATING_PKG THEN
    REMOVEALL;
    INSTANTIATING_PKG := FALSE;
  END IF;
  OPTIMISTIC(NAME, MESSAGE, ST);
  IF ST = 0 THEN
    STATUS := ST;
    RETURN;
  END IF;
  WAITIME := 1;
  CUMTIME := 0;
  LOOP
    TIMEDOUT := FALSE;
    FOUNDONE := FALSE;
    FOR REC IN C1 LOOP
      FOUNDONE := TRUE;
      LOCKID := DBMS_UTILITY.GET_HASH_VALUE(REC.NAME, 2000000000, 2048);
      LSTATUS := DBMS_LOCK.REQUEST(LOCKID, DBMS_LOCK.SX_MODE, WAITIME,
        RELEASE_ON_COMMIT => TRUE);
      IF LSTATUS = 1 THEN
        OPTIMISTIC(NAME, MESSAGE, ST);
        IF ST = 0 THEN
          STATUS := 0;
          RETURN;
        END IF;
        CUMTIME := CUMTIME + WAITIME;
        IF CUMTIME >= TIMEOUT THEN
          STATUS := 1;
          RETURN;
        END IF;
        TIMEDOUT := TRUE;
        GOTO CONTINUE;
      ELSIF LSTATUS <> 0 THEN
        RAISE_APPLICATION_ERROR(-20000,
          'ORU-10020: error ' || TO_CHAR(LSTATUS) || ' on lock request.');
      ELSE
        SELECT CHANGED, MESSAGE INTO CHANGED, MESSAGE FROM DBMS_ALERT_INFO
         WHERE SID = THIS_SESSION_ID
         AND   NAME = REC.NAME;
        IF CHANGED = 'Y' THEN
          UPDATE DBMS_ALERT_INFO SET CHANGED = 'N'
           WHERE SID = THIS_SESSION_ID
           AND   NAME = REC.NAME;
          COMMIT;
          NAME := REC.NAME;
          STATUS := 0;
          DBMS_PIPE.PURGE(SIGPIPE);
          RETURN;
        END IF;
        LSTATUS := DBMS_LOCK.RELEASE(LOCKID);
      END IF;
      <<continue>>
      NULL;
    END LOOP;
    IF NOT FOUNDONE THEN
      RAISE_APPLICATION_ERROR(-20000,
        'ORU-10024: there are no alerts registered.');
    END IF;
    IF TIMEDOUT THEN
      WAITIME := MINIMUM(WAITIME*2, 32);
      WAITIME := MINIMUM(WAITIME, TIMEOUT-CUMTIME);
    ELSE
      PIPE_WAIT(TIMEOUT-CUMTIME, CUMTIME);
    END IF;
    IF CUMTIME >= TIMEOUT THEN
      STATUS := 1;
      RETURN;
    END IF;
  END LOOP;
END;


PROCEDURE WAITONE(
  NAME    IN  VARCHAR2,
  MESSAGE OUT VARCHAR2,
  STATUS  OUT INTEGER,
  TIMEOUT IN  NUMBER    DEFAULT MAXWAIT)
IS
  CUMTIME NUMBER  := 0;
  LOCKID  INTEGER := DBMS_UTILITY.GET_HASH_VALUE(UPPER(NAME),
    2000000000, 2048);
  LSTATUS INTEGER;
BEGIN
  IF INSTANTIATING_PKG THEN
    REMOVEALL;
    INSTANTIATING_PKG := FALSE;
  END IF;
  LOOP
    LSTATUS := DBMS_LOCK.REQUEST(LOCKID, DBMS_LOCK.SX_MODE, TIMEOUT-CUMTIME,
      RELEASE_ON_COMMIT => TRUE);
    IF LSTATUS = 1 THEN
      STATUS := 1;
      RETURN;
    END IF;
    IF LSTATUS = 4 THEN
      RAISE_APPLICATION_ERROR(-20000,
        'ORU-10037: attempting to wait on uncommitted signal from same session');
    END IF;
    IF LSTATUS <> 0 THEN
      RAISE_APPLICATION_ERROR(-20000,
        'ORU-10023: error ' || TO_CHAR(LSTATUS) || ' on lock request.');
    END IF;
    UPDATE DBMS_ALERT_INFO SET CHANGED = 'N'
     WHERE NAME    = UPPER(WAITONE.NAME)
       AND SID     = THIS_SESSION_ID
       AND CHANGED = 'Y';
    IF SQL%ROWCOUNT != 0 THEN
      SELECT MESSAGE INTO MESSAGE FROM DBMS_ALERT_INFO
       WHERE NAME    = UPPER(WAITONE.NAME)
         AND SID     = THIS_SESSION_ID;
      COMMIT;
      DBMS_PIPE.PURGE(SIGPIPE);
      STATUS := 0;
      RETURN;
    END IF;
    LSTATUS := DBMS_LOCK.RELEASE(LOCKID);
    PIPE_WAIT(TIMEOUT, CUMTIME);
    IF CUMTIME >= TIMEOUT THEN
      STATUS := 1;
      RETURN;
    END IF;
  END LOOP;
END;


PROCEDURE SIGNAL_PIPE(PIPENAME VARCHAR2) IS
  MSGID    VARCHAR2(40);
  TMPMSGID VARCHAR2(40);
  STATUS   INTEGER;
BEGIN
  MSGID := THIS_SESSION_ID || ':' || TO_CHAR(MSGSEQ);
  MSGSEQ := MSGSEQ + 1;
  DBMS_PIPE.PACK_MESSAGE(MSGID);
  STATUS := DBMS_PIPE.SEND_MESSAGE(PIPENAME);
  IF STATUS <> 0 THEN
    RAISE_APPLICATION_ERROR(-20000,
      'ORU-10016: error:' || TO_CHAR(STATUS) || ' sending on pipe ' ||
      PIPENAME);
  END IF;
  STATUS := DBMS_PIPE.RECEIVE_MESSAGE(PIPENAME, 0);
  IF STATUS = 1 THEN
    RETURN;
  END IF;
  IF STATUS <> 0 THEN
    RAISE_APPLICATION_ERROR(-20000,
      'ORU-10017: error:' || TO_CHAR(STATUS) || ' receiving on pipe ' ||
      PIPENAME);
  END IF;
  DBMS_PIPE.UNPACK_MESSAGE(TMPMSGID);
  IF TMPMSGID = MSGID THEN
    DBMS_PIPE.PACK_MESSAGE(MSGID);
    STATUS := DBMS_PIPE.SEND_MESSAGE(PIPENAME);
    IF STATUS <> 0 THEN
      RAISE_APPLICATION_ERROR(-20000,
        'ORU-10018: error:' || TO_CHAR(STATUS) || ' sending on pipe ' ||
        PIPENAME);
    END IF;
  END IF;
END;


PROCEDURE SIGNAL(NAME IN VARCHAR2, MESSAGE IN VARCHAR2) IS
  STATUS  INTEGER;
  CURSOR  C2(ALERTNAME VARCHAR2) IS
    SELECT SID FROM DBMS_ALERT_INFO
     WHERE NAME = UPPER(ALERTNAME);
BEGIN
  STATUS := DBMS_LOCK.REQUEST(DBMS_UTILITY.GET_HASH_VALUE(UPPER(NAME),
    2000000000, 2048), DBMS_LOCK.S_MODE,
    DBMS_LOCK.MAXWAIT, RELEASE_ON_COMMIT => TRUE);
  IF STATUS != 0  AND STATUS != 4 THEN
    RAISE_APPLICATION_ERROR(-20000,
      'ORU-10001: lock request error, status: ' || TO_CHAR(STATUS));
  END IF;
  UPDATE DBMS_ALERT_INFO SET CHANGED = 'Y', MESSAGE = SIGNAL.MESSAGE
   WHERE NAME = UPPER(SIGNAL.NAME);
  IF DBMS_UTILITY.IS_CLUSTER_DATABASE THEN
    FOR REC IN C2(NAME) LOOP
      STATUS := DBMS_LOCK.REQUEST(DBMS_UTILITY.GET_HASH_VALUE(REC.SID,
        2000002048,
        2048),
        DBMS_LOCK.SX_MODE, TIMEOUT => 0,
        RELEASE_ON_COMMIT => TRUE);
      IF STATUS = 0 THEN
        DBMS_PIPE.PURGE('ORA$ALERT$' || REC.SID);
        STATUS := DBMS_LOCK.RELEASE(DBMS_UTILITY.GET_HASH_VALUE(REC.SID,
          2000002048,
          2048));
      ELSE
        IF STATUS != 1 AND STATUS != 4 THEN
          RAISE_APPLICATION_ERROR(-20000,
            'ORU-10022: lock request error, status: ' || TO_CHAR(STATUS));
        END IF;
        SIGNAL_PIPE('ORA$ALERT$' || REC.SID);
      END IF;
    END LOOP;
  ELSE
    FOR REC IN C2(NAME) LOOP
      IF  NOT DBMS_SESSION.IS_SESSION_ALIVE(REC.SID) THEN
        DBMS_PIPE.PURGE('ORA$ALERT$' || REC.SID);
      ELSE
        SIGNAL_PIPE('ORA$ALERT$' || REC.SID);
      END IF;
    END LOOP;
  END IF;
END;


PROCEDURE REMOVEALL IS
BEGIN
  DELETE FROM DBMS_ALERT_INFO WHERE SID = THIS_SESSION_ID;
  DBMS_PIPE.PURGE(SIGPIPE);
  COMMIT;
END;


END;



12.并发性
从源码可以看出, DBMS_ALERT是基于数据库表(DBMS_ALERT_INFO)和DBMS_LOCK, DBMS_PIPE实现的的
由于对表DBMS_ALERT_INFO操作前加上了锁, 是按警告的名称来申请锁的, 事务提交后才释放, 所以对警告的操作是串行的

比如, 有一个通知数据修改的触发器, 如果多个会话同时修改了数据, 同时触发, 发出警报. 这其中只能有一个会话申请到锁, 其它会话被阻塞. 申请到锁的会话发出警报, 释放锁后, 其它会话中一个会话申请锁成功,
剩下的会话还是被阻塞, ..., 依此类推
可以用DBMS_JOB解决此问题, DBMS_JOB是一种常见的, 使操作并行化的小技巧

但是, 即便解决了串行化问题, 大量并发的情况下, 频繁操作DBMS_ALERT_INFO表, 也会带来性能上的问题





外部链接:
dbms_alert
DBMS_ALERT: Broadcasting Alerts to Users
Using DBMS_ALERT To Notify Sessions Of Database Events
PACKAGE DBMS_ALERT Specification

Telling a Forms Application that a change has been made on the database.
介绍了如何让程序获知数据被改变的一些方法


这讲的是EnterpriseDB, 不知道是个啥数据库, 好像是Postgres什么一类的, 跟Oracle很像
DBMS_PIPE & DBMS_ALERT In EnterpriseDB




-fin-
Website Analytics

Followers