remainging code of couch_db:update_docs/4
前回まででcouch_db:doc_flush_atts/2
の一連の処理を読み終えました。久々にcouch_db:update_docs/4
に戻り、続きを見ていきます。
couch_db.erl:
1update_docs(Db, Docs, Options, interactive_edit) ->
2...
3 DocBuckets3 = [[
4 {doc_flush_atts(set_new_att_revpos(
5 check_dup_atts(Doc)), Db#db.updater_fd), Ref}
6 || {Doc, Ref} <- B] || B <- DocBuckets2],
7 {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
8
9 {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
10
11 ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
12 {ok, lists:map(
13 fun({#doc{}, Ref}) ->
14 {ok, Result} = dict:find(Ref, ResultsDict),
15 Result
16 end, Docs2)}
17 end.
DockBucket3
はアタッチメント情報をディスクに書き込み、そのポインタを持ったDoc
によって構成されています。DocBucket3
の構成はcouch_db:goup_alike_docs/1
のコメントに記載されています。具体的には以下の矢印右の構造になります。
couch_db.erl:
1% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
これを第一引数に取っているcouch_db:new_revs/3
を見ていきます。
couch_db.erl:
1new_revs([], OutBuckets, IdRevsAcc) ->
2 {lists:reverse(OutBuckets), IdRevsAcc};
3new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
4 {NewBucket, IdRevsAcc3} = lists:mapfoldl(
5 fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
6 NewRevId = new_revid(Doc),
7 {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
8 [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
9 end, IdRevsAcc, Bucket),
10 new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
lists:mapfoldl/3
はlists:map/2
とlists:foldl/3
を組み合わせた関数で、1回のリスト走査でmap
とfoldl
を実行します。
1Eshell V5.10.3 (abort with ^G)
21> lists:mapfoldl(fun(X, Sum) -> {X*2, X*2+Sum} end, 0, [1,2,3,4,5]).
3{[2,4,6,8,10],30}
Bucket
リストを指定してlists:mapfoldl/3
します。couch_db:new_revid/1
で新しいRevId
を生成し、#doc.revs
のRevsId
に追加しています。Start
に1を加えているのが良く分からない…。この値はTreの階層を表すと思っていたのですが、違うのかな。mapの方には新しく生成した#doc
を、foldlの方には{Ref, {ok, {Start+1, NewRevId}}}
を加えます。最終的に、呼び出し元に対して、新しいリビジョンを追加したドキュメント群をDockBuckets4
、新しく生成したRevId
の情報をIdRevs
として返します。
couch_db:write_and_commit/4
次にcouch_db:write_and_commit/4
を見てみます。
couch_db.erl:
1write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
2 NonRepDocs, Options0) ->
3 DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
4 Options = set_commit_option(Options0),
5 MergeConflicts = lists:member(merge_conflicts, Options),
6 FullCommit = lists:member(full_commit, Options),
7 MRef = erlang:monitor(process, UpdatePid),
8 try
9 UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
10 case collect_results(UpdatePid, MRef, []) of
11 {ok, Results} -> {ok, Results};
12 retry ->
13 % This can happen if the db file we wrote to was swapped out by
14 % compaction. Retry by reopening the db and writing to the current file
15 {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
16 DocBuckets2 = [
17 [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] ||
18 Bucket <- DocBuckets1
19 ],
20 % We only retry once
21 DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
22 close(Db2),
23 UpdatePid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
24 case collect_results(UpdatePid, MRef, []) of
25 {ok, Results} -> {ok, Results};
26 retry -> throw({update_error, compaction_retry})
27 end
28 end
29 after
30 erlang:demonitor(MRef, [flush])
31 end.
そのままcouch_db:prepare_doc_summaries/2
を見てみます。
couch_db.erl:
1prepare_doc_summaries(Db, BucketList) ->
2 [lists:map(
3 fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
4 DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
5 #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
6 att_len = AL, disk_len = DL, encoding = E} <- Atts],
7 AttsFd = case Atts of
8 [#att{data = {Fd, _}} | _] ->
9 Fd;
10 [] ->
11 nil
12 end,
13 SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
14 {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
15 end,
16 Bucket) || Bucket <- BucketList].
#doc.atts
をディスクに書き込む為に変換し、また#doc.atts
からFd
を取り出しています。ディスクに書き込む為に変換したDiskAtts
と#doc.body
を指定し、couch_db_updater:make_doc_summary/2
を呼び出しています。この関数を見てみます。
couch_db.erl:
1make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
2 Body = case couch_compress:is_compressed(Body0, Comp) of
3 true ->
4 Body0;
5 false ->
6 % pre 1.2 database file format
7 couch_compress:compress(Body0, Comp)
8 end,
9 Atts = case couch_compress:is_compressed(Atts0, Comp) of
10 true ->
11 Atts0;
12 false ->
13 couch_compress:compress(Atts0, Comp)
14 end,
15 SummaryBin = ?term_to_bin({Body, Atts}),
16 couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).
引数で渡された{Body0, Atts0}
をそれぞれcompressしてバイナリに変換しているようです。続けてcouch_file:assemble_file_chunk/2
を見てみます。
couch_file.erl:
1assemble_file_chunk(Bin) ->
2 [<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].
3
4assemble_file_chunk(Bin, Md5) ->
5 [<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
前回書いたように、データブロックに書き出す為にヘッダを付けてます。
couch_db:write_and_commit/4
の冒頭に戻り、couch_db:prepare_doc_summaries/2
の呼び出しによりDocBucket
に格納された#doc
のbody
は{summary, SummaryChunk, AttsFd}
になっています。前記のデータブロックにかい出す為のヘッダを付けたバイナリはSummaryChunk
にバインドされています。
MergeConflicts
とFullCommit
を設定し、下記のcouch_db_updater:handle_info/2
が呼び出されます。
couch_db_updater.erl:
1handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
2 FullCommit}, Db) ->
3 GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
4 if NonRepDocs == [] ->
5 {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
6 [Client], MergeConflicts, FullCommit);
7 true ->
8 GroupedDocs3 = GroupedDocs2,
9 FullCommit2 = FullCommit,
10 Clients = [Client]
11 end,
12 NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
13 try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
14 FullCommit2) of
15 {ok, Db2, UpdatedDDocIds} ->
16 ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
17 if Db2#db.update_seq /= Db#db.update_seq ->
18 couch_db_update_notifier:notify({updated, Db2#db.name});
19 true -> ok
20 end,
21 [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
22 lists:foreach(fun(DDocId) ->
23 couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
24 end, UpdatedDDocIds),
25 {noreply, Db2}
26 catch
27 throw: retry ->
28 [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
29 {noreply, Db}
30 end;
GroupDocs
→GroupDcos2
、NonRepDocs
→NonRepDocs2
への変換後、couch_db_updater:update_docs_int/5
を呼び出していま。
couch_db_updater:update_docs_int/5
couch_db_updater:update_docs_int/5
を見てみます。
couch_db_updater.erl:
1update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
2 #db{
3 fulldocinfo_by_id_btree = DocInfoByIdBTree,
4 docinfo_by_seq_btree = DocInfoBySeqBTree,
5 update_seq = LastSeq,
6 revs_limit = RevsLimit
7 } = Db,
8 Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
9 % lookup up the old documents, if they exist.
10 OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
11 OldDocInfos = lists:zipwith(
12 fun(_Id, {ok, FullDocInfo}) ->
13 FullDocInfo;
14 (Id, not_found) ->
15 #full_doc_info{id=Id}
16 end,
17 Ids, OldDocLookups),
18 % Merge the new docs into the revision trees.
19 {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
20 MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
21
22 % All documents are now ready to write.
23
24 {ok, Db2} = update_local_docs(Db, NonRepDocs),
25
26 % Write out the document summaries (the bodies are stored in the nodes of
27 % the trees, the attachments are already written to disk)
28 {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
29
30 {IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds} =
31 new_index_entries(FlushedFullDocInfos, [], [], []),
32
33 % and the indexes
34 {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
35 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
36
37 Db3 = Db2#db{
38 fulldocinfo_by_id_btree = DocInfoByIdBTree2,
39 docinfo_by_seq_btree = DocInfoBySeqBTree2,
40 update_seq = NewSeq},
41
42 % Check if we just updated any design documents, and update the validation
43 % funs if we did.
44 Db4 = case UpdatedDDocIds of
45 [] ->
46 Db3;
47 _ ->
48 refresh_validate_doc_funs(Db3)
49 end,
50
51 {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
大分長いので大まかに見ていきます。更新対象のDocList
からId
を取り出し、そのId
を指定して#db.fulldocinfo_by_id_btree
からFullDocInfo
を取り出します。該当するFullDocInfo
が無かった場合はId
だけ設定した空のFullDocInfo
を生成し、OldDocInfos
とします。更新対象のDocList
とOldDocInfos
を指定し、couch_db_updater:merge_rev_trees/7
を呼び出します。
couch_db_updater:merge_rev_trees/7
couch_db_updater:merge_rev_trees/7
を見て行きます。
couch_db_updater.erl:
1merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
2 {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
3merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
4 [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
5 #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
6 = OldDocInfo,
7 {NewRevTree, _} = lists:foldl(
8 fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
9 if not MergeConflicts ->
10 case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
11 Limit) of
12 {_NewTree, conflicts} when (not OldDeleted) ->
13 send_result(Client, Ref, conflict),
14 {AccTree, OldDeleted};
15 {NewTree, conflicts} when PrevRevs /= [] ->
16 % Check to be sure if prev revision was specified, it's
17 % a leaf node in the tree
18 Leafs = couch_key_tree:get_all_leafs(AccTree),
19 IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
20 {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
21 end, Leafs),
22 if IsPrevLeaf ->
23 {NewTree, OldDeleted};
24 true ->
25 send_result(Client, Ref, conflict),
26 {AccTree, OldDeleted}
27 end;
28 {NewTree, no_conflicts} when AccTree == NewTree ->
29 % the tree didn't change at all
30 % meaning we are saving a rev that's already
31 % been editted again.
32 if (Pos == 1) and OldDeleted ->
33 % this means we are recreating a brand new document
34 % into a state that already existed before.
35 % put the rev into a subsequent edit of the deletion
36 #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
37 couch_doc:to_doc_info(OldDocInfo),
38 NewRevId = couch_db:new_revid(
39 NewDoc#doc{revs={OldPos, [OldRev]}}),
40 NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
41 {NewTree2, _} = couch_key_tree:merge(AccTree,
42 couch_doc:to_path(NewDoc2), Limit),
43 % we changed the rev id, this tells the caller we did
44 send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
45 {NewTree2, OldDeleted};
46 true ->
47 send_result(Client, Ref, conflict),
48 {AccTree, OldDeleted}
49 end;
50 {NewTree, _} ->
51 {NewTree, NewDoc#doc.deleted}
52 end;
53 true ->
54 {NewTree, _} = couch_key_tree:merge(AccTree,
55 couch_doc:to_path(NewDoc), Limit),
56 {NewTree, OldDeleted}
57 end
58 end,
59 {OldTree, OldDeleted0}, NewDocs),
60 if NewRevTree == OldTree ->
61 % nothing changed
62 merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
63 AccNewInfos, AccRemoveSeqs, AccSeq);
64 true ->
65 % we have updated the document, give it a new seq #
66 NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
67 RemoveSeqs = case OldSeq of
68 0 -> AccRemoveSeqs;
69 _ -> [OldSeq | AccRemoveSeqs]
70 end,
71 merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
72 [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
73 end.
更新対象のドキュメントと、それに対する#full_doc_info
をマージする処理で、実際のマージはcouch_doc:merge/3
を呼び出すことによって実現しています。その関数を呼び出す前にcouch_doc:to_path/1
を呼び出しているので、先にこの関数を見てみます。
couch_doc.erl:
1-spec to_path(#doc{}) -> path().
2to_path(#doc{revs={Start, RevIds}}=Doc) ->
3 [Branch] = to_branch(Doc, lists:reverse(RevIds)),
4 {Start - length(RevIds) + 1, Branch}.
5
6-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
7to_branch(Doc, [RevId]) ->
8 [{RevId, Doc, []}];
9to_branch(Doc, [RevId | Rest]) ->
10 [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].
上記関数を呼び出すと、#doc
は以下のように変換されます。
1{3, {RevId3, [], {RevId2, [], {RevId1, Doc, []}}}}}.
また、#full_doc_info.rev_tree
は以下のような構造になっています。
1[{Pos, {Key, Value, SubTree}}, {Pos, {Key, Value, SubTree}}, ...}]
上記より、couch_doc:to_path/1
は#full_doc_info.rev_tree
とマージできるように#doc
を{Key, Value, SubTree}
の形式に変換しているようです。
ここからcouch_doc:merge/3
→couch_doc:merge_one/4
→couch_doc:merge_at/3
→couch_doc:merge_simp2
と続き、rev_tree
のマージが行われますが、かなり長いので割愛します。更新の衝突がなければ、新しいリビジョンが増えるだけだと思います。
couch_db_updater:flush_trees/3
couch_db_updater:update_docs_int/5
に戻ります。次に実行されるのはcouch_db_updater:update_local_docs/2
ですが、この関数はrev_tree
は関係しないので、後まわしにして先にcouch_db_updater:flush_trees/3
を見てみます。
couch_db_updater.erl:
1flush_trees(_Db, [], AccFlushedTrees) ->
2 {ok, lists:reverse(AccFlushedTrees)};
3flush_trees(#db{updater_fd = Fd} = Db,
4 [InfoUnflushed | RestUnflushed], AccFlushed) ->
5 #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
6 {Flushed, LeafsSize} = couch_key_tree:mapfold(
7 fun(_Rev, Value, Type, Acc) ->
8 case Value of
9 #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
10 % this node value is actually an unwritten document summary,
11 % write to disk.
12 % make sure the Fd in the written bins is the same Fd we are
13 % and convert bins, removing the FD.
14 % All bins should have been written to disk already.
15 case {AttsFd, Fd} of
16 {nil, _} ->
17 ok;
18 {SameFd, SameFd} ->
19 ok;
20 _ ->
21 % Fd where the attachments were written to is not the same
22 % as our Fd. This can happen when a database is being
23 % switched out during a compaction.
24 ?LOG_DEBUG("File where the attachments are written has"
25 " changed. Possibly retrying.", []),
26 throw(retry)
27 end,
28 {ok, NewSummaryPointer, SummarySize} =
29 couch_file:append_raw_chunk(Fd, Summary),
30 TotalSize = lists:foldl(
31 fun(#att{att_len = L}, A) -> A + L end,
32 SummarySize, Value#doc.atts),
33 NewValue = {IsDeleted, NewSummaryPointer, UpdateSeq, TotalSize},
34 case Type of
35 leaf ->
36 {NewValue, Acc + TotalSize};
37 branch ->
38 {NewValue, Acc}
39 end;
40 {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
41 {Value, Acc + LeafSize};
42 _ ->
43 {Value, Acc}
44 end
45 end, 0, Unflushed),
46 InfoFlushed = InfoUnflushed#full_doc_info{
47 rev_tree = Flushed,
48 leafs_size = LeafsSize
49 },
50 flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
#full_doc_info.rev_tree
をcouch_key_tree/3
でまわしていきます。このループの中で#doc.body
のSummary
をファイルに書き出していき、そのファイルポインタを#doc
の代わりにValue
として設定します。つまりこの関数では#doc.body
をファイルに書き出していることになります。
couch_db_updater:new_index_entries/4
couch_db_updater:update_docs_int/5
に戻ります。次に実行されるのはcouch_db_updater:new_index_entries/4
なので、この関数を見てみます。
couch_db_updater.erl:
1new_index_entries([], AccById, AccBySeq, AccDDocIds) ->
2 {AccById, AccBySeq, AccDDocIds};
3new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq, AccDDocIds) ->
4 #doc_info{revs=[#rev_info{deleted=Deleted}|_], id=Id} = DocInfo =
5 couch_doc:to_doc_info(FullDocInfo),
6 AccDDocIds2 = case Id of
7 <<?DESIGN_DOC_PREFIX, _/binary>> ->
8 [Id | AccDDocIds];
9 _ ->
10 AccDDocIds
11 end,
12 new_index_entries(RestInfos,
13 [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
14 [DocInfo|AccBySeq],
15 AccDDocIds2).
これは#full_doc_info
をAccById
に、#doc_info
をAccBySeq
に、DesignDocのIdをAccDDocIds
に追加している関数のようです。#doc_info.rev_info
の先頭の要素のdeleted
を#full_doc_info.deleted
に設定しているので、削除時はこの契機で#full_doc_info.deleted
が設定される、ということになるのかな。
couch_btree:add_remove/3
couch_db_updater:update_docs_int/5
に戻ります。couch_db_updater:new_index_entries/4
の戻り値{IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds}
を使い、DocInfoByIdBTree
、DocInfoBySeqBTree
を更新します。
couch_btree.erl:
1add_remove(Bt, InsertKeyValues, RemoveKeys) ->
2 {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
3 {ok, Bt2}.
4
5query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
6 #btree{root=Root} = Bt,
7 InsertActions = lists:map(
8 fun(KeyValue) ->
9 {Key, Value} = extract(Bt, KeyValue),
10 {insert, Key, Value}
11 end, InsertValues),
12 RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
13 FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
14 SortFun =
15 fun({OpA, A, _}, {OpB, B, _}) ->
16 case A == B of
17 % A and B are equal, sort by op.
18 true -> op_order(OpA) < op_order(OpB);
19 false ->
20 less(Bt, A, B)
21 end
22 end,
23 Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
24 {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
25 {ok, NewRoot} = complete_root(Bt, KeyPointers),
26 {ok, QueryResults, Bt#btree{root=NewRoot}}.
couch_btree:extract/2
を呼び出しKey
とValue
に分けています。このあたりについては以前見たように、#full_doc_info
をディスクに書き込み可能な形式に変換しています。それぞれのActionを準備した後、couch_btree:modify_node/4
を呼び出します。
couch_btree.erl:
1modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
2 case RootPointerInfo of
3 nil ->
4 NodeType = kv_node,
5 NodeList = [];
6 _Tuple ->
7 Pointer = element(1, RootPointerInfo),
8 {NodeType, NodeList} = get_node(Bt, Pointer)
9 end,
10 NodeTuple = list_to_tuple(NodeList),
11
12 {ok, NewNodeList, QueryOutput2} =
13 case NodeType of
14 kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
15 kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
16 end,
17 case NewNodeList of
18 [] -> % no nodes remain
19 {ok, [], QueryOutput2};
20 NodeList -> % nothing changed
21 {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
22 {ok, [{LastKey, RootPointerInfo}], QueryOutput2};
23 _Else2 ->
24 {ok, ResultList} = write_node(Bt, NodeType, NewNodeList),
25 {ok, ResultList, QueryOutput2}
26 end.
更新対象のTree
からNodeType
を取り出し、それによってmodify_kpnode/6
かmodify_kvnode/6
のどちらかを呼び出します。couch_db_updater:update_docs_int/5
からの呼び出しを見ると結局どちらも呼び出されるのですが、modify_kvnode/6
の方を見ていきます。
couch_btree:modify_kvnode/6
couch_btree.erl:
1modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
2 {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput};
3modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) ->
4 case ActionType of
5 insert ->
6 modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
7 remove ->
8 % just drop the action
9 modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
10 fetch ->
11 % the key/value must not exist in the tree
12 modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
13 end;
14modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) ->
15 N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
16 {Key, Value} = element(N, NodeTuple),
17 ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
18 case less(Bt, ActionKey, Key) of
19 true ->
20 case ActionType of
21 insert ->
22 % ActionKey is less than the Key, so insert
23 modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
24 remove ->
25 % ActionKey is less than the Key, just drop the action
26 modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
27 fetch ->
28 % ActionKey is less than the Key, the key/value must not exist in the tree
29 modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
30 end;
31 false ->
32 % ActionKey and Key are maybe equal.
33 case less(Bt, Key, ActionKey) of
34 false ->
35 case ActionType of
36 insert ->
37 modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
38 remove ->
39 modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput);
40 fetch ->
41 % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
42 % since an identical action key can follow it.
43 modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
44 end;
45 true ->
46 modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput)
47 end
48 end.
2番目のwhen LowerBound > tuple_size(NodeTuple)
というガードが付いている方は、現在のFullDocInfo
に当該Keyが含まれていない時に呼び出される関数のようです。メインは3番目の関数。最初に呼び出しているcouch_btree:find_first_gteq/5
を見てみます。
couch_btree.erl:
1find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
2 End;
3find_first_gteq(Bt, Tuple, Start, End, Key) ->
4 Mid = Start + ((End - Start) div 2),
5 {TupleKey, _} = element(Mid, Tuple),
6 case less(Bt, TupleKey, Key) of
7 true ->
8 find_first_gteq(Bt, Tuple, Mid+1, End, Key);
9 false ->
10 find_first_gteq(Bt, Tuple, Start, Mid, Key)
11 end.
Key
でバイナリサーチしています。そのままcouch_btree:bounded_tuple_to_revlist/4
を見てみます。
couch_btree.erl:
1bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
2 Tail;
3bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
4 bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]).
couch_btree:bounded_tuple_to_revlist/4
は、couch_btree:find_first_gteq/5
を実行して取得したN
に対して、LowerBound
からN-1
までのノードをアキュムレータに追加しています。
その後、ActionKey
をN
番目のKey
と比較します。その結果によって次の操作をN
かN+1
のどちらかから始めるかを決めています。そして、ActionType
がinsert
ならアキュムレータResultNode
に追加、delete
であれば追加せず、fetch
であればQueryOutput
の方に追加します。
最終的にcouch_btree:modify_kvnode/6
の番目の関数が呼び出されます。ここで、couch_btree:bounded_tuple_to_list/4
を呼び出してNodeTuple
のうち走査しなかった残りの部分を取得し、ResultNode
をreverseしたリストと連結して返します。つまり、この関数の中でkvnodeのリストに対してActions
に入っている操作を適用し、リストを再構成しています。ActionType
がdelete
の時にアキュムレータに追加していないのは、追加しないことによって再構成後のリストにエントリが含まれなくなり、消去されたことになります。また、insert
の時のActionKey
がNodeTuple
のKey
と完全に一致する場合、NodeTuple
側のエントリはアキュムレータに追加されず、{ActionKey, ActionValue}
が追加される為、既存のエントリは再構成後のエントリに含まれず、結果新しい値で上書きしたようになります。
couch_btree:modify_kpnode/6
次にcouch_btree:modify_kpnode/6
を見てみます。
couch_btree.erl:
1modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
2 modify_node(Bt, nil, Actions, QueryOutput);
3modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
4 {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
5 tuple_size(NodeTuple), [])), QueryOutput};
6modify_kpnode(Bt, NodeTuple, LowerBound,
7 [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) ->
8 Sz = tuple_size(NodeTuple),
9 N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
10 case N =:= Sz of
11 true ->
12 % perform remaining actions on last node
13 {_, PointerInfo} = element(Sz, NodeTuple),
14 {ok, ChildKPs, QueryOutput2} =
15 modify_node(Bt, PointerInfo, Actions, QueryOutput),
16 NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
17 Sz - 1, ChildKPs)),
18 {ok, NodeList, QueryOutput2};
19 false ->
20 {NodeKey, PointerInfo} = element(N, NodeTuple),
21 SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
22 not less(Bt, NodeKey, ActionKey)
23 end,
24 {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
25 {ok, ChildKPs, QueryOutput2} =
26 modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
27 ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple,
28 LowerBound, N - 1, ResultNode)),
29 modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2)
30 end.
3番目の関数を見ていくと、まずNodeTuple
のサイズとcouch_btree:find_first_gteq/5
の結果を比較しています。値が一致する場合は、残りのActions
を指定してcouch_btree:modify_node/4
を呼び出し、その結果をアキュムレータに加えて返しています。値が一致しない場合は、couch_btree:find_first_gteq/5
の結果で返ってきたインデックスを使ってNodeTuple
からタプルを取り出し、そのKey
を境としてActions
を2つに分割します。Key
以下のActionKey
を持つActions
を指定してcouch_btree:modify_node/4
を呼び出し、その結果をアキュムレータに加えて、Key
より大きいActionKey
を持つActions
を指定してcouch_btree:modify_kpnode/6
が呼び出されます。
正直きちんと理解できていないのですが、この関数でcouch_btree:modify_node/4
を呼び出すと、恐らくそのその先でNodeType
がkv_node
に変わり、couch_btree:modify_kvnode/4
が呼び出され、前記のように更新後のリストが取れるようになるのだと思います。
couch_btree:write_node/3
couch_btree:modify_node/4
に戻り、couch_btree:write_node/3
を見てみます。
couch_btree.erl:
1write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
2 % split up nodes into smaller sizes
3 NodeListList = chunkify(NodeList),
4 % now write out each chunk and return the KeyPointer pairs for those nodes
5 ResultList = [
6 begin
7 {ok, Pointer, Size} = couch_file:append_term(
8 Fd, {NodeType, ANodeList}, [{compression, Comp}]),
9 {LastKey, _} = lists:last(ANodeList),
10 SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
11 {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
12 end
13 ||
14 ANodeList <- NodeListList
15 ],
16 {ok, ResultList}.
couch_btree:chunkify/1
でNodeList
を分割し、その後に分割した単位でデータベースファイルに追記(couch_file:append_term/3
)しています。couch_btree:chunkify/1
を見てみます。
couch_btree.erl:
1chunkify(InList) ->
2 case ?term_size(InList) of
3 Size when Size > ?CHUNK_THRESHOLD ->
4 NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
5 ChunkThreshold = Size div NumberOfChunksLikely,
6 chunkify(InList, ChunkThreshold, [], 0, []);
7 _Else ->
8 [InList]
9 end.
10
11chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
12 lists:reverse(OutputChunks);
13chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
14 lists:reverse([lists:reverse(OutList) | OutputChunks]);
15chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
16 case ?term_size(InElement) of
17 Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
18 chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
19 Size ->
20 chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
21 end.
InList
のサイズを?CHUNK_THRESHOLD
(1279)と比較しています。いくつに分割するべきか(NumberOfChunksLikely
)を求め、InList
のサイズから分割後の各リストに含まれる要素数を求め、分割していきます。
データベースファイルに追加した後、分割されたリスト内のの最後のKey
を取得します。そしてreduce_tree_size/3
を呼び出し、SubTreeSize
を求めます。couch_btree:reduce_tree_size/3
を見てみます。
couch_btree.erl:
1reduce_tree_size(kv_node, NodeSize, _KvList) ->
2 NodeSize;
3reduce_tree_size(kp_node, NodeSize, []) ->
4 NodeSize;
5reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
6 % pre 1.2 format
7 nil;
8reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
9 nil;
10reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
11 reduce_tree_size(kp_node, NodeSize + Sz, NodeList).
NodeType
によって処理が分かれています。kv_node
の場合はNodeSize
が返るだけです。kp_node
の場合は、NodeList
の各要素に含まれるTreeSize
をNodeSize
に加えています。
続けてcouch_btree:reduce_node/3
を見てみます。
couch_btree.erl:
1reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
2 [];
3reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
4 R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
5reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
6 R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).
これもNodeType
によって処理が分かれています。Bt#btree.reduceにバインドされている関数に関しては、kp_node
、kv_node
共に以前見ていますのでここでは割愛します。
couch_btree:write_node/3
を呼び出した結果、以下の値が返ります。
1[ok, [{LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}, ...]]
couch_btree:complete_root/1
couch_btree:modify_node/4
まで一通り見たので、couch_btree:add_remove/3
に戻り、couch_btree:complete_root/2
を見てみます。
couch_btree.erl:
1complete_root(_Bt, []) ->
2 {ok, nil};
3complete_root(_Bt, [{_Key, PointerInfo}])->
4 {ok, PointerInfo};
5complete_root(Bt, KPs) ->
6 {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
7 complete_root(Bt, ResultKeyPointers).
第二引数の要素数によって処理が違っています。この第二引数のリストの要素数は、couch_btree:write_node/3
中で呼び出したcouch_btree:chunkify/1
によって分割された数になります。つまり複数のリストに分割された場合は3番目の関数が、分割されなかった場合は2番目の関数が呼ばれます。
couch_btree:add_remove/3
の最後で、この関数の戻り値{ok, PointerInfo}
をBt#btree.root
にバインドします。ようやくcouch_db_updater:update_docs_int/5
に戻り、更新されたそれぞれの#btree
を新しいBtreeとします。
couch_db_updater:commit_data/3
couch_db_updater:update_docs_int/5
で最後に呼び出していいる、couch_db_updater:commit_data/3
を見てみます。
couch_db_updater.erl:
1commit_data(Db) ->
2 commit_data(Db, false).
3
4db_to_header(Db, Header) ->
5 Header#db_header{
6 update_seq = Db#db.update_seq,
7 docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
8 fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
9 local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
10 security_ptr = Db#db.security_ptr,
11 revs_limit = Db#db.revs_limit}.
12
13commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
14 Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
15commit_data(Db, true) ->
16 Db;
17commit_data(Db, _) ->
18 #db{
19 updater_fd = Fd,
20 filepath = Filepath,
21 header = OldHeader,
22 fsync_options = FsyncOptions,
23 waiting_delayed_commit = Timer
24 } = Db,
25 if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
26 case db_to_header(Db, OldHeader) of
27 OldHeader ->
28 Db#db{waiting_delayed_commit=nil};
29 Header ->
30 case lists:member(before_header, FsyncOptions) of
31 true -> ok = couch_file:sync(Filepath);
32 _ -> ok
33 end,
34
35 ok = couch_file:write_header(Fd, Header),
36
37 case lists:member(after_header, FsyncOptions) of
38 true -> ok = couch_file:sync(Filepath);
39 _ -> ok
40 end,
41
42 Db#db{waiting_delayed_commit=nil,
43 header=Header,
44 committed_update_seq=Db#db.update_seq}
45 end.
Db#db.updater_fd
のデータファイルに新しいヘッダを書き出し、完了となります。これによって、更新後のデータが見れるようになります。
Conclusion
データ更新処理の残りの部分を見てきました。CouchDBはデータ更新時に、データベースファイルにある既存のデータは書き換えず、追記していくだけになっている、という部分を読むことが、ようやくできました。
本当はこんな風に読んだ際のメモをダラダラと貼り付けることなく、ササっと読んで纏められれば良いのですが、Erlangや関数型言語そのものに慣れていないこともあって、今回はこういう形となりました。
まだ、viewやcompactin、replication等を見ていないので、kp_node
の意味などはよく理解できていません。このあたりはまた気になった時に見てみたいと思います。