couch_db:update_doc/4

前回はHTTP経由でドキュメントを更新する流れを見てきました。今回は更新まわりのより深いところを見ていこうと思います。前回の流れからcouch_db:update_doc/4を見ていきます。

couch_db.erl:

 1update_doc(Db, Doc, Options, UpdateType) ->
 2    case update_docs(Db, [Doc], Options, UpdateType) of
 3    {ok, [{ok, NewRev}]} ->
 4        {ok, NewRev};
 5    {ok, [{{_Id, _Rev}, Error}]} ->
 6        throw(Error);
 7    {ok, [Error]} ->
 8        throw(Error);
 9    {ok, []} ->
10        % replication success
11        {Pos, [RevId | _]} = Doc#doc.revs,
12        {ok, {Pos, RevId}}
13    end.

単にリストにドキュメントを1つ入れてcouch_db:update_docs/4を呼び出します。そのまま見ていきます。この関数は第四引数のUpdateTypereplicated_changesinteractive_editのどちらかを指定するようになっており、実装も大きく分かれています。CouchDBのWikiを読む限り、replicated_changes(new_edit=false)は既存のrevに対して更新をかける特殊な処理らしいので、ここではinteractive_editの方を読んでいきます。ちょっとコード量が多いので分割してみていきます。

couch_db.erl:

 1update_docs(Db, Docs, Options, interactive_edit) ->
 2    increment_stat(Db, {couchdb, database_writes}),
 3    AllOrNothing = lists:member(all_or_nothing, Options),
 4    % go ahead and generate the new revision ids for the documents.
 5    % separate out the NonRep documents from the rest of the documents
 6
 7    % associate reference with each doc in order to track duplicates
 8    Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
 9    {Docs3, NonRepDocs} = lists:foldl(
10         fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
11            case Id of
12            <<?LOCAL_DOC_PREFIX, _/binary>> ->
13                {DocsAcc, [Doc | NonRepDocsAcc]};
14            Id->
15                {[Doc | DocsAcc], NonRepDocsAcc}
16            end
17        end, {[], []}, Docs2),
18
19    DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
20    ...

まずはcouch_db:increment_stat/2を見てみます。

couch_db.erl:

