couch_db_updater:init/1
前回はヘッダ部を読み込むコードを読んだので、今回はDBの読み込み準備?のところを読んでみます。couch_db_updater:init/1
からinit_db/6
を呼び出す手前のところから。
couch_db_updater.erl:
1init({MainPid, DbName, Filepath, Fd, Options}) ->
2(..sinp...)
3 ReaderFd = open_reader_fd(Filepath, Options),
4 Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
5 Db2 = refresh_validate_doc_funs(Db),
6 {ok, Db2#db{main_pid = MainPid}}.
ヘッダ読み込み後、open_reader_fd/2
を呼び出してReaderFd
を取得しています。Filepath
を引数にしていることから、Fd
と同じデータファイルをオープンしているようです。
couch_db_updater:open_reader_fd/2
couch_db_updater.erl:
1open_reader_fd(Filepath, Options) ->
2 {ok, Fd} = case lists:member(sys_db, Options) of
3 true ->
4 couch_file:open(Filepath, [read_only, sys_db]);
5 false ->
6 couch_file:open(Filepath, [read_only])
7 end,
8 unlink(Fd),
9 Fd.
オープンする際、read_only
に加えてsys_db
を指定することも可能になっています。最後にunlink(Fd)
を呼び出してプロセスへのlinkを切っているのは何でだろう。
couch_file.erl:
1open(Filepath) ->
2 open(Filepath, []).
3
4open(Filepath, Options) ->
5 case gen_server:start_link(couch_file,
6 {Filepath, Options, self(), Ref = make_ref()}, []) of
7 {ok, Fd} ->
8 {ok, Fd};
9(...snip...)
ファイルのオープンは、まずgen_server:start_link/3
を呼び出して別のプロセスを作り、そのプロセスの中で行っています。gen_server:start_link/3
の戻り値がPid
ではなくFd
となっているので、ファイル単位でプロセスを作るという意図のようです。
couch_file.erl:
1init({Filepath, Options, ReturnPid, Ref}) ->
2 process_flag(trap_exit, true),
3 OpenOptions = file_open_options(Options),
4 case lists:member(create, Options) of
5 true ->
6(...snip...)
7 false ->
8 % open in read mode first, so we don't create the file if it doesn't exist.
9 case file:open(Filepath, [read, raw]) of
10 {ok, Fd_Read} ->
11 {ok, Fd} = file:open(Filepath, OpenOptions),
12 ok = file:close(Fd_Read),
13 maybe_track_open_os_files(Options),
14 {ok, Eof} = file:position(Fd, eof),
15 {ok, #file{fd=Fd, eof=Eof}};
16 Error ->
17 init_status_error(ReturnPid, Ref, Error)
18 end
19 end.
gen_server:start_link/3
によって呼び出されるcouch_file:init/1
。今回はcreate
が指定されていない方だけ読みます。一旦
[read, raw]
でオープンし、オープンできたら改めてOpenOptions
を指定してオープンした後、クローズしています。エラーハンドリングの為ですかね。OpenOptions
を指定してオープンした方はEOFに移動しています。
couch_db_updater:init_db
couch_db_updater.erl:
1init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
2(...snip...)
3 Compression = couch_compress:get_compression_method(),
この関数は長いので読みたいところだけ。couch_compress:get_compression_method()
でデータ圧縮の為の関数を取得しています。CouchDBの設定ファイルである/usr/local/etc/couchdb/default.ini
の中にfile_compression = snappy
とあるので、通常はsnappyで圧縮する関数を取得することになると思います。今回はデータファイルの中身も見たいので、snappy
ではなくnone
を指定して動かしています。
couch_db_updater.erl:
1 {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
2 [{split, fun(X) -> btree_by_id_split(X) end},
3 {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
4 {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
5 {compression, Compression}]),
6 {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
7 [{split, fun(X) -> btree_by_seq_split(X) end},
8 {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
9 {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
10 {compression, Compression}]),
11 {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
12 [{compression, Compression}]),
ツリーの読み込みの部分。Fd
はデータファイルのハンドルです。split
、join
、reduce
、compression
の各関数を指定しています。couch_btree:open/3
は次回読んでみたいと思います。
couch_db_updater.erl:
1 {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
関数の名前からすると参照カウンタのようです。もうちょっと見てみます。couch_ref_counter
モジュールに移動します。
couch_ref_counter.erl:
start(ChildProcs) ->
=> gen_server:start(couch_ref_counter, {self(), ChildProcs}, []).
参照カウンタを管理するプロセスを起動する、というところでしょうか。ChildProcs
は
[Fd, ReaderFd]
なので、append可能なデータファイルのハンドルとread_onlyのデータファイルのハンドルが対象になります。
couch_ref_counter.erl:
1init({Pid, ChildProcs}) ->
2 [link(ChildProc) || ChildProc <- ChildProcs],
3 Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
4 {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}.
ChildProcs
に対して、link/1
でリンクを生成しています。couch_db_updater:open_reader_fd/2
の最後にオープンしたFdを指定してunlink/1
を呼び出していたのは、couch_ref_counter
でリンクするという意図なのかな。
その後、erlang:monitor/2
でcouch_ref_counter
のプロセスをモニターしてdict
に登録しています。一緒に登録している1
は何だろう。リクエストを処理する関数を見てみます。
couch_ref_counter.erl:
1handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
2 Referrers2 =
3 case dict:find(Pid, Referrers) of
4 error ->
5 dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
6 {ok, {MonRef, RefCnt}} ->
7 dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
8 end,
9 {reply, ok, Srv#srv{referrers=Referrers2}};
10handle_call(count, _From, Srv) ->
11 {monitors, Monitors} = process_info(self(), monitors),
12 {reply, length(Monitors), Srv};
13handle_call({drop, Pid}, _From, #srv{referrers=Referrers}=Srv) ->
14 Referrers2 =
15 case dict:find(Pid, Referrers) of
16 {ok, {MonRef, 1}} ->
17 erlang:demonitor(MonRef, [flush]),
18 dict:erase(Pid, Referrers);
19 {ok, {MonRef, Num}} ->
20 dict:store(Pid, {MonRef, Num-1}, Referrers);
21 error ->
22 Referrers
23 end,
24 Srv2 = Srv#srv{referrers=Referrers2},
25 case should_close() of
26 true ->
27 {stop,normal,ok,Srv2};
28 false ->
29 {reply, ok, Srv2}
30 end.
handle_call/3
の内容から、各Pid
の参照カウンタを管理しているようです。{add, Pid}
では、Pid
がdict
に登録済みであればカウンタを増やし、そうでなければPid
のモニターを開始しています。count
のprocess_info/2
でself()
を指定しているので、自プロセスでモニターしているPid
の数を返しているということになります。先ほどの1
は参照カウンタですね。このcouch_ref_counter
の具体的な用途は、後ほど確認します。(覚えていれば)
参照カウンタの管理の開始を確認したので、残りはcouch_db_updater:init_db/6
の最後のところ。
couch_db_updater.erl:
1 #db{
2 update_pid=self(),
3 fd = ReaderFd,
4 updater_fd = Fd,
5 fd_ref_counter = RefCntr,
6 header=Header,
7 fulldocinfo_by_id_btree = IdBtree,
8 docinfo_by_seq_btree = SeqBtree,
9 local_docs_btree = LocalDocsBtree,
10 committed_update_seq = Header#db_header.update_seq,
11 update_seq = Header#db_header.update_seq,
12 name = DbName,
13 filepath = Filepath,
14 security = Security,
15 security_ptr = SecurityPtr,
16 instance_start_time = StartTime,
17 revs_limit = Header#db_header.revs_limit,
18 fsync_options = FsyncOptions,
19 options = Options,
20 compression = Compression,
21 before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
22 after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
23 }.
この関数内で取得したものをレコードdb
に格納して返しています。
Conclusion
主にcouch_db_updater:init_db/6
のあたりを読んでみました。印象的だったのは、couch_file
でオープンしたファイル毎にプロセスを生成し、プロセスの識別子としてファイルハンドルを使っているところ。どういった部分をプロセスとして切り出すか、というのはまだ全然分かってないので、一例として勉強になりました。