Class: Ronin::Recon::WorkerPool Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ronin/recon/worker_pool.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Contains the Async::Task objects for a worker, that process messages from the input queue and sends messages to the output queue.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, concurrency: nil, output_queue:, params: nil, logger: Console.logger) ⇒ WorkerPool

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes the worker pool.

Parameters:

  • worker (Worker)

    The initialized worker object.

  • concurrency (Integer, nil) (defaults to: nil)

    The number of async tasks to spawn.

  • output_queue (Async::Queue)

    The output queue to send discovered values to.

  • logger (Console::Logger) (defaults to: Console.logger)

    The console logger object.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/ronin/recon/worker_pool.rb', line 87

def initialize(worker, concurrency:  nil,
                       output_queue: ,
                       params: nil,
                       logger: Console.logger)
  @worker      = worker
  @concurrency = concurrency || worker.class.concurrency

  @input_queue  = Async::Queue.new
  @output_queue = output_queue

  @logger = logger

  @tasks  = nil
end

Instance Attribute Details

#concurrencyInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The number of async worker tasks to spawn.

Returns:

  • (Integer)


50
51
52
# File 'lib/ronin/recon/worker_pool.rb', line 50

def concurrency
  @concurrency
end

#idString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The recon worker's ID.

Returns:

  • (String)


45
46
47
# File 'lib/ronin/recon/worker_pool.rb', line 45

def id
  @id
end

#input_queueAsync::Queue (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The input queue for the worker(s).

Returns:

  • (Async::Queue)


60
61
62
# File 'lib/ronin/recon/worker_pool.rb', line 60

def input_queue
  @input_queue
end

#loggerConsole::Logger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The logger for debug messages.

Returns:

  • (Console::Logger)


70
71
72
# File 'lib/ronin/recon/worker_pool.rb', line 70

def logger
  @logger
end

#output_queueAsync::Queue (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The output queue for the worker(s).

Returns:

  • (Async::Queue)


65
66
67
# File 'lib/ronin/recon/worker_pool.rb', line 65

def output_queue
  @output_queue
end

#workerWorker (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The worker object.

Returns:



55
56
57
# File 'lib/ronin/recon/worker_pool.rb', line 55

def worker
  @worker
end

Instance Method Details

#enqueue_mesg(mesg) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Routes a message to the worker.

Parameters:



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/ronin/recon/worker_pool.rb', line 108

def enqueue_mesg(mesg)
  case mesg
  when Message::SHUTDOWN
    # push the Stop message for each worker task
    @concurrency.times do
      @input_queue.enqueue(mesg)
    end
  else
    @input_queue.enqueue(mesg)
  end
end

#runObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Runs the worker.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/ronin/recon/worker_pool.rb', line 123

def run
  # HACK: for some reason `until (mesg = ...) == Message::SHUTDOWn)`
  # causes `Message::SHUTDOWN` objects to slip by. Changing it to a
  # `loop do` fixes this for some reason.
  loop do
    if (mesg = @input_queue.dequeue) == Message::SHUTDOWN
      break
    end

    value = mesg.value

    enqueue(Message::JobStarted.new(@worker,value))

    begin
      @worker.process(value) do |result|
        @logger.debug("Output value yielded: #{@worker} #{value.inspect} -> #{result.inspect}")

        new_value = Message::Value.new(result, worker: @worker,
                                               parent: value,
                                               depth:  mesg.depth + 1)

        enqueue(new_value)
      end

      enqueue(Message::JobCompleted.new(@worker,value))
    rescue StandardError => error
      enqueue(Message::JobFailed.new(@worker,value,error))
    end
  end

  stopped!
end

#start(task = Async::Task.current) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Starts the worker pool.

Parameters:

  • task (Async::Task) (defaults to: Async::Task.current)

    The optional async task to register the worker under.



162
163
164
165
166
167
168
169
170
171
# File 'lib/ronin/recon/worker_pool.rb', line 162

def start(task=Async::Task.current)
  # mark the worker as running
  started!

  @tasks = []

  @concurrency.times do
    @tasks << task.async { run }
  end
end

#started!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Marks the worker pool as running.



176
177
178
179
# File 'lib/ronin/recon/worker_pool.rb', line 176

def started!
  # send a message to the engine that the worker pool has started
  enqueue(Message::WorkerStarted.new(@worker))
end

#stopped!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Marks the worker pool as stopped.



184
185
186
187
# File 'lib/ronin/recon/worker_pool.rb', line 184

def stopped!
  # send a message to the engine that the worker pool has stopped
  enqueue(Message::WorkerStopped.new(@worker))
end