couch_db:update_doc/4
引き続きcouch_db:update_docs/4
を。
couch_db.erl:
1 if (AllOrNothing) and (PreCommitFailures /= []) ->
2 {aborted,
3 lists:foldl(fun({#doc{id=Id,revs=Revs}, Ref},Acc) ->
4 case lists:keyfind(Ref,1,PreCommitFailures) of
5 {Ref, Error} ->
6 case Revs of
7 {Pos, [RevId|_]} ->
8 [{{Id,{Pos, RevId}}, Error} | Acc];
9 {0, []} ->
10 [{{Id,{0, <<>>}}, Error} | Acc]
11 end;
12 false ->
13 Acc
14 end
15 end,[],Docs3)};
AllOrNothing
がtrue
でvalidationエラーが1つ以上存在する場合、ドキュメントの更新を行わず、呼び出し元に返ります。そのまま続けて見ていきます。
couch_db.erl:
1 true ->
2 Options2 = if AllOrNothing -> [merge_conflicts];
3 true -> [] end ++ Options,
4 DocBuckets3 = [[
5 {doc_flush_atts(set_new_att_revpos(
6 check_dup_atts(Doc)), Db#db.updater_fd), Ref}
7 || {Doc, Ref} <- B] || B <- DocBuckets2],
8 {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
9
10 {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
11
12 ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
13 {ok, lists:map(
14 fun({#doc{}, Ref}) ->
15 {ok, Result} = dict:find(Ref, ResultsDict),
16 Result
17 end, Docs2)}
18 end.
ようやくcouch_db:update_docs/4
の終わりが見えました。
まず、AllOrNothing
がtrue
の場合のみ[merge_conflicts]
を加えています。ドキュメントがconflict
した際に自動的にマージされるということですかね…。
その次に、リスト内包表記でDocBucket2
をドキュメントに展開します。展開されたドキュメントに対して、check_dup_atts/1
を呼び出します。
couch_db.erl:
1check_dup_atts(#doc{atts=Atts}=Doc) ->
2 Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
3 check_dup_atts2(Atts2),
4 Doc.
5
6check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
7 throw({bad_request, <<"Duplicate attachments">>});
8check_dup_atts2([_ | Rest]) ->
9 check_dup_atts2(Rest);
10check_dup_atts2(_) ->
11 ok.
アタッチメントのname
が重複しているかチェックし、問題なければチェックしたドキュメントを返します。
その次にset_new_att_revpos/1
を呼び出します。
couch_db.erl:
1set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
2 Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
3 % already commited to disk, do not set new rev
4 Att;
5 (Att) ->
6 Att#att{revpos=RevPos+1}
7 end, Atts)}.
コメントとコードの内容から、ドキュメントの#att.data
に値が設定されている場合はディスクに書き込み済みなのでAtt#att.revrepos
はそのまま、そうでない場合はAtt#att.revrepos
に1を加えています。
Db#db.updater_fd
その次にdoc_flush_atts/2
を呼び出すのですが、第二引数のDb#db.updater_fd
が何だか分からないのでチェックしてみます。この値を設定しているのは、couch_db_updater:init_db/6
の最後の方。
couch_db_updater.erl:
1#db{
2 update_pid=self(),
3 fd = ReaderFd,
4 updater_fd = Fd,
5 fd_ref_counter = RefCntr,
6...
このFd
という値はどのように渡ってくるか見てみました。
まずcouch_server:open_db/4
にて、DbName
を指定してget_full_filename/2
を呼び出し、ファイルパスを取得します。
couch_server.erl:
1open_db(DbName, Server, Options, From) ->
2 DbNameList = binary_to_list(DbName),
3 case check_dbname(Server, DbNameList) of
4 ok ->
5 Filepath = get_full_filename(Server, DbNameList),
6 case lists:member(sys_db, Options) of
7 true ->
8 {noreply, open_async(Server, From, DbName, Filepath, Options)};
9 false ->
10 case maybe_close_lru_db(Server) of
11 {ok, Server2} ->
12 {noreply, open_async(Server2, From, DbName, Filepath, Options)};
13 CloseError ->
14 {reply, CloseError, Server}
15 end
16 end;
17 Error ->
18 {reply, Error, Server}
19 end.
get_full_filename/2
のコードを見てみます。
couch_server.erl:
1get_full_filename(Server, DbName) ->
2 filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
データベースファイルは、DbName
.couchというファイル名になっていました。次にmaybe_close_lru_db/1
を見てみます。
couch_server.erl:
1maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
2 when NumOpen < MaxOpen ->
3 {ok, Server};
4maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
5 % must free up the lru db.
6 case try_close_lru(now()) of
7 ok ->
8 {ok, Server#server{dbs_open=NumOpen - 1}};
9 Error -> Error
10 end.
#server.dbs_open
が#server.max_dbs_open
より小さければ何もせず返ります。そうでない場合は、使われていないDBをcloseしようとするのかな。面白そうですが、このあたりを見始めると進まないのでスルーします。
openしているDBの数に問題がないようであれば、open_async/5
を呼び出してDBをopenします。
couch_server.erl:
1open_async(Server, From, DbName, Filepath, Options) ->
2 Parent = self(),
3 Opener = spawn_link(fun() ->
4 Res = couch_db:start_link(DbName, Filepath, Options),
5 gen_server:call(
6 Parent, {open_result, DbName, Res, Options}, infinity
7 ),
8 unlink(Parent),
9 case Res of
10 {ok, DbReader} ->
11 unlink(DbReader);
12 _ ->
13 ok
14 end
15 end),
16 true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}),
17 true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
18 DbsOpen = case lists:member(sys_db, Options) of
19 true ->
20 true = ets:insert(couch_sys_dbs, {DbName, true}),
21 Server#server.dbs_open;
22 false ->
23 Server#server.dbs_open + 1
24 end,
25 Server#server{dbs_open = DbsOpen}.
couch_db:start_link/3
にFilepath
を渡しています。その後、open_result
にてopenした結果を受け、openしたDBの情報をETSに保存し、Server#server.dbs_open
に1を加えています。couch_db:start_link/3
を見てみます。
couch_db.erl:
1start_link(DbName, Filepath, Options) ->
2 case open_db_file(Filepath, Options) of
3 {ok, Fd} ->
4 StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
5 unlink(Fd),
6 StartResult;
7 Else ->
8 Else
9 end.
couch_db:open_db_file/2
を呼び出してFd
を獲得しています。この関数の中でデータベースファイルのopenが行われているようです。
couch_db.erl:
1open_db_file(Filepath, Options) ->
2 case couch_file:open(Filepath, Options) of
3 {ok, Fd} ->
4 {ok, Fd};
5 {error, enoent} ->
6 % couldn't find file. is there a compact version? This can happen if
7 % crashed during the file switch.
8 case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
9 {ok, Fd} ->
10 ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
11 ok = file:rename(Filepath ++ ".compact", Filepath),
12 ok = couch_file:sync(Fd),
13 {ok, Fd};
14 {error, enoent} ->
15 {not_found, no_db_file}
16 end;
17 Error ->
18 Error
19 end.
couch_file:open/2
を呼び出してファイルをopenしています。この関数は以前見ているので、ここでは割愛します。couch_db:start_link/3
に戻り、gen_server:start_link/3
の呼び出しによってcouch_db:init/1
が呼び出されます。この時、openしたデータベースファイルのFd
を渡しています。
couch_db.erl:
1init({DbName, Filepath, Fd, Options}) ->
2 {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
3 {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
4 couch_ref_counter:add(RefCntr),
5 case lists:member(sys_db, Options) of
6 true ->
7 ok;
8 false ->
9 couch_stats_collector:track_process_count({couchdb, open_databases})
10 end,
11 process_flag(trap_exit, true),
12 {ok, Db}.
ここでもgen_server:start_link/3
を呼び出します。この呼び出しによってcouch_db_updater:init/1
が呼び出されます。この時もFd
が渡されています。ここからcouch_db_updater:init_db/6
が呼び出される流れも以前見ているので割愛します。
これまで見てきたように、Db#db.updater_fd
はopenしたデータベースファイルのFd
となります。
couch_db:doc_flush_atts/2
couch_db:update_docs/4
に戻り、couch_db:doc_flush_atts/2
の呼び出しから。
couch_db.erl:
1doc_flush_atts(Doc, Fd) ->
2 Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
そのままcouch_db:flush_att/2
を見ていきます。
couch_db.erl:
1flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
2 % already written to our file, nothing to write
3 Att;
4
5flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
6 disk_len=InDiskLen} = Att) ->
7 {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
8 couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
9 check_md5(IdentityMd5, InMd5),
10 Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
11
12flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
13 with_stream(Fd, Att, fun(OutputStream) ->
14 couch_stream:write(OutputStream, Data)
15 end);
16
17flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
18 MaxChunkSize = list_to_integer(
19 couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
20 with_stream(Fd, Att, fun(OutputStream) ->
21 % Fun(MaxChunkSize, WriterFun) must call WriterFun
22 % once for each chunk of the attachment,
23 Fun(MaxChunkSize,
24 % WriterFun({Length, Binary}, State)
25 % WriterFun({0, _Footers}, State)
26 % Called with Length == 0 on the last time.
27 % WriterFun returns NewState.
28 fun({0, Footers}, _) ->
29 F = mochiweb_headers:from_binary(Footers),
30 case mochiweb_headers:get_value("Content-MD5", F) of
31 undefined ->
32 ok;
33 Md5 ->
34 {md5, base64:decode(Md5)}
35 end;
36 ({_Length, Chunk}, _) ->
37 couch_stream:write(OutputStream, Chunk)
38 end, ok)
39 end);
40
41flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
42 with_stream(Fd, Att, fun(OutputStream) ->
43 write_streamed_attachment(OutputStream, Fun, AttLen)
44 end).
1つ目のパターンマッチ&ガードは、データベースファイルのFd
とアタッチメントのdata
で保持しているファイルディスクリプタが同じ値であれば、このアタッチメントは既にディスクに書き込み済みと見なしています。
2つ目のパターンマッチ&ガードは、#att.data
にFd
と異なるファイルディスクリプタが設定されていた場合、ということになります。この場合はcouch_stream:copy_to_new_stream/3
を呼び出しています。この関数を見てみます。
couch_stream.erl:
1copy_to_new_stream(Fd, PosList, DestFd) ->
2 {ok, Dest} = open(DestFd),
3 foldl(Fd, PosList,
4 fun(Bin, _) ->
5 ok = write(Dest, Bin)
6 end, ok),
7 close(Dest).
関数の名前にあるように、Fd
が示すファイルからPosList
の分だけオフセットを取り出してデータを読み込み、それをDestFd
が示すファイルに書き出しています。
3つ目のパターンマッチ&ガードは#att.data
がbinaryの時に呼び出され、細かくは見ていきませんが恐らくFd
が示すファイルにData
を書く。4つ目と5つ目のは#att.data
が関数の場合に呼び出され、やはりFd
が示すファイルにアタッチメントのデータを書き込むようです。
3つ目の#att.data
がbinaryの関数がシンプルなので、この関数の内容を追ってアタッチメントがファイルへの書き込まれるシーケンスを見ていきます。まずはcouch_stream:with_stream/3
から。
couch_stream.erl:
1with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
2 BufferSize = list_to_integer(
3 couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
4 {ok, OutputStream} = case (Enc =:= identity) andalso
5 compressible_att_type(Type) of
6 true ->
7 CompLevel = list_to_integer(
8 couch_config:get("attachments", "compression_level", "0")
9 ),
10 couch_stream:open(Fd, [{buffer_size, BufferSize},
11 {encoding, gzip}, {compression_level, CompLevel}]);
12 _ ->
13 couch_stream:open(Fd, [{buffer_size, BufferSize}])
14 end,
15 ReqMd5 = case Fun(OutputStream) of
16 {md5, FooterMd5} ->
17 case InMd5 of
18 md5_in_footer -> FooterMd5;
19 _ -> InMd5
20 end;
21 _ ->
22 InMd5
23 end,
24 {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
25 couch_stream:close(OutputStream),
26...
全部読むとちょっと長いので、ファイルに書き込んでいるところだけを見ていきます。最初にconfigからBufferSize
を取得します。次にファイルをopenし、OpenStream
を取得しています。エンコードを指定しない方のcouch_stream:open/2
を見てみます。
couch_stream.erl:
1open(Fd, Options) ->
2 gen_server:start_link(couch_stream, {Fd, Options}, []).
そのままcouch_stream:init/1
を見ます。
couch_stream.erl:
1init({Fd, Options}) ->
2 {EncodingFun, EndEncodingFun} =
3 case couch_util:get_value(encoding, Options, identity) of
4 identity ->
5 identity_enc_dec_funs();
6 gzip ->
7 gzip_init(Options)
8 end,
9 {ok, #stream{
10 fd=Fd,
11 md5=couch_util:md5_init(),
12 identity_md5=couch_util:md5_init(),
13 encoding_fun=EncodingFun,
14 end_encoding_fun=EndEncodingFun,
15 max_buffer=couch_util:get_value(
16 buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
17 }
18 }.
configからエンコーディングタイプを取得し、そのエンコーディング用の関数を取得して#stream
を構築して返しています。この時点ではopenしていないように見えます。これでcouch_stream:with_stream/3
は終わりです。次にcouch_stream:write/2
を見ていきます。
couch_stream.erl:
1write(_Pid, <<>>) ->
2 ok;
3write(Pid, Bin) ->
4 gen_server:call(Pid, {write, Bin}, infinity).
続けてcouch_stream:handle_call/3
のwrite
を指定しているパターンを見ます。
couch_stream.erl:
1handle_call({write, Bin}, _From, Stream) ->
2 BinSize = iolist_size(Bin),
3 #stream{
4 fd = Fd,
5 written_len = WrittenLen,
6 written_pointers = Written,
7 buffer_len = BufferLen,
8 buffer_list = Buffer,
9 max_buffer = Max,
10 md5 = Md5,
11 identity_md5 = IdenMd5,
12 identity_len = IdenLen,
13 encoding_fun = EncodingFun} = Stream,
14 if BinSize + BufferLen > Max ->
15 WriteBin = lists:reverse(Buffer, [Bin]),
16 IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
17 case EncodingFun(WriteBin) of
18 [] ->
19 % case where the encoder did some internal buffering
20 % (zlib does it for example)
21 WrittenLen2 = WrittenLen,
22 Md5_2 = Md5,
23 Written2 = Written;
24 WriteBin2 ->
25 {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
26 WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
27 Md5_2 = couch_util:md5_update(Md5, WriteBin2),
28 Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
29 end,
30
31 {reply, ok, Stream#stream{
32 written_len=WrittenLen2,
33 written_pointers=Written2,
34 buffer_list=[],
35 buffer_len=0,
36 md5=Md5_2,
37 identity_md5=IdenMd5_2,
38 identity_len=IdenLen + BinSize}};
39 true ->
40 {reply, ok, Stream#stream{
41 buffer_list=[Bin|Buffer],
42 buffer_len=BufferLen + BinSize,
43 identity_len=IdenLen + BinSize}}
44 end;
couch_stream:init/1
で設定したmax_buffer
を超えない内は、書き込むデータ(Bin
)をbuffer_list
に追加するだけです。max_buffer
を超えた場合は、EncodingFun
でデータをエンコードした後、couch_file:append_binary(WriteBin2)
を呼び出します。
couch_file.erl:
1append_binary(Fd, Bin) ->
2 gen_server:call(Fd, {append_bin, assemble_file_chunk(Bin)}, infinity).
gen_server:call/3
を追う前に、その前に呼び出しているassemble_file_chunk/1
を見てみます。
couch_file.erl:
1assemble_file_chunk(Bin) ->
2 [<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].
先頭が0
とデータのサイズを記したバイナリを構築し、データと共にリストに格納されて返されます。couch_file:handle_call/3
を見ていきます。
couch_file.erl:
1handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
2 Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
3 Size = iolist_size(Blocks),
4 case file:write(Fd, Blocks) of
5 ok ->
6 {reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
7 Error ->
8 {reply, Error, File}
9 end;
make_blocks/2
を呼び出して複数のブロックを作成しているようです。この関数を見てみます。
couch_file.erl:
1make_blocks(_BlockOffset, []) ->
2 [];
3make_blocks(0, IoList) ->
4 [<<0>> | make_blocks(1, IoList)];
5make_blocks(BlockOffset, IoList) ->
6 case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
7 {Begin, End} ->
8 [Begin | make_blocks(0, End)];
9 _SplitRemaining ->
10 IoList
11 end.
何となくデータをブロックに分ける操作なのかな、と思うのですが、よく分からなかったので先にsplit_iolist/3
を見てみます。
couch_file.erl:
1%% @doc Returns a tuple where the first element contains the leading SplitAt
2%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt
3%% is larger than byte_size(IoList), return the difference.
4-spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) ->
5 {iolist(), iolist()} | non_neg_integer().
6split_iolist(List, 0, BeginAcc) ->
7 {lists:reverse(BeginAcc), List};
8split_iolist([], SplitAt, _BeginAcc) ->
9 SplitAt;
10split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > byte_size(Bin) ->
11 split_iolist(Rest, SplitAt - byte_size(Bin), [Bin | BeginAcc]);
12split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
13 <<Begin:SplitAt/binary,End/binary>> = Bin,
14 split_iolist([End | Rest], 0, [Begin | BeginAcc]);
15split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
16 case split_iolist(Sublist, SplitAt, BeginAcc) of
17 {Begin, End} ->
18 {Begin, [End | Rest]};
19 SplitRemaining ->
20 split_iolist(Rest, SplitAt - (SplitAt - SplitRemaining), [Sublist | BeginAcc])
21 end;
22split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
23 split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
SplitAt
にバインドされている値は、呼び出しシーケンスから、?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK)
となります。これは最終ブロックの書き込まれていない領域のサイズを表します。
3つ目と4つ目の関数は、iolistの要素がバイナリの時に呼び出される関数です。3つ目のwhen SplitAt > byte_size(Bin)
は、ブロックの未書き込みの領域のサイズが書き込むデータのサイズよりも大きい場合、つまりブロックに書き込めるデータサイズである場合、ということになるので、BeginAcc
にデータを入れ、SplitAt
をデータサイズ分減算して次のデータを処理するようにします。すべてのデータがブロックに書き込める場合、最後に2番目のsplit_iolist/3
が呼び出され、残りのブロックサイズが呼び出し元に返ります。
4つ目はブロックに書き込めるデータサイズよりも書き込むデータのサイズが大きい場合、つまりブロックに収まりきらないケースになります。この場合は書き込むデータを、書き込めるサイズのデータ(Begin
)と、残りのデータ(End
)に分割します。End
はiolistの先頭に追加し、SplitAt
を0にしてsplit_iolist
を再度呼び出します。これにより1つ目のsplit_iolist/3
が呼び出され、書き込めたデータと書き込めなかったデータを呼び出し元に返します。
この結果を以ってmake_blocks/2
に戻ると、ブロックに書き込みきれた場合はIoList
が、書き込みきれない場合はブロックごとに
[[Bin, Bin2, ...], [Bin3, Bin4, ...], ...]
が返るようです。で、合ってるのかな。戻り値のリストのネスト具合が結果によって変わると扱いが面倒な気がしますが。
iolist
そこでこの戻り値の型であるErlangのiolistについて調べてみました。
A Ramble Through Erlang IO Lists
The key to IO lists is that you never flatten them. They get passed directly into low-level runtime functions (such as file:write_file), and the flattening happens without eating up any space in your Erlang process. Take advantage of that! Instead of appending values to lists, use nesting instead.
とあるので、リストがネストするような構造を取れるのがiolistのようです。couch_file:handle_call/3
の中で、make_blocks/2
でブロック毎に分割したiolistの戻り値をfile:write/2
で書き込むようになっているのですが、上記引用にあるように、先のネストしたリストも引き受けられるようになっているようです。
ここまででアタッチメントをファイルに書き込む部分が完了したことになるのですが、書き込んだデータがアタッチメントのデータであることを示す識別子やkeyの情報は含まれていないようでした。#att.data
にそこれらが含まれていれば問題ないのですが、ちょっと気になります。ヘッダっぽいものといえば、couch_file:assembly_file_chunk/1
くらいだったと思うのですが。
最後にcouch_stream:close/1
で書き込み完了となります。
couch_stream.erl:
1handle_call(close, _From, Stream) ->
2 #stream{
3 fd = Fd,
4 written_len = WrittenLen,
5 written_pointers = Written,
6 buffer_list = Buffer,
7 md5 = Md5,
8 identity_md5 = IdenMd5,
9 identity_len = IdenLen,
10 encoding_fun = EncodingFun,
11 end_encoding_fun = EndEncodingFun} = Stream,
12
13 WriteBin = lists:reverse(Buffer),
14 IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
15 WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
16 Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
17 Result = case WriteBin2 of
18 [] ->
19 {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
20 _ ->
21 {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
22 StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
23 StreamLen = WrittenLen + iolist_size(WriteBin2),
24 {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
25 end,
26 {stop, normal, Result, Stream}.
27```
Buffer
に残ったファイルを書き出して終了となります。
Conclusion
主にアタッチメントをファイルに書き込んでいく様子を見てきました。また、iolistについて改めて確認しました。ファイルに書き込む際、アタッチメントの識別子等が見当たらなかったので、アタッチメントをファイルから読み出すあたりのコードを読む機会があれば、その時に見てみたいと思います。