Class: Riak::MapReduce

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
riak-client/lib/riak/map_reduce.rb,
riak-client/lib/riak/search.rb,
riak-client/lib/riak/map_reduce/phase.rb,
riak-client/lib/riak/map_reduce/filter_builder.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: FilterBuilder, Phase

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Util::Escape

#escape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

- (MapReduce) initialize(client) {|self| ... }

Creates a new map-reduce job.

Parameters:

  • client (Client)

    the Riak::Client interface

Yields:

  • (self)

    helpful for initializing the job



48
49
50
51
# File 'riak-client/lib/riak/map_reduce.rb', line 48

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

- (Array<[bucket,key]>, ...) inputs

The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

Returns:

  • (Array<[bucket,key]>, String, Hash<:bucket,:filters>)

    The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

See Also:



37
38
39
# File 'riak-client/lib/riak/map_reduce.rb', line 37

def inputs
  @inputs
end

- (Array<Phase>) query

The map and reduce phases that will be executed

Returns:

  • (Array<Phase>)

    The map and reduce phases that will be executed

See Also:



43
44
45
# File 'riak-client/lib/riak/map_reduce.rb', line 43

def query
  @query
end

Instance Method Details

- (MapReduce) add(bucket) - (MapReduce) add(bucket, key) - (MapReduce) add(object) - (MapReduce) add(bucket, key, keydata) - (MapReduce) add(bucket, filters) Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • - (MapReduce) add(bucket)

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

    Parameters:

  • - (MapReduce) add(bucket, key)

    Add a bucket/key pair to the job.

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

  • - (MapReduce) add(object)

    Add an object to the job (by its bucket/key)

    Parameters:

    • object (RObject)

      the object to add to the inputs

  • - (MapReduce) add(bucket, key, keydata)

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

    • keydata (String)

      extra data to pass along with the object to the job

  • - (MapReduce) add(bucket, filters)

    Run the job across all keys in the bucket, with the given key-filters. This will replace any other inputs previously added. (Requires Riak 0.14)

    Parameters:

    • bucket (String, Bucket)

      the bucket to filter keys from

    • filters (Array<Array>)

      a list of key-filters to apply to the key list

Returns:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'riak-client/lib/riak/map_reduce.rb', line 76

def add(*params)
  params = params.dup
  params = params.first if Array === params.first
  case params.size
  when 1
    p = params.first
    case p
    when Bucket
      @inputs = escape(p.name)
    when RObject
      @inputs << [escape(p.bucket.name), escape(p.key)]
    when String
      @inputs = escape(p)
    end
  when 2..3
    bucket = params.shift
    bucket = bucket.name if Bucket === bucket
    if Array === params.first
      @inputs = {:bucket => escape(bucket), :key_filters => params.first }
    else
      key = params.shift
      @inputs << params.unshift(escape(key)).unshift(escape(bucket))
    end
  end
  self
end

- (MapReduce) filter(bucket) { ... }

Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.

Parameters:

  • bucket (String)

    the bucket to apply key-filters to

Yields:

  • builder block - instance_eval’ed into a FilterBuilder

Returns:

See Also:



111
112
113
# File 'riak-client/lib/riak/map_reduce.rb', line 111

def filter(bucket, &block)
  add(bucket, FilterBuilder.new(&block).to_a)
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).

Overloads:

Returns:

See Also:



157
158
159
160
161
162
163
# File 'riak-client/lib/riak/map_reduce.rb', line 157

def link(*params)
  options = params.extract_options!
  walk_spec_options = options.slice!(:type, :function, :language, :arg) unless params.first
  walk_spec = WalkSpec.normalize(params.shift || walk_spec_options).first
  @query << Phase.new({:type => :link, :function => walk_spec}.merge(options))
  self
end

- (MapReduce) map(function) - (MapReduce) map(function?, options)

Add a map phase to the job.

Overloads:

  • - (MapReduce) map(function)

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • - (MapReduce) map(function?, options)

    Parameters:

Returns:

See Also:



123
124
125
126
127
# File 'riak-client/lib/riak/map_reduce.rb', line 123

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

- (MapReduce) reduce(function) - (MapReduce) reduce(function?, options)

Add a reduce phase to the job.

Overloads:

  • - (MapReduce) reduce(function)

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • - (MapReduce) reduce(function?, options)

    Parameters:

Returns:

See Also:



137
138
139
140
141
# File 'riak-client/lib/riak/map_reduce.rb', line 137

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

- (Array<Array>) run - (nil) run {|phase, data| ... }

Executes this map-reduce job.

Overloads:

  • - (Array<Array>) run

    Return the entire collection of results.

    Returns:

    • (Array<Array>)

      similar to link-walking, each element is an array of results from a phase where “keep” is true. If there is only one “keep” phase, only the results from that phase will be returned.

  • - (nil) run {|phase, data| ... }

    Stream the results through the given block without accumulating.

    Yields:

    • (phase, data)

      A block to stream results through

    Yield Parameters:

    • phase (Fixnum)

      the phase from which the results were generated

    • data (Array)

      a list of results from the phase

    Returns:

    • (nil)

      nothing



195
196
197
198
199
200
201
202
203
204
# File 'riak-client/lib/riak/map_reduce.rb', line 195

def run(&block)
  raise MapReduceError.new(t("empty_map_reduce_query")) if @query.empty?
  @client.backend.mapred(self, &block)
rescue FailedRequest => fr
  if fr.server_error? && fr.is_json?
    raise MapReduceError.new(fr.body)
  else
    raise fr
  end
end

- (MapReduce) search(bucket, query)

Use a search query to start a map/reduce job.

Parameters:

  • bucket (String, Bucket)

    the bucket/index to search

  • query (String)

    the query to run

Returns:



156
157
158
159
160
# File 'riak-client/lib/riak/search.rb', line 156

def search(bucket, query)
  bucket = bucket.name if bucket.respond_to?(:name)
  @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]}
  self
end

- (Object) timeout(value) Also known as: timeout=

Sets the timeout for the map-reduce job.

Parameters:

  • value (Fixnum)

    the job timeout, in milliseconds



167
168
169
170
# File 'riak-client/lib/riak/map_reduce.rb', line 167

def timeout(value)
  @timeout = value
  return self
end

- (String) to_json(*a)

Convert the job to JSON for submission over the HTTP interface.

Returns:

  • (String)

    the JSON representation



175
176
177
178
179
# File 'riak-client/lib/riak/map_reduce.rb', line 175

def to_json(*a)
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  hash.to_json(*a)
end