Class: Ronin::Recon::WorkerPool Private
- Inherits:
-
Object
- Object
- Ronin::Recon::WorkerPool
- Defined in:
- lib/ronin/recon/worker_pool.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Contains the Async::Task
objects for a worker, that process messages
from the input queue and sends messages to the output queue.
Instance Attribute Summary collapse
-
#concurrency ⇒ Integer
readonly
private
The number of async worker tasks to spawn.
-
#id ⇒ String
readonly
private
The recon worker's ID.
-
#input_queue ⇒ Async::Queue
readonly
private
The input queue for the worker(s).
-
#logger ⇒ Console::Logger
readonly
private
The logger for debug messages.
-
#output_queue ⇒ Async::Queue
readonly
private
The output queue for the worker(s).
-
#worker ⇒ Worker
readonly
private
The worker object.
Instance Method Summary collapse
-
#enqueue_mesg(mesg) ⇒ Object
private
Routes a message to the worker.
-
#initialize(worker, concurrency: nil, output_queue:, params: nil, logger: Console.logger) ⇒ WorkerPool
constructor
private
Initializes the worker pool.
-
#run ⇒ Object
private
Runs the worker.
-
#start(task = Async::Task.current) ⇒ Object
private
Starts the worker pool.
-
#started! ⇒ Object
private
Marks the worker pool as running.
-
#stopped! ⇒ Object
private
Marks the worker pool as stopped.
Constructor Details
#initialize(worker, concurrency: nil, output_queue:, params: nil, logger: Console.logger) ⇒ WorkerPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initializes the worker pool.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/ronin/recon/worker_pool.rb', line 87 def initialize(worker, concurrency: nil, output_queue: , params: nil, logger: Console.logger) @worker = worker @concurrency = concurrency || worker.class.concurrency @input_queue = Async::Queue.new @output_queue = output_queue @logger = logger @tasks = nil end |
Instance Attribute Details
#concurrency ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The number of async worker tasks to spawn.
50 51 52 |
# File 'lib/ronin/recon/worker_pool.rb', line 50 def concurrency @concurrency end |
#id ⇒ String (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The recon worker's ID.
45 46 47 |
# File 'lib/ronin/recon/worker_pool.rb', line 45 def id @id end |
#input_queue ⇒ Async::Queue (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The input queue for the worker(s).
60 61 62 |
# File 'lib/ronin/recon/worker_pool.rb', line 60 def input_queue @input_queue end |
#logger ⇒ Console::Logger (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The logger for debug messages.
70 71 72 |
# File 'lib/ronin/recon/worker_pool.rb', line 70 def logger @logger end |
#output_queue ⇒ Async::Queue (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The output queue for the worker(s).
65 66 67 |
# File 'lib/ronin/recon/worker_pool.rb', line 65 def output_queue @output_queue end |
#worker ⇒ Worker (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The worker object.
55 56 57 |
# File 'lib/ronin/recon/worker_pool.rb', line 55 def worker @worker end |
Instance Method Details
#enqueue_mesg(mesg) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Routes a message to the worker.
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/ronin/recon/worker_pool.rb', line 108 def enqueue_mesg(mesg) case mesg when Message::SHUTDOWN # push the Stop message for each worker task @concurrency.times do @input_queue.enqueue(mesg) end else @input_queue.enqueue(mesg) end end |
#run ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Runs the worker.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/ronin/recon/worker_pool.rb', line 123 def run # HACK: for some reason `until (mesg = ...) == Message::SHUTDOWn)` # causes `Message::SHUTDOWN` objects to slip by. Changing it to a # `loop do` fixes this for some reason. loop do if (mesg = @input_queue.dequeue) == Message::SHUTDOWN break end value = mesg.value enqueue(Message::JobStarted.new(@worker,value)) begin @worker.process(value) do |result| @logger.debug("Output value yielded: #{@worker} #{value.inspect} -> #{result.inspect}") new_value = Message::Value.new(result, worker: @worker, parent: value, depth: mesg.depth + 1) enqueue(new_value) end enqueue(Message::JobCompleted.new(@worker,value)) rescue StandardError => error enqueue(Message::JobFailed.new(@worker,value,error)) end end stopped! end |
#start(task = Async::Task.current) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the worker pool.
162 163 164 165 166 167 168 169 170 171 |
# File 'lib/ronin/recon/worker_pool.rb', line 162 def start(task=Async::Task.current) # mark the worker as running started! @tasks = [] @concurrency.times do @tasks << task.async { run } end end |
#started! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Marks the worker pool as running.
176 177 178 179 |
# File 'lib/ronin/recon/worker_pool.rb', line 176 def started! # send a message to the engine that the worker pool has started enqueue(Message::WorkerStarted.new(@worker)) end |
#stopped! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Marks the worker pool as stopped.
184 185 186 187 |
# File 'lib/ronin/recon/worker_pool.rb', line 184 def stopped! # send a message to the engine that the worker pool has stopped enqueue(Message::WorkerStopped.new(@worker)) end |