act-act about projects rss

CouchDB Source Code Reading part9

couch_db:update_doc/4

引き続きcouch_db:update_docs/4を。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    if (AllOrNothing) and (PreCommitFailures /= []) ->
        {aborted,
         lists:foldl(fun({#doc{id=Id,revs=Revs}, Ref},Acc) ->
                         case lists:keyfind(Ref,1,PreCommitFailures) of
                         {Ref, Error} ->
                             case Revs of
                             {Pos, [RevId|_]} ->
                                 [{{Id,{Pos, RevId}}, Error} | Acc];
                             {0, []} ->
                                 [{{Id,{0, <<>>}}, Error} | Acc]
                             end;
                         false ->
                             Acc
                         end
                     end,[],Docs3)};

AllOrNothingtrueでvalidationエラーが1つ以上存在する場合、ドキュメントの更新を行わず、呼び出し元に返ります。そのまま続けて見ていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    true ->
        Options2 = if AllOrNothing -> [merge_conflicts];
                true -> [] end ++ Options,
        DocBuckets3 = [[
                {doc_flush_atts(set_new_att_revpos(
                        check_dup_atts(Doc)), Db#db.updater_fd), Ref}
                || {Doc, Ref} <- B] || B <- DocBuckets2],
        {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),

        {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),

        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
        {ok, lists:map(
            fun({#doc{}, Ref}) ->
                {ok, Result} = dict:find(Ref, ResultsDict),
                Result
            end, Docs2)}
    end.

ようやくcouch_db:update_docs/4の終わりが見えました。

まず、AllOrNothingtrueの場合のみ[merge_conflicts]を加えています。ドキュメントがconflictした際に自動的にマージされるということですかね…。 その次に、リスト内包表記でDocBucket2をドキュメントに展開します。展開されたドキュメントに対して、check_dup_atts/1を呼び出します。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
check_dup_atts(#doc{atts=Atts}=Doc) ->
    Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
    check_dup_atts2(Atts2),
    Doc.

check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
    throw({bad_request, <<"Duplicate attachments">>});
check_dup_atts2([_ | Rest]) ->
    check_dup_atts2(Rest);
check_dup_atts2(_) ->
    ok.

アタッチメントのnameが重複しているかチェックし、問題なければチェックしたドキュメントを返します。

その次にset_new_att_revpos/1を呼び出します。

couch_db.erl

1
2
3
4
5
6
7
set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
    Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
            % already commited to disk, do not set new rev
            Att;
        (Att) ->
            Att#att{revpos=RevPos+1}
        end, Atts)}.

コメントとコードの内容から、ドキュメントの#att.dataに値が設定されている場合はディスクに書き込み済みなのでAtt#att.revreposはそのまま、そうでない場合はAtt#att.revreposに1を加えています。

Db#db.updater_fd

その次にdoc_flush_atts/2を呼び出すのですが、第二引数のDb#db.updater_fdが何だか分からないのでチェックしてみます。この値を設定しているのは、couch_db_updater:init_db/6の最後の方。

couch_db_updater.erl

1
2
3
4
5
6
#db{
    update_pid=self(),
    fd = ReaderFd,
    updater_fd = Fd,
    fd_ref_counter = RefCntr,
...

このFdという値はどのように渡ってくるか見てみました。

まずcouch_server:open_db/4にて、DbNameを指定してget_full_filename/2を呼び出し、ファイルパスを取得します。

couch_server.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
open_db(DbName, Server, Options, From) ->
    DbNameList = binary_to_list(DbName),
    case check_dbname(Server, DbNameList) of
    ok ->
        Filepath = get_full_filename(Server, DbNameList),
        case lists:member(sys_db, Options) of
        true ->
            {noreply, open_async(Server, From, DbName, Filepath, Options)};
        false ->
            case maybe_close_lru_db(Server) of
            {ok, Server2} ->
                {noreply, open_async(Server2, From, DbName, Filepath, Options)};
            CloseError ->
                {reply, CloseError, Server}
            end
        end;
     Error ->
        {reply, Error, Server}
     end.

get_full_filename/2のコードを見てみます。

couch_server.erl

1
2
get_full_filename(Server, DbName) ->
    filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).

データベースファイルは、DbName.couchというファイル名になっていました。次にmaybe_close_lru_db/1を見てみます。

couch_server.erl

1
2
3
4
5
6
7
8
9
10
maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
        when NumOpen < MaxOpen ->
    {ok, Server};
maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
    % must free up the lru db.
    case try_close_lru(now()) of
    ok ->
        {ok, Server#server{dbs_open=NumOpen - 1}};
    Error -> Error
    end.

#server.dbs_open#server.max_dbs_openより小さければ何もせず返ります。そうでない場合は、使われていないDBをcloseしようとするのかな。面白そうですが、このあたりを見始めると進まないのでスルーします。

openしているDBの数に問題がないようであれば、open_async/5を呼び出してDBをopenします。

couch_server.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
open_async(Server, From, DbName, Filepath, Options) ->
    Parent = self(),
    Opener = spawn_link(fun() ->
            Res = couch_db:start_link(DbName, Filepath, Options),
            gen_server:call(
                Parent, {open_result, DbName, Res, Options}, infinity
            ),
            unlink(Parent),
            case Res of
            {ok, DbReader} ->
                unlink(DbReader);
            _ ->
                ok
            end
        end),
    true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}),
    true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
    DbsOpen = case lists:member(sys_db, Options) of
    true ->
        true = ets:insert(couch_sys_dbs, {DbName, true}),
        Server#server.dbs_open;
    false ->
        Server#server.dbs_open + 1
    end,
    Server#server{dbs_open = DbsOpen}.

couch_db:start_link/3Filepathを渡しています。その後、open_resultにてopenした結果を受け、openしたDBの情報をETSに保存し、Server#server.dbs_openに1を加えています。couch_db:start_link/3を見てみます。

couch_db.erl

1
2
3
4
5
6
7
8
9
start_link(DbName, Filepath, Options) ->
    case open_db_file(Filepath, Options) of
    {ok, Fd} ->
        StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
        unlink(Fd),
        StartResult;
    Else ->
        Else
    end.

couch_db:open_db_file/2を呼び出してFdを獲得しています。この関数の中でデータベースファイルのopenが行われているようです。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
open_db_file(Filepath, Options) ->
    case couch_file:open(Filepath, Options) of
    {ok, Fd} ->
        {ok, Fd};
    {error, enoent} ->
        % couldn't find file. is there a compact version? This can happen if
        % crashed during the file switch.
        case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
        {ok, Fd} ->
            ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
            ok = file:rename(Filepath ++ ".compact", Filepath),
            ok = couch_file:sync(Fd),
            {ok, Fd};
        {error, enoent} ->
            {not_found, no_db_file}
        end;
    Error ->
        Error
    end.

couch_file:open/2を呼び出してファイルをopenしています。この関数は以前見ているので、ここでは割愛します。couch_db:start_link/3に戻り、gen_server:start_link/3の呼び出しによってcouch_db:init/1が呼び出されます。この時、openしたデータベースファイルのFdを渡しています。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
init({DbName, Filepath, Fd, Options}) ->
    {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
    {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
    couch_ref_counter:add(RefCntr),
    case lists:member(sys_db, Options) of
    true ->
        ok;
    false ->
        couch_stats_collector:track_process_count({couchdb, open_databases})
    end,
    process_flag(trap_exit, true),
    {ok, Db}.

ここでもgen_server:start_link/3を呼び出します。この呼び出しによってcouch_db_updater:init/1が呼び出されます。この時もFdが渡されています。ここからcouch_db_updater:init_db/6が呼び出される流れも以前見ているので割愛します。

これまで見てきたように、Db#db.updater_fdはopenしたデータベースファイルのFdとなります。

couch_db:doc_flush_atts/2

couch_db:update_docs/4に戻り、couch_db:doc_flush_atts/2の呼び出しから。

couch_db.erl

1
2
doc_flush_atts(Doc, Fd) ->
    Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.

そのままcouch_db:flush_att/2を見ていきます。

couch_db.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
    % already written to our file, nothing to write
    Att;

flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
    disk_len=InDiskLen} = Att) ->
    {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
            couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
    check_md5(IdentityMd5, InMd5),
    Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};

flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
    with_stream(Fd, Att, fun(OutputStream) ->
        couch_stream:write(OutputStream, Data)
    end);

flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
    MaxChunkSize = list_to_integer(
        couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
    with_stream(Fd, Att, fun(OutputStream) ->
        % Fun(MaxChunkSize, WriterFun) must call WriterFun
        % once for each chunk of the attachment,
        Fun(MaxChunkSize,
            % WriterFun({Length, Binary}, State)
            % WriterFun({0, _Footers}, State)
            % Called with Length == 0 on the last time.
            % WriterFun returns NewState.
            fun({0, Footers}, _) ->
                F = mochiweb_headers:from_binary(Footers),
                case mochiweb_headers:get_value("Content-MD5", F) of
                undefined ->
                    ok;
                Md5 ->
                    {md5, base64:decode(Md5)}
                end;
            ({_Length, Chunk}, _) ->
                couch_stream:write(OutputStream, Chunk)
            end, ok)
    end);

flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
    with_stream(Fd, Att, fun(OutputStream) ->
        write_streamed_attachment(OutputStream, Fun, AttLen)
    end).

1つ目のパターンマッチ&ガードは、データベースファイルのFdとアタッチメントのdataで保持しているファイルディスクリプタが同じ値であれば、このアタッチメントは既にディスクに書き込み済みと見なしています。

2つ目のパターンマッチ&ガードは、#att.dataFdと異なるファイルディスクリプタが設定されていた場合、ということになります。この場合はcouch_stream:copy_to_new_stream/3を呼び出しています。この関数を見てみます。

couch_stream.erl

1
2
3
4
5
6
7
copy_to_new_stream(Fd, PosList, DestFd) ->
    {ok, Dest} = open(DestFd),
    foldl(Fd, PosList,
        fun(Bin, _) ->
            ok = write(Dest, Bin)
        end, ok),
    close(Dest).

関数の名前にあるように、Fdが示すファイルからPosListの分だけオフセットを取り出してデータを読み込み、それをDestFdが示すファイルに書き出しています。

3つ目のパターンマッチ&ガードは#att.dataがbinaryの時に呼び出され、細かくは見ていきませんが恐らくFdが示すファイルにDataを書く。4つ目と5つ目のは#att.dataが関数の場合に呼び出され、やはりFdが示すファイルにアタッチメントのデータを書き込むようです。

3つ目の#att.dataがbinaryの関数がシンプルなので、この関数の内容を追ってアタッチメントがファイルへの書き込まれるシーケンスを見ていきます。まずはcouch_stream:with_stream/3から。

couch_stream.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
    BufferSize = list_to_integer(
        couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
    {ok, OutputStream} = case (Enc =:= identity) andalso
        compressible_att_type(Type) of
    true ->
        CompLevel = list_to_integer(
            couch_config:get("attachments", "compression_level", "0")
        ),
        couch_stream:open(Fd, [{buffer_size, BufferSize},
            {encoding, gzip}, {compression_level, CompLevel}]);
    _ ->
        couch_stream:open(Fd, [{buffer_size, BufferSize}])
    end,
    ReqMd5 = case Fun(OutputStream) of
        {md5, FooterMd5} ->
            case InMd5 of
                md5_in_footer -> FooterMd5;
                _ -> InMd5
            end;
        _ ->
            InMd5
    end,
    {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
        couch_stream:close(OutputStream),
...

全部読むとちょっと長いので、ファイルに書き込んでいるところだけを見ていきます。最初にconfigからBufferSizeを取得します。次にファイルをopenし、OpenStreamを取得しています。エンコードを指定しない方のcouch_stream:open/2を見てみます。

couch_stream.erl

1
2
open(Fd, Options) ->
    gen_server:start_link(couch_stream, {Fd, Options}, []).

そのままcouch_stream:init/1を見ます。

couch_stream.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
init({Fd, Options}) ->
    {EncodingFun, EndEncodingFun} =
    case couch_util:get_value(encoding, Options, identity) of
    identity ->
        identity_enc_dec_funs();
    gzip ->
        gzip_init(Options)
    end,
    {ok, #stream{
            fd=Fd,
            md5=couch_util:md5_init(),
            identity_md5=couch_util:md5_init(),
            encoding_fun=EncodingFun,
            end_encoding_fun=EndEncodingFun,
            max_buffer=couch_util:get_value(
                buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
        }
    }.

configからエンコーディングタイプを取得し、そのエンコーディング用の関数を取得して#streamを構築して返しています。この時点ではopenしていないように見えます。これでcouch_stream:with_stream/3は終わりです。次にcouch_stream:write/2を見ていきます。

couch_stream.erl

1
2
3
4
write(_Pid, <<>>) ->
    ok;
write(Pid, Bin) ->
    gen_server:call(Pid, {write, Bin}, infinity).

続けてcouch_stream:handle_call/3writeを指定しているパターンを見ます。

couch_stream.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
handle_call({write, Bin}, _From, Stream) ->
    BinSize = iolist_size(Bin),
    #stream{
        fd = Fd,
        written_len = WrittenLen,
        written_pointers = Written,
        buffer_len = BufferLen,
        buffer_list = Buffer,
        max_buffer = Max,
        md5 = Md5,
        identity_md5 = IdenMd5,
        identity_len = IdenLen,
        encoding_fun = EncodingFun} = Stream,
    if BinSize + BufferLen > Max ->
        WriteBin = lists:reverse(Buffer, [Bin]),
        IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
        case EncodingFun(WriteBin) of
        [] ->
            % case where the encoder did some internal buffering
            % (zlib does it for example)
            WrittenLen2 = WrittenLen,
            Md5_2 = Md5,
            Written2 = Written;
        WriteBin2 ->
            {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
            WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
            Md5_2 = couch_util:md5_update(Md5, WriteBin2),
            Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
        end,

        {reply, ok, Stream#stream{
                        written_len=WrittenLen2,
                        written_pointers=Written2,
                        buffer_list=[],
                        buffer_len=0,
                        md5=Md5_2,
                        identity_md5=IdenMd5_2,
                        identity_len=IdenLen + BinSize}};
    true ->
        {reply, ok, Stream#stream{
                        buffer_list=[Bin|Buffer],
                        buffer_len=BufferLen + BinSize,
                        identity_len=IdenLen + BinSize}}
    end;

couch_stream:init/1で設定したmax_bufferを超えない内は、書き込むデータ(Bin)をbuffer_listに追加するだけです。max_bufferを超えた場合は、EncodingFunでデータをエンコードした後、couch_file:append_binary(WriteBin2)を呼び出します。

couch_file.erl

1
2
append_binary(Fd, Bin) ->
    gen_server:call(Fd, {append_bin, assemble_file_chunk(Bin)}, infinity).

gen_server:call/3を追う前に、その前に呼び出しているassemble_file_chunk/1を見てみます。

couch_file.erl

1
2
assemble_file_chunk(Bin) ->
    [<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].

先頭が0とデータのサイズを記したバイナリを構築し、データと共にリストに格納されて返されます。couch_file:handle_call/3を見ていきます。

couch_file.erl

1
2
3
4
5
6
7
8
9
handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
    Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
    Size = iolist_size(Blocks),
    case file:write(Fd, Blocks) of
    ok ->
        {reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
    Error ->
        {reply, Error, File}
    end;

make_blocks/2を呼び出して複数のブロックを作成しているようです。この関数を見てみます。

couch_file.erl

1
2
3
4
5
6
7
8
9
10
11
make_blocks(_BlockOffset, []) ->
    [];
make_blocks(0, IoList) ->
    [<<0>> | make_blocks(1, IoList)];
make_blocks(BlockOffset, IoList) ->
    case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
    {Begin, End} ->
        [Begin | make_blocks(0, End)];
    _SplitRemaining ->
        IoList
    end.

何となくデータをブロックに分ける操作なのかな、と思うのですが、よく分からなかったので先にsplit_iolist/3を見てみます。

couch_file.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
%% @doc Returns a tuple where the first element contains the leading SplitAt
%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt
%% is larger than byte_size(IoList), return the difference.
-spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) ->
    {iolist(), iolist()} | non_neg_integer().
split_iolist(List, 0, BeginAcc) ->
    {lists:reverse(BeginAcc), List};
split_iolist([], SplitAt, _BeginAcc) ->
    SplitAt;
split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > byte_size(Bin) ->
    split_iolist(Rest, SplitAt - byte_size(Bin), [Bin | BeginAcc]);
split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
    <<Begin:SplitAt/binary,End/binary>> = Bin,
    split_iolist([End | Rest], 0, [Begin | BeginAcc]);
split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
    case split_iolist(Sublist, SplitAt, BeginAcc) of
    {Begin, End} ->
        {Begin, [End | Rest]};
    SplitRemaining ->
        split_iolist(Rest, SplitAt - (SplitAt - SplitRemaining), [Sublist | BeginAcc])
    end;
split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
    split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).

SplitAtにバインドされている値は、呼び出しシーケンスから、?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK)となります。これは最終ブロックの書き込まれていない領域のサイズを表します。

3つ目と4つ目の関数は、iolistの要素がバイナリの時に呼び出される関数です。3つ目のwhen SplitAt > byte_size(Bin)は、ブロックの未書き込みの領域のサイズが書き込むデータのサイズよりも大きい場合、つまりブロックに書き込めるデータサイズである場合、ということになるので、BeginAccにデータを入れ、SplitAtをデータサイズ分減算して次のデータを処理するようにします。すべてのデータがブロックに書き込める場合、最後に2番目のsplit_iolist/3が呼び出され、残りのブロックサイズが呼び出し元に返ります。

4つ目はブロックに書き込めるデータサイズよりも書き込むデータのサイズが大きい場合、つまりブロックに収まりきらないケースになります。この場合は書き込むデータを、書き込めるサイズのデータ(Begin)と、残りのデータ(End)に分割します。Endはiolistの先頭に追加し、SplitAtを0にしてsplit_iolistを再度呼び出します。これにより1つ目のsplit_iolist/3が呼び出され、書き込めたデータと書き込めなかったデータを呼び出し元に返します。

この結果を以ってmake_blocks/2に戻ると、ブロックに書き込みきれた場合はIoListが、書き込みきれない場合はブロックごとに[[Bin, Bin2, …], [Bin3, Bin4, …], …]が返るようです。で、合ってるのかな。戻り値のリストのネスト具合が結果によって変わると扱いが面倒な気がしますが。

iolist

そこでこの戻り値の型であるErlangのiolistについて調べてみました。

A Ramble Through Erlang IO Lists

The key to IO lists is that you never flatten them. They get passed directly into low-level runtime functions (such as file:write_file), and the flattening happens without eating up any space in your Erlang process. Take advantage of that! Instead of appending values to lists, use nesting instead.

とあるので、リストがネストするような構造を取れるのがiolistのようです。couch_file:handle_call/3の中で、make_blocks/2でブロック毎に分割したiolistの戻り値をfile:write/2で書き込むようになっているのですが、上記引用にあるように、先のネストしたリストも引き受けられるようになっているようです。

ここまででアタッチメントをファイルに書き込む部分が完了したことになるのですが、書き込んだデータがアタッチメントのデータであることを示す識別子やkeyの情報は含まれていないようでした。#att.dataにそこれらが含まれていれば問題ないのですが、ちょっと気になります。ヘッダっぽいものといえば、couch_file:assembly_file_chunk/1くらいだったと思うのですが。

最後にcouch_stream:close/1で書き込み完了となります。

couch_stream.erl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
handle_call(close, _From, Stream) ->
    #stream{
        fd = Fd,
        written_len = WrittenLen,
        written_pointers = Written,
        buffer_list = Buffer,
        md5 = Md5,
        identity_md5 = IdenMd5,
        identity_len = IdenLen,
        encoding_fun = EncodingFun,
        end_encoding_fun = EndEncodingFun} = Stream,

    WriteBin = lists:reverse(Buffer),
    IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
    WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
    Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
    Result = case WriteBin2 of
    [] ->
        {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
    _ ->
        {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
        StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
        StreamLen = WrittenLen + iolist_size(WriteBin2),
        {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
    end,
    {stop, normal, Result, Stream}.
```

Bufferに残ったファイルを書き出して終了となります。

Conclusion

主にアタッチメントをファイルに書き込んでいく様子を見てきました。また、iolistについて改めて確認しました。ファイルに書き込む際、アタッチメントの識別子等が見当たらなかったので、アタッチメントをファイルから読み出すあたりのコードを読む機会があれば、その時に見てみたいと思います。