Parent

Class/Module Index [+]

Quicksearch

Fluent::RootAgent

Fluentd forms a tree structure to manage plugins:

                 RootAgent
                     |
        +------------+-------------+-------------+
        |            |             |             |
     <label>      <source>      <filter>      <match>
        |
   +----+----+
   |         |
<filter>   <match>

Relation:

Next step: `fluentd/agent.rb` Next step: 'fluentd/label.rb'

Constants

ERROR_LABEL

Attributes

inputs[R]
labels[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/fluent/root_agent.rb, line 46
def initialize(opts = {})
  super

  @labels = {}
  @inputs = []
  @started_inputs = []
  @suppress_emit_error_log_interval = 0
  @next_emit_error_log_time = nil

  suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval]
  @without_source = opts[:without_source] if opts[:without_source]
end

Public Instance Methods

add_label(name) click to toggle source
# File lib/fluent/root_agent.rb, line 155
def add_label(name)
  label = Label.new(name)
  label.root_agent = self
  @labels[name] = label
end
add_source(type, conf) click to toggle source
# File lib/fluent/root_agent.rb, line 141
def add_source(type, conf)
  log.info "adding source", type: type

  input = Plugin.new_input(type)
  # <source> emits events to the top-level event router (RootAgent#event_router).
  # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter.
  # See also 'fluentd/plugin/input.rb'
  input.router = @event_router
  input.configure(conf)
  @inputs << input

  input
end
configure(conf) click to toggle source
# File lib/fluent/root_agent.rb, line 62
def configure(conf)
  error_label_config = nil

  # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
  label_configs = {}
  conf.elements.select { |e| e.name == 'label' }.each { |e|
    name = e.arg
    raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

    if name == ERROR_LABEL
      error_label_config = e
    else
      add_label(name)
      label_configs[name] = e
    end
  }
  # Call 'configure' here to avoid 'label not found'
  label_configs.each { |name, e| @labels[name].configure(e) }
  setup_error_label(error_label_config) if error_label_config

  super

  # initialize <source> elements
  if @without_source
    log.info "'--without-source' is applied. Ignore <source> sections"
  else
    conf.elements.select { |e| e.name == 'source' }.each { |e|
      type = e['@type'] || e['type']
      raise ConfigError, "Missing 'type' parameter on <source> directive" unless type
      add_source(type, e)
    }
  end
end
emit_error_event(tag, time, record, error) click to toggle source
# File lib/fluent/root_agent.rb, line 169
def emit_error_event(tag, time, record, error)
  error_info = {:error_class => error.class, :error => error.to_s, :tag => tag, :time => time}
  if @error_collector
    # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification
    log.warn "send an error event to @ERROR:", error_info
    @error_collector.emit(tag, time, record)
  else
    error_info[:record] = record
    log.warn "dump an error event:", error_info
  end
end
find_label(label_name) click to toggle source
# File lib/fluent/root_agent.rb, line 161
def find_label(label_name)
  if label = @labels[label_name]
    label
  else
    raise ArgumentError, "#{label_name} label not found"
  end
end
handle_emits_error(tag, es, error) click to toggle source
# File lib/fluent/root_agent.rb, line 181
def handle_emits_error(tag, es, error)
  error_info = {:error_class => error.class, :error => error.to_s, :tag => tag}
  if @error_collector
    log.warn "send an error event stream to @ERROR:", error_info
    @error_collector.emit_stream(tag, es)
  else
    now = Engine.now
    if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time
      log.warn "emit transaction failed:", error_info
      log.warn_backtrace
      @next_emit_error_log_time = now + @suppress_emit_error_log_interval
    end
    raise error
  end
end
setup_error_label(e) click to toggle source
# File lib/fluent/root_agent.rb, line 96
def setup_error_label(e)
  error_label = add_label(ERROR_LABEL)
  error_label.configure(e)
  error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self)
  @error_collector = error_label.event_router
end
shutdown() click to toggle source
# File lib/fluent/root_agent.rb, line 116
def shutdown
  # Shutdown Input plugin first to prevent emitting to terminated Output plugin
  @started_inputs.map { |i|
    Thread.new do
      begin
        i.shutdown
      rescue => e
        log.warn "unexpected error while shutting down input plugin", :plugin => i.class, :plugin_id => i.plugin_id, :error_class => e.class, :error => e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }

  @labels.each { |n, l|
    l.shutdown
  }

  super
end
start() click to toggle source
# File lib/fluent/root_agent.rb, line 103
def start
  super

  @labels.each { |n, l|
    l.start
  }

  @inputs.each { |i|
    i.start
    @started_inputs << i
  }
end
suppress_interval(interval_time) click to toggle source
# File lib/fluent/root_agent.rb, line 136
def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.