搜索
查看: 3164|: 0

在 Oracle 数据库中实现 MapReduce

[复制链接]

167

主题

4

回帖

573

积分

管理员

积分
573
发表于 2014-10-23 11:09:04 | 显示全部楼层 |阅读模式
在程序员开发并行程序时,Map-Reduce模式正变得流行起来。这些map-reduce程序通常来并行处理大量数据。本文来演示如何在Oracle数据库上,通过使用Parallel Pipelined Table函数及并行操作,来实现Map-Reduce程序。(译者注:table()是oracle中一个函数,可以把定义为Pipelined的function的返回结果进行SQL查询)
原理:
Pipelined Table函数是在Oracle 9i引入的,作为能在数据流中嵌入过程逻辑代码方法。从逻辑上说,一个Table函数是可以出现在from子句中,该函数就像数据表一样的返回多行数据。Table函数同样也可以接收多行数据做为输入参数。大多数情况下,Pipelined Table函数可以嵌入到一个数据流中,它让数据“流”进SQL语句中,从而避免增加一个物理层(直译:具体化的中介)。再次说明,Pipelined Table函数是可以并行处理的。
为了并行Table函数,开发人员必须指定指定一个键对输入数据进行重定位。Table函数可以直接在PL/SQL, Java, and 中实现,你可以查到关于Table函数的更多信息、例子以及上面提到的那些功能,网址是:http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677
在多个发行版中,Pipelined Table函数已经被用户使用,并成为Oracle可扩展基础功能的一个核心部分。无论是外部用户,还是Oracle的开发部门,Table函数成为一个有效的、简单的扩充数据库核心功能的方法。
类似Table函数的功能已经在Oracle内使用,并且是Oracle Spatial 和Oracle Warehouse Builder许多特色功能的实现方式。Oracle Spatial(空间数据处理系统)使用它涉及spatial joins 和许多 spatial data的数据挖掘的操作。Oracle Warehouse Builder让让用户使用Table 函数对数据流进行并行处理的逻辑,比如Match-Merge 算法和其它逐行计算的算法。
手把手的例子
所有的例子都在omr.sql文件中。
为了说明并行的使用方法以及用Pipelined Table函数在Oracle数据库内写一个Map-Reduce算法, 我们实现一个最经典的map-reduce例子--单词计数。单词计数是实现返回一组文档中所有不重复单词出现的个数的程序,也可以说是查询单词出现频率功能。
示例代码是用PL/SQL实现,但如前所说,Oracle允许你选择其它语言来实现这个过程逻辑。
1、配置环境
我们将在一组文档中查找,这些文档可以是数据库之外的文件中,也可以保存在Secure Files/CLOB的数据库内的列中。在我们这个存文档的表也相当于一个文件系统。
在本例中,我们将在数据库内创建一个表,用下面的声明:
  1. CREATE TABLE documents (a CLOB)      
  2.   LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);
复制代码
该表的每一行都对应一个文档,我们在用下面的语句,这个表中插入三个简单的文档:
  1. INSERT INTO documents VALUES ('abc def');      
  2. INSERT INTO documents VALUES ('def ghi');        
  3. INSERT INTO documents VALUES ('ghi jkl');        
  4. commit;
