Parent

Class/Module Index [+]

Quicksearch

Fluent::FileBuffer

Constants

PATH_MATCH

Dots are separator for many cases:

we should have to escape dots in keys...

Attributes

Public Class Methods

clear_buffer_paths() click to toggle source
# File lib/fluent/test/input_test.rb, line 19
def self.clear_buffer_paths
  @@buffer_paths = {}
end
new() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 78
def initialize
  require 'uri'
  super

  @uri_parser = URI::Parser.new
end

Public Instance Methods

before_shutdown(out) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 186
def before_shutdown(out)
  if @flush_at_shutdown
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end
end
chunk_identifier_in_path(path) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 167
def chunk_identifier_in_path(path)
  pos_after_prefix = @buffer_path_prefix.length
  pos_before_suffix = @buffer_path_suffix.length + 1 # from tail of path

  path.slice(pos_after_prefix..-pos_before_suffix)
end
configure(conf) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 93
def configure(conf)
  super

  if @@buffer_paths.has_key?(@buffer_path)
    raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}"
  else
    @@buffer_paths[@buffer_path] = conf['@type'] || conf['type']
  end

  if pos = @buffer_path.index('*')
    @buffer_path_prefix = @buffer_path[0, pos]
    @buffer_path_suffix = @buffer_path[(pos + 1)..-1]
  else
    @buffer_path_prefix = @buffer_path + "."
    @buffer_path_suffix = ".log"
  end

end
enqueue(chunk) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 174
def enqueue(chunk)
  path = chunk.path
  identifier_part = chunk_identifier_in_path(path)

  m = PATH_MATCH.match(identifier_part)
  encoded_key = m ? m[1] : ""
  tsuffix = m[3]
  npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"

  chunk.mv(npath)
end
new_chunk(key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 121
def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 128
def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    identifier_part = chunk_identifier_in_path(path)
    if m = PATH_MATCH.match(identifier_part)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = FileBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = FileBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end
start() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 112
def start
  FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), :mode => DEFAULT_DIR_PERMISSION
  super
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.