MessagePack FixArray length = 3 (if @extend_internal_protocol)
= 2 (else)
# File lib/fluent/plugin/out_forward.rb, line 72 def configure(conf) super # backward compatibility if host = conf['host'] log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead." port = conf['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT e = conf.add_element('server') e['host'] = host e['port'] = port.to_s end recover_sample_size = @recover_wait / @heartbeat_interval # add options here if any options addes which uses extended protocol @extend_internal_protocol = if @require_ack_response true else false end conf.elements.each {|e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT weight = e['weight'] weight = weight ? weight.to_i : 60 standby = !!e['standby'] name = e['name'] unless name name = "#{host}:#{port}" end failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) node_conf = NodeConfig.new(name, host, port, weight, standby, failure, @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector) @nodes << Node.new(log, node_conf) log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id } end
# File lib/fluent/plugin/out_forward.rb, line 150 def run @loop.run rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end
# File lib/fluent/plugin/out_forward.rb, line 142 def shutdown @finished = true @loop.watchers.each {|w| w.detach } @loop.stop @thread.join @usock.close if @usock end
# File lib/fluent/plugin/out_forward.rb, line 119 def start super @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp @usock = SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end
# File lib/fluent/plugin/out_forward.rb, line 157 def write_objects(tag, chunk) return if chunk.empty? error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end
Generated with the Darkfish Rdoc Generator 2.