复制代码
map代码和reduce代码都将包含在一个包中,保持代码的整洁。为了展示这些步骤,我将把这些代码段从包中拿出来,在下面各小节展示。在实际的包中,还必须要定义几个types。所有代码均在Oracle Database 11g (11.1.0.6)测试通过。
2、创建Mapper and the Reducer
首先我们要创建一个普通的map函数来给文档做标记。记住,我们不是要展示这个map函数有多么好,而是要表达这在数据库工作的原理。这个map函数非常基本,其它地方也可能有更好的实现。
你可以使用数据库的聚合引擎及仅map函数来得到最终结果。一个请求和结果看起来是: SQL完成聚合操作,不需要reducer的函数。
当然,你也可以写自己的聚合的Table函数来计算单词的出现次数。如果你不用oracle的聚合引擎的话,你必须自己来写map-reduce的程序。这个聚合Table函数就相当于map-reduce中的reducer部分。
Table函数要求输入必须按单词分组,需要将数据排序(用oracle 执行引擎的sort)或单词分簇。我们展示一个简单的记数程序在本文中。
第3步 ,数据库中进行map-reduce
当你写完mapper and the reducer后,你就可以在数据库中进行map-reduce.执行一个包含Table函数的请求,就能对外部文档进行并行的按照map-reduce的代码执行。
总结
Oracle Table函数是经得起验证的技术,并在Oracle的内外广泛使用的扩展Oracle11g的技术。
Oracle Table函数是稳定并可扩展的方法,在Oracle数据库内实现Map-Reduce,并且能够利用Oracle并行执行框架的扩展性。在SQL中利用它,能让数据库开发人员用自己熟悉的环境和语言,为他们提供一个有效的、简单的机制去实现Map-Reduce方法。
你可以下载orm.sql,没有什么特殊的权限需求。
附:orm.sql代码
  1. CREATE TABLE documents (a CLOB)
  2.   LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);

  3. INSERT INTO documents VALUES ('abc def');
  4. INSERT INTO documents VALUES ('def ghi');
  5. INSERT INTO documents VALUES ('ghi jkl');
  6. commit;

  7. create or replace
  8. package oracle_map_reduce is

  9.   type word_t     is record (word varchar2(4000));
  10.   type words_t    is table of word_t;

  11.   type word_cur_t is ref cursor return word_t;
  12.   type wordcnt_t  is record (word varchar2(4000), count number);
  13.   type wordcnts_t is table of wordcnt_t;

  14.   function mapper(doc in sys_refcursor, sep in varchar2) return words_t
  15.     pipelined parallel_enable (partition doc by any);

  16.   function reducer(in_cur in word_cur_t) return wordcnts_t
  17.     pipelined parallel_enable (partition in_cur by hash(word))
  18.     cluster in_cur by (word);

  19. end;
  20. /

  21. create or replace
  22. package body oracle_map_reduce is

  23.   --
  24.   -- The mapper is a simple tokenizer that tokenizes the input documents
  25.   -- and emits individual words
  26.   --
  27.   function mapper(doc in sys_refcursor, sep in varchar2) return words_t
  28.     pipelined parallel_enable (partition doc by any)
  29.   is
  30.     document clob;
  31.     istart   number;
  32.     pos      number;
  33.     len      number;
  34.     word_rec word_t;
  35.   begin

  36.     -- for every document
  37.     loop

  38.       fetch doc into document;
  39.       exit when doc%notfound;

  40.       istart := 1;
  41.       len := length(document);

  42.       -- For every word within a document
  43.       while (istart <= len) loop
  44.         pos := instr(document, sep, istart);

  45.         if (pos = 0) then
  46.           word_rec.word := substr(document, istart);
  47.           pipe row (word_rec);
  48.           istart := len + 1;
  49.         else
  50.           word_rec.word := substr(document, istart, pos - istart);
  51.           pipe row (word_rec);
  52.           istart := pos + 1;
  53.         end if;

  54.       end loop; -- end loop for a single document

  55.     end loop; -- end loop for all documents

  56.     return;

  57.   end mapper;

  58.   --
  59.   -- The reducer emits words and the number of times they're seen
  60.   --
  61.   function reducer(in_cur in word_cur_t) return wordcnts_t
  62.     pipelined parallel_enable (partition in_cur by hash(word))
  63.     cluster in_cur by (word)
  64.   is
  65.     word_count wordcnt_t;
  66.     next       varchar2(4000);
  67.   begin

  68.     word_count.count := 0;

  69.     loop

  70.       fetch in_cur into next;
  71.       exit when in_cur%notfound;

  72.       if (word_count.word is null) then

  73.         word_count.word := next;
  74.         word_count.count := word_count.count + 1;

  75.       elsif (next <> word_count.word) then

  76.         pipe row (word_count);
  77.         word_count.word := next;
  78.         word_count.count := 1;

  79.       else

  80.         word_count.count := word_count.count + 1;

  81.       end if;

  82.     end loop;

  83.     if word_count.count <> 0 then
  84.       pipe row (word_count);
  85.     end if;

  86.     return;

  87.   end reducer;

  88. end;
  89. /


  90. -- Select statements

  91. select word, count(*)
  92. from (
  93.         select value(map_result).word word
  94.         from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result)
  95. group by (word);

  96. select *
  97. from table(oracle_map_reduce.reducer(
  98.               cursor(select value(map_result).word word
  99.                        from table(oracle_map_reduce.mapper(
  100.                         cursor(select a from documents), ' ')) map_result)));
复制代码





您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2025-1-7 03:59 , Processed in 0.061490 second(s), 24 queries .

快速回复 返回顶部 返回列表