|
2.3 多线程查询 在上述方法的基础上,我们还可以采用多线程并行方式来进一步提高性能。
所谓多线程并行,就是把数据分成 N 份,用 N 个线程查询。但如果只是随意地将数据分成 N 份,很可能无法真正地提高性能。因为将要查询的键值集是未知的,所以理论上也无法确保希望查找的数据能够均匀分布在每一份组表文件中。比较好的处理方式是先观察键值集的特征,从而尽可能地进行数据的均匀拆分。
比如说,继续使用上文中多字段键拼成单字段键的例子,将合并后的主键 nid 对 4 取模,余数相同的数据存在同一个组表中,最终由 4 个组表文件装载现有全部数据。这样的文件拆分方法,可以使被查询的数据分布的相对更加均匀一些。
如果键值数据有比较明显的业务特征,我们可以考虑按照实际业务场景使用日期、部门之类的字段来处理文件拆分。如:将属于部门 A 的 1000 条记录均分在 10 个文件中,每个文件就有 100 条记录。在利用多线程查询属于部门 A 的记录时,每个线程就会从各自对应的文件中取数相应的这 100 条记录了。
下面我们来看个实际的例子。
2.3.1 创建组表
| A
| 1
| =["type_a",……,"type_z","type_1",……,"type_9","type_0"]
| 2
| =A1.new(#:tid,~:type)
| 3
| =file("multi_source.txt")
| 4
| =A3.cursor@t()
| 5
| =A4.switch(type,A2:type)
| 6
| =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data)
| 7
| =N.(file("nid_"+string(~-1)+"_T.ctx").create(#nid,data))
| 8
| =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A7("+string(~)+").append(~.cursor()))"))
| 9
| for A6,500000
| A1~A6:与多字段键的方法二一致。
A7:使用循环函数,创建名为“键值名 _ 键值取 N 的余数 _T.ctx”的组表文件,其结构同为 (#nid,data)。
A8:用循环函数将游标数据分别追加到 N 个原组表上。比如当 N=1 时,拼出的 eval 函数参数为:channel(A4).select(nid%4==0).attach(A7(1).append(~.cursor()))。意思是对游标 A4 创建管道,将管道中记录按键值 nid 取 4 的余数,将余数值等于 0 的记录过滤出来。attach 是对当前管道的附加运算,表示取和当前余数值对应的原组表,将当前管道中筛选过滤出的记录,以游标记录的方式追加到 A7(1),即第 1 个组表。
A9:循环游标 A6,每次获取 50 万条记录,直至 A6 游标中的数据取完。
执行后,产出 4(这时例子取 N=4)个独立的组表文件:
2.3.2 创建建索引
| A
| B
| 1
| fork directory@p("nid*T.ctx")
| =file(A1).create().index(nid_idx;nid;data)
| A1:列出满足 nid*T.ctx 的文件名(这里 * 为通配符),这里 @p 选项代表需要返回带有完整路径信息的文件名。使用 fork 执行多线程时,需要注意环境中的并行限制数是否设置合理。这里用了 4 个线程,设计器中对应的设置如下:
B2:每个线程为各个组表建立对应的索引文件,最终结果如下:
2.3.3 查询
| A
| B
| 1
| =file("keys.txt").import@i()
|
| 2
| =A1.group(~%N)
|
| 3
| fork N.(~-1),A2
| =A3(2)
| 4
|
| =file("nid_"/A3(1)/"_T.ctx").create().icursor(;B3.contain(nid),nid_idx)
| 5
|
| return B4
| 6
| =A3.conjx()
|
| 7
| =file("result_nid.txt").export@t(A6)
|
| A1:从 keys.txt 获取查询键值序列,因为只有一列结果,使用 @i 选项,将结果返回成序列:
A2:把 A1 的序列按 4 的余数进行等值分组:
A3、B3~B5:用 fork 函数,按等值分组后的键值对各个组表分别并行查询。这里的 fork 后面分别写了两个参数,第一个是循环函数 N.(~-1),第二个是 A2。在接下来的 B3、B4 中分别使用 A3(2) 和 A3(1) 来获取 fork 后面这两个对应顺序的参数,B4:对组表文件进行根据 B3 中的键值集进行数据筛选,B5:返回游标。由于 A3 中是多个线程返回的游标序列,所以 A6 中需要使用 conjx 对多个游标进行纵向连接。
A6~A7:将多个线程返回的游标进行纵向连接后,导出游标记录至文本文件,前几行内容如下。
2.4 数据追加 前面我们已经解决了针对大数据的批量随机键值查询问题,不过,我们不能假定数据永远不变。尤其是对于大数据来说,新数据的追加是必然要面对的。在将新数据追加到原有组表文件中时,我们需要讨论三种情况:有序键值追加、无序键值追加,以及数据量很大时的数据追加。
2.4.1 有序键值追加 单个文件时,如果键值有序,追加的代码如下:
| A
| 1
| =file("single.ctx")
| 2
| =A1.create()
| 3
| =file("single_add.txt")
| 4
| =A3.cursor@t()
| 5
| =A2.append(A4)
| A1:single.ctx 是已有的组表文件,结构为 (#id,data),其中 id 为自增键值。
A3~A5:新数据文件与已有文件结构相同,其 id 加入原组表后,对于整体数据也是有序的。这种情况可以直接追加到原组表,组表会自动更新索引。
如果按按多线程的方法拆分为多个文件,代码如下:
| A
| 1
| =file("single_add.txt")
| 2
| =A1.cursor@t()
| 3
| =directory@p("id*T.ctx").(file(~).create())
| 4
| =N.(eval("channel(A2).select(id%N=="+string(~-1)+").attach(A3("+string(~)+").append([~.cursor()]))"))
| 5
| for A2,500000
| A1、A2:用游标方式获取新增数据。
A3:满足通配符串:"id*T.ctx",现有 N 份组表文件的序列。
A4、A5:与前面方法中的代码一致。
2.4.2 无序键值追加 同样先来看一下单个文件的追加方法,以单字段键为例,代码如下:
| A
| 1
| =file("single.ctx")
| 2
| =A1.create().cursor()
| 3
| =file("single_add.txt")
| 4
| =A3.cursor@t()
| 5
| =file("single.ctx_temp").create(#id,data)
| 6
| =A5.append([A2,A4].mergex(id))
| A2:游标方式打开现有组表。
A4:游标方式获取新增数据。
A5:建个新的组表文件。
A6:在新的组表中存放现有组表数据和新增数据归并后的结果。这里要注意的是,用 cs.mergex(x) 进行归并操作,需要 cs 序列对 x 有序,也就是要求组表文件 A1 和新增数据文件 A3 中的数据对于 id 都分别有序。若不满足 cs 对 x 有序,程序虽然不会报错,但是归并出来的结果也是无序的。
当这段代码执行完后,还需要进行旧组表、旧索引的清理以及对新组表的建立索引等操作:
| A
| 1
| =movefile(file("single.ctx"))
| 2
| =movefile(file("single.ctx__id_idx"))
| 3
| =movefile(file("single.ctx_temp"),"single.ctx"))
| 4
| =file("single.ctx").create().index(id_idx;id;data)
| 前三行是文件操作,详见函数参考:movefile。A4 为组表建立索引,不再详述。
下面再看看多个文件的追加方法,以多字段键转单字段键后的数据结构 (nid,data) 为例,代码如下:
| A
| 1
| =["type_a",……,"type_z","type_1",……,"type_9","type_0"]
| 2
| =A1.new(#:tid,~:type)
| 3
| =file("multi_source_add.txt")
| 4
| =A3.cursor@t()
| 5
| =A4.switch(type,A2:type)
| 6
| =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data)
| 7
| =directory@p("nid*T.ctx").(file(~).create().cursor())
| 8
| =directory@p("nid*T.ctx").(file(~+"_temp").create(#nid,data))
| 9
| =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A8("+string(~)+").append([~.cursor(),A7("+string(~)+")].mergex(nid)))"))
| 10
| for A4,500000
| A3:multi_source_add.txt 是新增数据来源。
A7:假设原组表已存在,列出原组表的文件名,依次获取组表游标,返回成序列。
A8:建立新的组表文件,用来存放旧组表数据和新增数据,在原有文件名后加上 "_temp",以示区别。
A9:对新增数据使用管道,将管道中的 N 份游标记录与对应的 N 个旧份组表中游标记录进行归并,追加到新 N 份组表中。上文已有详细的解释。
当这段代码执行完后,还需要进行旧组表、旧索引的清理以及对新组表的索引建立等操作,如下:
| A
| 1
| =directory@p("*T.ctx_temp")
| 2
| =A1.(left(~,len(~)-5))
| 3
| =A2.(movefile(file(~)))
| 4
| =A1.(left(~,len(~)-5)+"__nid_idx")
| 5
| =A2.(movefile(file(~)))
| 6
| =A1.(movefile(file(~),left(~,len(~)-5)))
| 7
| =A2.(file(~).create().index(nid_idx;nid;data))
| 代码中几乎全是循环函数与简单的文件操作。详见函数参考《文件》。最后一行建立索引,前文中也已多次解释。
2.4.3 数据量很大时的数据追加 随着新数据不断增加,每次新追加数据与全量历史数据归并的时间成本将会越来越高。这时需要把每份组表文件分为新、旧两份,新的一份是最近一段时间内积累的追加数据,旧的是之前的历史数据。每当有新数据需要追加时,还是按 2.4.2 的处理思路操作,但只对新的那份组表文件进行处理。当新份数据文件超过一定大小阈值(如 100G),再和旧数据合并。这样的做法不仅可以减少归并的时间成本,另一方面也可以降低对磁盘的损耗。
列举的数据结构还是以 (nid,data) 为例,这次我们从头开始完整地看一遍代码:
首先定义新、旧组表文件,命名规则如下:
新份组表:键值名 _ 键值取 N 的余数 _T.ctx;旧份组表:键值名 _ 键值取 N 的余数 _H.ctx。
1、 建立新、旧组表,本例中 N=4,代表建立 4 份组表:
| A
| 1
| =N.(file("nid_"+string(~-1)+"_H.ctx").create(#nid,data))
| 2
| =N.(file("nid_"+string(~-1)+"_T.ctx").create(#nid,data))
| N 取 4,生成的历史和临时组表文件如下:
2、 在新组表上追加新数据。
| A
| 1
| =["type_a",……,"type_z","type_1",……,"type_9","type_0"]
| 2
| =A1.new(#:tid,~:type)
| 3
| =file("multi_source_.txt")
| 4
| =A3.cursor@t()
| 5
| =A4.switch(type,A2:type)
| 6
| =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data)
| 7
| =directory@p("nid*T.ctx").(file(~).create().cursor())
| 8
| =directory@p("nid*T.ctx").(file(~+"_temp").create(#nid,data))
| 9
| =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A8("+string(~)+").append([~.cursor(),A7("+string(~)+")].mergex(nid)))"))
| 10
| for A4,500000
| 3、 新组表合并后,清理原来的新组表和索引,然后重建新组表索引。
| A
| 1
| =directory@p("*T.ctx_temp")
| 2
| =A1.(left(~,len(~)-5))
| 3
| =A2.(movefile(file(~)))
| 4
| =A1.(left(~,len(~)-5)+"__nid_idx")
| 5
| =A2.(movefile(file(~)))
| 6
| =A1.(movefile(file(~),left(~,len(~)-5)))
| 7
| =A2.(file(~).create().index(nid_idx;nid;data))
| 4、 对新数据大小进行判断,如果超过参数 B(单位是字节数)则与旧份组表数据合并。
| A
| B
| C
| 1
| fork directory@p("nid*T.ctx")
| =file(A1)
|
| 2
|
| if B1.size()>B
| =left(A1,(len(A1)-5))+"H.ctx"
| 3
|
|
| =B1.create().cursor()
| 4
|
|
| =file(C2).create().cursor()
| 5
|
|
| =left(A1,(len(A1)-5))+"H.ctx_temp"
| 6
|
|
| =file(C5).create(#nid,data).append([C3,C4].mergex(nid))
| 5、 旧组表与新组表合并后,清理原来的旧组表和索引,然后重建旧组表索引。清理已合并的新组表,并重建空的新组表。
| A
| 1
| =directory@p("*H.ctx_temp")
| 2
| =A1.(left(~,len(~)-5))
| 3
| =A2.(movefile(file(~)))
| 4
| =A1.(left(~,len(~)-5)+"__nid_idx")
| 5
| =A4.(movefile(file(~)))
| 6
| =A1.(movefile(file(~),left(~,len(~)-5)))
| 7
| =A2.(file(~).create().index(nid_idx;nid;data))
| 8
| =A1.(left(~,len(~)-10)+"T.ctx")
| 9
| =A8.(movefile(file(~)))
| 10
| =A1.(left(~,len(~)-10)+"T.ctx__nid_idx")
| 11
| =A10.(movefile(file(~)))
| 12
| =A8.(file(~).create(#nid,data))
| 6、 对新、旧组表文件分别利用多线程进行查询
| A
| B
| 1
| =file("keys.txt").import@i()
|
| 2
| =A1.group(~%N)
|
| 3
| fork directory@p("*H.ctx"),directory@p("*T.ctx"),A2
| =A3(3)
| 4
|
| =file(A3(1)).create().icursor(;B3.contain(nid),nid_idx)
| 5
|
| =file(A3(2)).create().icursor(;B3.contain(nid),nid_idx)
| 6
|
| return B4|B5
| 7
| =A3.conj()
|
| 8
| =file("result.txt").export@t(A8)
|
| 这里需要注意 A7 中写法,因为 B6 中返回 B4|B5,所以导致 A3 的结果为多个游标序列的序列,因此在对 A3 进行纵向连接时,应该使用序列的 conj,而不是游标的 conjx。
至此,基于本文的 6 个集算器脚本文件,在第三方定时任务调度工具的合理调用下,可以实现单机情况下大数据量数据的追加,以及针对批量随机键值的查询工作。
|
|