couch_db_updater:init/1

前回はヘッダ部を読み込むコードを読んだので、今回はDBの読み込み準備?のところを読んでみます。couch_db_updater:init/1からinit_db/6を呼び出す手前のところから。

couch_db_updater.erl:

1init({MainPid, DbName, Filepath, Fd, Options}) ->    
2(..sinp...)
3    ReaderFd = open_reader_fd(Filepath, Options),
4    Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
5    Db2 = refresh_validate_doc_funs(Db),
6    {ok, Db2#db{main_pid = MainPid}}.

ヘッダ読み込み後、open_reader_fd/2を呼び出してReaderFdを取得しています。Filepathを引数にしていることから、Fdと同じデータファイルをオープンしているようです。

couch_db_updater:open_reader_fd/2

couch_db_updater.erl:

1open_reader_fd(Filepath, Options) ->
2    {ok, Fd} = case lists:member(sys_db, Options) of
3    true ->
4        couch_file:open(Filepath, [read_only, sys_db]);
5    false ->
6        couch_file:open(Filepath, [read_only])
7    end,
8    unlink(Fd),
9    Fd.

オープンする際、read_onlyに加えてsys_dbを指定することも可能になっています。最後にunlink(Fd)を呼び出してプロセスへのlinkを切っているのは何でだろう。

couch_file.erl:

1open(Filepath) ->
2    open(Filepath, []).
3
4open(Filepath, Options) ->
5    case gen_server:start_link(couch_file,
6            {Filepath, Options, self(), Ref = make_ref()}, []) of
7    {ok, Fd} ->
8        {ok, Fd};
9(...snip...)

ファイルのオープンは、まずgen_server:start_link/3を呼び出して別のプロセスを作り、そのプロセスの中で行っています。gen_server:start_link/3の戻り値がPidではなくFdとなっているので、ファイル単位でプロセスを作るという意図のようです。

couch_file.erl:

 1init({Filepath, Options, ReturnPid, Ref}) ->
 2    process_flag(trap_exit, true),
 3    OpenOptions = file_open_options(Options),
 4    case lists:member(create, Options) of
 5    true ->
 6(...snip...)
 7    false ->
 8        % open in read mode first, so we don't create the file if it doesn't exist.
 9        case file:open(Filepath, [read, raw]) of
10        {ok, Fd_Read} ->
11            {ok, Fd} = file:open(Filepath, OpenOptions),
12            ok = file:close(Fd_Read),
13            maybe_track_open_os_files(Options),
14            {ok, Eof} = file:position(Fd, eof),
15            {ok, #file{fd=Fd, eof=Eof}};
16        Error ->
17            init_status_error(ReturnPid, Ref, Error)
18        end
19    end.

gen_server:start_link/3によって呼び出されるcouch_file:init/1。今回はcreateが指定されていない方だけ読みます。一旦

[read, raw]

でオープンし、オープンできたら改めてOpenOptionsを指定してオープンした後、クローズしています。エラーハンドリングの為ですかね。OpenOptionsを指定してオープンした方はEOFに移動しています。

couch_db_updater:init_db

couch_db_updater.erl:

1init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
2(...snip...)
3    Compression = couch_compress:get_compression_method(),

この関数は長いので読みたいところだけ。couch_compress:get_compression_method()でデータ圧縮の為の関数を取得しています。CouchDBの設定ファイルである/usr/local/etc/couchdb/default.iniの中にfile_compression = snappyとあるので、通常はsnappyで圧縮する関数を取得することになると思います。今回はデータファイルの中身も見たいので、snappyではなくnoneを指定して動かしています。

couch_db_updater.erl:

 1    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
 2        [{split, fun(X) -> btree_by_id_split(X) end},
 3        {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
 4        {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
 5        {compression, Compression}]),
 6    {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
 7            [{split, fun(X) -> btree_by_seq_split(X) end},
 8            {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
 9            {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
10            {compression, Compression}]),
11    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
12        [{compression, Compression}]),

ツリーの読み込みの部分。Fdはデータファイルのハンドルです。splitjoinreducecompressionの各関数を指定しています。couch_btree:open/3は次回読んでみたいと思います。

couch_db_updater.erl:

1    {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),

関数の名前からすると参照カウンタのようです。もうちょっと見てみます。couch_ref_counterモジュールに移動します。

couch_ref_counter.erl:

start(ChildProcs) ->
=>  gen_server:start(couch_ref_counter, {self(), ChildProcs}, []).

参照カウンタを管理するプロセスを起動する、というところでしょうか。ChildProcs

[Fd, ReaderFd]

なので、append可能なデータファイルのハンドルとread_onlyのデータファイルのハンドルが対象になります。

couch_ref_counter.erl:

1init({Pid, ChildProcs}) ->
2    [link(ChildProc) || ChildProc <- ChildProcs],
3    Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
4    {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}.

ChildProcsに対して、link/1でリンクを生成しています。couch_db_updater:open_reader_fd/2の最後にオープンしたFdを指定してunlink/1を呼び出していたのは、couch_ref_counterでリンクするという意図なのかな。

その後、erlang:monitor/2couch_ref_counterのプロセスをモニターしてdictに登録しています。一緒に登録している1は何だろう。リクエストを処理する関数を見てみます。

couch_ref_counter.erl:

 1handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
 2    Referrers2 =
 3    case dict:find(Pid, Referrers) of
 4    error ->
 5        dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
 6    {ok, {MonRef, RefCnt}} ->
 7        dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
 8    end,
 9    {reply, ok, Srv#srv{referrers=Referrers2}};
10handle_call(count, _From, Srv) ->
11    {monitors, Monitors} =  process_info(self(), monitors),
12    {reply, length(Monitors), Srv};
13handle_call({drop, Pid}, _From, #srv{referrers=Referrers}=Srv) ->
14    Referrers2 =
15    case dict:find(Pid, Referrers) of
16    {ok, {MonRef, 1}} ->
17        erlang:demonitor(MonRef, [flush]),
18        dict:erase(Pid, Referrers);
19    {ok, {MonRef, Num}} ->
20        dict:store(Pid, {MonRef, Num-1}, Referrers);
21    error ->
22        Referrers
23    end,
24    Srv2 = Srv#srv{referrers=Referrers2},
25    case should_close() of
26    true ->
27        {stop,normal,ok,Srv2};
28    false ->
29        {reply, ok, Srv2}
30    end.

handle_call/3の内容から、各Pidの参照カウンタを管理しているようです。{add, Pid}では、Piddictに登録済みであればカウンタを増やし、そうでなければPidのモニターを開始しています。countprocess_info/2self()を指定しているので、自プロセスでモニターしているPidの数を返しているということになります。先ほどの1は参照カウンタですね。このcouch_ref_counterの具体的な用途は、後ほど確認します。(覚えていれば)

参照カウンタの管理の開始を確認したので、残りはcouch_db_updater:init_db/6の最後のところ。

couch_db_updater.erl:

 1    #db{
 2        update_pid=self(),
 3        fd = ReaderFd,
 4        updater_fd = Fd,
 5        fd_ref_counter = RefCntr,
 6        header=Header,
 7        fulldocinfo_by_id_btree = IdBtree,
 8        docinfo_by_seq_btree = SeqBtree,
 9        local_docs_btree = LocalDocsBtree,
10        committed_update_seq = Header#db_header.update_seq,
11        update_seq = Header#db_header.update_seq,
12        name = DbName,
13        filepath = Filepath,
14        security = Security,
15        security_ptr = SecurityPtr,
16        instance_start_time = StartTime,
17        revs_limit = Header#db_header.revs_limit,
18        fsync_options = FsyncOptions,
19        options = Options,
20        compression = Compression,
21        before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
22        after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
23        }.

この関数内で取得したものをレコードdbに格納して返しています。

Conclusion

主にcouch_db_updater:init_db/6のあたりを読んでみました。印象的だったのは、couch_fileでオープンしたファイル毎にプロセスを生成し、プロセスの識別子としてファイルハンドルを使っているところ。どういった部分をプロセスとして切り出すか、というのはまだ全然分かってないので、一例として勉強になりました。