remainging code of couch_db:update_docs/4



 1update_docs(Db, Docs, Options, interactive_edit) ->
 3        DocBuckets3 = [[
 4                {doc_flush_atts(set_new_att_revpos(
 5                        check_dup_atts(Doc)), Db#db.updater_fd), Ref}
 6                || {Doc, Ref} <- B] || B <- DocBuckets2],
 7        {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
 9        {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
11        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
12        {ok, lists:map(
13            fun({#doc{}, Ref}) ->
14                {ok, Result} = dict:find(Ref, ResultsDict),
15                Result
16            end, Docs2)}
17    end.



1% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]



 1new_revs([], OutBuckets, IdRevsAcc) ->
 2    {lists:reverse(OutBuckets), IdRevsAcc};
 3new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
 4    {NewBucket, IdRevsAcc3} = lists:mapfoldl(
 5        fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
 6        NewRevId = new_revid(Doc),
 7        {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
 8            [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
 9    end, IdRevsAcc, Bucket),
10    new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).


1Eshell V5.10.3  (abort with ^G)
21> lists:mapfoldl(fun(X, Sum) -> {X*2, X*2+Sum} end, 0, [1,2,3,4,5]).

Bucketリストを指定してlists:mapfoldl/3します。couch_db:new_revid/1で新しいRevIdを生成し、#doc.revsRevsIdに追加しています。Startに1を加えているのが良く分からない…。この値はTreの階層を表すと思っていたのですが、違うのかな。mapの方には新しく生成した#docを、foldlの方には{Ref, {ok, {Start+1, NewRevId}}}を加えます。最終的に、呼び出し元に対して、新しいリビジョンを追加したドキュメント群をDockBuckets4、新しく生成したRevIdの情報をIdRevsとして返します。




 1write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
 2        NonRepDocs, Options0) ->
 3    DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
 4    Options = set_commit_option(Options0),
 5    MergeConflicts = lists:member(merge_conflicts, Options),
 6    FullCommit = lists:member(full_commit, Options),
 7    MRef = erlang:monitor(process, UpdatePid),
 8    try
 9        UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
10        case collect_results(UpdatePid, MRef, []) of
11        {ok, Results} -> {ok, Results};
12        retry ->
13            % This can happen if the db file we wrote to was swapped out by
14            % compaction. Retry by reopening the db and writing to the current file
15            {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
16            DocBuckets2 = [
17                [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] ||
18                Bucket <- DocBuckets1
19            ],
20            % We only retry once
21            DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
22            close(Db2),
23            UpdatePid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
24            case collect_results(UpdatePid, MRef, []) of
25            {ok, Results} -> {ok, Results};
26            retry -> throw({update_error, compaction_retry})
27            end
28        end
29    after
30        erlang:demonitor(MRef, [flush])
31    end.



 1prepare_doc_summaries(Db, BucketList) ->
 2    [lists:map(
 3        fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
 4            DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
 5                #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
 6                    att_len = AL, disk_len = DL, encoding = E} <- Atts],
 7            AttsFd = case Atts of
 8            [#att{data = {Fd, _}} | _] ->
 9                Fd;
10            [] ->
11                nil
12            end,
13            SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
14            {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
15        end,
16        Bucket) || Bucket <- BucketList].



 1make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
 2    Body = case couch_compress:is_compressed(Body0, Comp) of
 3    true ->
 4        Body0;
 5    false ->
 6        % pre 1.2 database file format
 7        couch_compress:compress(Body0, Comp)
 8    end,
 9    Atts = case couch_compress:is_compressed(Atts0, Comp) of
10    true ->
11        Atts0;
12    false ->
13        couch_compress:compress(Atts0, Comp)
14    end,
15    SummaryBin = ?term_to_bin({Body, Atts}),
16    couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).

引数で渡された{Body0, Atts0}をそれぞれcompressしてバイナリに変換しているようです。続けてcouch_file:assemble_file_chunk/2を見てみます。


1assemble_file_chunk(Bin) ->
2    [<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].
4assemble_file_chunk(Bin, Md5) ->
5    [<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].


couch_db:write_and_commit/4の冒頭に戻り、couch_db:prepare_doc_summaries/2の呼び出しによりDocBucketに格納された#docbody{summary, SummaryChunk, AttsFd}になっています。前記のデータブロックにかい出す為のヘッダを付けたバイナリはSummaryChunkにバインドされています。



 1handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
 2        FullCommit}, Db) ->
 3    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
 4    if NonRepDocs == [] ->
 5        {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
 6                [Client], MergeConflicts, FullCommit);
 7    true ->
 8        GroupedDocs3 = GroupedDocs2,
 9        FullCommit2 = FullCommit,
10        Clients = [Client]
11    end,
12    NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
13    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
14                FullCommit2) of
15    {ok, Db2, UpdatedDDocIds} ->
16        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
17        if Db2#db.update_seq /= Db#db.update_seq ->
18            couch_db_update_notifier:notify({updated,});
19        true -> ok
20        end,
21        [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
22        lists:foreach(fun(DDocId) ->
23            couch_db_update_notifier:notify({ddoc_updated, {, DDocId}})
24        end, UpdatedDDocIds),
25        {noreply, Db2}
26    catch
27        throw: retry ->
28            [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
29            {noreply, Db}
30    end;





 1update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
 2    #db{
 3        fulldocinfo_by_id_btree = DocInfoByIdBTree,
 4        docinfo_by_seq_btree = DocInfoBySeqBTree,
 5        update_seq = LastSeq,
 6        revs_limit = RevsLimit
 7        } = Db,
 8    Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
 9    % lookup up the old documents, if they exist.
10    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
11    OldDocInfos = lists:zipwith(
12        fun(_Id, {ok, FullDocInfo}) ->
13            FullDocInfo;
14        (Id, not_found) ->
15            #full_doc_info{id=Id}
16        end,
17        Ids, OldDocLookups),
18    % Merge the new docs into the revision trees.
19    {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
20            MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
22    % All documents are now ready to write.
24    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
26    % Write out the document summaries (the bodies are stored in the nodes of
27    % the trees, the attachments are already written to disk)
28    {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
30    {IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds} =
31            new_index_entries(FlushedFullDocInfos, [], [], []),
33    % and the indexes
34    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
35    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
37    Db3 = Db2#db{
38        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
39        docinfo_by_seq_btree = DocInfoBySeqBTree2,
40        update_seq = NewSeq},
42    % Check if we just updated any design documents, and update the validation
43    % funs if we did.
44    Db4 = case UpdatedDDocIds of
45    [] ->
46        Db3;
47    _ ->
48        refresh_validate_doc_funs(Db3)
49    end,
51    {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.





 1merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 2    {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
 3merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
 4        [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 5    #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
 6            = OldDocInfo,
 7    {NewRevTree, _} = lists:foldl(
 8        fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
 9            if not MergeConflicts ->
10                case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
11                    Limit) of
12                {_NewTree, conflicts} when (not OldDeleted) ->
13                    send_result(Client, Ref, conflict),
14                    {AccTree, OldDeleted};
15                {NewTree, conflicts} when PrevRevs /= [] ->
16                    % Check to be sure if prev revision was specified, it's
17                    % a leaf node in the tree
18                    Leafs = couch_key_tree:get_all_leafs(AccTree),
19                    IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
20                            {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
21                        end, Leafs),
22                    if IsPrevLeaf ->
23                        {NewTree, OldDeleted};
24                    true ->
25                        send_result(Client, Ref, conflict),
26                        {AccTree, OldDeleted}
27                    end;
28                {NewTree, no_conflicts} when  AccTree == NewTree ->
29                    % the tree didn't change at all
30                    % meaning we are saving a rev that's already
31                    % been editted again.
32                    if (Pos == 1) and OldDeleted ->
33                        % this means we are recreating a brand new document
34                        % into a state that already existed before.
35                        % put the rev into a subsequent edit of the deletion
36                        #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
37                                couch_doc:to_doc_info(OldDocInfo),
38                        NewRevId = couch_db:new_revid(
39                                NewDoc#doc{revs={OldPos, [OldRev]}}),
40                        NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
41                        {NewTree2, _} = couch_key_tree:merge(AccTree,
42                                couch_doc:to_path(NewDoc2), Limit),
43                        % we changed the rev id, this tells the caller we did
44                        send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
45                        {NewTree2, OldDeleted};
46                    true ->
47                        send_result(Client, Ref, conflict),
48                        {AccTree, OldDeleted}
49                    end;
50                {NewTree, _} ->
51                    {NewTree, NewDoc#doc.deleted}
52                end;
53            true ->
54                {NewTree, _} = couch_key_tree:merge(AccTree,
55                            couch_doc:to_path(NewDoc), Limit),
56                {NewTree, OldDeleted}
57            end
58        end,
59        {OldTree, OldDeleted0}, NewDocs),
60    if NewRevTree == OldTree ->
61        % nothing changed
62        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
63            AccNewInfos, AccRemoveSeqs, AccSeq);
64    true ->
65        % we have updated the document, give it a new seq #
66        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
67        RemoveSeqs = case OldSeq of
68            0 -> AccRemoveSeqs;
69            _ -> [OldSeq | AccRemoveSeqs]
70        end,
71        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
72            [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
73    end.



 1-spec to_path(#doc{}) -> path().
 2to_path(#doc{revs={Start, RevIds}}=Doc) ->
 3    [Branch] = to_branch(Doc, lists:reverse(RevIds)),
 4    {Start - length(RevIds) + 1, Branch}.
 6-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
 7to_branch(Doc, [RevId]) ->
 8    [{RevId, Doc, []}];
 9to_branch(Doc, [RevId | Rest]) ->
10    [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].


1{3, {RevId3, [], {RevId2, [], {RevId1, Doc, []}}}}}.


1[{Pos, {Key, Value, SubTree}}, {Pos, {Key, Value, SubTree}}, ...}]

上記より、couch_doc:to_path/1#full_doc_info.rev_treeとマージできるように#doc{Key, Value, SubTree}の形式に変換しているようです。





 1flush_trees(_Db, [], AccFlushedTrees) ->
 2    {ok, lists:reverse(AccFlushedTrees)};
 3flush_trees(#db{updater_fd = Fd} = Db,
 4        [InfoUnflushed | RestUnflushed], AccFlushed) ->
 5    #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
 6    {Flushed, LeafsSize} = couch_key_tree:mapfold(
 7        fun(_Rev, Value, Type, Acc) ->
 8            case Value of
 9            #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
10                % this node value is actually an unwritten document summary,
11                % write to disk.
12                % make sure the Fd in the written bins is the same Fd we are
13                % and convert bins, removing the FD.
14                % All bins should have been written to disk already.
15                case {AttsFd, Fd} of
16                {nil, _} ->
17                    ok;
18                {SameFd, SameFd} ->
19                    ok;
20                _ ->
21                    % Fd where the attachments were written to is not the same
22                    % as our Fd. This can happen when a database is being
23                    % switched out during a compaction.
24                    ?LOG_DEBUG("File where the attachments are written has"
25                            " changed. Possibly retrying.", []),
26                    throw(retry)
27                end,
28                {ok, NewSummaryPointer, SummarySize} =
29                    couch_file:append_raw_chunk(Fd, Summary),
30                TotalSize = lists:foldl(
31                    fun(#att{att_len = L}, A) -> A + L end,
32                    SummarySize, Value#doc.atts),
33                NewValue = {IsDeleted, NewSummaryPointer, UpdateSeq, TotalSize},
34                case Type of
35                leaf ->
36                    {NewValue, Acc + TotalSize};
37                branch ->
38                    {NewValue, Acc}
39                end;
40             {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
41                {Value, Acc + LeafSize};
42             _ ->
43                {Value, Acc}
44            end
45        end, 0, Unflushed),
46    InfoFlushed = InfoUnflushed#full_doc_info{
47        rev_tree = Flushed,
48        leafs_size = LeafsSize
49    },
50    flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).





 1new_index_entries([], AccById, AccBySeq, AccDDocIds) ->
 2    {AccById, AccBySeq, AccDDocIds};
 3new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq, AccDDocIds) ->
 4    #doc_info{revs=[#rev_info{deleted=Deleted}|_], id=Id} = DocInfo =
 5            couch_doc:to_doc_info(FullDocInfo),
 6    AccDDocIds2 = case Id of
 7    <<?DESIGN_DOC_PREFIX, _/binary>> ->
 8        [Id | AccDDocIds];
 9    _ ->
10        AccDDocIds
11    end,
12    new_index_entries(RestInfos,
13        [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
14        [DocInfo|AccBySeq],
15        AccDDocIds2).



couch_db_updater:update_docs_int/5に戻ります。couch_db_updater:new_index_entries/4の戻り値{IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds}を使い、DocInfoByIdBTreeDocInfoBySeqBTreeを更新します。


 1add_remove(Bt, InsertKeyValues, RemoveKeys) ->
 2    {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
 3    {ok, Bt2}.
 5query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
 6    #btree{root=Root} = Bt,
 7    InsertActions = lists:map(
 8        fun(KeyValue) ->
 9            {Key, Value} = extract(Bt, KeyValue),
10            {insert, Key, Value}
11        end, InsertValues),
12    RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
13    FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
14    SortFun =
15        fun({OpA, A, _}, {OpB, B, _}) ->
16            case A == B of
17            % A and B are equal, sort by op.
18            true -> op_order(OpA) < op_order(OpB);
19            false ->
20                less(Bt, A, B)
21            end
22        end,
23    Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
24    {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
25    {ok, NewRoot} = complete_root(Bt, KeyPointers),
26    {ok, QueryResults, Bt#btree{root=NewRoot}}.



 1modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
 2    case RootPointerInfo of
 3    nil ->
 4        NodeType = kv_node,
 5        NodeList = [];
 6    _Tuple ->
 7        Pointer = element(1, RootPointerInfo),
 8        {NodeType, NodeList} = get_node(Bt, Pointer)
 9    end,
10    NodeTuple = list_to_tuple(NodeList),
12    {ok, NewNodeList, QueryOutput2} =
13    case NodeType of
14    kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
15    kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
16    end,
17    case NewNodeList of
18    [] ->  % no nodes remain
19        {ok, [], QueryOutput2};
20    NodeList ->  % nothing changed
21        {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
22        {ok, [{LastKey, RootPointerInfo}], QueryOutput2};
23    _Else2 ->
24        {ok, ResultList} = write_node(Bt, NodeType, NewNodeList),
25        {ok, ResultList, QueryOutput2}
26    end.




 1modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
 2    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput};
 3modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) ->
 4    case ActionType of
 5    insert ->
 6        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
 7    remove ->
 8        % just drop the action
 9        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
10    fetch ->
11        % the key/value must not exist in the tree
12        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
13    end;
14modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) ->
15    N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
16    {Key, Value} = element(N, NodeTuple),
17    ResultNode =  bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
18    case less(Bt, ActionKey, Key) of
19    true ->
20        case ActionType of
21        insert ->
22            % ActionKey is less than the Key, so insert
23            modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
24        remove ->
25            % ActionKey is less than the Key, just drop the action
26            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
27        fetch ->
28            % ActionKey is less than the Key, the key/value must not exist in the tree
29            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
30        end;
31    false ->
32        % ActionKey and Key are maybe equal.
33        case less(Bt, Key, ActionKey) of
34        false ->
35            case ActionType of
36            insert ->
37                modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
38            remove ->
39                modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput);
40            fetch ->
41                % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
42                % since an identical action key can follow it.
43                modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
44            end;
45        true ->
46            modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput)
47        end
48    end.

2番目のwhen LowerBound > tuple_size(NodeTuple)というガードが付いている方は、現在のFullDocInfoに当該Keyが含まれていない時に呼び出される関数のようです。メインは3番目の関数。最初に呼び出しているcouch_btree:find_first_gteq/5を見てみます。


 1find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
 2    End;
 3find_first_gteq(Bt, Tuple, Start, End, Key) ->
 4    Mid = Start + ((End - Start) div 2),
 5    {TupleKey, _} = element(Mid, Tuple),
 6    case less(Bt, TupleKey, Key) of
 7    true ->
 8        find_first_gteq(Bt, Tuple, Mid+1, End, Key);
 9    false ->
10        find_first_gteq(Bt, Tuple, Start, Mid, Key)
11    end.



1bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
2    Tail;
3bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
4    bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]).



最終的にcouch_btree:modify_kvnode/6の番目の関数が呼び出されます。ここで、couch_btree:bounded_tuple_to_list/4を呼び出してNodeTupleのうち走査しなかった残りの部分を取得し、ResultNodeをreverseしたリストと連結して返します。つまり、この関数の中でkvnodeのリストに対してActionsに入っている操作を適用し、リストを再構成しています。ActionTypedeleteの時にアキュムレータに追加していないのは、追加しないことによって再構成後のリストにエントリが含まれなくなり、消去されたことになります。また、insertの時のActionKeyNodeTupleKeyと完全に一致する場合、NodeTuple側のエントリはアキュムレータに追加されず、{ActionKey, ActionValue}が追加される為、既存のエントリは再構成後のエントリに含まれず、結果新しい値で上書きしたようになります。




 1modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
 2    modify_node(Bt, nil, Actions, QueryOutput);
 3modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
 4    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
 5            tuple_size(NodeTuple), [])), QueryOutput};
 6modify_kpnode(Bt, NodeTuple, LowerBound,
 7        [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) ->
 8    Sz = tuple_size(NodeTuple),
 9    N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
10    case N =:= Sz of
11    true  ->
12        % perform remaining actions on last node
13        {_, PointerInfo} = element(Sz, NodeTuple),
14        {ok, ChildKPs, QueryOutput2} =
15            modify_node(Bt, PointerInfo, Actions, QueryOutput),
16        NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
17            Sz - 1, ChildKPs)),
18        {ok, NodeList, QueryOutput2};
19    false ->
20        {NodeKey, PointerInfo} = element(N, NodeTuple),
21        SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
22                not less(Bt, NodeKey, ActionKey)
23            end,
24        {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
25        {ok, ChildKPs, QueryOutput2} =
26                modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
27        ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple,
28                LowerBound, N - 1, ResultNode)),
29        modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2)
30    end.






 1write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
 2    % split up nodes into smaller sizes
 3    NodeListList = chunkify(NodeList),
 4    % now write out each chunk and return the KeyPointer pairs for those nodes
 5    ResultList = [
 6        begin
 7            {ok, Pointer, Size} = couch_file:append_term(
 8                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
 9            {LastKey, _} = lists:last(ANodeList),
10            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
11            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
12        end
13    ||
14        ANodeList <- NodeListList
15    ],
16    {ok, ResultList}.



 1chunkify(InList) ->
 2    case ?term_size(InList) of
 3    Size when Size > ?CHUNK_THRESHOLD ->
 4        NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
 5        ChunkThreshold = Size div NumberOfChunksLikely,
 6        chunkify(InList, ChunkThreshold, [], 0, []);
 7    _Else ->
 8        [InList]
 9    end.
11chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
12    lists:reverse(OutputChunks);
13chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
14    lists:reverse([lists:reverse(OutList) | OutputChunks]);
15chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
16    case ?term_size(InElement) of
17    Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
18        chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
19    Size ->
20        chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
21    end.




 1reduce_tree_size(kv_node, NodeSize, _KvList) ->
 2    NodeSize;
 3reduce_tree_size(kp_node, NodeSize, []) ->
 4    NodeSize;
 5reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
 6    % pre 1.2 format
 7    nil;
 8reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
 9    nil;
10reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
11    reduce_tree_size(kp_node, NodeSize + Sz, NodeList).




1reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
2    [];
3reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
4    R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
5reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
6    R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).



