当前位置: 首页 > news >正文

深圳网站建设中为个人订阅号支持微网站的建设吗

深圳网站建设中为,个人订阅号支持微网站的建设吗,网站建设工期时间表,网站建设釒首先金手指十五在进行数据分析时经常需要对多个不同的数据源进行关联操作#xff0c;因此在各类数据库的 SQL 语言中均包含了丰富的 join 语句#xff0c;以支持批计算中的多种关联操作。 DolphinDB 不仅通过 join 语法支持了对于全量历史数据的关联处理#xff0c;而且在要求低延时的实时…在进行数据分析时经常需要对多个不同的数据源进行关联操作因此在各类数据库的 SQL 语言中均包含了丰富的 join 语句以支持批计算中的多种关联操作。 DolphinDB 不仅通过 join 语法支持了对于全量历史数据的关联处理而且在要求低延时的实时计算场景中也提供了多种流数据连接引擎来支持对不断增长的数据流进行实时的关联处理。流数据连接引擎在内部实现上以增量计算为原则在大数据流量下依然能够保持毫秒级的时延性能。本教程将系统地介绍 DolphinDB 中的 5 种不同的流数据连接引擎从实时数据流关联、DolphinDB 连接引擎分类、用连接引擎实现金融应用场景等方面层层递进帮助用户深入理解 DolphinDB 的流计算连接引擎和快速上手多数据源实时关联处理。DolphinDB 2.00.8 及以上版本支持本篇所有代码。此外1.30.20 及以上版本支持除 3.2 小节外的所有代码细节会在该小节内详细说明。1. 流数据实时关联本章从 DolphinDB 中批计算的表关联语法 join 讲起介绍实时数据流关联面临的挑战。1.1 DolphinDB 批计算表关联在批计算场景中 DolphinDB SQL 语句中不仅提供了与传统关系型数据库类似的 equal join, full join, left join, prefix join, cross join 等表连接方式还提供了两种专门为时序数据设计的连接方式asof join 和 window join。以下是一个简单的 asof join 批计算的示例更详细应用介绍请参考应用教程使用 Asof Join, Window Join 快速估计个股交易成本。我们将通过它进一步分析实时连接中的挑战。// data t1 table(take(A, 4) as Sym, 10:10:03.000(10 2100 2890 6030) as Time, 12.5 12.5 12.6 12.6 as Price) t2 table(take(A, 4) as Sym, 10:10:00.000(0 3000 6000 9000) as Time, 12.6 12.5 12.7 12.6 as BidPrice)// asof join calculation select * from aj(t1, t2, Time)asof join 能够关联距离当前时刻最近的数据。指定连接列为 Time 字段后如上图所示t1 表的每行记录总是关联 t2 表中 Time 值小于它的 Time 值的那一行 t2 记录。关联后的结果如下1.2 实时数据流关联面临的挑战批计算的关联操作作为计算输入的 t1 表和 t2 表的数据是有界的关联操作作用在全量输入数据上计算结果一次性全量输出。现在考虑一下实时关联的场景首先在输入上t1, t2 的记录数会不断增长表现为数据无界且无法预知下一条记录何时到来比如股票市场中的交易数据、行情快照数据而对于关联结果我们会希望在每一条输入到来后尽快且尽可能正确地输出结果记录这时计算是不断增量进行的。那么对于流式的实时关联场景我们需要解决以下两个问题何时触发计算并输出以上面的 asof join 为例数据流 t1 中第一条记录Time 值为10:10:03:010到达系统时假设 t2 数据流中也有一条记录Time 为10:10:00.000此时实时计算模块是决定关联目前 t2 中最近的一条记录并输出还是等待某个触发条件再关联输出这是技术实现上要解决的问题。如何管理内存为了能够正确地关联到两个数据流实时计算模块需要缓存历史的数据流而输入是源源不断的则需要历史数据的清理机制。2. 流数据连接引擎DolphinDB 提供了 createAsofJoinEngine, createWindowJoinEngine, createEqualJoinEngine, createLeftSemiJoinEngine, createLookupJoinEngine 等 5 种不同的流计算连接引擎函数不同连接引擎的关联规则基本上与批计算中相应的 join 类似差异将在后续小节中详细说明。本章首先概述 DolphinDB 流计算引擎之后依次介绍各个引擎的原理和效果。流计算连接引擎是 DolphinDB 中对数据流进行实时关联的计算模块可以理解为一个设置了关联规则的计算黑盒输入为2条数据流输出为1条数据流引擎内部会自动维护计算状态。以下代码是 1.1 小节中的 asof join SQL 的流计算实现的脚本首先创建 2 个流数据表作为输入、1 个流数据表作为输出然后通过函数 createAsofJoinEngine 创建流计算引擎之后通过函数 subscribeTable 分别订阅 2 个流数据表并将数据实时注入流计算引擎的左、右表。之后当数据不断写入两个流数据表时输出结果表 output 中的记录数会相应地增加。流数据订阅功能更详细的介绍见 流数据订阅 。// create table share streamTable(1:0, SymTimePrice, [SYMBOL, TIME, DOUBLE]) as trade share streamTable(1:0, SymTimeBidPrice, [SYMBOL, TIME, DOUBLE]) as snapshot share table(1:0, TimeSymPricet2_TimeBidPrice, [TIME, SYMBOL, DOUBLE, TIME, DOUBLE]) as output// create engine ajEngine createAsofJoinEngine(nameasofJoin, leftTabletrade, rightTablesnapshot, outputTableoutput, metrics[Price, snapshot.Time, BidPrice], matchingColumnSym, timeColumnTime, useSystemTimefalse, delayedTime1000)// subscribe topic subscribeTable(tableNametrade, actionNamejoinLeft, offset0, handlergetLeftStream(ajEngine), msgAsTabletrue) subscribeTable(tableNamesnapshot, actionNamejoinRight, offset0, handlergetRightStream(ajEngine), msgAsTabletrue)以下代码构造输入数据并写入 2 个流数据表查看结果表 output 将看到引擎计算的结果。// generate data t1 table(take(A, 4) as Sym, 10:10:03.000(10 2100 2890 6030) as Time, 12.5 12.5 12.6 12.6 as Price) t2 table(take(A, 4) as Sym, 10:10:00.000(0 3000 6000 9000) as Time, 12.6 12.5 12.7 12.6 as BidPrice) // input data snapshot.append!(t2) trade.append!(t1)流计算连接引擎通过内置实现和简单的参数接口来解决上一章提到的实时数据流关联的问题。对于内存管理每个引擎都提供了 garbageSize 参数来清理不再需要的历史数据。对于触发计算的机制不同的引擎会稍有不同可以大致分为以下几类若关联计算依赖数据的时间顺序则处理的方式有以数据注入引擎时的系统时间为时序标准以数据中的时间列为时序标准这种情况下因为无法预知后续将到达的数据的时间戳则时序判断以最新的时间戳为依据认为时间戳早于它的全部数据都已经到齐同时辅以超时强制触发的规则关联计算不依赖数据的时间顺序则处理的方式有在数据注入引擎时立即计算输出等待到匹配数据后才计算输出同时辅以超时强制触发的规则关联规则和触发规则最终决定了引擎的计算结果下面我们详细介绍每一个连接引擎的原理和关联效果。2.1 Asof Join 引擎createAsofJoinEngineAsof Join 引擎的连接机制类似于 SQL 中的 asof join按连接列分组在每个分组内按时间邻近度关联左右表。引擎默认左右表是有序的在连接列分组内对于左表的每一条记录当引擎判断邻近的时刻到来后在右表缓存中选取在该条左表记录的时刻之前且最接近的一条记录不论是否找到引擎都将输出一条结果。Asof join 引擎在创建时通过参数 useSystemTime 指定以下两种规则中的一种用于判断临近时刻是否到来规则一以数据注入引擎时的系统时间为时序标准则每一条左表记录注入引擎时立刻关联并输出规则二以数据中的时间列为时序标准当右表数据的最新时刻大于左表数据的时刻时触发关联并输出。在规则二的基础上还可以通过参数 delayedTime 设置超时强制触发规则。下图展示在一个分组中以非系统时间触发输出的 Asof Join 引擎效果未设置超时强制触发触发输出的时刻由右表到来新数据决定。后文 3.1 小节将介绍一个 Asof Join 引擎的实际应用场景计算个股交易成本。2.2 Window Join 引擎createWindowJoinEngineWindow Join 引擎的连接机制类似于 SQL 中的 window join上一小节的 Asof Join 引擎可以看做是Window Join 引擎的一个特例。按连接列分组在每个分组内按时间邻近关联右表一个时间窗口内的数据这个窗口由左表的每一条记录的时刻和创建引擎时指定的窗口参数 window决定。引擎默认左右表是有序的在连接列分组内对于左表中的每一条记录当引擎判断窗口结束的时刻到来后会在右表缓存中选取由左表的时刻确定的窗口范围内的记录可能会找到 0 至多条记录引擎将输出一条结果这条结果由多条右表记录聚合为一条后与左表拼接而成。Window Join 引擎在创建时通过参数 useSystemTime 指定以下两种规则中的一种用于判断临近时刻是否到来规则一以数据注入引擎时的系统时间为时序标准则系统时间达到窗口下边界时立刻关联并输出规则二以数据中的时间列为时序标准当右表数据的最新时刻大于窗口下边界时触发关联并输出。在规则二的基础上还可以通过参数 maxDelayedTime 设置超时强制触发规则。下图展示在一个分组中以非系统时间触发输出的普通窗口参数 window-1:2 连接的效果由每一条左表记录基于其时间戳往前 1 个时间刻度、往后2个时间刻度划定窗口的上下边界输出由大于窗口下边界的第一条右表记录触发窗口计算不包含这条触发记录。下图展示在一个分组中以非系统时间触发输出的特殊窗口参数 window0:0 连接的效果窗口范围由相邻两条左表记录划定输出由等于或大于左表时间戳的第一条右表记录触发窗口计算不包含这条触发记录。后文 3.2 小节将介绍一个基于特殊窗口的窗口关联引擎的实际应用场景对行情快照融合逐笔成交数据。2.3 Equal Join 引擎createEqualJoinEngineEqual Join 引擎的连接机制类似于 SQL 中的 equal join按连接列和时间列等值关联左右表对于左表或右表中的每一条记录当它成功匹配上右表或左表中连接列一致的一条记录时引擎将输出一条结果。与SQL 中的 equal join 不同的是因为引擎内部并不缓存所有历史数据所以可能出现左表或右表中的某条记录到来后无法关联到已经从引擎缓存中清除的历史右表或左表记录进而不会输出结果。这是由Equal Join 引擎的设计初衷和内部实现决定的该引擎是为以连接列和时间列为键值的输入数据设计的比如每支股票在每分钟有一条记录。下图展示字段结构为连接列时间列指标的输入数据注入等值关联引擎的效果。后文 3.3 小节将介绍一个等值关联引擎的实际应用场景拼接不同数据源的实时分钟指标。建议按推荐场景使用Equal Join 引擎即对连接列和时间列唯一的数据使用本引擎。若非推荐场景为了理解输出效果可以参考如下设计原理Equal Join 引擎内部分别为左右表数据维护两个以连接列和时间列作为键值的键值表作为缓存并对每条记录标识是否关联过。下面以左表为例介绍右表同理。当一条左表记录注入引擎则到查找右表缓存 若能成功匹配则输出一条结果并在右表缓存中标识对应记录为已关联这时左表缓存中不会保存这条立刻关联输出的左表记录此原理会导致上图中后续的灰色数据(A,t1,4)无法匹配而不输出若未能匹配成功则将该条左表记录加入左表缓存并标识为未关联。需要注意对于缓存中的已关联、未关联的数据Equal Join 引擎都会进行过期清理清理原理可参考用户手册 createEqualJoinEngine。若遵循推荐场景使用此引擎但是引擎输出结果与 SQL equal join 结果仍不完全一致则是设置的清理规则导致的差异。2.4 Lookup Join 引擎createLookupJoinEngineLookup Join 引擎的连接机制类似于 SQL 中的 left join按连接列等值关联左右表左表中的每一条记录注入引擎时便立刻关联当前时刻的右表不论是否在右表中匹配到连接列一致的记录引擎都会立刻输出一条结果若未能匹配上则结果中右表相关的字段为空。与 SQL 中的 left join 不同的是引擎在内部缓存右表的记录时对于相同连接列的数据总是只保留最新一条因此对于左表的每一条记录至多只会匹配一条右表记录并输出一次。引擎的右表可以是数据流或者数据集。对于数据流引擎通过数据流不断地注入引擎来更新内部的右表缓存对于数据集引擎通过对数据集的定时查询来更新内部的右表缓存。下图展示字段结构为连接列指标的输入数据注入右表保留最新一条记录的 Lookup Join 引擎的效果左表数据总是在达到后立刻输出。后文 3.4 小节将介绍一个 Lookup Join 引擎的实际应用场景对实时行情关联历史日频指标。2.5 Left Semi Join 引擎createLeftSemiJoinEngineLeft Semi Join 引擎的连接机制类似于 SQL 中的 equal join 按连接列等值关联左右表对于左表中的每一条记录当它成功匹配上右表中连接列一致的一条记录时引擎将输出一条结果。未成功匹配的左表的记录将一直由引擎缓存等待与右表中更新的记录匹配。与SQL 中的 equal join 不同的是引擎在内部缓存右表的记录时对于相同连接列的数据总是只保留第一条或者最新一条因此对于左表的每一条记录至多只会匹配一条右表记录并输出一次。下图展示字段结构为连接列指标的输入数据注入右表保留最新一条记录的Left Semi Join 引擎的效果左表数据总是等到匹配成功才输出。后文3.5、3.6小节将分别介绍两个Left Semi Join 引擎的实际应用场景一是对逐笔成交数据补充原始委托信息二是关联股票和指数行情并计算相关性。3. 实时关联应用案例DolphinDB 中流计算连接引擎是结合各类实际业务场景而设计的本章将从 6 个实际应用案例出发介绍各个连接引擎适用的具体场景。为了便于解释关联效果下文案例中均以少量的模拟数据依次注入右表、左表来模拟数据流输入。流计算脚本开发和调试过程中推荐使用 getStreamingStat 函数监控流订阅的状态getStreamEngineStat 函数监控流数据引擎的状态。此外文末附录中提供了清理流数据环境的通用脚本用于一键清理所有的流数据表、取消所有的订阅、释放所有的流引擎。3.1 用 Asof Join 引擎计算个股交易成本因为逐笔成交数据和报价数据的发生时间不可能完全一致而不能使用常用的等值连接往往需要以成交时间为基准找到交易发生前的最近一次报价数据因此需要以邻近匹配的方式关联两个数据流。这个场景的特征是每条成交记录匹配一条时刻早于自己的报价记录输出与原始的每一条成交记录一一对应。以下脚本用 Asof Join 引擎来实现此场景// create table share streamTable(1:0, SymTradeTimeTradePrice, [SYMBOL, TIME, DOUBLE]) as trades share streamTable(1:0, SymTimeBid1PriceAsk1Price, [SYMBOL, TIME, DOUBLE, DOUBLE]) as snapshot share streamTable(1:0, TradeTimeSymTradePriceTradeCostSnapshotTime, [TIME, SYMBOL, DOUBLE, DOUBLE, TIME]) as output// create engine ajEngine createAsofJoinEngine(nameasofJoin, leftTabletrades, rightTablesnapshot, outputTableoutput, metrics[TradePrice, abs(TradePrice-(Bid1PriceAsk1Price)/2), snapshot.Time], matchingColumnSym, timeColumnTradeTimeTime, useSystemTimefalse, delayedTime1000)// subscribe topic subscribeTable(tableNametrades, actionNameappendLeftStream, handlergetLeftStream(ajEngine), msgAsTabletrue, offset-1, hash0) subscribeTable(tableNamesnapshot, actionNameappendRightStream, handlergetRightStream(ajEngine), msgAsTabletrue, offset-1, hash1)逐笔成交数据 trades 注入引擎的左表报价数据 snapshot 注入引擎的右表。引擎参数 useSystemTimefalse 表示通过数据中的时间列左表为 TradeTime 字段右表为 Time 字段来判断左右表中记录的时序关系。引擎参数 delayedTime 是对默认触发机制的补充以超时强制触发的方式保证左表及时匹配并输出。若未设置 delayTime 是默认触发机制对于任意一条左表记录它必须等到右表出现一条时间戳大于它的记录才输出。但考虑到实际的应用场景中某条右表记录可能迟迟未能到达或者始终不可能出现一条大于某些左表数据的右表记录同时期望左表中每条记录都能匹配并输出那么建议设置 dalayTime 在这种情况下将以左表出现更新的数据或者系统时间超时来强制触发计算。引擎参数 metrics 中 snapshot.Time 表示取右表 snapshot 中的 Time 字段因为左表 trades 中也具有 Time 字段若不加前缀、直接写 Time则默认取左表的 Time 字段。上例中创建引擎时未显式指定 garbageSize 则使用默认值garbageSize 不论大小均不改变计算结果只影响引擎的内存占用。构造数据写入作为原始输入的 2 个流数据表先写入右表再写入左表// generate data: trade t1 table(AABABB as Sym, 10:00:02.000(1..6)*700 as TradeTime, (3.4 3.5 7.7 3.5 7.5 7.6) as TradePrice) // generate data: snapshot t2 table(ABAB as Sym, 10:00:00.000(3 3 6 6)*1000 as Time, (3.5 7.6 3.5 7.6) as Bid1Price, (3.5 7.6 3.6 7.6) as Ask1Price) // input data snapshot.append!(t2) trades.append!(t1)输入数据与关联关系如下关联得到的结果表 output 如下左表中全部 7 条数据都有对应的输出。本例中在创建引擎时指定了 delayTime 参数因此对于分组 B 即使右表 snapshot 中没有比 10:00:06.200 更大的时间戳 右表 trades 中最后一条数据(B,10:00:06.200, 7.6) 仍然能够在注入引擎 2s 后强制输出。3.2 用 Window Join 引擎将行情快照与逐笔成交数据融合行情快照和逐笔成交数据包含着不同的信息很多高频因子的计算同时依赖行情快照和成交数据本例在行情快照数据的基础上融合前后两个快照之间的逐笔成交数据融合后的数据可以更方便地作为后续复杂因子的计算的输入。这个场景的特征是每条行情快照记录匹配一个时间窗口内的全部逐笔成交记录的聚合值这个时间窗口的上下界由两条行情快照数据的时刻决定输出与原始的每一条行情快照记录一一对应。对于一个窗口中的逐笔成交记录既需要计算交易量总和这样的聚合值也希望以一个字段保留窗口内的全部逐笔成交明细。以下脚本用 Window Join 引擎的特殊窗口来实现此场景。注意1.30 版本的 DolphinDB 不支持 array vector 数据形式以下脚本包含 array vector 功能因此仅支持 2.00 版本。// create table share streamTable(1:0, SymTradeTimeSideTradeQty, [SYMBOL, TIME, INT, LONG]) as trades share streamTable(1:0, SymTimeOpenHighLowClose, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as snapshot share streamTable(1:0, TimeSymOpenHighLowCloseBuyQtySellQtyTradeQtyListTradeTimeList, [TIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG[], TIME[]]) as output// create engine wjMetrics [Open, High, Low, Close, sum(iif(Side1, TradeQty, 0)), sum(iif(Side2, TradeQty, 0)), TradeQty, TradeTime] fillArray [00:00:00.000, , 0, 0, 0, 0, 0, 0, [[]], [[]]] wjEngine createWindowJoinEngine(namewindowJoin, leftTablesnapshot, rightTabletrades, outputTableoutput, window0:0, metricswjMetrics, matchingColumnSym, timeColumnTimeTradeTime, useSystemTimefalse, nullFillfillArray)// subscribe topic subscribeTable(tableNamesnapshot, actionNameappendLeftStream, handlergetLeftStream(wjEngine), msgAsTabletrue, offset-1, hash0) subscribeTable(tableNametrades, actionNameappendRightStream, handlergetRightStream(wjEngine), msgAsTabletrue, offset-1, hash1)行情快照数据 snapshot 注入引擎的左表逐笔成交数据 trades 注入引擎的左表。引擎参数 useSystemTimefalse 表示通过数据中的时间列左表为 Time 字段右表为 TradeTime 字段来判断左右表中记录的时序关系。引擎参数 window0:0 表示右表 trades 的计算窗口将由左表 snapshot 当前和其上一条数据的时间戳划定。引擎参数 metrics 表示计算指标如 Open 表示取左表 snapshot 中 Open 字段sum(iif(Side1, TradeQty, 0)) 表示对右表 trades 在窗口内的数据做聚合计算。注意TradeQty 是右表 trades 中的字段且此处对 TradeQty 没有使用聚合函数则表示对右表 trades 在窗口内的全部 TradeQty 值保留明细对应的输出为一个数据类型为 array vector 的字段。引擎参数 nullFill 为可选参数表示如何填充输出表中的空值本例中结合实际场景对于表示价格的字段如 Open 等都指定将空值填充为0。注意nullFill 为元组必须和输出表列字段等长且类型一一对应。构造数据写入作为原始输入的 2 个流数据表先写入右表再写入左表// generate data: snapshot t1 table(ABABAB as Sym, 10:00:00.000(3 3 6 6 9 9)*1000 as Time, (NULL NULL 3.5 7.6 3.5 7.6) as Open, (3.5 7.6 3.6 7.6 3.6 7.6) as High, (3.5 7.6 3.5 7.6 3.4 7.5) as Low, (3.5 7.6 3.5 7.6 3.6 7.5) as Close) // generate data: trade t2 table(AABABBABAA as Sym, 10:00:02.000(1..10)*700 as TradeTime, (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty) // input data trades.append!(t2) snapshot.append!(t1)输入数据与关联关系如下关联得到的结果表 output 如下其中最后两列为 array vector 类型数据记录了窗口中全部成交记录的 TradeQty 字段明细、TradeTime 字段明细。注意输出表比左表 snapshot 少一条数据即左表 sanpshot 中分组 B 内时间戳为 10:00:09.000 的数据没有输出这是因为右表 trades 中分组 B 内没有等于或大于 10:00:09.000 的数据来关闭窗口。在实际生产中当接入实时数据时若需要左表 snapshot 一旦达到引擎便立即输出则建议选择 useSystemTimetrue即用系统时间作为时间戳这时对于任意一条左表记录右表窗口是从前一条左表记录到达到本条记录到达之间进入引擎的全部右表数据。3.3 用 Equal Join 引擎拼接不同数据源的实时分钟指标在量化金融的实盘中往往会对原始的行情快照、逐笔成交等进行降采样形成分钟指标以作为输入提供给进一步的交易策略这时则需要将多个不同数据源计算出的指标关联到同一张表中。本例将对快照和成交数据分别做实时的 1 分钟聚合并将快照指标和成交指标关联后输出到同一张宽表中。这个场景的特征是每支股票的行情快照分钟指标在每一分钟只有一条记录逐笔成交分钟指标同样有这样的唯一性并且在某一分钟的输出上期望总是在两类指标都计算完成后再将关联输出。以下脚本用 Equal Join 引擎来实现此场景。// create table share streamTable(1:0, SymTradeTimeSideTradeQty, [SYMBOL, TIME, INT, LONG]) as trades share streamTable(1:0, UpdateTimeSymBuyTradeQtySellTradeQty, [TIME, SYMBOL, LONG, LONG]) as tradesMin share streamTable(1:0, SymTimeBid1PriceBid1Qty, [SYMBOL, TIME, DOUBLE, LONG]) as snapshot share streamTable(1:0, UpdateTimeSymAvgBid1Amt, [TIME, SYMBOL, DOUBLE]) as snapshotMin share streamTable(1:0, UpdateTimeSymAvgBid1AmtBuyTradeQtySellTradeQty, [TIME, SYMBOL, DOUBLE, LONG, LONG]) as output// create engine: eqJoinEngine createEqualJoinEngine(nameEqualJoin, leftTabletradesMin, rightTablesnapshotMin, outputTableoutput, metrics[AvgBid1Amt, BuyTradeQty, SellTradeQty], matchingColumnSym, timeColumnUpdateTime) // create engine: tsEngine1 createTimeSeriesEngine(nametradesAggr, windowSize60000, step60000, metrics[sum(iif(Side1, 0, TradeQty)), sum(iif(Side2, 0, TradeQty))], dummyTabletrades, outputTablegetLeftStream(eqJoinEngine), timeColumnTradeTime, keyColumnSym, useSystemTimefalse, fill(0, 0)) // create engine: tsEngine2 createTimeSeriesEngine(namesnapshotAggr, windowSize60000, step60000, metrics[avg(iif(Bid1Price!NULL, Bid1Price*Bid1Qty, 0))], dummyTablesnapshot, outputTablegetRightStream(eqJoinEngine), timeColumnTime, keyColumnSym, useSystemTimefalse, fill(0.0))// subscribe topic subscribeTable(tableNametrades, actionNameminAggr, handlertsEngine1, msgAsTabletrue, offset-1, hash1) subscribeTable(tableNamesnapshot, actionNameminAggr, handlertsEngine2, msgAsTabletrue, offset-1, hash2) 首先用两个独立的时序聚合引擎createTimeSeriesEngine对原始的快照和成交数据流按数据中的时间戳做实时聚合、输出每一分钟的指标之后通过引擎级联的方式将两个时序聚合引擎的输出分别作为左右表注入连接引擎。引擎级联更详细的介绍见 流数据教程4.1 流水线处理 。Equal Join 引擎对左、右表的处理是完全相同的即上例中在 createEqualJoinEngine 时交换左右表不会影响关联结果。构造数据写入作为原始输入的 2 个流数据表先写入右表再写入左表// generate data: snapshot t1 table(ABABAB as Sym, 10:00:52.000(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.6 7.6 3.6 7.6) as Bid1Price, (1000 2000 500 1500 400 1800) as Bid1Qty) // generate data: trade t2 table(AABABBABBA as Sym, 10:00:54.000(1..10)*700 as TradeTime, (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty) // input trades.append!(t2) snapshot.append!(t1)关联得到的结果表 output 如下3.4 用 Lookup Join 引擎将实时行情与历史日频指标关联在当日的实时计算中可能会需要依赖历史指标本例在行情快照的基础上通过股票代码关联该股票在昨日的日频指标。这个场景的特征是每条快照记录到达后要求立刻关联输出如果日频数据里没有对应的股票输出结果对应的字段为空输出与原始输入中的每一条行情快照记录一一对应。同时日频指标并非实时数据而是一个以较低频率更新的有主键的离线数据集。以下脚本用 Lookup Join 引擎来实现此场景。// create table share streamTable(1:0, SymTimeOpenHighLowClose, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as snapshot historicalData table(AB as Sym, (0.8 0.2) as PreWeight, (3.1 7.6) as PreClose) share table(1:0, SymTimeOpenHighLowClosePreWeightPreClose, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output// create engine lookupJoinEngine createLookupJoinEngine(namelookupJoin, leftTablesnapshot, rightTablehistoricalData, outputTableoutput, metrics[Time, Open, High, Low, Close, PreWeight, PreClose], matchingColumnSym, checkTimes10s)// subscribe topic subscribeTable(tableNamesnapshot, actionNameappendLeftStream, handlergetLeftStream(lookupJoinEngine), msgAsTabletrue, offset-1)订阅流数据表 snapshot 注入引擎的左表。引擎右表为普通内存表 historicalData 它不能且不需要进行订阅。引擎会在内部维护一个最新的右表在创建引擎时会查询右表 historicalData 并缓存在引擎内部。参数 checkTimes10s 表示之后的每 10s 引擎会再次查询右表 historicalData 并更新一次内部的缓存。构造数据写入作为引擎左表输入的流数据表 snapshot// generate data: snapshot t1 table(ABABAB as Sym, 10:00:00.000(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.5 7.6 3.5 7.6) as Open, (3.5 7.6 3.6 7.6 3.6 7.6) as High, (3.5 7.6 3.5 7.6 3.4 7.5) as Low, (3.5 7.6 3.5 7.6 3.6 7.5) as Close) snapshot.append!(t1)输入数据与关联关系如下结果在左表数据到达引擎时立刻输出关联得到的结果表 output 如下3.5 用 Left Semi Join 引擎对逐笔成交数据补充原始委托信息逐笔成交数据中包含买卖双方的原始委托订单号本例通过股票代码和订单号去关联逐笔委托数据以达到在成交数据的基础上丰富其原始委托信息的目的。这个场景的特征是对于每条逐笔成交都应该找到对应的委托单输出与原始输入中的逐笔成交记录一一对应。在找到对应的委托单前该条逐笔成交记录暂时不输出。以下脚本用两个 Left Semi Join 引擎级联的方式对成交表 trades 中的卖方委托单、买方委托单依次进行了关联。多个引擎之间采用了引擎级联的方式处理引擎级联更详细的介绍见 流数据教程4.1 流水线处理 。// create table share streamTable(1:0, SymBuyNoSellNoTradePriceTradeQtyTradeTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME]) as trades share streamTable(1:0, SymOrderNoSideOrderQtyOrderPriceOrderTime, [SYMBOL, LONG, INT, LONG, DOUBLE, TIME]) as orders share streamTable(1:0, SymSellNoBuyNoTradePriceTradeQtyTradeTimeBuyOrderQtyBuyOrderPriceBuyOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME]) as outputTemp share streamTable(1:0, SymBuyNoSellNoTradePriceTradeQtyTradeTimeBuyOrderQtyBuyOrderPriceBuyOrderTimeSellOrderQtySellOrderPriceSellOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME, LONG, DOUBLE, TIME]) as output// create engine: left join buy order ljEngineBuycreateLeftSemiJoinEngine(nameleftJoinBuy, leftTableoutputTemp, rightTableorders, outputTableoutput, metrics[SellNo, TradePrice, TradeQty, TradeTime, BuyOrderQty, BuyOrderPrice, BuyOrderTime, OrderQty, OrderPrice, OrderTime], matchingColumn[SymBuyNo, SymOrderNo])// create engine: left join sell order ljEngineSellcreateLeftSemiJoinEngine(nameleftJoinSell, leftTabletrades, rightTableorders, outputTablegetLeftStream(ljEngineBuy), metrics[BuyNo, TradePrice, TradeQty, TradeTime, OrderQty, OrderPrice, OrderTime], matchingColumn[SymSellNo, SymOrderNo])// subscribe topic subscribeTable(tableNametrades, actionNameappendLeftStream, handlergetLeftStream(ljEngineSell), msgAsTabletrue, offset-1) subscribeTable(tableNameorders, actionNameappendRightStreamForSell, handlergetRightStream(ljEngineSell), msgAsTabletrue, offset-1) subscribeTable(tableNameorders, actionNameappendRightStreamForBuy, handlergetRightStream(ljEngineBuy), msgAsTabletrue, offset-1) 数据流向首先将 trades 和 orders 分为作为左、右表注入引擎 leftJoinSell此次以 trades 数据中的卖单号关联 oders 中的对应订单。之后将上述引擎的输出作为左表直接注入引擎 leftJoinBuy该引擎的右表仍然设置为 orders此次以 trades 数据中的买单号关联 oders 中的对应订单。内存管理上例中创建引擎时未显式指定 garbageSize 则使用默认值garbageSize 不论大小均不改变计算结果。注意和其他连接引擎不同该函数的 garbageSize 参数只用于清理左表的历史数据右表的历史数据不进行回收因此上述案例中两个引擎至少分别占用一个 orders 表大小的内存。构造数据写入作为原始输入的 2 个流数据表// generate data: trade t1 table(ABBA as Sym, [2, 5, 5, 6] as BuyNo, [4, 1, 3, 4] as SellNo, [7.6, 3.5, 3.5, 7.6]as TradePrice, [10, 100, 20, 50]as TradeQty, 10:00:00.000(400 500 500 600) as TradeTime) // generate data: order t2 table(BABABA as Sym, 1..6 as OrderNo, [2, 1, 2, 2, 1, 1] as Side, [100, 10, 20, 100, 350, 50] as OrderQty, [7.6, 3.5, 7.6, 3.5, 7.6, 3.5] as OrderPrice, 10:00:00.000(1..6)*100 as OrderTime) // input data orders.append!(t2) trades.append!(t1)输入数据与关联关系如下通过两个 Left Semi Join 引擎上图中 trades 数据流中的每一条记录将分别和 orders 数据流中的两条记录关联进而取得 orders 中的委托量、价、时间等字段关联得到的结果表 output 如下3.6 用 Left Semi Join 引擎关联股票与指数行情并计算相关性本例中我们实时计算股票和某个指数在过去一段时间内分钟收益率的相关性。输入使用已经降为分钟频率的股票数据和指数数据。这个场景的特征是两个数据流的时间戳频率一致全部股票都需要关联同一支指数输出与原始输入中的股票数据一一对应。 以下脚本用 Left Semi Join 引擎来实现此关联场景。// create table share streamTable(1:0, SymTimeClose, [SYMBOL, TIME, DOUBLE]) as stockKline share streamTable(1:0, SymTimeClose, [SYMBOL, TIME, DOUBLE]) as indexKline share streamTable(1:0, TimeSymCloseIndex1Close, [TIME, SYMBOL, DOUBLE, DOUBLE]) as stockKlineAddIndex1 share streamTable(1:0, SymTimeCloseIndex1CloseIndex1Corr, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE]) as output// create engine: calculate correlation rsEngine createReactiveStateEngine(namecalCorr, dummyTablestockKlineAddIndex1, outputTableoutput, metrics[Time, Close, Index1Close, mcorr(ratios(Close)-1, ratios(Index1Close)-1, 3)], keyColumnSym)// create engine: left join Index1 ljEngine1 createLeftSemiJoinEngine(nameleftJoinIndex1, leftTablestockKline, rightTableindexKline, outputTablegetStreamEngine(calCorr), metrics[Sym, Close, indexKline.Close], matchingColumnTime)// subscribe topic def appendIndex(engineName, indexName, msg){tmp select * from msg where Sym indexNamegetRightStream(getStreamEngine(engineName)).append!(tmp) } subscribeTable(tableNameindexKline, actionNameappendIndex1, handlerappendIndex{leftJoinIndex1, idx1}, msgAsTabletrue, offset-1, hash1) subscribeTable(tableNamestockKline, actionNameappendStock, handlergetLeftStream(ljEngine1), msgAsTabletrue, offset-1, hash0)数据流向首先股票数据 stockKline 注入连接引擎 leftJoinIndex1 的左表指数数据经过滤后注入该引擎的右表这一步将股票与指数的分钟指标关联。之后将上述连接引擎的输出直接注入响应式状态引擎createReactiveStateEngine利用响应式状态引擎内置的 mccor 和 ratio 函数计算股票与指数的相关性指标。多个引擎之间采用了引擎级联的方式处理引擎级联更详细的介绍见 流数据教程4.1 流水线处理 。响应式状态引擎教程见 金融高频因子的流批统一计算DolphinDB响应式状态引擎介绍 。订阅指数数据 indexKline 时指定 hanlder 为自定义函数 appendIndex 是指不断地收到 indexKline 数据后首先过滤出指数数据中指数名为 idx1 的数据然后再注入连接引擎的右表。构造数据写入作为原始输入的 2 个流数据表// generate data: stock Kline t1 table(ABABABABAB as Sym, 10:00:00.000(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (4.1 7.6 3.8 7.6 4.3 7.5 3.5 7.6 4.2 7.6) as Close) // generate data: index Kline t2 table(idx1idx2idx1idx2idx1idx2idx1idx2idx1idx2 as Sym, 10:00:00.000(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (2.1 5 2.2 5 1.9 5 1.7 5 1.7 5) as Close) // input data indexKline.append!(t2) stockKline.append!(t1)输入数据与关联关系如下关联得到的结果表 output 如下对于股票 A 、B 每分钟都会关联对应分钟的指数 idx1 。因为 mcorr 滑动窗口为3所以前两分钟结算结果为空。4. 流数据连接引擎的比较连接引擎连接列关联机制类似的 SQL join结果表行数应用场景AsofJoinEnginematchingColumn左表每到来一条记录匹配右表连接列一致且时间戳最近的一条记录。asof join小于或等于左表行数计算个股交易成本WindowJoinEnginematchingColumn左表每到来一条记录匹配右表中连接列一致且在由左表时间戳确定的窗口范围内的数据。window join小于或等于左表行数将行情快照和逐笔成交数据融合EqualJoinEnginematchingColumntimeColumn左右表每到来一条记录匹配右左表连接列一致的最新的一条记录。equal join等于左右表能完全等值匹配的行数在左右表中的连接列均唯一的前提下拼接不同数据源的实时分钟指标LookupJoinEnginematchingColumn左表每到来一条记录匹配右表连接列一致的最新的一条记录。left join等于左表行数将实时行情与历史日频指标关联LeftSemiJoinEnginematchingColumn对于左表的每一条记录匹配右表连接列一致的第一条或最后一条记录。equal join小于或等于左表行数对逐笔成交数据补充原始委托信息、关联股票和指数行情并计算相关性。5. 总结DolphinDB 提供了 5 个不同的流数据连接引擎引擎均内置实现了高效的关联计算、实时触发规则和内存管理机制开发人员通过简单的引擎参数配置便能够快速实现复杂的实时关联需求。本文重点介绍了各个连接引擎的原理、关联效果、实际应用案例并在文末简要总结各个引擎的特点旨在降低开发人员在实时数据流关联处理中的开发门槛。结合 DolphinDB 流数据框架中其他流计算引擎、流水线处理、并行计算等重要特性开发人员可以将自己的业务场景实时化通过提升速度掌握更及时的信息、挖掘更多的业务价值。附录流环境清理通用脚本clearStreamingEnv.dos
http://www.hkea.cn/news/14593217/

