remainging code of couch_db:update_docs/4

前回まででcouch_db:doc_flush_atts/2の一連の処理を読み終えました。久々にcouch_db:update_docs/4に戻り、続きを見ていきます。

couch_db.erl:

 1update_docs(Db, Docs, Options, interactive_edit) ->
 2...
 3        DocBuckets3 = [[
 4                {doc_flush_atts(set_new_att_revpos(
 5                        check_dup_atts(Doc)), Db#db.updater_fd), Ref}
 6                || {Doc, Ref} <- B] || B <- DocBuckets2],
 7        {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
 8
 9        {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
10
11        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
12        {ok, lists:map(
13            fun({#doc{}, Ref}) ->
14                {ok, Result} = dict:find(Ref, ResultsDict),
15                Result
16            end, Docs2)}
17    end.

DockBucket3はアタッチメント情報をディスクに書き込み、そのポインタを持ったDocによって構成されています。DocBucket3の構成はcouch_db:goup_alike_docs/1のコメントに記載されています。具体的には以下の矢印右の構造になります。

couch_db.erl:

1% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]

これを第一引数に取っているcouch_db:new_revs/3を見ていきます。

couch_db.erl:

 1new_revs([], OutBuckets, IdRevsAcc) ->
 2    {lists:reverse(OutBuckets), IdRevsAcc};
 3new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
 4    {NewBucket, IdRevsAcc3} = lists:mapfoldl(
 5        fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
 6        NewRevId = new_revid(Doc),
 7        {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
 8            [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
 9    end, IdRevsAcc, Bucket),
10    new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).

lists:mapfoldl/3lists:map/2lists:foldl/3を組み合わせた関数で、1回のリスト走査でmapfoldlを実行します。

1Eshell V5.10.3  (abort with ^G)
21> lists:mapfoldl(fun(X, Sum) -> {X*2, X*2+Sum} end, 0, [1,2,3,4,5]).
3{[2,4,6,8,10],30}

Bucketリストを指定してlists:mapfoldl/3します。couch_db:new_revid/1で新しいRevIdを生成し、#doc.revsRevsIdに追加しています。Startに1を加えているのが良く分からない…。この値はTreの階層を表すと思っていたのですが、違うのかな。mapの方には新しく生成した#docを、foldlの方には{Ref, {ok, {Start+1, NewRevId}}}を加えます。最終的に、呼び出し元に対して、新しいリビジョンを追加したドキュメント群をDockBuckets4、新しく生成したRevIdの情報をIdRevsとして返します。

couch_db:write_and_commit/4

次にcouch_db:write_and_commit/4を見てみます。

couch_db.erl:

 1write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
 2        NonRepDocs, Options0) ->
 3    DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
 4    Options = set_commit_option(Options0),
 5    MergeConflicts = lists:member(merge_conflicts, Options),
 6    FullCommit = lists:member(full_commit, Options),
 7    MRef = erlang:monitor(process, UpdatePid),
 8    try
 9        UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
10        case collect_results(UpdatePid, MRef, []) of
11        {ok, Results} -> {ok, Results};
12        retry ->
13            % This can happen if the db file we wrote to was swapped out by
14            % compaction. Retry by reopening the db and writing to the current file
15            {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
16            DocBuckets2 = [
17                [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] ||
18                Bucket <- DocBuckets1
19            ],
20            % We only retry once
21            DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
22            close(Db2),
23            UpdatePid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
24            case collect_results(UpdatePid, MRef, []) of
25            {ok, Results} -> {ok, Results};
26            retry -> throw({update_error, compaction_retry})
27            end
28        end
29    after
30        erlang:demonitor(MRef, [flush])
31    end.

そのままcouch_db:prepare_doc_summaries/2を見てみます。

couch_db.erl:

 1prepare_doc_summaries(Db, BucketList) ->
 2    [lists:map(
 3        fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
 4            DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
 5                #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
 6                    att_len = AL, disk_len = DL, encoding = E} <- Atts],
 7            AttsFd = case Atts of
 8            [#att{data = {Fd, _}} | _] ->
 9                Fd;
10            [] ->
11                nil
12            end,
13            SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
14            {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
15        end,
16        Bucket) || Bucket <- BucketList].

#doc.attsをディスクに書き込む為に変換し、また#doc.attsからFdを取り出しています。ディスクに書き込む為に変換したDiskAtts#doc.bodyを指定し、couch_db_updater:make_doc_summary/2を呼び出しています。この関数を見てみます。

couch_db.erl:

 1make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
 2    Body = case couch_compress:is_compressed(Body0, Comp) of
 3    true ->
 4        Body0;
 5    false ->
 6        % pre 1.2 database file format
 7        couch_compress:compress(Body0, Comp)
 8    end,
 9    Atts = case couch_compress:is_compressed(Atts0, Comp) of
10    true ->
11        Atts0;
12    false ->
13        couch_compress:compress(Atts0, Comp)
14    end,
15    SummaryBin = ?term_to_bin({Body, Atts}),
16    couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).

引数で渡された{Body0, Atts0}をそれぞれcompressしてバイナリに変換しているようです。続けてcouch_file:assemble_file_chunk/2を見てみます。

couch_file.erl:

1assemble_file_chunk(Bin) ->
2    [<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].
3
4assemble_file_chunk(Bin, Md5) ->
5    [<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].

前回書いたように、データブロックに書き出す為にヘッダを付けてます。

couch_db:write_and_commit/4の冒頭に戻り、couch_db:prepare_doc_summaries/2の呼び出しによりDocBucketに格納された#docbody{summary, SummaryChunk, AttsFd}になっています。前記のデータブロックにかい出す為のヘッダを付けたバイナリはSummaryChunkにバインドされています。

MergeConflictsFullCommitを設定し、下記のcouch_db_updater:handle_info/2が呼び出されます。

couch_db_updater.erl:

 1handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
 2        FullCommit}, Db) ->
 3    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
 4    if NonRepDocs == [] ->
 5        {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
 6                [Client], MergeConflicts, FullCommit);
 7    true ->
 8        GroupedDocs3 = GroupedDocs2,
 9        FullCommit2 = FullCommit,
10        Clients = [Client]
11    end,
12    NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
13    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
14                FullCommit2) of
15    {ok, Db2, UpdatedDDocIds} ->
16        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
17        if Db2#db.update_seq /= Db#db.update_seq ->
18            couch_db_update_notifier:notify({updated, Db2#db.name});
19        true -> ok
20        end,
21        [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
22        lists:foreach(fun(DDocId) ->
23            couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
24        end, UpdatedDDocIds),
25        {noreply, Db2}
26    catch
27        throw: retry ->
28            [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
29            {noreply, Db}
30    end;

GroupDocsGroupDcos2NonRepDocsNonRepDocs2への変換後、couch_db_updater:update_docs_int/5を呼び出していま。

couch_db_updater:update_docs_int/5

couch_db_updater:update_docs_int/5を見てみます。

couch_db_updater.erl:

 1update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
 2    #db{
 3        fulldocinfo_by_id_btree = DocInfoByIdBTree,
 4        docinfo_by_seq_btree = DocInfoBySeqBTree,
 5        update_seq = LastSeq,
 6        revs_limit = RevsLimit
 7        } = Db,
 8    Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
 9    % lookup up the old documents, if they exist.
10    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
11    OldDocInfos = lists:zipwith(
12        fun(_Id, {ok, FullDocInfo}) ->
13            FullDocInfo;
14        (Id, not_found) ->
15            #full_doc_info{id=Id}
16        end,
17        Ids, OldDocLookups),
18    % Merge the new docs into the revision trees.
19    {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
20            MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
21
22    % All documents are now ready to write.
23
24    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
25
26    % Write out the document summaries (the bodies are stored in the nodes of
27    % the trees, the attachments are already written to disk)
28    {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
29
30    {IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds} =
31            new_index_entries(FlushedFullDocInfos, [], [], []),
32
33    % and the indexes
34    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
35    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
36
37    Db3 = Db2#db{
38        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
39        docinfo_by_seq_btree = DocInfoBySeqBTree2,
40        update_seq = NewSeq},
41
42    % Check if we just updated any design documents, and update the validation
43    % funs if we did.
44    Db4 = case UpdatedDDocIds of
45    [] ->
46        Db3;
47    _ ->
48        refresh_validate_doc_funs(Db3)
49    end,
50
51    {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.

大分長いので大まかに見ていきます。更新対象のDocListからIdを取り出し、そのIdを指定して#db.fulldocinfo_by_id_btreeからFullDocInfoを取り出します。該当するFullDocInfoが無かった場合はIdだけ設定した空のFullDocInfoを生成し、OldDocInfosとします。更新対象のDocListOldDocInfosを指定し、couch_db_updater:merge_rev_trees/7を呼び出します。

couch_db_updater:merge_rev_trees/7

couch_db_updater:merge_rev_trees/7を見て行きます。

couch_db_updater.erl:

 1merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 2    {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
 3merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
 4        [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 5    #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
 6            = OldDocInfo,
 7    {NewRevTree, _} = lists:foldl(
 8        fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
 9            if not MergeConflicts ->
10                case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
11                    Limit) of
12                {_NewTree, conflicts} when (not OldDeleted) ->
13                    send_result(Client, Ref, conflict),
14                    {AccTree, OldDeleted};
15                {NewTree, conflicts} when PrevRevs /= [] ->
16                    % Check to be sure if prev revision was specified, it's
17                    % a leaf node in the tree
18                    Leafs = couch_key_tree:get_all_leafs(AccTree),
19                    IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
20                            {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
21                        end, Leafs),
22                    if IsPrevLeaf ->
23                        {NewTree, OldDeleted};
24                    true ->
25                        send_result(Client, Ref, conflict),
26                        {AccTree, OldDeleted}
27                    end;
28                {NewTree, no_conflicts} when  AccTree == NewTree ->
29                    % the tree didn't change at all
30                    % meaning we are saving a rev that's already
31                    % been editted again.
32                    if (Pos == 1) and OldDeleted ->
33                        % this means we are recreating a brand new document
34                        % into a state that already existed before.
35                        % put the rev into a subsequent edit of the deletion
36                        #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
37                                couch_doc:to_doc_info(OldDocInfo),
38                        NewRevId = couch_db:new_revid(
39                                NewDoc#doc{revs={OldPos, [OldRev]}}),
40                        NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
41                        {NewTree2, _} = couch_key_tree:merge(AccTree,
42                                couch_doc:to_path(NewDoc2), Limit),
43                        % we changed the rev id, this tells the caller we did
44                        send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
45                        {NewTree2, OldDeleted};
46                    true ->
47                        send_result(Client, Ref, conflict),
48                        {AccTree, OldDeleted}
49                    end;
50                {NewTree, _} ->
51                    {NewTree, NewDoc#doc.deleted}
52                end;
53            true ->
54                {NewTree, _} = couch_key_tree:merge(AccTree,
55                            couch_doc:to_path(NewDoc), Limit),
56                {NewTree, OldDeleted}
57            end
58        end,
59        {OldTree, OldDeleted0}, NewDocs),
60    if NewRevTree == OldTree ->
61        % nothing changed
62        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
63            AccNewInfos, AccRemoveSeqs, AccSeq);
64    true ->
65        % we have updated the document, give it a new seq #
66        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
67        RemoveSeqs = case OldSeq of
68            0 -> AccRemoveSeqs;
69            _ -> [OldSeq | AccRemoveSeqs]
70        end,
71        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
72            [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
73    end.

更新対象のドキュメントと、それに対する#full_doc_infoをマージする処理で、実際のマージはcouch_doc:merge/3を呼び出すことによって実現しています。その関数を呼び出す前にcouch_doc:to_path/1を呼び出しているので、先にこの関数を見てみます。

couch_doc.erl:

 1-spec to_path(#doc{}) -> path().
 2to_path(#doc{revs={Start, RevIds}}=Doc) ->
 3    [Branch] = to_branch(Doc, lists:reverse(RevIds)),
 4    {Start - length(RevIds) + 1, Branch}.
 5
 6-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
 7to_branch(Doc, [RevId]) ->
 8    [{RevId, Doc, []}];
 9to_branch(Doc, [RevId | Rest]) ->
10    [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].

上記関数を呼び出すと、#docは以下のように変換されます。

1{3, {RevId3, [], {RevId2, [], {RevId1, Doc, []}}}}}.

また、#full_doc_info.rev_treeは以下のような構造になっています。

1[{Pos, {Key, Value, SubTree}}, {Pos, {Key, Value, SubTree}}, ...}]

上記より、couch_doc:to_path/1#full_doc_info.rev_treeとマージできるように#doc{Key, Value, SubTree}の形式に変換しているようです。

ここからcouch_doc:merge/3couch_doc:merge_one/4couch_doc:merge_at/3couch_doc:merge_simp2と続き、rev_treeのマージが行われますが、かなり長いので割愛します。更新の衝突がなければ、新しいリビジョンが増えるだけだと思います。

couch_db_updater:flush_trees/3

couch_db_updater:update_docs_int/5に戻ります。次に実行されるのはcouch_db_updater:update_local_docs/2ですが、この関数はrev_treeは関係しないので、後まわしにして先にcouch_db_updater:flush_trees/3を見てみます。

couch_db_updater.erl:

 1flush_trees(_Db, [], AccFlushedTrees) ->
 2    {ok, lists:reverse(AccFlushedTrees)};
 3flush_trees(#db{updater_fd = Fd} = Db,
 4        [InfoUnflushed | RestUnflushed], AccFlushed) ->
 5    #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
 6    {Flushed, LeafsSize} = couch_key_tree:mapfold(
 7        fun(_Rev, Value, Type, Acc) ->
 8            case Value of
 9            #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
10                % this node value is actually an unwritten document summary,
11                % write to disk.
12                % make sure the Fd in the written bins is the same Fd we are
13                % and convert bins, removing the FD.
14                % All bins should have been written to disk already.
15                case {AttsFd, Fd} of
16                {nil, _} ->
17                    ok;
18                {SameFd, SameFd} ->
19                    ok;
20                _ ->
21                    % Fd where the attachments were written to is not the same
22                    % as our Fd. This can happen when a database is being
23                    % switched out during a compaction.
24                    ?LOG_DEBUG("File where the attachments are written has"
25                            " changed. Possibly retrying.", []),
26                    throw(retry)
27                end,
28                {ok, NewSummaryPointer, SummarySize} =
29                    couch_file:append_raw_chunk(Fd, Summary),
30                TotalSize = lists:foldl(
31                    fun(#att{att_len = L}, A) -> A + L end,
32                    SummarySize, Value#doc.atts),
33                NewValue = {IsDeleted, NewSummaryPointer, UpdateSeq, TotalSize},
34                case Type of
35                leaf ->
36                    {NewValue, Acc + TotalSize};
37                branch ->
38                    {NewValue, Acc}
39                end;
40             {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
41                {Value, Acc + LeafSize};
42             _ ->
43                {Value, Acc}
44            end
45        end, 0, Unflushed),
46    InfoFlushed = InfoUnflushed#full_doc_info{
47        rev_tree = Flushed,
48        leafs_size = LeafsSize
49    },
50    flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).

#full_doc_info.rev_treecouch_key_tree/3でまわしていきます。このループの中で#doc.bodySummaryをファイルに書き出していき、そのファイルポインタを#docの代わりにValueとして設定します。つまりこの関数では#doc.bodyをファイルに書き出していることになります。

couch_db_updater:new_index_entries/4

couch_db_updater:update_docs_int/5に戻ります。次に実行されるのはcouch_db_updater:new_index_entries/4なので、この関数を見てみます。

couch_db_updater.erl:

 1new_index_entries([], AccById, AccBySeq, AccDDocIds) ->
 2    {AccById, AccBySeq, AccDDocIds};
 3new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq, AccDDocIds) ->
 4    #doc_info{revs=[#rev_info{deleted=Deleted}|_], id=Id} = DocInfo =
 5            couch_doc:to_doc_info(FullDocInfo),
 6    AccDDocIds2 = case Id of
 7    <<?DESIGN_DOC_PREFIX, _/binary>> ->
 8        [Id | AccDDocIds];
 9    _ ->
10        AccDDocIds
11    end,
12    new_index_entries(RestInfos,
13        [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
14        [DocInfo|AccBySeq],
15        AccDDocIds2).

これは#full_doc_infoAccByIdに、#doc_infoAccBySeqに、DesignDocのIdをAccDDocIdsに追加している関数のようです。#doc_info.rev_infoの先頭の要素のdeleted#full_doc_info.deletedに設定しているので、削除時はこの契機で#full_doc_info.deletedが設定される、ということになるのかな。

couch_btree:add_remove/3

couch_db_updater:update_docs_int/5に戻ります。couch_db_updater:new_index_entries/4の戻り値{IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds}を使い、DocInfoByIdBTreeDocInfoBySeqBTreeを更新します。

couch_btree.erl:

 1add_remove(Bt, InsertKeyValues, RemoveKeys) ->
 2    {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
 3    {ok, Bt2}.
 4
 5query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
 6    #btree{root=Root} = Bt,
 7    InsertActions = lists:map(
 8        fun(KeyValue) ->
 9            {Key, Value} = extract(Bt, KeyValue),
10            {insert, Key, Value}
11        end, InsertValues),
12    RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
13    FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
14    SortFun =
15        fun({OpA, A, _}, {OpB, B, _}) ->
16            case A == B of
17            % A and B are equal, sort by op.
18            true -> op_order(OpA) < op_order(OpB);
19            false ->
20                less(Bt, A, B)
21            end
22        end,
23    Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
24    {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
25    {ok, NewRoot} = complete_root(Bt, KeyPointers),
26    {ok, QueryResults, Bt#btree{root=NewRoot}}.

couch_btree:extract/2を呼び出しKeyValueに分けています。このあたりについては以前見たように、#full_doc_infoをディスクに書き込み可能な形式に変換しています。それぞれのActionを準備した後、couch_btree:modify_node/4を呼び出します。

couch_btree.erl:

 1modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
 2    case RootPointerInfo of
 3    nil ->
 4        NodeType = kv_node,
 5        NodeList = [];
 6    _Tuple ->
 7        Pointer = element(1, RootPointerInfo),
 8        {NodeType, NodeList} = get_node(Bt, Pointer)
 9    end,
10    NodeTuple = list_to_tuple(NodeList),
11
12    {ok, NewNodeList, QueryOutput2} =
13    case NodeType of
14    kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
15    kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
16    end,
17    case NewNodeList of
18    [] ->  % no nodes remain
19        {ok, [], QueryOutput2};
20    NodeList ->  % nothing changed
21        {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
22        {ok, [{LastKey, RootPointerInfo}], QueryOutput2};
23    _Else2 ->
24        {ok, ResultList} = write_node(Bt, NodeType, NewNodeList),
25        {ok, ResultList, QueryOutput2}
26    end.

更新対象のTreeからNodeTypeを取り出し、それによってmodify_kpnode/6modify_kvnode/6のどちらかを呼び出します。couch_db_updater:update_docs_int/5からの呼び出しを見ると結局どちらも呼び出されるのですが、modify_kvnode/6の方を見ていきます。

couch_btree:modify_kvnode/6

couch_btree.erl:

 1modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
 2    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput};
 3modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) ->
 4    case ActionType of
 5    insert ->
 6        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
 7    remove ->
 8        % just drop the action
 9        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
10    fetch ->
11        % the key/value must not exist in the tree
12        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
13    end;
14modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) ->
15    N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
16    {Key, Value} = element(N, NodeTuple),
17    ResultNode =  bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
18    case less(Bt, ActionKey, Key) of
19    true ->
20        case ActionType of
21        insert ->
22            % ActionKey is less than the Key, so insert
23            modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
24        remove ->
25            % ActionKey is less than the Key, just drop the action
26            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
27        fetch ->
28            % ActionKey is less than the Key, the key/value must not exist in the tree
29            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
30        end;
31    false ->
32        % ActionKey and Key are maybe equal.
33        case less(Bt, Key, ActionKey) of
34        false ->
35            case ActionType of
36            insert ->
37                modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
38            remove ->
39                modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput);
40            fetch ->
41                % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
42                % since an identical action key can follow it.
43                modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
44            end;
45        true ->
46            modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput)
47        end
48    end.

2番目のwhen LowerBound > tuple_size(NodeTuple)というガードが付いている方は、現在のFullDocInfoに当該Keyが含まれていない時に呼び出される関数のようです。メインは3番目の関数。最初に呼び出しているcouch_btree:find_first_gteq/5を見てみます。

couch_btree.erl:

 1find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
 2    End;
 3find_first_gteq(Bt, Tuple, Start, End, Key) ->
 4    Mid = Start + ((End - Start) div 2),
 5    {TupleKey, _} = element(Mid, Tuple),
 6    case less(Bt, TupleKey, Key) of
 7    true ->
 8        find_first_gteq(Bt, Tuple, Mid+1, End, Key);
 9    false ->
10        find_first_gteq(Bt, Tuple, Start, Mid, Key)
11    end.

Keyでバイナリサーチしています。そのままcouch_btree:bounded_tuple_to_revlist/4を見てみます。

couch_btree.erl:

1bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
2    Tail;
3bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
4    bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]).

couch_btree:bounded_tuple_to_revlist/4は、couch_btree:find_first_gteq/5を実行して取得したNに対して、LowerBoundからN-1までのノードをアキュムレータに追加しています。

その後、ActionKeyN番目のKeyと比較します。その結果によって次の操作をNN+1のどちらかから始めるかを決めています。そして、ActionTypeinsertならアキュムレータResultNodeに追加、deleteであれば追加せず、fetchであればQueryOutputの方に追加します。

最終的にcouch_btree:modify_kvnode/6の番目の関数が呼び出されます。ここで、couch_btree:bounded_tuple_to_list/4を呼び出してNodeTupleのうち走査しなかった残りの部分を取得し、ResultNodeをreverseしたリストと連結して返します。つまり、この関数の中でkvnodeのリストに対してActionsに入っている操作を適用し、リストを再構成しています。ActionTypedeleteの時にアキュムレータに追加していないのは、追加しないことによって再構成後のリストにエントリが含まれなくなり、消去されたことになります。また、insertの時のActionKeyNodeTupleKeyと完全に一致する場合、NodeTuple側のエントリはアキュムレータに追加されず、{ActionKey, ActionValue}が追加される為、既存のエントリは再構成後のエントリに含まれず、結果新しい値で上書きしたようになります。

couch_btree:modify_kpnode/6

次にcouch_btree:modify_kpnode/6を見てみます。

couch_btree.erl:

 1modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
 2    modify_node(Bt, nil, Actions, QueryOutput);
 3modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
 4    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
 5            tuple_size(NodeTuple), [])), QueryOutput};
 6modify_kpnode(Bt, NodeTuple, LowerBound,
 7        [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) ->
 8    Sz = tuple_size(NodeTuple),
 9    N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
10    case N =:= Sz of
11    true  ->
12        % perform remaining actions on last node
13        {_, PointerInfo} = element(Sz, NodeTuple),
14        {ok, ChildKPs, QueryOutput2} =
15            modify_node(Bt, PointerInfo, Actions, QueryOutput),
16        NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
17            Sz - 1, ChildKPs)),
18        {ok, NodeList, QueryOutput2};
19    false ->
20        {NodeKey, PointerInfo} = element(N, NodeTuple),
21        SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
22                not less(Bt, NodeKey, ActionKey)
23            end,
24        {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
25        {ok, ChildKPs, QueryOutput2} =
26                modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
27        ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple,
28                LowerBound, N - 1, ResultNode)),
29        modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2)
30    end.

3番目の関数を見ていくと、まずNodeTupleのサイズとcouch_btree:find_first_gteq/5の結果を比較しています。値が一致する場合は、残りのActionsを指定してcouch_btree:modify_node/4を呼び出し、その結果をアキュムレータに加えて返しています。値が一致しない場合は、couch_btree:find_first_gteq/5の結果で返ってきたインデックスを使ってNodeTupleからタプルを取り出し、そのKeyを境としてActionsを2つに分割します。Key以下のActionKeyを持つActionsを指定してcouch_btree:modify_node/4を呼び出し、その結果をアキュムレータに加えて、Keyより大きいActionKeyを持つActionsを指定してcouch_btree:modify_kpnode/6が呼び出されます。

正直きちんと理解できていないのですが、この関数でcouch_btree:modify_node/4を呼び出すと、恐らくそのその先でNodeTypekv_nodeに変わり、couch_btree:modify_kvnode/4が呼び出され、前記のように更新後のリストが取れるようになるのだと思います。

couch_btree:write_node/3

couch_btree:modify_node/4に戻り、couch_btree:write_node/3を見てみます。

couch_btree.erl:

 1write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
 2    % split up nodes into smaller sizes
 3    NodeListList = chunkify(NodeList),
 4    % now write out each chunk and return the KeyPointer pairs for those nodes
 5    ResultList = [
 6        begin
 7            {ok, Pointer, Size} = couch_file:append_term(
 8                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
 9            {LastKey, _} = lists:last(ANodeList),
10            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
11            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
12        end
13    ||
14        ANodeList <- NodeListList
15    ],
16    {ok, ResultList}.

couch_btree:chunkify/1NodeListを分割し、その後に分割した単位でデータベースファイルに追記(couch_file:append_term/3)しています。couch_btree:chunkify/1を見てみます。

couch_btree.erl:

 1chunkify(InList) ->
 2    case ?term_size(InList) of
 3    Size when Size > ?CHUNK_THRESHOLD ->
 4        NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
 5        ChunkThreshold = Size div NumberOfChunksLikely,
 6        chunkify(InList, ChunkThreshold, [], 0, []);
 7    _Else ->
 8        [InList]
 9    end.
10
11chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
12    lists:reverse(OutputChunks);
13chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
14    lists:reverse([lists:reverse(OutList) | OutputChunks]);
15chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
16    case ?term_size(InElement) of
17    Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
18        chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
19    Size ->
20        chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
21    end.

InListのサイズを?CHUNK_THRESHOLD(1279)と比較しています。いくつに分割するべきか(NumberOfChunksLikely)を求め、InListのサイズから分割後の各リストに含まれる要素数を求め、分割していきます。

データベースファイルに追加した後、分割されたリスト内のの最後のKeyを取得します。そしてreduce_tree_size/3を呼び出し、SubTreeSizeを求めます。couch_btree:reduce_tree_size/3を見てみます。

couch_btree.erl:

 1reduce_tree_size(kv_node, NodeSize, _KvList) ->
 2    NodeSize;
 3reduce_tree_size(kp_node, NodeSize, []) ->
 4    NodeSize;
 5reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
 6    % pre 1.2 format
 7    nil;
 8reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
 9    nil;
10reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
11    reduce_tree_size(kp_node, NodeSize + Sz, NodeList).

NodeTypeによって処理が分かれています。kv_nodeの場合はNodeSizeが返るだけです。kp_nodeの場合は、NodeListの各要素に含まれるTreeSizeNodeSizeに加えています。

続けてcouch_btree:reduce_node/3を見てみます。

couch_btree.erl:

1reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
2    [];
3reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
4    R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
5reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
6    R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).

これもNodeTypeによって処理が分かれています。Bt#btree.reduceにバインドされている関数に関しては、kp_nodekv_node共に以前見ていますのでここでは割愛します。

couch_btree:write_node/3を呼び出した結果、以下の値が返ります。

1[ok, [{LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}, ...]]   

couch_btree:complete_root/1

couch_btree:modify_node/4まで一通り見たので、couch_btree:add_remove/3に戻り、couch_btree:complete_root/2を見てみます。

couch_btree.erl:

1complete_root(_Bt, []) ->
2    {ok, nil};
3complete_root(_Bt, [{_Key, PointerInfo}])->
4    {ok, PointerInfo};
5complete_root(Bt, KPs) ->
6    {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
7    complete_root(Bt, ResultKeyPointers).

第二引数の要素数によって処理が違っています。この第二引数のリストの要素数は、couch_btree:write_node/3中で呼び出したcouch_btree:chunkify/1によって分割された数になります。つまり複数のリストに分割された場合は3番目の関数が、分割されなかった場合は2番目の関数が呼ばれます。

couch_btree:add_remove/3の最後で、この関数の戻り値{ok, PointerInfo}Bt#btree.rootにバインドします。ようやくcouch_db_updater:update_docs_int/5に戻り、更新されたそれぞれの#btreeを新しいBtreeとします。

couch_db_updater:commit_data/3

couch_db_updater:update_docs_int/5で最後に呼び出していいる、couch_db_updater:commit_data/3を見てみます。

couch_db_updater.erl:

 1commit_data(Db) ->
 2    commit_data(Db, false).
 3
 4db_to_header(Db, Header) ->
 5    Header#db_header{
 6        update_seq = Db#db.update_seq,
 7        docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
 8        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
 9        local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
10        security_ptr = Db#db.security_ptr,
11        revs_limit = Db#db.revs_limit}.
12
13commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
14    Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
15commit_data(Db, true) ->
16    Db;
17commit_data(Db, _) ->
18    #db{
19        updater_fd = Fd,
20        filepath = Filepath,
21        header = OldHeader,
22        fsync_options = FsyncOptions,
23        waiting_delayed_commit = Timer
24    } = Db,
25    if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
26    case db_to_header(Db, OldHeader) of
27    OldHeader ->
28        Db#db{waiting_delayed_commit=nil};
29    Header ->
30        case lists:member(before_header, FsyncOptions) of
31        true -> ok = couch_file:sync(Filepath);
32        _    -> ok
33        end,
34
35        ok = couch_file:write_header(Fd, Header),
36
37        case lists:member(after_header, FsyncOptions) of
38        true -> ok = couch_file:sync(Filepath);
39        _    -> ok
40        end,
41
42        Db#db{waiting_delayed_commit=nil,
43            header=Header,
44            committed_update_seq=Db#db.update_seq}
45    end.

Db#db.updater_fdのデータファイルに新しいヘッダを書き出し、完了となります。これによって、更新後のデータが見れるようになります。

Conclusion

データ更新処理の残りの部分を見てきました。CouchDBはデータ更新時に、データベースファイルにある既存のデータは書き換えず、追記していくだけになっている、という部分を読むことが、ようやくできました。

本当はこんな風に読んだ際のメモをダラダラと貼り付けることなく、ササっと読んで纏められれば良いのですが、Erlangや関数型言語そのものに慣れていないこともあって、今回はこういう形となりました。

まだ、viewやcompactin、replication等を見ていないので、kp_nodeの意味などはよく理解できていません。このあたりはまた気になった時に見てみたいと思います。