act-act about projects rss

CouchDB Source Code Reading part7

couch_db:update_doc/4

前回はHTTP経由でドキュメントを更新する流れを見てきました。今回は更新まわりのより深いところを見ていこうと思います。前回の流れからcouch_db:update_doc/4を見ていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
update_doc(Db, Doc, Options, UpdateType) ->
    case update_docs(Db, [Doc], Options, UpdateType) of
    {ok, [{ok, NewRev}]} ->
        {ok, NewRev};
    {ok, [{{_Id, _Rev}, Error}]} ->
        throw(Error);
    {ok, [Error]} ->
        throw(Error);
    {ok, []} ->
        % replication success
        {Pos, [RevId | _]} = Doc#doc.revs,
        {ok, {Pos, RevId}}
    end.

単にリストにドキュメントを1つ入れてcouch_db:update_docs/4を呼び出します。そのまま見ていきます。この関数は第四引数のUpdateTypereplicated_changesinteractive_editのどちらかを指定するようになっており、実装も大きく分かれています。CouchDBのWikiを読む限り、replicated_changes(new_edit=false)は既存のrevに対して更新をかける特殊な処理らしいので、ここではinteractive_editの方を読んでいきます。ちょっとコード量が多いので分割してみていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
update_docs(Db, Docs, Options, interactive_edit) ->
    increment_stat(Db, {couchdb, database_writes}),
    AllOrNothing = lists:member(all_or_nothing, Options),
    % go ahead and generate the new revision ids for the documents.
    % separate out the NonRep documents from the rest of the documents

    % associate reference with each doc in order to track duplicates
    Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
    {Docs3, NonRepDocs} = lists:foldl(
         fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
            case Id of
            <<?LOCAL_DOC_PREFIX, _/binary>> ->
                {DocsAcc, [Doc | NonRepDocsAcc]};
            Id->
                {[Doc | DocsAcc], NonRepDocsAcc}
            end
        end, {[], []}, Docs2),

    DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
    ...

まずはcouch_db:increment_stat/2を見てみます。

couch_db.erl