相关文章:

  • 成都高新区网站建设怎么给网站做404界面
  • 微信公众商城网站开发商务网站建设流程
  • 非营利组织网站建设会计分录网站程可以自己做吗
  • 网站备案背景那个视频网站好
  • 洛阳网站设计开发松山湖短视频seo排名
  • 做招聘网站需要哪些手续如何购买域名和服务器
  • 网站开发的基本知识青岛建设网站制作
  • Dw做网站怎么加logoseo如何提高网站排名
  • 国内做市场调查专业网站正版网站设计制作
  • 如何快速学成网站开发网页传奇怎么开
  • 手机版免费申请微网站网站建设书案例
  • 兼职做国外网站钻前潍坊网站建设wf3
  • 成都网站建设免费咨询游戏网站外链建设
  • html5 珠宝网站西宁网站设计公司价格
  • 企业网站的建立要做的准备网站域名怎么快速备案
  • wamp建设网站大致步骤海淀网站建设价格
  • 手机网站建站软件智能建站做网站好吗
  • 网站自动采集rss腾讯与中国联通
  • 肇庆市住房和城乡房屋建设局网站淘宝电商运营基础知识
  • 深圳网站建设ejaket工装设计案例网站
  • 企业建站的作用是什么深圳展厅设计装修
  • 网站企业管理培训课程php网站怎么缓存
  • 品网站建设公司排名做推广便宜的网站
  • 深圳的网站建设公司价格李笑来做的一个网站
  • 校体育网站建设的好处品牌网站策划书
  • 关于国际贸易的网站ps网页设计视频教程
  • 公司网站开发部署网站收录很高
  • 哪些网站织梦cmswordpress 文章去重
  • wordpress admin ajax最新黑帽seo教程
  • wap网站排名郴州新网0735