BasicBuffer
Dots are separator for many cases:
we should have to escape dots in keys...
'symlink_path' is currently only for out_file. That is the reason why this is not config_param, but attr_accessor. See: github.com/fluent/fluentd/pull/181
# File lib/fluent/plugin/buf_file.rb, line 186 def before_shutdown(out) if @flush_at_shutdown synchronize do @map.each_key {|key| push(key) } while pop(out) end end end end
# File lib/fluent/plugin/buf_file.rb, line 167 def chunk_identifier_in_path(path) pos_after_prefix = @buffer_path_prefix.length pos_before_suffix = @buffer_path_suffix.length + 1 # from tail of path path.slice(pos_after_prefix..-pos_before_suffix) end
# File lib/fluent/plugin/buf_file.rb, line 93 def configure(conf) super if @@buffer_paths.has_key?(@buffer_path) raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}" else @@buffer_paths[@buffer_path] = conf['@type'] || conf['type'] end if pos = @buffer_path.index('*') @buffer_path_prefix = @buffer_path[0, pos] @buffer_path_suffix = @buffer_path[(pos + 1)..-1] else @buffer_path_prefix = @buffer_path + "." @buffer_path_suffix = ".log" end end
# File lib/fluent/plugin/buf_file.rb, line 174 def enqueue(chunk) path = chunk.path identifier_part = chunk_identifier_in_path(path) m = PATH_MATCH.match(identifier_part) encoded_key = m ? m[1] : "" tsuffix = m[3] npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}" chunk.mv(npath) end
# File lib/fluent/plugin/buf_file.rb, line 121 def new_chunk(key) encoded_key = encode_key(key) path, tsuffix = make_path(encoded_key, "b") unique_id = tsuffix_to_unique_id(tsuffix) FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path) end
# File lib/fluent/plugin/buf_file.rb, line 128 def resume maps = [] queues = [] Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path| identifier_part = chunk_identifier_in_path(path) if m = PATH_MATCH.match(identifier_part) key = decode_key(m[1]) bq = m[2] tsuffix = m[3] timestamp = m[3].to_i(16) unique_id = tsuffix_to_unique_id(tsuffix) if bq == 'b' chunk = FileBufferChunk.new(key, path, unique_id, "a+") maps << [timestamp, chunk] elsif bq == 'q' chunk = FileBufferChunk.new(key, path, unique_id, "r") queues << [timestamp, chunk] end end } map = {} maps.sort_by {|(timestamp,chunk)| timestamp }.each {|(timestamp,chunk)| map[chunk.key] = chunk } queue = queues.sort_by {|(timestamp,chunk)| timestamp }.map {|(timestamp,chunk)| chunk } return queue, map end
Generated with the Darkfish Rdoc Generator 2.