Class: Riak::MapReduce
- Inherits:
-
Object
- Object
- Riak::MapReduce
- Includes:
- Util::Escape, Util::Translation
- Defined in:
- riak-client/lib/riak/map_reduce.rb,
riak-client/lib/riak/search.rb,
riak-client/lib/riak/map_reduce/phase.rb,
riak-client/lib/riak/map_reduce/filter_builder.rb
Overview
Class for invoking map-reduce jobs using the HTTP interface.
Defined Under Namespace
Classes: FilterBuilder, Phase
Instance Attribute Summary (collapse)
-
- (Array<[bucket,key]>, ...) inputs
The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
-
- (Array<Phase>) query
The map and reduce phases that will be executed.
Instance Method Summary (collapse)
-
- (MapReduce) add(*params)
(also: #<<, #include)
Add or replace inputs for the job.
-
- (MapReduce) filter(bucket) { ... }
Adds a bucket and key-filters built by the given block.
-
- (MapReduce) initialize(client) {|self| ... }
constructor
Creates a new map-reduce job.
-
- (MapReduce) link(*params)
Add a link phase to the job.
-
- (MapReduce) map(*params)
Add a map phase to the job.
-
- (MapReduce) reduce(*params)
Add a reduce phase to the job.
-
- (Object) run(&block)
Executes this map-reduce job.
-
- (MapReduce) search(bucket, query)
Use a search query to start a map/reduce job.
-
- (Object) timeout(value)
(also: #timeout=)
Sets the timeout for the map-reduce job.
-
- (String) to_json(*a)
Convert the job to JSON for submission over the HTTP interface.
Methods included from Util::Escape
Methods included from Util::Translation
Constructor Details
- (MapReduce) initialize(client) {|self| ... }
Creates a new map-reduce job.
48 49 50 51 |
# File 'riak-client/lib/riak/map_reduce.rb', line 48 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end |
Instance Attribute Details
- (Array<[bucket,key]>, ...) inputs
The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
37 38 39 |
# File 'riak-client/lib/riak/map_reduce.rb', line 37 def inputs @inputs end |
Instance Method Details
- (MapReduce) add(bucket) - (MapReduce) add(bucket, key) - (MapReduce) add(object) - (MapReduce) add(bucket, key, keydata) - (MapReduce) add(bucket, filters) Also known as: <<, include
Add or replace inputs for the job.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'riak-client/lib/riak/map_reduce.rb', line 76 def add(*params) params = params.dup params = params.first if Array === params.first case params.size when 1 p = params.first case p when Bucket @inputs = escape(p.name) when RObject @inputs << [escape(p.bucket.name), escape(p.key)] when String @inputs = escape(p) end when 2..3 bucket = params.shift bucket = bucket.name if Bucket === bucket if Array === params.first @inputs = {:bucket => escape(bucket), :key_filters => params.first } else key = params.shift @inputs << params.unshift(escape(key)).unshift(escape(bucket)) end end self end |
- (MapReduce) filter(bucket) { ... }
Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.
111 112 113 |
# File 'riak-client/lib/riak/map_reduce.rb', line 111 def filter(bucket, &block) add(bucket, FilterBuilder.new(&block).to_a) end |
- (MapReduce) link(walk_spec, options = {}) - (MapReduce) link(bucket, tag, keep, options = {}) - (MapReduce) link(options)
Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).
157 158 159 160 161 162 163 |
# File 'riak-client/lib/riak/map_reduce.rb', line 157 def link(*params) = params. = .slice!(:type, :function, :language, :arg) unless params.first walk_spec = WalkSpec.normalize(params.shift || ).first @query << Phase.new({:type => :link, :function => walk_spec}.merge()) self end |
- (MapReduce) map(function) - (MapReduce) map(function?, options)
Add a map phase to the job.
123 124 125 126 127 |
# File 'riak-client/lib/riak/map_reduce.rb', line 123 def map(*params) = params. @query << Phase.new({:type => :map, :function => params.shift}.merge()) self end |
- (MapReduce) reduce(function) - (MapReduce) reduce(function?, options)
Add a reduce phase to the job.
137 138 139 140 141 |
# File 'riak-client/lib/riak/map_reduce.rb', line 137 def reduce(*params) = params. @query << Phase.new({:type => :reduce, :function => params.shift}.merge()) self end |
- (Array<Array>) run - (nil) run {|phase, data| ... }
Executes this map-reduce job.
195 196 197 198 199 200 201 202 203 204 |
# File 'riak-client/lib/riak/map_reduce.rb', line 195 def run(&block) raise MapReduceError.new(t("empty_map_reduce_query")) if @query.empty? @client.backend.mapred(self, &block) rescue FailedRequest => fr if fr.server_error? && fr.is_json? raise MapReduceError.new(fr.body) else raise fr end end |
- (MapReduce) search(bucket, query)
Use a search query to start a map/reduce job.
156 157 158 159 160 |
# File 'riak-client/lib/riak/search.rb', line 156 def search(bucket, query) bucket = bucket.name if bucket.respond_to?(:name) @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]} self end |
- (Object) timeout(value) Also known as: timeout=
Sets the timeout for the map-reduce job.
167 168 169 170 |
# File 'riak-client/lib/riak/map_reduce.rb', line 167 def timeout(value) @timeout = value return self end |
- (String) to_json(*a)
Convert the job to JSON for submission over the HTTP interface.
175 176 177 178 179 |
# File 'riak-client/lib/riak/map_reduce.rb', line 175 def to_json(*a) hash = {"inputs" => inputs, "query" => query.map(&:as_json)} hash['timeout'] = @timeout.to_i if @timeout hash.to_json(*a) end |