Class/Module Index [+]

Quicksearch

Fluent::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length = 3 (if @extend_internal_protocol)

= 2 (else)
FORWARD_HEADER_EXT
NodeConfig

Attributes

extend_internal_protocol[RW]
nodes[R]

Linux default tcp_syn_retries is 5 (in many environment) 3 + 6 + 12 + 24 + 48 + 96 -> 189 (sec)

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 30
def initialize
  super
  require "base64"
  require 'socket'
  require 'fileutils'
  require 'fluent/plugin/socket_util'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 72
def configure(conf)
  super

  # backward compatibility
  if host = conf['host']
    log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead."
    port = conf['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    e = conf.add_element('server')
    e['host'] = host
    e['port'] = port.to_s
  end

  recover_sample_size = @recover_wait / @heartbeat_interval

  # add options here if any options addes which uses extended protocol
  @extend_internal_protocol = if @require_ack_response
                                true
                              else
                                false
                              end
  conf.elements.each {|e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    weight = e['weight']
    weight = weight ? weight.to_i : 60

    standby = !!e['standby']

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)

    node_conf = NodeConfig.new(name, host, port, weight, standby, failure,
      @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector)
    @nodes << Node.new(log, node_conf)
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id
  }
end
run() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 150
def run
  @loop.run
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 142
def shutdown
  @finished = true
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @thread.join
  @usock.close if @usock
end
start() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 119
def start
  super

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0

  @loop = Coolio::Loop.new

  if @heartbeat_type == :udp
    # assuming all hosts use udp
    @usock = SocketUtil.create_udp_socket(@nodes.first.host)
    @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
    @loop.attach(@hb)
  end

  @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
  @loop.attach(@timer)

  @thread = Thread.new(&method(:run))
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 157
def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]

    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.