`
langzhe
  • 浏览: 278543 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

riak MapReduce 中的 {index, bucket(), Index::binary(), key()} 使用方法

    博客分类:
  • riak
 
阅读更多

 

----------这是客户端测试代码-------------------------

-module(test).

-export([test/0,

         get_state/0]).

 

get_state() ->

    Bucket = <<"ejabberd/pubsub_state">> ,

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    Inputs = {index, Bucket, <<"idx_int">>, 1}, %% that is ok

    %Query = [],

    Query = 

        [{map, {modfun, trend_riak, get_map_bucket_values}, none, true}], %%注意这里的trend_riak:get_map_bucet_values是自己在riak-server中自定义的

        %[{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

        % {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Inputs, Query).

 

 

test() ->

    Bucket = <<"ejabberd/pubsub_node">> ,

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    %Inputs = {modfun, trend_riak, test, [<<"a">>, <<"b">>]},

    %Inputs =  {modfun, riak_search, mapred_search, [ <<"Bucket">>, <<"SearchQuery">>]}, %% that is ok

    %Inputs = {index, Bucket, <<"nodeidx_int">>,11}, %% that is ok

    Inputs = {index, Bucket, <<"host_bin">>,<<"pubsub.meda.com">>}, %% that is ok

    %Query = [],

    Query = 

        [{map, {modfun, trend_riak, test}, none, true}],

        %[{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

        % {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Inputs, Query).

 

test1() ->

    Bucket = <<"ejabberd/pubsub_item">> ,                                                                                                                                                                           

    %Merge = fun(Gcounts, none) ->    lists:keysort(6,Gcounts) end,  

    %Count = fun(G, undefined, none) ->  [binary_to_term(riak_object:get_value(G))]  end,     

    %Query =  [{map, {qfun, Count}, none,false},  {reduce, {qfun, Merge}, none, true}],

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    %riakc_pb_socket:mapred(Pid, Bucket, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true}]).

    %riakc_pb_socket:mapred(Pid, Bucket, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true}]).

    %Query = [{map,   {modfun, riak_kv_mapreduce, map_object_value},

    %                   {struct,[{<<"sub">>,[<<"0">>]}]},false},

    %                 {reduce,{modfun, riak_kv_mapreduce, reduce_string_to_integer},none,false},

    %                 {reduce,{modfun, riak_kv_mapreduce, reduce_sum},              none,true}],

    %Query = 

    %    [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true},

    %     {reduce, {modfun, riak_kv_mapreduce, reduce_sum}, none, true}],

    Query = 

        [{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

    %    [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true},

         {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Bucket, Query).


----------------------自定义服务器代码------------------------

1 -module(trend_riak).                                                                                                                                                                                            

  2 

  3 -export([get_map_bucket_values/3,

  4          get_reduce_bucket_values/2,

  5          test/3,

  6          test/2]).

  7 

  8 -record(pubsub_item,

  9 {

 10   itemid,

 11   creation          = {'unknown','unknown'},

 12   modification      = {'unknown','unknown'},

 13   payload           = [],

 14   published         = "",

 15   node_icon         = "",

 16   node_name         = "",

 17   node_creator      = "",

 18   node_creator_jid  = "",

 19   node_creator_icon = "",

 20   creator           = "",

 21   creator_jid       = "",

 22   creator_icon      = "",

 23   category          = ""

 24 }).

 25 

 26 get_map_bucket_values(Record, undefined, {Nodes, CategoryList})->

 27     PubsubItem = binary_to_term(riak_object:get_value(Record)),

 28     IsNode =lists:member(PubsubItem#pubsub_item.node_name, Nodes),

 29     IsCategory = lists:member(PubsubItem#pubsub_item.category, CategoryList),

 30     if  IsNode == true orelse

 31         IsCategory == true ->

 32             [PubsubItem];

 33         true ->

 34             []

 35     end;

 36 %% _s default is undefined _c  default is none 

 37 get_map_bucket_values(Record, undefined, _C)->

 38     [binary_to_term(riak_object:get_value(Record))].

 39 

 40 get_reduce_bucket_values(Records,{Start, Max})->

 41     %Records.

 42     %%lists:keysort(6,Records).

 43     %lists:sublist(Records, Start, Max).

 44     Skip = get_skip_num(Start, Max),

 45     if length(Records) > Skip  ->

 46         lists:reverse(lists:sublist(lists:keysort(#pubsub_item.published, Records), Skip, Max));

 47        true ->

 48             []

 49     end.

 50 test(A,B) ->

 51 ok.

 52 test(Record, B, C) -> %%B,C两个参数可以有客户端传入

 53     [binary_to_term(riak_object:get_value(Record))].

     %[A].

 55   %%  io:format(ssssssssss),

 56  %%   io:format("A=~p,B=~p,C=~p~n", [A,B,C]),

 57     %{ok, [<<"itrends/ejabberd/pubsub_node">>]}.

 58 %%    [{<<"my">>, <<"bu">>}].

 59     %{ok,[]}.

 60     %[].

 61 

 62 

 63 

 64 get_skip_num(1, _Max) ->

 65     1;

 66 get_skip_num(Start, Max) ->

 67     Start*Max.

这里需要注意的是需要在riak的

vi etc/vm.args中添加如下:

 40 ## add by langxw 

 41 -pa /home/jason/learn/riak              

主要目的是在riak中引入trend_riak.erl                                                                                                                                                     

~                                  

-------------------------------------


0
0
分享到:
评论

相关推荐

    riak-bucket-cleanup:清除键匹配给定正则表达式的 riak 条目

    riak-bucket-清理 清除键匹配给定正则表达式的 riak 条目 如何使用 npm install -g riak-bucket-cleanup run riak-bucket-cleanup --regex="^old_.*$" [bucketName] grab a cup of coffee all keys starting with...

    riak-session-manager.zip

    riak-session-manager 是使用 Riak 来存储 Tomcat session 信息的项目。 配置方法:   &lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;Manager className="com.jbrisbin.vpc.riak.session.RiakManager" ...

    riak-cli:Riak 命令行客户端

    带有 NodeJS 的 Riak 命令行工具。 有更好的解决方案,请参考 -&gt;使用带有curl的Riak http api查询... $ riak-cli -m post -b bucket-name -k riak-key -d data$ riak-cli -m post -b bucket-name -k riak-key -f ${YO

    liza-riak:liza的riak bucket实现(KV抽象)

    可配置的特定内容: 序列化(对内容类型使用多种方法完成) allow-siblings(默认为true) r值最后写赢主机/端口存储后端值区名称用法( require '[liza.store :as store])( require '[liza.store.riak :as riak])( ...

    riak:Riak是Basho Technologies的去中心化数据存储

    在Wiki中,您将找到设置和使用Riak的“快速入门”指导。 有关更多信息,请浏览以下文件: 自述文件:此文件 许可证:Riak的发布许可证 doc / admin.org:Riak管理指南 architecture.txt:有关Riak底层设计的详细...

    fakeriak:用于测试和没有 Riak 的机器的内存中 Ruby Riak 驱动程序

    特征支持以下 Riak 功能: 服务器信息基本数据对象查找2.0 之前的计数器Bucket/Bucket Type 道具列出键/桶使用 Javascript 映射/减少二级索引搜索索引/模式CRDT 以下 Riak 功能目前尚未实现: 使用 Solr 搜索查询...

    riak-erlang-http-client:使用HTTP接口的Riak Erlang客户端

    riak-erlang-http-client建置状态 riak-erlang-http-client是... 作为一个简单的示例,下面是如何使用此客户端在存储桶“ bar”中使用键“ foo”创建和检索值的方法。 首先,启动带有riak-erlang-http-client路径和所有

    riak-admin:Riak 的类似蒲团的 Web 界面

    riak-admin 到目前为止的功能 显示桶的内容 单击时显示文档内容 编辑文档内容(json.object) 删除单个文档或存储桶... http://localhost:8098/riak/riak-admin/index.html?bucket 其中bucket是居住在 Riak 的任何桶。

    riak_kv:Ripple KeyValue商店

    riak_kv概述Riak KV是使用 Erlang库分发的开源Erlang应用程序。 Riak KV提供了键/值数据存储,并具有MapReduce,轻量级数据关系和几种不同的客户端API。快速开始您必须具有或更高版本,以及GNU风格的构建系统才能...

    docker-riak:DevDB Riak - 用于开发的 docker 镜像

    开发数据库 Riak docker 容器中的 Riak v2.0.0 - 当您需要快速的 Riak 开发节点时。 快速开始 docker pull devdb/riak:... 如果您希望数据持久化,请使用以下命令启动 Riak: docker pull devdb/riak:latest do

    riak_ensemble_demo:设置riak_ensemble的小代码示例

    riak_ensemble_demo:read_object(Key)-&gt; {ok,Obj#obj} | {错误,错误} 返回{obj,{Epoch,Seq,Key,Value}}}的元组(实际上是riak_ensemble_demo中obj记录的一个实例)或一个错误。 请注意,未找到表示为#obj ...

    server_monitoring_riak:使用Riak作为后端的服务器监视

    使用Riak作为后端的服务器监视 该项目是我的学士学位工作的一部分: “ NoSQL数据库和应用程序的比较分析” 米兰比可卡大学 关联者:安德烈·毛里诺(Andrea Maurino) 联合主持人:Blerina Spahiu 讲解 先决条件 ...

    Riak 学习文档

    学习使用Riak

    riak-ruby-client, 用于 ruby的Riak客户端.zip

    riak-ruby-client, 用于 ruby的Riak客户端 ) 客户端( Riak客户机)riak-client 是一个富 ruby 客户端/工具箱,分布在,数据库中,包含典型操作的基本包装。在 http://basho.github.io/riak-ruby-client/ 可以使用详尽...

    riak-client:Perl 波纹客户端

    名称Riak::Client - Riak 的快速轻量级 Perl 客户端版本版本 1.95概要 use Riak::Client;# normal modemy $client = Riak::Client-&gt;new( host =&gt; '127.0.0.1', port =&gt; 8087, r =&gt; 2, w =&gt; 2, dw =&gt; 1, connection_...

    riak_core:Riak使用的分布式系统基础架构

    里亚克核心 Riak Core是分布式系统框架,是分发数据和扩展规模的基础。 更一般而言,可以将其视为构建分布式,可伸缩,容错应用程序的...问题,问题和错误围绕核心相关内容提出问题或开始对话的方法有很多是围绕Riak

    riak-php-client:RiakPHP客户端

    适用于PHP的Riak客户端 Riak PHP Client... 运行以下命令: $ composer require " basho/riak " : " 3.0.* " 或者,在require部分中将以下内容手动添加到您的composer.json中: "require" : { "basho/riak" : "3.0.*"}

    Laravel开发-laravel-riak

    Laravel开发-laravel-riak RIAK连接、缓存和会话的RIAK提供程序

    luwak:Riak的大对象存储接口(注意

    Riak密钥/值存储极其快速且可靠,但是由于对象大小受限制,因此无法在某些应用程序中使用它。 例如,很难或不可能存储高清晰度的静止图像或视频。 Luwak是Riak上的服务层,可为大量对象提供简单的,面向文件的抽象。...

    interaction-node-riak:如何在 node.js 中使用 riak(进行中)

    在 lib 文件夹中,您有不同的文件,其中包含如何在 Nodejs 中使用 riak 的交互示例。 每个文件都可以独立工作。 在 Rest Api 文件夹中,您有一个真实案例的示例。 不要忘记为 REST Api 安装包: npm install 在...

Global site tag (gtag.js) - Google Analytics