1increment_stat(#db{options = Options}, Stat) ->
2    case lists:member(sys_db, Options) of
3    true ->
4        ok;
5    false ->
6        couch_stats_collector:increment(Stat)
7    end.

Optionssys_dbが含まれている場合はインクリメントしないようです。sys_dbが何を表しているか確認してみます。

sys_db

sys_dbをオプション等で指定している箇所をascope.elascope-find-this-symbolを使ってピックアップします。

 1Find this symbol: sys_db
 2
 3-------------------------------------------------------------------------------
 4*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db_updater.erl:
 5open_reader_fd[490]            {ok, Fd} = case lists:member(sys_db, Options) of
 6open_reader_fd[492]            couch_file:open(Filepath, [read_only, sys_db]);
 7
 8*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_file.erl:
 9maybe_track_open_os_files[333] case lists:member(sys_db, FileOptions) of
10
11*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_auth_cache.erl:
12open_auth_db[367]              {ok, AuthDb} = ensure_users_db_exists(DbName, [sys_db]),
13
14*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_server.erl:
15maybe_add_sys_db_callbacks[99] sys_db | Options
16maybe_add_sys_db_callbacks[107] sys_db | Options
17open_async[303]                DbsOpen = case lists:member(sys_db, Options) of
18handle_call[345]               DbsOpen = case lists:member(sys_db, Options) of
19open_db[486]                   case lists:member(sys_db, Options) of
20
21*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db.erl:
22is_system_db[105]              lists:member(sys_db, Options).
23init[1099]                     case lists:member(sys_db, Options) of
24increment_stat[1334]           case lists:member(sys_db, Options) of
25
26*** /home/masayuki/work/erlang/couchdb/src/couch_replicator/src/couch_replicator_manager.erl:
27changes_feed_loop[241]         DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
28ensure_rep_db_exists[585]      case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}, nologifmissing]) of
29ensure_rep_db_exists[589]      {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
30-------------------------------------------------------------------------------
31
32Search complete.

この結果を見ると、以下のポイントでsys_dbが指定されていることが分かります。

このうち、couch_db_updater:open_reader_fdOptionssys_dbが指定されている場合にcouch_file:open/2sys_dbを渡すようになっている為、この関数がsys_dbでオープンすることを決めているわけではありません。couch_auth_cache:open_auth_dbは認証DBをオープンする関数なので、認証DBをオープンする際にsys_dbを指定する、ということになります。残りはcouch_replicator_managerモジュールなので、レプリケーション関連であると推測できます。CouchDBのwikiにはこのオプションに関する記載がないので、sys_dbは認証DBをオープンする際と、レプリケーションまわりでDBをオープンする際に指定されるものと考えられます。

couch_db:increment_stat/2に戻ると、現状は認証DBやレプリケーションでオープンした場合はインクリメントせず、それ以外の場合はcouch_stats_collector:increment/1を呼び出しています。

couch_stats_collector:increment/1

couch_stats_collector:increment/1のコードを見てみます。

couch_stats_collector.erl:

1increment(Key) ->
2    Key2 = make_key(Key),
3    case catch ets:update_counter(?HIT_TABLE, Key2, 1) of
4        {'EXIT', {badarg, _}} ->
5            catch ets:insert(?HIT_TABLE, {Key2, 1}),
6            ok;
7        _ ->
8            ok
9    end.

インクリメントしているカウンタはKey毎にETSで管理されていることが分かります。make_key/1を見て、このKeyがどのように生成されているかを確認します。

couch_stats_collector.erl:

1make_key({Module, Key}) when is_integer(Key) ->
2    {Module, list_to_atom(integer_to_list(Key))};
3make_key(Key) ->
4    Key.

Keyが数値の場合はatomに、そうでない場合はKeyのまま戻されています。今回のコンテキストだとこの関数に指定されるKeycouch_db:update_docs/4で指定している{couchdb, database_writes}なので、この値がそのまま戻されることになります。この値を見る限り、ETSで管理しているカウンタはデータベースの書き込み回数のようです。

all_or_nothing

couch_db:update_docs/4に戻ると、次にOptionsall_or_nothingが含まれているかどうか見ています。このall_or_nothingは、HTTP Bulk Document APIに以下のように記載があります。

Transactional Semantics with Bulk Updates

In short, there are none (by design). However, you can ask CouchDB to check that all the documents in your _bulk_docs request pass all your validation functions. If even one fails, none of the documents are written. You can select this mode by including “all_or_nothing”:true in your request. With this mode, if all documents pass validation, then all documents will be updated, even if that introduces a conflict for some or all of the documents.

複数のドキュメントを一度に更新する際、all_or_nothingtrueの場合は、ドキュメントが1つでもvalidationに引っかかるとすべて更新されないようになっているとのこと。

次に、各ドキュメントにmake_ref/0の値をバインドしています。この関数はユニークな値を返すようになっています。コメントにもあるように、ドキュメントの重複をチェックする為に入っている処理です。

そしてその次に、各ドキュメントを、レプリケーション対象かどうかで2つに分けているようです。それを分ける為にドキュメントのIDが?LOCAL_DOC_PREFIXで始まっていたらレプリケーション対象外となっています。この?LOCAL_DOC_PREFIXはcouch_db.hrlで定義されていて、実際の値は"_local/"となっています。

local_doc

この"_local/"のIDを持つドキュメントはどこで生成されるのか見てみます。?LOCAL_DOC_PREFIXで探してみましたが、イマイチそれらしい箇所を見つけられなかったので、find-grepで"_local"を探してみました。

 1-*- mode: grep; default-directory: "~/work/erlang/couchdb/src/couchdb/" -*-
 2Grep started at Sat Jul 12 22:04:39
 3
 4find . -type f -exec grep -nH -e _local/ {} +
 5./couch_db.hrl:13:-define(LOCAL_DOC_PREFIX, "_local/").
 6./couch_httpd_db.erl:455:db_req(#httpd{path_parts=[_DbName, <<"_local/">>]}, _Db) ->
 7./couch_httpd_db.erl:459:    db_doc_req(Req, Db, <<"_local/", Name/binary>>);
 8./couch_doc.erl:200:    <<"_local/", _/binary>> -> ok;
 9./couch_util.erl:431:encode_doc_id(<<"_local/", Rest/binary>>) ->
10./couch_util.erl:432:    "_local/" ++ url_encode(Rest);
11./couch_db.erl:467:validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
12
13Grep finished (matches found) at Sat Jul 12 22:04:39

couch_httpd_dbモジュールで使用されている箇所を見てみます。

couch_httpd_db.erl:

1db_req(#httpd{path_parts=[_DbName, <<"_local">>, Name]}=Req, Db) ->
2    db_doc_req(Req, Db, <<"_local/", Name/binary>>);

HTTP APIで指定されているようです。HTTP Document APIを見てみたところ、"_local/"について以下のように記されていました。

are not being replicated (local documents) and used for Replication checkpointing.

IDが"_local/"で始まるドキュメントは、レプリケーションされていないことを示しており、レプリケーションのチェックポイントまわりで使われているようです。レプリケーションに関してはそのうち見てみたいと思いますが、今は取り合えず先に進めたいと思います。

couch_db:before_docs_update/2

またcouch_db:update_docs/4に戻り、次のcouch_db:before_docs_update/2の前に、couch_db:goup_alike_docs/1を見てみます。

couch_db.erl:

 1% group_alike_docs groups the sorted documents into sublist buckets, by id.
 2% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
 3=>oup_alike_docs(Docs) ->
 4    Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
 5    group_alike_docs(Sorted, []).
 6
 7group_alike_docs([], Buckets) ->
 8    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
 9group_alike_docs([Doc|Rest], []) ->
10    group_alike_docs(Rest, [[Doc]]);
11group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
12    [{#doc{id=BucketId},_Ref}|_] = Bucket,
13    case Doc#doc.id == BucketId of
14    true ->
15        % add to existing bucket
16        group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
17    false ->
18        % add to new bucket
19       group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
20    end.
これはコードを読むよりもコメントの部分が分かりやすいので、そちらを参照してください。次にcouch_db:before_docs_update/2を見てみます。

couch_db.erl:

1before_docs_update(#db{before_doc_update = nil}, BucketList) ->
2    BucketList;
3before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
4    [lists:map(
5        fun({Doc, Ref}) ->
6            NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
7            {NewDoc, Ref}
8        end,
9        Bucket) || Bucket <- BucketList].

before_doc_updateに関数がバインドされている場合は、それを呼び出すようになっています。ActiveRecordのbefore_saveのようなものですかね。

Conclusion

couch_db:update_docs/4の前半をザッと見て、データベースの書き込みカウンタの存在、sys_dball_or_nothingやlocal_docの意味、更新前のコールバック関数の呼び出しを確認しました。