1[ok, [{LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}, ...]]   




1complete_root(_Bt, []) ->
2    {ok, nil};
3complete_root(_Bt, [{_Key, PointerInfo}])->
4    {ok, PointerInfo};
5complete_root(Bt, KPs) ->
6    {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
7    complete_root(Bt, ResultKeyPointers).


couch_btree:add_remove/3の最後で、この関数の戻り値{ok, PointerInfo}Bt#btree.rootにバインドします。ようやくcouch_db_updater:update_docs_int/5に戻り、更新されたそれぞれの#btreeを新しいBtreeとします。




 1commit_data(Db) ->
 2    commit_data(Db, false).
 4db_to_header(Db, Header) ->
 5    Header#db_header{
 6        update_seq = Db#db.update_seq,
 7        docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
 8        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
 9        local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
10        security_ptr = Db#db.security_ptr,
11        revs_limit = Db#db.revs_limit}.
13commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
14    Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
15commit_data(Db, true) ->
16    Db;
17commit_data(Db, _) ->
18    #db{
19        updater_fd = Fd,
20        filepath = Filepath,
21        header = OldHeader,
22        fsync_options = FsyncOptions,
23        waiting_delayed_commit = Timer
24    } = Db,
25    if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
26    case db_to_header(Db, OldHeader) of
27    OldHeader ->
28        Db#db{waiting_delayed_commit=nil};
29    Header ->
30        case lists:member(before_header, FsyncOptions) of
31        true -> ok = couch_file:sync(Filepath);
32        _    -> ok
33        end,
35        ok = couch_file:write_header(Fd, Header),
37        case lists:member(after_header, FsyncOptions) of
38        true -> ok = couch_file:sync(Filepath);
39        _    -> ok
40        end,
42        Db#db{waiting_delayed_commit=nil,
43            header=Header,
44            committed_update_seq=Db#db.update_seq}
45    end.




