act-act about projects rss

CouchDB Source Code Reading part2

couch_db_updater:init/1

前回はヘッダ部を読み込むコードを読んだので、今回はDBの読み込み準備?のところを読んでみます。couch_db_updater:init/1からinit_db/6を呼び出す手前のところから。

couch_db_updater.erl

1
2
3
4
5
6
init({MainPid, DbName, Filepath, Fd, Options}) ->    
(..sinp...)
    ReaderFd = open_reader_fd(Filepath, Options),
    Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
    Db2 = refresh_validate_doc_funs(Db),
    {ok, Db2#db{main_pid = MainPid}}.

ヘッダ読み込み後、open_reader_fd/2を呼び出してReaderFdを取得しています。Filepathを引数にしていることから、Fdと同じデータファイルをオープンしているようです。

couch_db_updater:open_reader_fd/2

couch_db_updater.erl

1
2
3
4
5
6
7
8
9
open_reader_fd(Filepath, Options) ->
    {ok, Fd} = case lists:member(sys_db, Options) of
    true ->
        couch_file:open(Filepath, [read_only, sys_db]);
    false ->
        couch_file:open(Filepath, [read_only])
    end,
    unlink(Fd),
    Fd.

オープンする際、read_onlyに加えてsys_dbを指定することも可能になっています。最後にunlink(Fd)を呼び出してプロセスへのlinkを切っているのは何でだろう。

couch_file.erl

1
2
3
4
5
6
7
8
9
open(Filepath) ->
    open(Filepath, []).

open(Filepath, Options) ->
    case gen_server:start_link(couch_file,
            {Filepath, Options, self(), Ref = make_ref()}, []) of
    {ok, Fd} ->
        {ok, Fd};
(...snip...)

ファイルのオープンは、まずgen_server:start_link/3を呼び出して別のプロセスを作り、そのプロセスの中で行っています。gen_server:start_link/3の戻り値がPidではなくFdとなっているので、ファイル単位でプロセスを作るという意図のようです。

couch_file.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
init({Filepath, Options, ReturnPid, Ref}) ->
    process_flag(trap_exit, true),
    OpenOptions = file_open_options(Options),
    case lists:member(create, Options) of
    true ->
(...snip...)
    false ->
        % open in read mode first, so we don't create the file if it doesn't exist.
        case file:open(Filepath, [read, raw]) of
        {ok, Fd_Read} ->
            {ok, Fd} = file:open(Filepath, OpenOptions),
            ok = file:close(Fd_Read),
            maybe_track_open_os_files(Options),
            {ok, Eof} = file:position(Fd, eof),
            {ok, #file{fd=Fd, eof=Eof}};
        Error ->
            init_status_error(ReturnPid, Ref, Error)
        end
    end.

gen_server:start_link/3によって呼び出されるcouch_file:init/1。今回はcreateが指定されていない方だけ読みます。一旦[read, raw]でオープンし、オープンできたら改めてOpenOptionsを指定してオープンした後、[read, raw]でオープンした方はクローズしています。エラーハンドリングの為ですかね。OpenOptionsを指定してオープンした方はEOFに移動しています。

couch_db_updater:init_db

couch_db_updater.erl

1
2
3
init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
(...snip...)
    Compression = couch_compress:get_compression_method(),

この関数は長いので読みたいところだけ。couch_compress:get_compression_method()でデータ圧縮の為の関数を取得しています。CouchDBの設定ファイルである/usr/local/etc/couchdb/default.iniの中にfile_compression = snappyとあるので、通常はsnappyで圧縮する関数を取得することになると思います。今回はデータファイルの中身も見たいので、snappyではなくnoneを指定して動かしています。

couch_db_updater.erl

1
2
3
4
5
6
7
8
9
10
11
12
    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
        [{split, fun(X) -> btree_by_id_split(X) end},
        {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
        {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
        {compression, Compression}]),
    {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
            [{split, fun(X) -> btree_by_seq_split(X) end},
            {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
            {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
            {compression, Compression}]),
    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
        [{compression, Compression}]),

ツリーの読み込みの部分。Fdはデータファイルのハンドルです。splitjoinreducecompressionの各関数を指定しています。couch_btree:open/3は次回読んでみたいと思います。

couch_db_updater.erl

1
    {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),

関数の名前からすると参照カウンタのようです。もうちょっと見てみます。couch_ref_counterモジュールに移動します。

erlang couch_ref_counter.erl start(ChildProcs) -> => gen_server:start(couch_ref_counter, {self(), ChildProcs}, []). 参照カウンタを管理するプロセスを起動する、というところでしょうか。ChildProcs[Fd, ReaderFd]なので、append可能なデータファイルのハンドルとread_onlyのデータファイルのハンドルが対象になります。

couch_ref_counter.erl

1
2
3
4
init({Pid, ChildProcs}) ->
    [link(ChildProc) || ChildProc <- ChildProcs],
    Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
    {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}.

ChildProcsに対して、link/1でリンクを生成しています。couch_db_updater:open_reader_fd/2の最後にオープンしたFdを指定してunlink/1を呼び出していたのは、couch_ref_counterでリンクするという意図なのかな。

その後、erlang:monitor/2couch_ref_counterのプロセスをモニターしてdictに登録しています。一緒に登録している1は何だろう。リクエストを処理する関数を見てみます。

couch_ref_counter.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
    Referrers2 =
    case dict:find(Pid, Referrers) of
    error ->
        dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
    {ok, {MonRef, RefCnt}} ->
        dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
    end,
    {reply, ok, Srv#srv{referrers=Referrers2}};
handle_call(count, _From, Srv) ->
    {monitors, Monitors} =  process_info(self(), monitors),
    {reply, length(Monitors), Srv};
handle_call({drop, Pid}, _From, #srv{referrers=Referrers}=Srv) ->
    Referrers2 =
    case dict:find(Pid, Referrers) of
    {ok, {MonRef, 1}} ->
        erlang:demonitor(MonRef, [flush]),
        dict:erase(Pid, Referrers);
    {ok, {MonRef, Num}} ->
        dict:store(Pid, {MonRef, Num-1}, Referrers);
    error ->
        Referrers
    end,
    Srv2 = Srv#srv{referrers=Referrers2},
    case should_close() of
    true ->
        {stop,normal,ok,Srv2};
    false ->
        {reply, ok, Srv2}
    end.

handle_call/3の内容から、各Pidの参照カウンタを管理しているようです。{add, Pid}では、Piddictに登録済みであればカウンタを増やし、そうでなければPidのモニターを開始しています。countprocess_info/2self()を指定しているので、自プロセスでモニターしているPidの数を返しているということになります。先ほどの1は参照カウンタですね。このcouch_ref_counterの具体的な用途は、後ほど確認します。(覚えていれば)

参照カウンタの管理の開始を確認したので、残りはcouch_db_updater:init_db/6の最後のところ。

couch_db_updater.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    #db{
        update_pid=self(),
        fd = ReaderFd,
        updater_fd = Fd,
        fd_ref_counter = RefCntr,
        header=Header,
        fulldocinfo_by_id_btree = IdBtree,
        docinfo_by_seq_btree = SeqBtree,
        local_docs_btree = LocalDocsBtree,
        committed_update_seq = Header#db_header.update_seq,
        update_seq = Header#db_header.update_seq,
        name = DbName,
        filepath = Filepath,
        security = Security,
        security_ptr = SecurityPtr,
        instance_start_time = StartTime,
        revs_limit = Header#db_header.revs_limit,
        fsync_options = FsyncOptions,
        options = Options,
        compression = Compression,
        before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
        after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
        }.

この関数内で取得したものをレコードdbに格納して返しています。

Conclusion

主にcouch_db_updater:init_db/6のあたりを読んでみました。印象的だったのは、couch_fileでオープンしたファイル毎にプロセスを生成し、プロセスの識別子としてファイルハンドルを使っているところ。どういった部分をプロセスとして切り出すか、というのはまだ全然分かってないので、一例として勉強になりました。