1
2
3
4
5
6
7
increment_stat(#db{options = Options}, Stat) ->
    case lists:member(sys_db, Options) of
    true ->
        ok;
    false ->
        couch_stats_collector:increment(Stat)
    end.

Optionssys_dbが含まれている場合はインクリメントしないようです。sys_dbが何を表しているか確認してみます。

sys_db

sys_dbをオプション等で指定している箇所をascope.elascope-find-this-symbolを使ってピックアップします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Find this symbol: sys_db

-------------------------------------------------------------------------------
*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db_updater.erl:
open_reader_fd[490]            {ok, Fd} = case lists:member(sys_db, Options) of
open_reader_fd[492]            couch_file:open(Filepath, [read_only, sys_db]);

*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_file.erl:
maybe_track_open_os_files[333] case lists:member(sys_db, FileOptions) of

*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_auth_cache.erl:
open_auth_db[367]              {ok, AuthDb} = ensure_users_db_exists(DbName, [sys_db]),

*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_server.erl:
maybe_add_sys_db_callbacks[99] sys_db | Options
maybe_add_sys_db_callbacks[107] sys_db | Options
open_async[303]                DbsOpen = case lists:member(sys_db, Options) of
handle_call[345]               DbsOpen = case lists:member(sys_db, Options) of
open_db[486]                   case lists:member(sys_db, Options) of

*** /home/masayuki/work/erlang/couchdb/src/couchdb/couch_db.erl:
is_system_db[105]              lists:member(sys_db, Options).
init[1099]                     case lists:member(sys_db, Options) of
increment_stat[1334]           case lists:member(sys_db, Options) of

*** /home/masayuki/work/erlang/couchdb/src/couch_replicator/src/couch_replicator_manager.erl:
changes_feed_loop[241]         DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
ensure_rep_db_exists[585]      case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}, nologifmissing]) of
ensure_rep_db_exists[589]      {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
-------------------------------------------------------------------------------

Search complete.

この結果を見ると、以下のポイントでsys_dbが指定されていることが分かります。

  • couch_db_updater:open_reader_fd
  • couch_auth_cache:open_auth_db
  • couch_replicator_manager:changes_feed_loop
  • couch_replicator_manager:ensure_rep_db_exists

このうち、couch_db_updater:open_reader_fdOptionssys_dbが指定されている場合にcouch_file:open/2sys_dbを渡すようになっている為、この関数がsys_dbでオープンすることを決めているわけではありません。couch_auth_cache:open_auth_dbは認証DBをオープンする関数なので、認証DBをオープンする際にsys_dbを指定する、ということになります。残りはcouch_replicator_managerモジュールなので、レプリケーション関連であると推測できます。CouchDBのwikiにはこのオプションに関する記載がないので、sys_dbは認証DBをオープンする際と、レプリケーションまわりでDBをオープンする際に指定されるものと考えられます。

couch_db:increment_stat/2に戻ると、現状は認証DBやレプリケーションでオープンした場合はインクリメントせず、それ以外の場合はcouch_stats_collector:increment/1を呼び出しています。

couch_stats_collector:increment/1

couch_stats_collector:increment/1のコードを見てみます。

couch_stats_collector.erl

1
2
3
4
5
6
7
8
9
increment(Key) ->
    Key2 = make_key(Key),
    case catch ets:update_counter(?HIT_TABLE, Key2, 1) of
        {'EXIT', {badarg, _}} ->
            catch ets:insert(?HIT_TABLE, {Key2, 1}),
            ok;
        _ ->
            ok
    end.

インクリメントしているカウンタはKey毎にETSで管理されていることが分かります。make_key/1を見て、このKeyがどのように生成されているかを確認します。

couch_stats_collector.erl

1
2
3
4
make_key({Module, Key}) when is_integer(Key) ->
    {Module, list_to_atom(integer_to_list(Key))};
make_key(Key) ->
    Key.

Keyが数値の場合はatomに、そうでない場合はKeyのまま戻されています。今回のコンテキストだとこの関数に指定されるKeycouch_db:update_docs/4で指定している{couchdb, database_writes}なので、この値がそのまま戻されることになります。この値を見る限り、ETSで管理しているカウンタはデータベースの書き込み回数のようです。

all_or_nothing

couch_db:update_docs/4に戻ると、次にOptionsall_or_nothingが含まれているかどうか見ています。このall_or_nothingは、HTTP Bulk Document APIに以下のように記載があります。

Transactional Semantics with Bulk Updates

In short, there are none (by design). However, you can ask CouchDB to check that all the documents in your _bulk_docs request pass all your validation functions. If even one fails, none of the documents are written. You can select this mode by including “all_or_nothing”:true in your request. With this mode, if all documents pass validation, then all documents will be updated, even if that introduces a conflict for some or all of the documents.

複数のドキュメントを一度に更新する際、all_or_nothingtrueの場合は、ドキュメントが1つでもvalidationに引っかかるとすべて更新されないようになっているとのこと。

次に、各ドキュメントにmake_ref/0の値をバインドしています。この関数はユニークな値を返すようになっています。コメントにもあるように、ドキュメントの重複をチェックする為に入っている処理です。

そしてその次に、各ドキュメントを、レプリケーション対象かどうかで2つに分けているようです。それを分ける為にドキュメントのIDが?LOCAL_DOC_PREFIXで始まっていたらレプリケーション対象外となっています。この?LOCAL_DOC_PREFIXはcouch_db.hrlで定義されていて、実際の値は"_local/"となっています。

local_doc

この"_local/"のIDを持つドキュメントはどこで生成されるのか見てみます。?LOCAL_DOC_PREFIXで探してみましたが、イマイチそれらしい箇所を見つけられなかったので、find-grepで"_local"を探してみました。

1
2
3
4
5
6
7
8
9
10
11
12
13
-*- mode: grep; default-directory: "~/work/erlang/couchdb/src/couchdb/" -*-
Grep started at Sat Jul 12 22:04:39

find . -type f -exec grep -nH -e _local/ {} +
./couch_db.hrl:13:-define(LOCAL_DOC_PREFIX, "_local/").
./couch_httpd_db.erl:455:db_req(#httpd{path_parts=[_DbName, <<"_local/">>]}, _Db) ->

./couch_httpd_db.erl:459:    db_doc_req(Req, Db, <<"_local/", Name/binary>>);
./couch_doc.erl:200:    <<"_local/", _/binary>> -> ok;
./couch_util.erl:431:encode_doc_id(<<"_local/", Rest/binary>>) ->
./couch_util.erl:432:    "_local/" ++ url_encode(Rest);
./couch_db.erl:467:validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->


Grep finished (matches found) at Sat Jul 12 22:04:39

couch_httpd_dbモジュールで使用されている箇所を見てみます。

couch_httpd_db.erl

1
2
db_req(#httpd{path_parts=[_DbName, <<"_local">>, Name]}=Req, Db) ->
    db_doc_req(Req, Db, <<"_local/", Name/binary>>);

HTTP APIで指定されているようです。HTTP Document APIを見てみたところ、"_local/"について以下のように記されていました。

are not being replicated (local documents) and used for Replication checkpointing.

IDが"_local/"で始まるドキュメントは、レプリケーションされていないことを示しており、レプリケーションのチェックポイントまわりで使われているようです。レプリケーションに関してはそのうち見てみたいと思いますが、今は取り合えず先に進めたいと思います。

couch_db:before_docs_update/2

またcouch_db:update_docs/4に戻り、次のcouch_db:before_docs_update/2の前に、couch_db:goup_alike_docs/1を見てみます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
=>oup_alike_docs(Docs) ->
    Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
    group_alike_docs(Sorted, []).

group_alike_docs([], Buckets) ->
    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
group_alike_docs([Doc|Rest], []) ->
    group_alike_docs(Rest, [[Doc]]);
group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
    [{#doc{id=BucketId},_Ref}|_] = Bucket,
    case Doc#doc.id == BucketId of
    true ->
        % add to existing bucket
        group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
    false ->
        % add to new bucket
       group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
    end.

これはコードを読むよりもコメントの部分が分かりやすいので、そちらを参照してください。次にcouch_db:before_docs_update/2を見てみます。

couch_db.erl

1
2
3
4
5
6
7
8
9
beefore_docs_update(#db{before_doc_update = nil}, BucketList) ->
    BucketList;
before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
    [lists:map(
        fun({Doc, Ref}) ->
            NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
            {NewDoc, Ref}
        end,
        Bucket) || Bucket <- BucketList].

before_doc_updateに関数がバインドされている場合は、それを呼び出すようになっています。ActiveRecordのbefore_saveのようなものですかね。

Conclusion

couch_db:update_docs/4の前半をザッと見て、データベースの書き込みカウンタの存在、sys_dball_or_nothingやlocal_docの意味、更新前のコールバック関数の呼び出しを確認しました。