会话间通信 DBMS_PIPE
DBMS_PIPE在Oracle7引入, 也是会话间通信的一种, 常用于与外部服务通信,调试PL/SQL程序,或安全审计
1.
DBMS_PIPE与DBMS_ALERT的比较:
1)DBMS_ALERT是单向的. DBMS_PIPE是双向的.
2)DBMS_ALERT是基于事务的, 提交后发出警报. DBMS_PIPE不是基于事务的.
3)DBMS_ALERT一条警报可以有多个接收者, 可以是广播模式的. DBMS_PIPE的一条消息只能被最后等待一个用户接收到.
4)DBMS_ALERT是由DBMS_PIPE和DBMS_LOCK实现的
5)DBMS_ALERT只能发送一条VARCHAR2型字符串, 最多1800个字符. DBMS_PIPE可以发送多条信息,多种数据类型
6)都只能在同一个实例内会话间通信
7)发送的消息不是持久的, 数据库实例关闭, 消息就被清除
8)都是异步的
另参考
Can I do mutlithreaded programming in PLSQL:
dbms_alert is used to send a signal to ALL interested parties. It is transactional (the
signal is not sent until you commit). It "loses" signals -- if I signal the same event 5
times and commit -- only one event might get broadcast. It is like a unix signal in this
way. Dbms_alert is asyncronous -- the sender never gets anything back from the reciever.
dbms_pipe is used to send a message to a SINGLE interested party (although >1 person can
be reading the pipe, only ONE person will get the message). The message is sent
immediately, regardless of your transactions state. It is like a unix pipe. Dbms_pipe
can be syncronous -- i can send a message and get a message back.
2.
参考Oracle文档DBMS_PIPE:
The DBMS_PIPE package lets two or more sessions in the same instance communicate. Oracle pipes are similar in concept to the pipes used in UNIX, but Oracle pipes are not implemented using the operating system pipe mechanisms.
DBMS_PIPE包让处于同一实例中的两个或多个会话进行通信. Oracle管道和UNIX中使用的管道的概念类似, 但不是用操作系统的管道机制实现的.
Pipe functionality has several potential applications:
管道功能有几种可能的应用:
External service interface: You can communicate with user-written services that are external to the RDBMS. This can be done effectively in a shared server process, so that several instances of the service are executing simultaneously. Additionally, the services are available asynchronously. The requestor of the service does not need to block a waiting reply. The requestor can check (with or without time out) at a later time. The service can be written in any of the 3GL languages that Oracle supports.
外部服务接口:能够与RDBMS之外的用户编写的服务进行通信.(略)
Independent transactions: The pipe can communicate to a separate session which can perform an operation in an independent transaction (such as logging an attempted security violation detected by a trigger).
独立的事务:管道能够与在一个独立事务(比如记录由触发器发现的违反安全的尝试)s中运行操作的单独的会话通信
Alerters (non-transactional): You can post another process without requiring the waiting process to poll. If an "after-row" or "after-statement" trigger were to alert an application, then the application would treat this alert as an indication that the data probably changed. The application would then read the data to get the current value. Because this is an "after" trigger, the application would want to do a "SELECT FOR UPDATE" to make sure it read the correct data.
警报(非事务性的):你能够通知另一个进程,无需那个等待的进程去轮询.(略)
Debugging: Triggers and stored procedures can send debugging information to a pipe. Another session can keep reading out of the pipe and display it on the screen or write it to a file.
调试:触发器和存储过程向管道发送调试信息. 另一个会话从管道读取并显示到屏幕或写入文件.
Concentrator: This is useful for multiplexing large numbers of users over a fewer number of network connections, or improving performance by concentrating several user-transactions into one DBMS transaction.
集中器:用于多路复用网络连接数较少的大量用户, 或者将多个用户事务集中到一个DBMS事务中用以改善性能.(这个比较有意思,但到底是什么意思呢?)
-----
Public Pipes
公有管道
You may create a public pipe either implicitly or explicitly. For implicit public pipes, the pipe is automatically created when it is referenced for the first time, and it disappears when it no longer contains data. Because the pipe descriptor is stored in the SGA, there is some space usage overhead until the empty pipe is aged out of the cache.
可以隐式或显式的创建公有管道.隐式公有管道在第一次被引用时自动创建,当不再存在数据时消失.(略)
You create an explicit public pipe by calling the CREATE_PIPE function with the private flag set to FALSE. You must deallocate explicitly-created pipes by calling the REMOVE_PIPE function.
调用CREATE_PIPE加上private标记等于FALSE创建显式公有管道.必须调用REMOVE_PIPE删除显式创建的管道.
The domain of a public pipe is the schema in which it was created, either explicitly or implicitly.
公有管道的域(是啥?)是被创建的用户, 不论显式的或隐式的
-----
Writing and Reading Pipes
读写管道
Each public pipe works asynchronously. Any number of schema users can write to a public pipe, as long as they have EXECUTE permission on the DBMS_PIPE package, and they know the name of the public pipe. However, once buffered information is read by one user, it is emptied from the buffer, and is not available for other readers of the same pipe.
公有管道是异步工作的. 可以有任意数量的用户写一个公有管道, 只要它们拥有DBMS_PIPE的执行权限, 和知道公有管道名. 但是, 缓冲的信息一旦被一个用户读取了, 它就从缓冲区中被清空, 对于其它用户就不可以使用了.
The sending session builds a message using one or more calls to the PACK_MESSAGE procedure. This procedure adds the message to the session's local message buffer. The information in this buffer is sent by calling the SEND_MESSAGE function, designating the pipe name to be used to send the message. When SEND_MESSAGE is called, all messages that have been stacked in the local buffer are sent.
发送会话调用PACK_MESSAGE一次或多次来创建一个信息. 这个过程将消息加到会话本地的信息缓冲区. 调用SEND_MESSAGE函数,指定发送消息的管道名,用以发送缓冲区中的信息. 当SEND_MESSAGE被调用, 堆放在本地缓冲区的所有信息都被发送出去.
A process that wants to receive a message calls the RECEIVE_MESSAGE function, designating the pipe name from which to receive the message. The process then calls the UNPACK_MESSAGE procedure to access each of the items in the message.
想要接受信息的进程调用RECEIVE_MESSAGE函数, 并指定接受信息的管道名. 然后调用UNPACK_MESSAGE过程去访问信息中的每条.
-----
Private Pipes
私有管道
You explicitly create a private pipe by calling the CREATE_PIPE function. Once created, the private pipe persists in shared memory until you explicitly deallocate it by calling the REMOVE_PIPE function. A private pipe is also deallocated when the database instance is shut down.
可以通过调用CREATE_PIPE函数显式的创建一个私有管道. 私有管道一旦被创建, 就持久保存在共享内存中, 直到显式的调用REMOVE_PIPE函数清除它. 当数据库实例关闭时私有管道也被清除.
You cannot create a private pipe if an implicit pipe exists in memory and has the same name as the private pipe you are trying to create. In this case, CREATE_PIPE returns an error.
如果已经有一个隐式的同名管道存在了, 就不能再创建同名的私有管道. 这种情况下, CREATE_PIPE返回一个错误.
Access to a private pipe is restricted to:
访问私有管道有如下限制:
Sessions running under the same userid as the creator of the pipe
在与管道创建者同样的用户的会话下运行.
Stored subprograms executing in the same userid privilege domain as the pipe creator
在与管道创建者同样用户权限域的存储过程下运行
Users connected as SYSDBA
SYSDBA用户
An attempt by any other user to send or receive messages on the pipe, or to remove the pipe, results in an immediate error. Any attempt by another user to create a pipe with the same name also causes an error.
As with public pipes, you must first build your message using calls to PACK_MESSAGE before calling SEND_MESSAGE. Similarly, you must call RECEIVE_MESSAGE to retrieve the message before accessing the items in the message by calling UNPACK_MESSAGE.
另外参考Definitions and Examples of how Oracle Pipes works:
Let's assume we have sessions A, B and C.
假设有会话A,B,C
Case 1
------
Let's say A and B send messages through the pipe and C is waiting on them.
If A sends a message first and then B sends another message, C will receive A's message first and then B's message.
会话A,B通过管道发送信息, 会话C等待信息
如果A首先发送了一条信息, 然后B发送了另一条, C会首先接收到A的信息, 然后是B的.
Case 2
------
Now, Let's put A and B to receive and C to send.
A will wait for message first and B will wait for a message after A did it.
When C sends a message, B is the one who receives it but A will remain waiting.
If C sends another message now A receives the message.
A和B接收, C发送
首先A等待信息, 然后B等待信息.
C发出一条信息, B接收到, A仍然等待.
C再发出一条信息, 这次A接收到
This shows that the queue for waiting a pipe behaves in a "stack" manner. Which ever waits first receives the message last and whichever waits last will receives it first.
这表明等待管道的队列是堆栈的方式. 最先等待的接收到最后的信息, 最后等待的接收到最先的信息.
3.
举例, 也参考前面的连接(Definitions and Examples of how Oracle Pipes works):
创建发送,接收存储过程,和测试表等
/** * 发送. */ create or replace procedure send_pipe as result number; my_pipe varchar2(30); user user_users%rowtype; begin select * into user from user_users; --生成一个管道名, 这个管道将用于传递用户信息等 my_pipe := dbms_pipe.UNIQUE_SESSION_NAME; --将my_pipe管道名发送到'waiter'管道, 'waiter'管道也是隐式创建的 dbms_pipe.pack_message(my_pipe); result := dbms_pipe.send_message('waiter'); --然后, 将用户信息发送到my_pipe管道中 -- this message will have 3 parts. A varchar2, a number and a date. dbms_pipe.pack_message(user.USERNAME); dbms_pipe.pack_message(user.user_id); dbms_pipe.pack_message(user.created); result := dbms_pipe.send_message(my_pipe); end; / /** * 接收. */ create or replace procedure read_pipe as result number; v varchar2(4000); d date; n number; w raw(32767); r rowid; his_pipe varchar2(30); type_not_handled exception; begin --从'waiter'管道接收信息, 得到对端的my_pipe管道名, 这里叫his_pipe. result := dbms_pipe.RECEIVE_MESSAGE('waiter'); dbms_pipe.unpack_message(his_pipe); dbms_output.put_line('Pipe :'||his_pipe); --然后, 从his_pipe管道获取并打印接收到的信息 result := dbms_pipe.RECEIVE_MESSAGE(his_pipe); result := dbms_pipe.next_item_type; while result <> 0 loop dbms_output.put_line('type =>'||to_char(result)); if result=9 then -- varchar2 dbms_pipe.unpack_message(v); dbms_output.put_line(v); elsif result=6 then -- number dbms_pipe.unpack_message(n); dbms_output.put_line(n); elsif result=12 then -- date dbms_pipe.unpack_message(d); dbms_output.put_line(d); elsif result=11 then -- rowid dbms_pipe.unpack_message_rowid(r); dbms_output.put_line(r); elsif result=23 then -- raw dbms_pipe.unpack_message_raw(w); dbms_output.put_line(w); else raise type_not_handled; end if; result := dbms_pipe.next_item_type; end loop; exception when type_not_handled then dbms_output.put_line('Type '||to_char(result)||' not handled'); dbms_pipe.purge(his_pipe); when others then dbms_output.put_line('error: '||to_char(result)); dbms_pipe.purge(his_pipe); end; / /** * 删除所有的管道. */ create or replace procedure remove_all_pipes is result number; begin for i in (select * from v$db_pipes) loop dbms_output.put('Pipe '||i.name); begin result := dbms_pipe.remove_pipe(i.name); dbms_output.put_line(' removed.'); exception when others then dbms_output.put_line(' not removed.'); end; end loop; end; / --建测试表, 其上创建触发器 drop table any_table; create table any_table(dummy varchar2(30)); create or replace trigger audit_any_table after insert on any_table for each row declare name varchar2(30); result number; usid varchar2(30); begin --同样, 先将自己的管道名发送非'waiter' usid := dbms_pipe.unique_session_name; dbms_pipe.pack_message(usid); result := dbms_pipe.send_message('waiter'); --然后, 将用户信息等发送到自己的管道, 由waiter端接收 select username into name from user_users; dbms_pipe.pack_message(name); dbms_pipe.pack_message(:new.dummy); result := dbms_pipe.send_message(usid); end; /
Case 1
------
Session: A
Explanation: The same user can read from it's own pipe.
Two pipes are created. One is 'waiter' the other has the name of the unique_session_name.
1) A: exec send_pipe
2) A: exec read_pipe
3) A: Select * from v$db_pipes;
同一个会话:
同一个用户能够读取它自己的管道
SQL> exec send_pipe PL/SQL procedure successfully completed. SQL> exec read_pipe Pipe :ORA$PIPE$0659B71A0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed. SQL> Select * from v$db_pipes; OWNERID NAME TYPE PIPE_SIZE ---------- ------------------------------ ------- ---------- ORA$PIPE$0659B71A0001 PUBLIC 1686 WAITER PUBLIC 1671
Case 2
------
Sessions: A & B
Explanation: A sends info, B receives it. No new pipes.
1) A: exec send_pipe
2) B: exec read_pipe
3) A: Select * from v$db_pipes;
会话A发送信息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B接收
SQL> exec read_pipe Pipe :ORA$PIPE$0659B71A0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.查看管道信息
SQL> Select * from v$db_pipes; OWNERID NAME TYPE PIPE_SIZE ---------- ------------------------------ ------- ---------- ORA$PIPE$0659B71A0001 PUBLIC 1686 WAITER PUBLIC 1671
Case 3
------
Sessions: A & B
Explanation: A will hold until a message is received.
A new pipe is created.
1) A: exec read_pipe
2) B: exec send_pipe
3) A: Select * from v$db_pipes;
会话A先等待接收
SQL> exec read_pipe会话B再发送
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B发送时,隐式创建了一个新的管道
Pipe :ORA$PIPE$06573F5C0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed. SQL> Select * from v$db_pipes; OWNERID NAME TYPE PIPE_SIZE ---------- ------------------------------ ------- ---------- ORA$PIPE$0659B71A0001 PUBLIC 1686 WAITER PUBLIC 1671 ORA$PIPE$06573F5C0001 PUBLIC 1686
Case 4
------
Sessions: A & B
Notes: A must have the table any_table and the trigger audit_any_table in its schema.
Explanation: B will hold until a message from the trigger is recieved which is sent when A inserts a new value. Commit or rollback doesn't have any effect, the message was already been received.
No new pipes.
1) B: exec read_pipe
2) A: insert into any_table values('New Value');
3) A: try both commit and rollback.
会话B等待信息
SQL> exec read_pipe会话A插入数据,触发触发器发送信息
事务B立即收到信息, 不论(不等)会话A事务提交与否
Pipe :ORA$PIPE$0659B71A0001 type =>9 A type =>9 New Value PL/SQL procedure successfully completed. SQL>会话A回滚
SQL> rollback; Rollback complete.
Case 5
------
Sessions: A, B & C
Explanation: On step 3, B will receive the message. On Step 4 A receives it.
A new pipe is created.
1) A: exec read_pipe
2) B: exec read_pipe
3) C: exec send_pipe
4) C: exec send_pipe
会话A等待消息
SQL> exec read_pipe会话B等待消息
SQL> exec read_pipe会话C发送消息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B是后启的,先收到消息
Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.会话C再次发送消息
SQL> exec send_pipe PL/SQL procedure successfully completed.这次会话A收到消息
Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.
Case 6
------
Sessions: A, B & C
Explanation: On step 3, B will receive the message. On Step 5 B again receives it.
On step 6 A receives it.
1) A: exec read_pipe
2) B: exec read_pipe
3) C: exec send_pipe
4) B: exec read_pipe
5) C: exec send_pipe
6) C: exec send_pipe
会话A等待消息
SQL> exec read_pipe会话B等待消息
SQL> exec read_pipe会话C发送消息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B是后启的,先收到消息
Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.会话B再次等待消息
SQL> exec read_pipe会话C再次发送消息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B还是后启的,又收到了消息
Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.会话A仍然在等待消息, 按ctrl-c退出
BEGIN read_pipe; END; * ERROR at line 1: ORA-00604: error occurred at recursive SQL level 1 ORA-01013: user requested cancel of current operation ORA-06508: PL/SQL: could not find program unit being called: "SYS.DBMS_SYS_ERROR" ORA-06512: at "SYS.DBMS_PIPE", line 167 ORA-06512: at "SYS.DBMS_PIPE", line 192 ORA-06512: at "A.READ_PIPE", line 46 ORA-06556: the pipe is empty, cannot fulfill the unpack_message request ORA-06512: at line 1
Case 7
------
Sessions: A, B & C
Explanation: On Step 3, A receives C's message. On Step 4 A received B's Message.
1) C,B: exec dbms_output.put_line(dbms_pipe.unique_session_name)
2) C: exec send_pipe
3) B: exec send_pipe
4) A: exec read_pipe
5) A: exec read_pipe
会话C显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name) ORA$PIPE$066337EB0001 PL/SQL procedure successfully completed.会话B显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name) ORA$PIPE$06573F5C0001 PL/SQL procedure successfully completed.会话C发送信息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B发送信息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话A接收信息, 收到C发送的信息(先发的信息)
SQL> exec read_pipe error: 3 Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.会话A再次接收信息, 收到了B发送的信息(后发的信息)
SQL> exec read_pipe Pipe :ORA$PIPE$06573F5C0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.
Case 8
------
Sessions: A,B,C & D
Explanation: On Step 4, B receives D' message. On step 5, A receives C's message.
A new pipe is created.
1) C,D: exec dbms_output.put_line(dbms_pipe.unique_session_name)
2) D: exec send_pipe
3) C: exec send_pipe
4) B: exec read_pipe
5) A: exec read_pipe
会话C显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name) ORA$PIPE$066337EB0001 PL/SQL procedure successfully completed.会话D显示自己的管道名
SQL> exec dbms_output.put_line(dbms_pipe.unique_session_name) ORA$PIPE$06612A790001 PL/SQL procedure successfully completed.会话D发送信息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话C发送信息
SQL> exec send_pipe PL/SQL procedure successfully completed.会话B接收信息, 收到会话D发送的信息
SQL> exec read_pipe Pipe :ORA$PIPE$06612A790001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.会话A接收信息, 收到会话C发送的信息
SQL> exec read_pipe Pipe :ORA$PIPE$066337EB0001 type =>9 A type =>6 48 type =>12 2008-07-11 09:43:05 PL/SQL procedure successfully completed.
Case 9
------
Session: Any
Explanation: Cleaning up.
1) exec remove_all_pipes
2) select * from v$db_pipes;
查看管道,删除管道
SQL> select * from v$db_pipes; OWNERID NAME TYPE PIPE_SIZE ---------- ------------------------------ ------- ---------- ORA$PIPE$0659B71A0001 PUBLIC 1686 WAITER PUBLIC 1671 ORA$PIPE$06573F5C0001 PUBLIC 1686 ORA$PIPE$06612A790001 PUBLIC 1686 ORA$PIPE$066337EB0001 PUBLIC 1686 SQL> exec remove_all_pipes Pipe ORA$PIPE$0659B71A0001 removed. Pipe WAITER removed. Pipe ORA$PIPE$06573F5C0001 removed. Pipe ORA$PIPE$06612A790001 removed. Pipe ORA$PIPE$066337EB0001 removed. PL/SQL procedure successfully completed. SQL> select * from v$db_pipes; no rows selected
4.
源码:
package dbms_pipe is -- DE-HEAD <- tell SED where to cut when generating fixed package ------------ -- OVERVIEW -- -- This package provides a DBMS "pipe" service which allows messages -- to be sent between sessions. -- -- The metaphor is similar to UNIX pipes: you can do -- dbms_pipe.send_message(<pipename>) -- dbms_pipe.receive_message(<pipename>) -- which will cause a message to be sent or received. You do -- dbms_pipe.pack_message(<varchar2>|<number>|<date>) -- to pack an item into a static buffer (which will then be sent with -- the "send_message" call), and -- dbms_pipe.unpack_message(<varchar2>|<number>|<date>) -- to get an item out of the static buffer (which is filled by the -- "receive_message" call). -- Pipes can be private to a user-id - which only allows session connected -- under the same user-id or stored procedure owned by the user-id to read -- write to the pipe. Pipes could be public - and all database users with -- execute privilege on dbms_pipe and knowledge of the pipe can read or -- write to the pipe. -- -- Pipes operate independently of transactions. They also operate -- asynchronously. There can be multiple readers and writers of the -- same pipe. -- -- Pipes only operate between sessions in the same instance. -- -- Pipes can be explicitly created using -- dbms_pipe.create_pipe(<pipename>) -- and removed using -- dbms_pipe.remove_pipe(<pipename>) -- A pipe created using the explicit create command should be removed -- using the remove function. A pipe can also be created implicitly. -- Pipes automatically come into existence the first time they are -- referenced. They effectively disappear when they contain no more -- data (some overhead remains in the SGA until it gets aged out). -- Pipes take up space in the SGA (see "maxpipesize" parameter to -- "send_message"). -------- -- USES -- -- The pipe functionality has several potential applications: -- -- o External service interface. You can provide the ability to -- communicate with (user-written) services that are external to the -- RDBMS. This can be done in a (effectively) multi-threaded manner -- so that several instances of the service can be executing -- simultaneously. Additionally, the services are available -- asynchronously - the requestor of the service need not block -- awaiting a reply. The requestor can check (with or without -- timeout) at a later time. The service can be written in any -- of the 3GL languages that ORACLE supports, not just C. See -- example below. -- o Independent transactions. The pipe can be used to communicate -- to a separate session which can perform an operation in an -- independent transaction (such as logging an attempted security -- violation detected by a trigger). -- o Alerters (non-transactional). You can post another process -- without requiring the waiting process to poll. If an "after-row" -- or "after-statement" trigger were to alert an application, then -- the application would treat this alert as an indication that -- the data probably changed. The application would then go read -- the data to get the current value. Since this is an "after" -- trigger, the application would want to do a "select for update" -- to make sure it read the correct data. -- o Debugging. Triggers and/or stored procedures can send debugging -- information to a pipe. Another session can keep reading out -- of the pipe and displaying it on the screen or writing it -- out to a file. -- o Concentrator. Useful for multiplexing large numbers of users -- over a fewer number of network connections, or improving -- performance by concentrating several user-transactions into -- one dbms-transaction. ------------ -- SECURITY -- -- Security can be achieved by use of 'grant execute' on the dbms_pipe -- package, by creating a pipe using the 'private' parameter in the create -- function and by writing cover packages that only expose particular -- features or pipenames to particular users or roles. ------------ -- EXAMPLES -- -- External service interface ------------------------------ -- -- Put the user-written 3GL code into an OCI or Precompiler program. -- The program connects to the database and executes PL/SQL code to read -- its request from the pipe, computes the result, and then executes -- PL/SQL code to send the result on a pipe back to the requestor. -- Below is an example of a stock service request. -- -- The recommended sequence for the arguments to pass on the pipe -- for all service requests is -- -- protocol_version varchar2 - '1', 10 bytes or less -- returnpipe varchar2 - 30 bytes or less -- service varchar2 - 30 bytes or less -- arg1 varchar2/number/date -- ... -- argn varchar2/number/date -- -- The recommended format for returning the result is -- -- success varchar2 - 'SUCCESS' if OK, -- otherwise error message -- arg1 varchar2/number/date -- ... -- argn varchar2/number/date -- -- -- The "stock price request server" would do, using OCI or PRO* (in -- pseudo-code): -- -- <loop forever> -- begin dbms_stock_server.get_request(:stocksymbol); end; -- <figure out price based on stocksymbol (probably from some radio -- signal), set error if can't find such a stock> -- begin dbms_stock_server.return_price(:error, :price); end; -- -- A client would do: -- -- begin :price := stock_request('YOURCOMPANY'); end; -- -- The stored procedure, dbms_stock_server, which is called by the -- "stock price request server" above is: -- -- create or replace package dbms_stock_server is -- procedure get_request(symbol out varchar2); -- procedure return_price(errormsg in varchar2, price in varchar2); -- end; -- -- create or replace package body dbms_stock_server is -- returnpipe varchar2(30); -- -- procedure returnerror(reason varchar2) is -- s integer; -- begin -- dbms_pipe.pack_message(reason); -- s := dbms_pipe.send_message(returnpipe); -- if s <> 0 then -- raise_application_error(-20000, 'Error:' || to_char(s) || -- ' sending on pipe'); -- end if; -- end; -- -- procedure get_request(symbol out varchar2) is -- protocol_version varchar2(10); -- s integer; -- service varchar2(30); -- begin -- s := dbms_pipe.receive_message('stock_service'); -- if s <> 0 then -- raise_application_error(-20000, 'Error:' || to_char(s) || -- 'reading pipe'); -- end if; -- dbms_pipe.unpack_message(protocol_version); -- if protocol_version <> '1' then -- raise_application_error(-20000, 'Bad protocol: ' || -- protocol_version); -- end if; -- dbms_pipe.unpack_message(returnpipe); -- dbms_pipe.unpack_message(service); -- if service != 'getprice' then -- returnerror('Service ' || service || ' not supported'); -- end if; -- dbms_pipe.unpack_message(symbol); -- end; -- -- procedure return_price(errormsg in varchar2, price in varchar2) is -- s integer; -- begin -- if errormsg is null then -- dbms_pipe.pack_message('SUCCESS'); -- dbms_pipe.pack_message(price); -- else -- dbms_pipe.pack_message(errormsg); -- end if; -- s := dbms_pipe.send_message(returnpipe); -- if s <> 0 then -- raise_application_error(-20000, 'Error:'||to_char(s)|| -- ' sending on pipe'); -- end if; -- end; -- end; -- -- -- The procedure called by the client is: -- -- create or replace function stock_request (symbol varchar2) -- return varchar2 is -- s integer; -- price varchar2(20); -- errormsg varchar2(512); -- begin -- dbms_pipe.pack_message('1'); -- protocol version -- dbms_pipe.pack_message(dbms_pipe.unique_session_name); -- return pipe -- dbms_pipe.pack_message('getprice'); -- dbms_pipe.pack_message(symbol); -- s := dbms_pipe.send_message('stock_service'); -- if s <> 0 then -- raise_application_error(-20000, 'Error:'||to_char(s)|| -- ' sending on pipe'); -- end if; -- s := dbms_pipe.receive_message(dbms_pipe.unique_session_name); -- if s <> 0 then -- raise_application_error(-20000, 'Error:'||to_char(s)|| -- ' receiving on pipe'); -- end if; -- dbms_pipe.unpack_message(errormsg); -- if errormsg <> 'SUCCESS' then -- raise_application_error(-20000, errormsg); -- end if; -- dbms_pipe.unpack_message(price); -- return price; -- end; -- -- You would typically only grant execute on 'dbms_stock_service' to -- the stock service application server, and would only grant execute -- on 'stock_request' to those users allowed to use the service. --------------------- -- SPECIAL CONSTANTS -- maxwait constant integer := 86400000; /* 1000 days */ -- The maximum time to wait attempting to send or receive a message ---------------------------- -- PROCEDURES AND FUNCTIONS -- procedure pack_message(item in varchar2 character set any_cs); pragma restrict_references(pack_message,WNDS,RNDS); procedure pack_message(item in number); pragma restrict_references(pack_message,WNDS,RNDS); procedure pack_message(item in date); pragma restrict_references(pack_message,WNDS,RNDS); procedure pack_message_raw(item in raw); pragma restrict_references(pack_message_raw,WNDS,RNDS); procedure pack_message_rowid(item in rowid); pragma restrict_references(pack_message_rowid,WNDS,RNDS); -- Pack an item into the message buffer -- Input parameters: -- item -- Item to pack into the local message buffer. -- Exceptions: -- ORA-06558 generated if message buffer overflows (currently 4096 -- bytes). Each item in the buffer takes one byte for the type, -- two bytes for the length, plus the actual data. There is also one -- byte needed to terminate the message. -- procedure unpack_message(item out varchar2 character set any_cs); pragma restrict_references(unpack_message,WNDS,RNDS); procedure unpack_message(item out number); pragma restrict_references(unpack_message,WNDS,RNDS); procedure unpack_message(item out date); pragma restrict_references(unpack_message,WNDS,RNDS); procedure unpack_message_raw(item out raw); pragma restrict_references(unpack_message_raw,WNDS,RNDS); procedure unpack_message_rowid(item out rowid); pragma restrict_references(unpack_message_rowid,WNDS,RNDS); -- Unpack an item from the local message buffer -- Output parameters: -- item -- The argument to receive the next unpacked item from the local -- message buffer. -- Exceptions: -- ORA-06556 or 06559 are generated if the buffer contains -- no more items, or if the item is not of the same type as that -- requested (see 'next_item_type' below). -- function next_item_type return integer; pragma restrict_references(next_item_type,WNDS,RNDS); -- Get the type of the next item in the local message buffer -- Return value: -- Type of next item in buffer: -- 0 no more items -- 9 varchar2 -- 6 number -- 11 rowid -- 12 date -- 23 raw -- function create_pipe(pipename in varchar2, maxpipesize in integer default 8192, private in boolean default TRUE) return integer; pragma restrict_references(create_pipe,WNDS,RNDS); -- Create an empty pipe with the given name. -- Input parameters: -- pipename -- Name of pipe to be created. WARNING: Do not use pipe names -- beginning with 'ORA$'. These are reserved for use by procedures -- provided by Oracle Corporation. Pipename should not be longer than -- 128 bytes, and is case_insensitive. At this time, the name cannot -- contain NLS characters. -- maxpipesize -- Maximum allowed size for the pipe. The total size of all the -- messages on the pipe cannot exceed this amount. The maxpipesize -- for a pipe becomes part of the pipe and persists for the lifetime -- of the pipe. Callers of send_message with larger values will -- cause the maxpipesize to be increased. Callers with a smaller -- value will just use the larger value. The specification of -- maxpipesize here allows us to avoid its use in future send_message -- calls. -- private -- Boolean indicating whether the pipe will be private - and for the -- use of the creating user-id, or public. A private pipe can be used -- directly through calls to this package by sessions connected to the -- database as the same user as the one that created the pipe. It can -- also be used via stored procedures owned by the user that created -- the pipe. The procedure may be executed by anyone with execute -- privilege on it. A public pipe can be accessed by anyone who has -- knowledge of it and execute privilege on dbms_pipe. -- Return values: -- 0 - Success. This is returned even if the pipe had been created in -- mode that permits its use by the user executing the create call. -- If a pipe already existed, it is not emptied. -- Exceptions: -- Null pipe name. -- Permission error. Pipe with the same name already exists and -- you are not allowed to use it. -- function remove_pipe(pipename in varchar2) return integer; pragma restrict_references(remove_pipe,WNDS,RNDS); -- Remove the named pipe. -- Input Parameters: -- pipename -- Name of pipe to remove. -- Return value: -- 0 - Success. Calling remove on a pipe that does not exist returns 0. -- Exceptions: -- Null pipe name. -- Permission error. Insufficient privilege to remove pipe. The -- pipe was created and is owned by someone else. -- function send_message(pipename in varchar2, timeout in integer default maxwait, maxpipesize in integer default 8192) return integer; pragma restrict_references(send_message,WNDS,RNDS); -- Send a message on the named pipe. The message is contained in the -- local message buffer which was filled with calls to 'pack_message'. -- A pipe could have been created explicitly using 'create_pipe', or -- it will be created implicitly. -- Input parameters: -- pipename -- Name of pipe to place the message on. The message is copied -- from the local buffer which can be filled by the "pack_message" -- routine. WARNING: Do not use pipe names beginning with 'ORA$'. -- These names are reserved for use by procedures provided by -- Oracle Corporation. Pipename should not be longer than 128 bytes, -- and is case_insensitive. At this time, the name cannot -- contain NLS characters. -- timeout -- Time to wait while attempting to place a message on a pipe, in -- seconds (see return codes below). -- maxpipesize -- Maximum allowed size for the pipe. The total size of all the -- messages on the pipe cannot exceed this amount. If this message -- would exceed this amount the call will block. The maxpipesize -- for a pipe becomes part of the pipe and persists for the lifetime -- of the pipe. Callers of send_message with larger values will -- cause the maxpipesize to be increased. Callers with a smaller -- value will just use the larger value. The specification of -- maxpipesize here allows us to avoid the use of a "open_pipe" call. -- Return value: -- 0 - Success -- 1 - Timed out (either because can't get lock on pipe or pipe stays -- too full) -- 3 - Interrupted -- Exceptions: -- Null pipe name. -- Permission error. Insufficient privilege to write to the pipe. -- The pipe is private and owned by someone else. function receive_message(pipename in varchar2, timeout in integer default maxwait) return integer; pragma restrict_references(receive_message,WNDS,RNDS); -- Receive a message from the named pipe. Copy the message into the -- local message buffer. Use 'unpack_message' to access the -- individual items in the message. The pipe can be created explicitly -- using the 'create_pipe' function or it will be created implicitly. -- Input parameters: -- pipename -- Name of pipe from which to retrieve a message. The message is -- copied into a local buffer which can be accessed by the -- "unpack_message" routine. WARNING: Do not use pipe names -- beginning with 'ORA$'. These names are reserved for use by -- procedures provided by Oracle Corporation. Pipename should not be -- longer than 128 bytes, and is case-insensitive. At this time, -- the name cannot contain NLS characters. -- timeout -- Time to wait for a message. A timeout of 0 allows you to read -- without blocking. -- Return value: -- 0 - Success -- 1 - Timed out -- 2 - Record in pipe too big for buffer (should not happen). -- 3 - Interrupted -- Exceptions: -- Null pipe name. -- Permission error. Insufficient privilege to remove the record -- from the pipe. The pipe is owned by someone else. procedure reset_buffer; pragma restrict_references(reset_buffer,WNDS,RNDS); -- Reset pack and unpack positioning indicators to 0. Generally this -- routine is not needed. -- procedure purge(pipename in varchar2); pragma restrict_references(purge,WNDS,RNDS); -- Empty out the named pipe. An empty pipe is a candidate for LRU -- removal from the SGA, therefore 'purge' can be used to free all -- memory associated with a pipe. -- Input Parameters: -- pipename -- Name of pipe from which to remove all messages. The local -- buffer may be overwritten with messages as they are discarded. -- Pipename should not be longer than 128 bytes, and is -- case-insensitive. -- Exceptions: -- Permission error if pipe belongs to another user. -- function unique_session_name return varchar2; pragma restrict_references(unique_session_name,WNDS,RNDS,WNPS); -- Get a name that is unique among all sessions currently connected -- to this database. Multiple calls to this routine from the same -- session will always return the same value. -- Return value: -- A unique name. The returned name can be up to 30 bytes. -- pragma TIMESTAMP('2000-06-09:14:30:00'); end; -- CUT_HERE <- tell sed where to chop off the rest
可以看到,调用的其实都是C的接口
PACKAGE BODY dbms_pipe IS PACKBUF CHAR(4096) := 'a'; UNPACKBUF CHAR(4096) := 'a'; PACKPOS BINARY_INTEGER := 0; UNPACKPOS BINARY_INTEGER := 2000000000; PROCEDURE SENDPIPE(PIPENAME IN VARCHAR2, POS IN BINARY_INTEGER, BUFFER IN OUT NOCOPY CHAR, MAXPIPESIZE IN BINARY_INTEGER, TIMEOUT IN BINARY_INTEGER, RETVAL OUT BINARY_INTEGER); PRAGMA INTERFACE (C, SENDPIPE); PROCEDURE RECEIVEPIPE(PIPENAME IN VARCHAR2, BUFFER IN OUT NOCOPY CHAR, TIMEOUT IN BINARY_INTEGER, RETVAL OUT BINARY_INTEGER); PRAGMA INTERFACE (C, RECEIVEPIPE); PROCEDURE COPYINTOBUF(A IN VARCHAR2 CHARACTER SET ANY_CS, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR); PRAGMA INTERFACE (C, COPYINTOBUF); PROCEDURE COPYINTOBUF(A IN NUMBER, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR); PRAGMA INTERFACE (C, COPYINTOBUF); PROCEDURE COPYINTOBUF(A IN DATE, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR); PRAGMA INTERFACE (C, COPYINTOBUF); PROCEDURE COPYFROMBUF(A OUT VARCHAR2 CHARACTER SET ANY_CS, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR); PRAGMA INTERFACE (C, COPYFROMBUF); PROCEDURE COPYFROMBUF(A OUT NUMBER, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR); PRAGMA INTERFACE (C, COPYFROMBUF); PROCEDURE COPYFROMBUF(A OUT DATE, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR); PRAGMA INTERFACE (C, COPYFROMBUF); FUNCTION GETTYPEFROMBUF(POS IN BINARY_INTEGER, BUF IN CHAR) RETURN BINARY_INTEGER; PRAGMA INTERFACE (C, GETTYPEFROMBUF); PROCEDURE COPYINTOBUFBINARY(A IN RAW, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR); PRAGMA INTERFACE (C, COPYINTOBUFBINARY); PROCEDURE COPYINTOBUFROWID(A IN ROWID, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN OUT NOCOPY CHAR); PRAGMA INTERFACE (C, COPYINTOBUFROWID); PROCEDURE COPYFROMBUFBINARY(A OUT RAW , POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR); PRAGMA INTERFACE (C, COPYFROMBUFBINARY); PROCEDURE COPYFROMBUFROWID(A OUT ROWID, POS IN OUT NOCOPY BINARY_INTEGER, BUF IN CHAR); PRAGMA INTERFACE (C, COPYFROMBUFROWID); PROCEDURE CREATEPIPE(PIPENAME IN VARCHAR2, MAXPIPESIZE IN BINARY_INTEGER, PRIVATE IN BOOLEAN, RETVAL OUT BINARY_INTEGER); PRAGMA INTERFACE (C, CREATEPIPE); PROCEDURE REMOVEPIPE(PIPENAME IN VARCHAR2, RETVAL OUT BINARY_INTEGER); PRAGMA INTERFACE (C, REMOVEPIPE); FUNCTION UNIQUE_SESSION_ID RETURN VARCHAR2; PRAGMA INTERFACE (C, UNIQUE_SESSION_ID); PROCEDURE PACK_MESSAGE(ITEM IN VARCHAR2 CHARACTER SET ANY_CS) IS BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END; PROCEDURE PACK_MESSAGE_RAW(ITEM IN RAW) IS BEGIN COPYINTOBUFBINARY(ITEM, PACKPOS, PACKBUF); END; PROCEDURE PACK_MESSAGE_ROWID(ITEM IN ROWID) IS BEGIN COPYINTOBUFROWID(ITEM, PACKPOS, PACKBUF); END; PROCEDURE PACK_MESSAGE(ITEM IN NUMBER) IS BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END; PROCEDURE PACK_MESSAGE(ITEM IN DATE) IS BEGIN COPYINTOBUF(ITEM, PACKPOS, PACKBUF); END; PROCEDURE UNPACK_MESSAGE(ITEM OUT VARCHAR2 CHARACTER SET ANY_CS) IS BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END; PROCEDURE UNPACK_MESSAGE_RAW(ITEM OUT RAW) IS BEGIN COPYFROMBUFBINARY(ITEM, UNPACKPOS, UNPACKBUF); END; PROCEDURE UNPACK_MESSAGE_ROWID(ITEM OUT ROWID) IS BEGIN COPYFROMBUFROWID(ITEM, UNPACKPOS, UNPACKBUF); END; PROCEDURE UNPACK_MESSAGE(ITEM OUT NUMBER) IS BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END; PROCEDURE UNPACK_MESSAGE(ITEM OUT DATE) IS BEGIN COPYFROMBUF(ITEM, UNPACKPOS, UNPACKBUF); END; FUNCTION NEXT_ITEM_TYPE RETURN INTEGER IS INTERNAL_TYPE BINARY_INTEGER; BEGIN INTERNAL_TYPE := GETTYPEFROMBUF(UNPACKPOS, UNPACKBUF); IF INTERNAL_TYPE = 1 THEN RETURN 9; ELSIF INTERNAL_TYPE = 2 THEN RETURN 6; ELSE RETURN INTERNAL_TYPE; END IF; END; FUNCTION CREATE_PIPE(PIPENAME IN VARCHAR2, MAXPIPESIZE IN INTEGER DEFAULT 8192, PRIVATE IN BOOLEAN DEFAULT TRUE) RETURN INTEGER IS RETVAL BINARY_INTEGER; MPS BINARY_INTEGER := MAXPIPESIZE; PVT BOOLEAN := PRIVATE; BEGIN IF PIPENAME IS NULL THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null'); END IF; CREATEPIPE(PIPENAME, MPS, PVT, RETVAL); IF RETVAL = 4 THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322, 'Insufficient privilege to access pipe'); END IF; RETURN RETVAL; END; FUNCTION REMOVE_PIPE(PIPENAME IN VARCHAR2) RETURN INTEGER IS RETVAL BINARY_INTEGER; BEGIN IF PIPENAME IS NULL THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null'); END IF; REMOVEPIPE(PIPENAME, RETVAL); IF RETVAL = 4 THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322, 'Insufficient privilege to access pipe'); END IF; RETURN RETVAL; END; FUNCTION SEND_MESSAGE(PIPENAME IN VARCHAR2, TIMEOUT IN INTEGER DEFAULT MAXWAIT, MAXPIPESIZE IN INTEGER DEFAULT 8192) RETURN INTEGER IS RETVAL BINARY_INTEGER; MPS BINARY_INTEGER := MAXPIPESIZE; TMO BINARY_INTEGER := TIMEOUT; BEGIN IF PIPENAME IS NULL THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null'); END IF; SENDPIPE(PIPENAME, PACKPOS, PACKBUF, MPS, TMO, RETVAL); IF RETVAL = 0 THEN PACKPOS := 0; END IF; IF RETVAL = 4 THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322, 'Insufficient privilege to access pipe'); END IF; RETURN RETVAL; END; FUNCTION RECEIVE_MESSAGE(PIPENAME IN VARCHAR2, TIMEOUT IN INTEGER DEFAULT MAXWAIT) RETURN INTEGER IS RETVAL BINARY_INTEGER; TMO BINARY_INTEGER := TIMEOUT; BEGIN IF PIPENAME IS NULL THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23321, 'Pipename may not be null'); END IF; RECEIVEPIPE(PIPENAME, UNPACKBUF, TMO, RETVAL); IF RETVAL = 0 THEN UNPACKPOS := 0; ELSE UNPACKPOS := 2000000000; END IF; IF RETVAL = 4 THEN DBMS_SYS_ERROR.RAISE_SYSTEM_ERROR(-23322, 'Insufficient privilege to access pipe'); END IF; RETURN RETVAL; END; PROCEDURE RESET_BUFFER IS BEGIN UNPACKPOS := 0; PACKPOS := 0; END; PROCEDURE PURGE(PIPENAME IN VARCHAR2) IS RETVAL BINARY_INTEGER; BEGIN LOOP RETVAL := RECEIVE_MESSAGE(PIPENAME, 0); IF RETVAL <> 0 THEN EXIT; END IF; END LOOP; END; FUNCTION UNIQUE_SESSION_NAME RETURN VARCHAR2 IS BEGIN RETURN ('ORA$PIPE$' || UNIQUE_SESSION_ID); END; END;
外部链接:
PACKAGE DBMS_PIPE Specification
When Do DBMS_PIPE Connections Get Closed
管道处于非活动状态或没有消息达到一段时间, 会被从共享池中自动清除, 这时再操作管道就会报错, 通常是ORA-20011
即使是显式创建的管道, 也可能会被自动清除
唯一解决办法是删除并重建
Dynamic SQL and System Commands Using DBMS_PIPE
使用管道调用SQL语句或外部命令
COMMONLY ASKED QUESTIONS ABOUT DBMS_PIPE PACKAGE
使用DBMS_PIPE比用轮询更有效, 但需测试
管道中的数据不是持久性的, 随着数据库实例关闭而丢失
Example of DBMS_PIPE() with Pro*C on Unix
Pro*C程序使用DBMS_PIPE管道
PLSQL: Example use of DBMS_PIPE for Debugging
使用DBMS_PIPE管道取代DBMS_OUTPUT来调试程序
How To Use the PIPE Option with RMAN
使用管道向RMAN发送命令和接收返回信息
What is the difference between PIPE and QUEUE for profile Concurrent:TM Transport Type ?
DBMS_PIPE: Communicating Between Sessions
-fin-
No comments:
Post a Comment