act-act about projects rss

CouchDB Source Code Reading part4

couch_db_updater:refresh_validate_doc_funs/1

今回はcouch_db_updater:init/1から呼び出している関数の中でまだ見ていないrefresh_validate_doc_funs/1を読んで行きます。

couch_db_updater.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
refresh_validate_doc_funs(Db0) ->
    Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}},
    DesignDocs = couch_db:get_design_docs(Db),
    ProcessDocFuns = lists:flatmap(
        fun(DesignDocInfo) ->
            {ok, DesignDoc} = couch_db:open_doc_int(
                Db, DesignDocInfo, [ejson_body]),
            case couch_doc:get_validate_doc_fun(DesignDoc) of
            nil -> [];
            Fun -> [Fun]
            end
        end, DesignDocs),
    Db0#db{validate_doc_funs=ProcessDocFuns}.

1行目でadminロールの設定をしていますが意図は分からず。couch_db_updater:init/1から呼び出されているので、adminロールということで問題ないんですかね。2行目でDesignDocsを取得しています。このDesignDocsが良く分からないので、見ていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
get_design_docs(Db) ->
    FoldFun = skip_deleted(fun
        (#full_doc_info{deleted = true}, _Reds, Acc) ->
            {ok, Acc};
        (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
            {ok, [FullDocInfo | Acc]};k
        (_, _Reds, Acc) ->
            {stop, Acc}
    end),
    KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
    {ok, _, Docs} = couch_btree:fold(by_id_btree(Db), FoldFun, [], KeyOpts),
    Docs.

skip_deleted/1が呼び出されているので、そのまま追っていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
skip_deleted(FoldFun) ->
    fun
        (visit, KV, Reds, Acc) ->
            FoldFun(KV, Reds, Acc);
        (traverse, _LK, {Undeleted, _Del, _Size}, Acc) when Undeleted == 0 ->
            {skip, Acc};
        (traverse, _, _, Acc) ->
            {ok, Acc}
    end.

第一引数がvisitの時はFoldFunを実行する関数を返します。KV#full_doc_infoです。traverseは2つあり、一方はwhen Undeleted == 0とあるので、削除されてたら{skip, Acc}、そうでない場合は{ok, Acc}を返すことになります。削除されてたらskipなのでskip_deleted/1といったところでしょうか。Undeleted == 0は2重否定のようになっており、ちょっと分かりづらいかな。

get_design_docs/1の方に戻ると、#full_doc_info.id<<"_design/",_/binary>>の場合のみアキュムレータにFullDocInfoを入れる関数FoldFunを定義しています。この関数がvisitの時に呼び出されることになります。<<"_design/",_/binary>>の後ろの方の意味が分からなかったのですが、バイナリ型の値で先頭が”_design/”にマッチする、という意味でした。

couch_btree:fold/4

残りは、start_keyend_key_gtを定義してcouch_btree:fold/4を呼び出す部分。ここまでのところは、Btreeをトラバースする際のフィルタ条件の設定を行ってきたように見えます。

couch_btree.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fold(#btree{root=nil}, _Fun, Acc, _Options) ->
    {ok, {[], []}, Acc};
fold(#btree{root=Root}=Bt, Fun, Acc, Options) ->
    Dir = couch_util:get_value(dir, Options, fwd),
    InRange = make_key_in_end_range_function(Bt, Dir, Options),
    Result =
    case couch_util:get_value(start_key, Options) of
    undefined ->
        stream_node(Bt, [], Bt#btree.root, InRange, Dir,
                convert_fun_arity(Fun), Acc);
    StartKey ->
        stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir,
                convert_fun_arity(Fun), Acc)
    end,
    case Result of
    {ok, Acc2}->
        FullReduction = element(2, Root),
        {ok, {[], [FullReduction]}, Acc2};
    {stop, LastReduction, Acc2} ->
        {ok, LastReduction, Acc2}
    end.

couch_util:get_value(dir, Options, fwd)は第二引数の{key, value}のリストの中に第一引数で指定したkeyがあればそのvalueを、なければ第三引数を返します。今回のコンテキストではfwdDirに設定し、make_key_in_end_range_function/3を呼び出します。

couch_btree.erl

1
2
3
4
5
6
7
8
9
10
11
12
make_key_in_end_range_function(#btree{less=Less}, fwd, Options) ->
    case couch_util:get_value(end_key_gt, Options) of
    undefined ->
        case couch_util:get_value(end_key, Options) of
        undefined ->
            fun(_Key) -> true end;
        LastKey ->
            fun(Key) -> not Less(LastKey, Key) end
        end;
    EndKey ->
        fun(Key) -> Less(Key, EndKey) end
    end;

今回は呼び出しの流れから、第二引数にfwdが、第三引数のOptionsend_key_gtが指定されているのでそこだけ読むと、fun(Key) -> Less(Key, <<"_design0">>) endという関数が返ることになります。前回、couch_db_updater/init_db/6の中で設定されなかったlessの関数が、ここで使われています。

foldの方に戻ると、InRangefun(Key) -> Less(Key, <<"_design0">>) endで、StartKey<<"_design/">>になり、stream_node/8の呼び出しは以下のようになります。

stream_node(IdBtree, [], Bt#btree.root, <<"_design/">>, fun(Key) -> Less(Key, <<"_design0">>) end, fwd, Fun, []).

couch_btree:stream_node/8

stream_node/8の中を見ていきます。

couch_btree.erl

1
2
3
4
5
6
7
8
9
stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) ->
    Pointer = element(1, Node),
    {NodeType, NodeList} = get_node(Bt, Pointer),
    case NodeType of
    kp_node ->
        stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc);
    kv_node ->
        stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc)
    end.

1行目でNodeの1つ目の要素を取り出しています。このNodeは前記のように#btree.rootにあたるので、rootの第一要素を取得することになります。これまで#btree.rootがどういう構成になっているのか見ていなかったので探してみたところ、以下のような箇所がありました。

couch_btree.erl

1
2
3
4
5
6
7
size(#btree{root = nil}) ->
    0;
size(#btree{root = {_P, _Red}}) ->
    % pre 1.2 format
    nil;
size(#btree{root = {_P, _Red, Size}}) ->
    Size.

上記のコードだけでは分からないので想像ですが、#btree.rootの第一要素_Pは恐らくBtreeのファイルポインタなのではないかと思います。そう仮定して読み進めてみます。stream_node/8に戻ると、#btree.rootからポインタを取り出し、get_node/2を呼び出します。

couch_btree.erl

1
2
3
get_node(#btree{fd = Fd}, NodePos) ->
    {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
    {NodeType, NodeList}.

続けてcouch_file:pread_term/2を読んでみます。

couch_file.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
pread_term(Fd, Pos) ->
    {ok, Bin} = pread_binary(Fd, Pos),
    {ok, couch_compress:decompress(Bin)}.


%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args:    Pos, the offset into the file where the term is serialized.
%% Returns: {ok, Term}
%%  or {error, Reason}.
%%----------------------------------------------------------------------

pread_binary(Fd, Pos) ->
    {ok, L} = pread_iolist(Fd, Pos),
    {ok, iolist_to_binary(L)}.


pread_iolist(Fd, Pos) ->
    case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
    {ok, IoList, <<>>} ->
        {ok, IoList};
    {ok, IoList, Md5} ->
        case couch_util:md5(IoList) of
        Md5 ->
            {ok, IoList};
        _ ->
            exit({file_corruption, <<"file corruption">>})
        end;
    Error ->
        Error
    end.

pread_iolist/2のコメントに、Posは「termがシリアライズされているファイルのオフセット」とあるので、#btree.rootの第一要素はファイルポインタで良さそうです。pread_iolist/2の1行目のgen_server:call/3で以下が呼び出されます。

couch_file.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
handle_call({pread_iolist, Pos}, _From, File) ->
    {RawData, NextPos} = try
        % up to 8Kbs of read ahead
        read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
    catch
    _:_ ->
        read_raw_iolist_int(File, Pos, 4)
    end,
    <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
        iolist_to_binary(RawData),
    case Prefix of
    1 ->
        {Md5, IoList} = extract_md5(
            maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)),
        {reply, {ok, IoList, Md5}, File};
    0 ->
        IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File),
        {reply, {ok, IoList, <<>>}, File}
    end;

?SIZE_BLOCKはcouch_file.erlで以下のように定義されています。

couch_file.erl

1
deefine(SIZE_BLOCK, 4096).

コメントにもあるように、read_raw_iolist_int/3でおおよそ8KBほど読み出します。そこから1ビット取り出してPrefixに、31ビット取り出してLenに、残りをRestRawDataに束縛します。Prefixが1の場合はextract_md5/1を呼び出しています。

couch_file.erl

1
2
3
4
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
    {Md5List, IoList} = split_iolist(FullIoList, 16, []),
    {iolist_to_binary(Md5List), IoList}.

先頭16バイトがMD5の値になっているようです。maybe_read_more_iolist/4の第二引数が16 + Lenとなっていることから、前期のLenにはこのMD5の値の長さは含まれていないことになります。そしてPrefixが0の場合はMD5の値は格納されていないようです。MD5の値が設定されている場合は、pread_iolist/2でMD5の値のチェックを行っています。ファイルから読み込んだiolistをbinaryに変換し、pread_term/2まで戻ります。pread_term/2の中で、couch_compress:decompress/2にそのbinaryを渡します。

couch_compress.erl

1
2
3
4
5
decompress(<<?SNAPPY_PREFIX, Rest/binary>>) ->
    {ok, TermBin} = snappy:decompress(Rest),
    binary_to_term(TermBin);
decompress(<<?TERM_PREFIX, _/binary>> = Bin) ->
    binary_to_term(Bin).

binaryの先頭が?SNAPPY_PREFIXにマッチする場合はsnappy:decompress/1で戻してからbinary_to_term/1でtermに戻します。?TERM_PREFIXにマッチする場合はそのままbinary_to_term/1を呼び出してtermに戻します。

ここまででようやくget_node/2が終わり、{NodeTyp, NodeList}をファイルから読み出しました。get_node/2の呼び出しによって、IdBtree内のデータをファイルから一度に全部読み込んだ気がするのですが、大丈夫なのかな…。

stream_node/8に戻ります。BtreeはIdBtreeなので、NodeTypekv_nodeになるはずです。ですので、stream_kv_node/8を読んでいきます。その前にadjust_dir(Dir, NodeList)の部分を読んでみたいと思います。

couch_btree.erl

1
2
3
4
adjust_dir(fwd, List) ->
    List;
adjust_dir(rev, List) ->
    lists:reverse(List).

今回はfwdなのでNodeListはそのままstream_kv_node/8に引き渡されます。

couch_btree:stream_kv_node/8

stream_node/8のコードが大分上部の方にあるので、ここまでの流れでstream_kv_node/8がどういう値で呼び出されるか書いておきます。

stream_kv_node(IdBtree, [], NodeList, <<"_design/">>, fun(Key) -> Less(Key, <<"_design0">>) end, fwd, Fun, []).

この流れでstream_kv_node/8がどのような処理になるか見ていきます。

couch_btree.erl

1
2
3
4
5
6
7
8
9
10
11
stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) ->
    DropFun =
    case Dir of
    fwd ->
        fun({Key, _}) -> less(Bt, Key, StartKey) end;
    rev ->
        fun({Key, _}) -> less(Bt, StartKey, Key) end
    end,
    {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
    AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs],
    stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc).

less/3が呼び出されてます。#btree.lessに設定される関数をまだ読んでないのでこの時点では何とも。

couch_btree.erl

1
2
less(#btree{less=Less}, A, B) ->
    Less(A, B).

ですよね。DropFunIdBtreelessに設定された関数を呼び出す関数です。次のlists:splitwith/2DropFunを指定していることから、NodeList[{Key, ...},...]ですかね。

次にsplitwith/2で恐らくStartKeyよりも前のデータであるLTKVsの各要素に対してassemble/3を呼び出してリストを作成しています。これは見るまでもなく、joinに設定していた関数が呼ばれることになるので、AssembleLTKVs#full_doc_infoのリストになります。それを指定してstream_kv_node2/8を呼び出しています。

couch_btree.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) ->
    {ok, Acc};
stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) ->
    case InRange(K) of
    false ->
        {stop, {PrevKVs, Reds}, Acc};
    true ->
        AssembledKV = assemble(Bt, K, V),
        case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
        {ok, Acc2} ->
            stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2);
        {stop, Acc2} ->
            {stop, {PrevKVs, Reds}, Acc2}
        end
    end.

ようやくInRangeの関数が呼び出されるところまで来ました。Key<<"_design0">>よりも前であればtrueとなります。[{K,V} | RestKVs]StartKey(«“_design/”»)以上のKeyを持つドキュメントのリストが指定されているので、

<<"_design/">>  ≦  Key  < <<"_design0">>

であればアキュムレータに追加されます。これがDesignDocになります。

Conclusion

DesignDocとは何かをポイントにコードを読んでいきました。DesignDocはKeyが«“_design/”» ≦ Key < «“_design0”>のドキュメントであることが分かりましたが、どんな役割なのかはまだ分かりません。

また、今回のコードリーディングの過程で、データベースファイルからドキュメントを読み出す部分を見ていきました。ドキュメントはErlangのtermがbinaryに変換されてディスクに書き込まれることが分かりました。