couch_db:update_doc/4
前回はHTTP経由でドキュメントを更新する流れを見てきました。今回は更新まわりのより深いところを見ていこうと思います。前回の流れからcouch_db:update_doc/4
を見ていきます。
couch_db.erl:
1update_doc(Db, Doc, Options, UpdateType) ->
2 case update_docs(Db, [Doc], Options, UpdateType) of
3 {ok, [{ok, NewRev}]} ->
4 {ok, NewRev};
5 {ok, [{{_Id, _Rev}, Error}]} ->
6 throw(Error);
7 {ok, [Error]} ->
8 throw(Error);
9 {ok, []} ->
10 % replication success
11 {Pos, [RevId | _]} = Doc#doc.revs,
12 {ok, {Pos, RevId}}
13 end.
単にリストにドキュメントを1つ入れてcouch_db:update_docs/4
を呼び出します。そのまま見ていきます。この関数は第四引数のUpdateType
にreplicated_changes
とinteractive_edit
のどちらかを指定するようになっており、実装も大きく分かれています。CouchDBのWikiを読む限り、replicated_changes(new_edit=false)
は既存のrevに対して更新をかける特殊な処理らしいので、ここではinteractive_edit
の方を読んでいきます。ちょっとコード量が多いので分割してみていきます。
couch_db.erl:
1update_docs(Db, Docs, Options, interactive_edit) ->
2 increment_stat(Db, {couchdb, database_writes}),
3 AllOrNothing = lists:member(all_or_nothing, Options),
4 % go ahead and generate the new revision ids for the documents.
5 % separate out the NonRep documents from the rest of the documents
6
7 % associate reference with each doc in order to track duplicates
8 Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
9 {Docs3, NonRepDocs} = lists:foldl(
10 fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
11 case Id of
12 <<?LOCAL_DOC_PREFIX, _/binary>> ->
13 {DocsAcc, [Doc | NonRepDocsAcc]};
14 Id->
15 {[Doc | DocsAcc], NonRepDocsAcc}
16 end
17 end, {[], []}, Docs2),
18
19 DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
20 ...
まずはcouch_db:increment_stat/2
を見てみます。
couch_db.erl:
1increment_stat(#db{options = Options}, Stat) ->
2 case lists:member(sys_db, Options) of
3 true ->
4 ok;
5 false ->
6 couch_stats_collector:increment(Stat)
7 end.
Options
にsys_db
が含まれている場合はインクリメントしないようです。sys_db
が何を表しているか確認してみます。
sys_db
sys_db
をオプション等で指定している箇所をascope.elのascope-find-this-symbol
を使ってピックアップします。
1Find this symbol: sys_db
2
3-------------------------------------------------------------------------------
4*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db_updater.erl:
5open_reader_fd[490] {ok, Fd} = case lists:member(sys_db, Options) of
6open_reader_fd[492] couch_file:open(Filepath, [read_only, sys_db]);
7
8*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_file.erl:
9maybe_track_open_os_files[333] case lists:member(sys_db, FileOptions) of
10
11*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_auth_cache.erl:
12open_auth_db[367] {ok, AuthDb} = ensure_users_db_exists(DbName, [sys_db]),
13
14*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_server.erl:
15maybe_add_sys_db_callbacks[99] sys_db | Options
16maybe_add_sys_db_callbacks[107] sys_db | Options
17open_async[303] DbsOpen = case lists:member(sys_db, Options) of
18handle_call[345] DbsOpen = case lists:member(sys_db, Options) of
19open_db[486] case lists:member(sys_db, Options) of
20
21*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db.erl:
22is_system_db[105] lists:member(sys_db, Options).
23init[1099] case lists:member(sys_db, Options) of
24increment_stat[1334] case lists:member(sys_db, Options) of
25
26*** /home/masayuki/work/erlang/couchdb/src/couch_replicator/src/couch_replicator_manager.erl:
27changes_feed_loop[241] DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
28ensure_rep_db_exists[585] case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}, nologifmissing]) of
29ensure_rep_db_exists[589] {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
30-------------------------------------------------------------------------------
31
32Search complete.
この結果を見ると、以下のポイントでsys_db
が指定されていることが分かります。
- couch_db_updater:open_reader_fd
- couch_auth_cache:open_auth_db
- couch_replicator_manager:changes_feed_loop
- couch_replicator_manager:ensure_rep_db_exists
このうち、couch_db_updater:open_reader_fd
はOptions
にsys_db
が指定されている場合にcouch_file:open/2
にsys_db
を渡すようになっている為、この関数がsys_db
でオープンすることを決めているわけではありません。couch_auth_cache:open_auth_db
は認証DBをオープンする関数なので、認証DBをオープンする際にsys_db
を指定する、ということになります。残りはcouch_replicator_manager
モジュールなので、レプリケーション関連であると推測できます。CouchDBのwikiにはこのオプションに関する記載がないので、sys_db
は認証DBをオープンする際と、レプリケーションまわりでDBをオープンする際に指定されるものと考えられます。
couch_db:increment_stat/2
に戻ると、現状は認証DBやレプリケーションでオープンした場合はインクリメントせず、それ以外の場合はcouch_stats_collector:increment/1
を呼び出しています。
couch_stats_collector:increment/1
couch_stats_collector:increment/1
のコードを見てみます。
couch_stats_collector.erl:
1increment(Key) ->
2 Key2 = make_key(Key),
3 case catch ets:update_counter(?HIT_TABLE, Key2, 1) of
4 {'EXIT', {badarg, _}} ->
5 catch ets:insert(?HIT_TABLE, {Key2, 1}),
6 ok;
7 _ ->
8 ok
9 end.
インクリメントしているカウンタはKey
毎にETSで管理されていることが分かります。make_key/1
を見て、このKey
がどのように生成されているかを確認します。
couch_stats_collector.erl:
1make_key({Module, Key}) when is_integer(Key) ->
2 {Module, list_to_atom(integer_to_list(Key))};
3make_key(Key) ->
4 Key.
Key
が数値の場合はatomに、そうでない場合はKey
のまま戻されています。今回のコンテキストだとこの関数に指定されるKey
はcouch_db:update_docs/4
で指定している{couchdb, database_writes}
なので、この値がそのまま戻されることになります。この値を見る限り、ETSで管理しているカウンタはデータベースの書き込み回数のようです。
all_or_nothing
couch_db:update_docs/4
に戻ると、次にOptions
にall_or_nothing
が含まれているかどうか見ています。このall_or_nothing
は、HTTP Bulk Document APIに以下のように記載があります。
Transactional Semantics with Bulk Updates
In short, there are none (by design). However, you can ask CouchDB to check that all the documents in your _bulk_docs request pass all your validation functions. If even one fails, none of the documents are written. You can select this mode by including “all_or_nothing”:true in your request. With this mode, if all documents pass validation, then all documents will be updated, even if that introduces a conflict for some or all of the documents.
複数のドキュメントを一度に更新する際、all_or_nothing
がtrue
の場合は、ドキュメントが1つでもvalidationに引っかかるとすべて更新されないようになっているとのこと。
次に、各ドキュメントにmake_ref/0
の値をバインドしています。この関数はユニークな値を返すようになっています。コメントにもあるように、ドキュメントの重複をチェックする為に入っている処理です。
そしてその次に、各ドキュメントを、レプリケーション対象かどうかで2つに分けているようです。それを分ける為にドキュメントのIDが?LOCAL_DOC_PREFIX
で始まっていたらレプリケーション対象外となっています。この?LOCAL_DOC_PREFIX
はcouch_db.hrlで定義されていて、実際の値は"_local/"
となっています。
local_doc
この"_local/"
のIDを持つドキュメントはどこで生成されるのか見てみます。?LOCAL_DOC_PREFIX
で探してみましたが、イマイチそれらしい箇所を見つけられなかったので、find-grepで"_local"
を探してみました。
1-*- mode: grep; default-directory: "~/work/erlang/couchdb/src/couchdb/" -*-
2Grep started at Sat Jul 12 22:04:39
3
4find . -type f -exec grep -nH -e _local/ {} +
5./couch_db.hrl:13:-define(LOCAL_DOC_PREFIX, "_local/").
6./couch_httpd_db.erl:455:db_req(#httpd{path_parts=[_DbName, <<"_local/">>]}, _Db) ->
7./couch_httpd_db.erl:459: db_doc_req(Req, Db, <<"_local/", Name/binary>>);
8./couch_doc.erl:200: <<"_local/", _/binary>> -> ok;
9./couch_util.erl:431:encode_doc_id(<<"_local/", Rest/binary>>) ->
10./couch_util.erl:432: "_local/" ++ url_encode(Rest);
11./couch_db.erl:467:validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
12
13Grep finished (matches found) at Sat Jul 12 22:04:39
couch_httpd_db
モジュールで使用されている箇所を見てみます。
couch_httpd_db.erl:
1db_req(#httpd{path_parts=[_DbName, <<"_local">>, Name]}=Req, Db) ->
2 db_doc_req(Req, Db, <<"_local/", Name/binary>>);
HTTP APIで指定されているようです。HTTP Document APIを見てみたところ、"_local/"
について以下のように記されていました。
are not being replicated (local documents) and used for Replication checkpointing.
IDが"_local/"
で始まるドキュメントは、レプリケーションされていないことを示しており、レプリケーションのチェックポイントまわりで使われているようです。レプリケーションに関してはそのうち見てみたいと思いますが、今は取り合えず先に進めたいと思います。
couch_db:before_docs_update/2
またcouch_db:update_docs/4
に戻り、次のcouch_db:before_docs_update/2
の前に、couch_db:goup_alike_docs/1
を見てみます。
couch_db.erl:
1% group_alike_docs groups the sorted documents into sublist buckets, by id.
2% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
3=>oup_alike_docs(Docs) ->
4 Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
5 group_alike_docs(Sorted, []).
6
7group_alike_docs([], Buckets) ->
8 lists:reverse(lists:map(fun lists:reverse/1, Buckets));
9group_alike_docs([Doc|Rest], []) ->
10 group_alike_docs(Rest, [[Doc]]);
11group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
12 [{#doc{id=BucketId},_Ref}|_] = Bucket,
13 case Doc#doc.id == BucketId of
14 true ->
15 % add to existing bucket
16 group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
17 false ->
18 % add to new bucket
19 group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
20 end.
couch_db:before_docs_update/2
を見てみます。
couch_db.erl:
1before_docs_update(#db{before_doc_update = nil}, BucketList) ->
2 BucketList;
3before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
4 [lists:map(
5 fun({Doc, Ref}) ->
6 NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
7 {NewDoc, Ref}
8 end,
9 Bucket) || Bucket <- BucketList].
before_doc_update
に関数がバインドされている場合は、それを呼び出すようになっています。ActiveRecordのbefore_saveのようなものですかね。
Conclusion
couch_db:update_docs/4
の前半をザッと見て、データベースの書き込みカウンタの存在、sys_db
やall_or_nothing
やlocal_docの意味、更新前のコールバック関数の呼び出しを確認しました。