C MAGAZINE Linux programming Tips
第6回 SystemV IPC セマフォ養成合宿
■はじめに
今回は、Linux Programming Tips 第5回で紹介しきれなかったSystemV IPCの一 つであるセマフォについて説明します。
前回紹介したメッセージキュー(message queue)、共有メモリ(shared memory)は、 プロセス間でデータを受け渡す機構でしたが、 セマフォはこれらとはちょっと趣が違うものです。セマフォは、 共有リソースを持つ複数のプロセスが共有リソースのアクセス(クリティカルセクション)で競合が発生しないように排他的なアクセスを制御する機構です。 ちなみに、このクリティカルセクションへのアクセス制御機構は、1965年にE.W.Dijkstraによって定義されました。
セマフォ(正確にはバイナリセマフォ)は、線路(単線)を通る列車で例えることができます(図1)。 この単線に入る直前に信号機が設置されていて、その信号機は単線に列車がいない場合のみ単線へ入る許可を出します。 仮に、信号機が単線へ入る許可を出している時は、青が点灯し、逆に、単線へ入る許可を出していない 時は、赤が点灯するものとします。 列車は、青信号の場合にのみ単線を通過することができます。赤信号の場合は、青信号になるまで単線に入るのを待たないといけません。 もし赤信号を無視して単線に入った場合、反対から来た列車と衝突 したり、前にいる列車を追い越そうとして衝突したりするかもしれません。 この例をコンピュータ用語に照らし合わせてみると、単線は共有リソースもしくはクリティカルセクション、列車はプロセス、 信号はセマフォにあたります。さらに信号の色はセマフォ値にあたります。
上記の説明でクリティカルセクションの存在、セマフォの雰囲気はつかんでもらえたと思います。 もう少し補足(強引な説明)しておくと、信号機(セマフォ)は OSによって提供されますが、実は、信号機(セマフォ)自身が単線に列車がいるかどうかは監視していません。 列車(プロセス)が単線が使用中でないことを確認し、 単線に入った時に、信号機の色を青から赤に換えます。 逆に、単線を抜け出した 瞬間に信号機の色を赤から青に換えます。おそらく車掌が信号機を確認し、信号 機の前まで行って色を換えるのでしょう(笑)。 単線に入ろうとした時に赤信号の場合、信号機が青になるまで待って、その後、信号機を赤にして単線に入らないと行けません。待っている間は、車掌は眠りに入ってしまいます。 他の列車の 車掌によって信号機が青に換えられた場合に、どういうわけか信号機がかってに赤になり、車掌を起こします。無理矢理こじつけると、 車掌は、セマフォの操作を行うシステムコールにあたるでしょう。
■失敗は成功のもと
まずはリスト1で共有リソースを排他的にアクセスできないプログラム例を紹介します。このプログラムでは、2つのプロセス(親プロセス、 子プロセス)が何の排他機構を使わないまま共有リソースにアクセスします。 共有リソースとしては、 普通は共有メモリーなどが一般的ですが、プログラムの把握を容易にするために、この例では、 共有リソースを標準出力にします。もちろん標準出力は2つのプロセスによって共有されても問題ないので次のルールを設けます。
ルール: 各プロセスは、標準出力へのアクセスの開始時に記号'['、終了時に記号']'を標準出力に出力します。この2つの記号の間は、 各プロセスがクリティカルセクションに入っている(標準出力を占有している)ことになります。 このルールで排他的なアクセスが行われている場合、標準出力に出力された文字列中で'[', ']'のネストは存在しません。
ちなみに、この例では、各プロセスが3回標準出力を占有して、そのたびに数個 の文字を標準出力に出力します。出力タイミングはランダムに遅延されるようになっています。
では早速、リスト1をコンパイルして実行してみましょう。予定通り標準出力の出力結果に'[', ']'のネストが存在していて、排他的なアクセスができていないことを確認できるはずです。
$ cc -Wall -o collision collision.c
$ ./collision
[[CPCPC][CC][P][PP][PCC]P] <- 出力結果は毎回異なる
前述のルールに沿って排他的なアクセスが行われた場合、次のような結果を得たいと ころです。では2つのプロセスでこのような出力を得るにはどのような実装を行ったらよいでしょうか。 一つの解決策がセマフォによる排他アクセスの制御機構です。
$ ./collision
[CCC][PP][CC][PPP][PP][CC]
[リスト1: collision.c]
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
static void error(char *msg) { if(msg) perror(msg) ; exit(1); }
static void rand_sleep(void) { sleep(rand()%2); }
static void access_shared_resource(char *msg)
{
if(msg) printf("%s",msg),fflush(stdout);
}
static void do_dangerous(char* msg)
{
int i,j;
rand_sleep();
for(i=0; i<3 ; i++ ) {
/* クリティカルセクションの始まり */
access_shared_resource("[");
for(j=0; j<(rand()%10)+1; j++ ) {
rand_sleep();
access_shared_resource(msg);
}
access_shared_resource("]");
/* クリティカルセクションの終り */
rand_sleep();
}
}
int main(int argc, char *argv[])
{
int pid, status;
srand((unsigned int)getpid());
if ((pid = fork()) < 0) error("fork");
else if (pid == 0) {
do_dangerous("C");
exit(0);
} else {
do_dangerous("P");
}
wait(&status);
access_shared_resource("\n");
return 0;
}
|
■セマフォの導入
ではリスト1にセマフォを導入して、共有リソースを排他的にアクセスできるよ うにしてみましょう。
まずは共有リソースを持つプロセス間で共通のセマフォを作成します(正確にはセマフォ集合の識別子を取得する)。 セマフォの作成は、関数(システムコール ) semget()により行います。関数 semget()の書式は以下のようになります。
int semget ( key_t key, int nsems, int semflg )
以下の例では、第一引数 key に渡すキーは直接整数を指定していますが、関数 ftok()によって作成されたキーを使った方がよいでしょう。 第2引数 nsems には、セマフォの数を指定します、バイナリセマフォの場合は1になります。第3引数 semflgには、セマフォへのアクセス許可および作成モードを指定します。 一つのプログラムから関数 fork()によりプロセスを起動するような場合は、以下のようなセマフォの作成でよいのですが、 別々のプログラムからプロセスが起動する場合、関数semget()に与えるフラグは異なってきますので manページ semget(2)を参考にしてください。
ここで注意が必要なことは、関数 semget()では、各セマフォは初期化されませ ん。関数 semctl()でSETVALまたはSETALLする必要があります。
セマフォの使用が終った場合、必ずセマフォ集合を削除する必要があります。関 数 semctl()にIPC_RMIDを指定して行います。
リスト1 への拡張
...
#include <sys/ipc.h>
#include <sys/sem.h>
...
static int semid;
...
int main(int argc,char *argv[])
{
...
short initval = 1;
...
if( (semid = semget((key_t)1113,1,0666|IPC_CREAT))==-1 ) error("semget");
if( semctl(semid, 0, SETALL, &initval) == -1 ) error("semctl");
...
クリティカルセクションを持つ処理
...
if( semctl(semid,0,IPC_RMID) !=0 ) error( "semctl");
return 0;
}
|
とりあえず上記の拡張で、セマフォを使用する準備はできました。次にセマフォ を操作し、クリティカルセクションを保護する関数を用意します。 関数 semaphore()は、引数にPが指定されたとき、クリティカルセクションに入る操作をし、引数にVが指定されてとき、クリティカルセクションからでる操作をします。 関数 semopの詳しい動作は、後程説明します。
#define P (-1) /* P(Proberen) from the Dutch, means to test */
#define V (+1) /* V(Verhoren) from the Dutch means to increment */
static void semaphore(int operation)
{
struct sembuf semb;
semb.sem_num = 0;
semb.sem_op = operation;
semb.sem_flg = SEM_UNDO;
if( semop(semid,&semb,1) != 0 ) error("semop");
}
|
あとは察しの通り、この関数semaphore()をクリティカルセクションの前後で呼ぶようにします。 上記の拡張を全て施したリスト(文末に載せておきます)を、 sem.c などと名前を付けて、コンパイルして実行してみましょう。
$ cc -Wall -o sem sem.c
$ ./sem
[PPPP][CCCC][PPPPPP][PP][CCCCCC][CC] <- 出力結果は毎回異なる
|
各プロセスが前述のルールに則って排他的に標準出力をアクセスしていることを確認できます。
■関数(システムコール) semopの謎
SystemV IPCのセマフォを理解する場合、セマフォ集合を操作する関数 semop()の(謎めいた )挙動を理解することがキーポイントになります。
まず関数 semop()の宣言と 構造体 sembufの内容を以下に示します。
int semop ( int semid, struct sembuf *sops, unsigned nsops )
struct sembuf {
short sem_num;
short sem_op;
short sem_flg;
}
|
前述のセマフォの使用例では、構造体 sembuf を一つしか宣言していませんでしたが、関数 semop()には複数個指定することが可能です。 関数 semop() は、 semidで指定されたセマフォ集合のメンバー全てに対して、構造体 sembufで指定された操作を行います(図2)。
より具体的に説明すると、関数 semop()は、指定されたnsops個のセマフォ操作 を行います。 以下に、構造体のメンバーsem_op, sem_flgの設定値の違いによる セマフォ操作の挙動をCライクな簡易言語を使って説明します。
sem_op < 0 の場合(P操作)
この操作は、クリティカルセクションに入る前のセマフォ操作になります。 この操作を行う場合、プロセスは、セマフォ集合を変更する許可が必要になります。 この操作の要点は、現在のセマフォ値(semval)が sem_opの絶対値より大きい場合は、セマフォ値からsem_opの絶対値を引きプロセスはスリープせずに通過します。 逆に、セマフォ値(semval)が sem_opの絶対値未満の場合、セマフォ値 (semval)が sem_opの絶対値より大きくなるまでプロセスはスリープします。 このことは他のプロセスがクリティカルセクションに入っていることを意味し、自分がクリティカルセクションに入れるまで待っていることを意味します。
if( semval >= abs(sem_op) ) {
semval -= abs(sem_op);
if( sem_flg & SEM_UNDO ) add_undo(sem_op);
return SUCESS;
} else if( sem_flg & IPC_NOWAIT ) {
errno = EAGAIN;
return FAILURE;
} else {
semncnt++;
sleep until ( semval>=abs(sem_op) || isSemRemoved || catchSignal );
if( semval>=abs(sem_op) ) {
semcnt--;
semval -= abs(sem_op);
if( sem_flg & SEM_UNDO ) add_undo(sem_op);
return SUCESS;
} else if( isSemRemoved ) {
errno = EIDRM;
return FAILURE;
} else if( catchSignal ) {
semncnt--;
errno = EINTR;
return FAILURE;
}
}
|
[上記の操作の補足]
- semncnt: semnum番目の セマフォの semval の増加を待っているプロセス数
- isSemRemoved : セマフォの削除を知らせるフラグ
- catchSignal : シグナルを受け取ったことを知らせるフラグ
- sleep until COND : 条件CONDが満たされるまでプロセスをスリープする
- add_undo : 現在のプロセスがセマフォに加えた変更をOSによって追跡できるようにする。 セマフォを開放せずにプロセスが終了した場合、OSが時動的にセマフォを開放するための処理
sem_op > 0 の場合(V操作)
この操作は、クリティカルセクションを終えた後のセマフォ操作になり、プロセスはスリープせずに、 単純にsemvalにsem_opを加算し通過します。プロセスは、セマフォ集合を変更する許可が必要になります。
semval += sem_op;
if( sem_flg & SEM_UNDO ) add_undo(sem_op);
|
sem_op == 0の場合
この操作を行う場合、プロセスは、セマフォ集合の読みだし許可が必要になります。
if( semval==0 ) {
return SUCESS;
} else if( sem_flg & IPC_NOWAIT ) {
errno = EAGAIN;
return FAILURE;
} else {
semzcnt++;
sleep until ( semval==0 || isSemRemoved || catchSignal );
if( semval==0 ) {
semzcnt--;
return SUCCESS;
} else if( isSemRemoved ) {
errno = EIDRM;
return FAILURE;
} else if( catchSignal ) {
semncnt--;
errno = EINTR;
return FAILURE;
}
}
|
[上記の操作の補足]
- semzcnt : semnum番目のセマフォのsemvalの値が0になるのを待っているプロセス数
関数 semop()の謎が解明されたところで、前述のsem.cのsemvalの動きをタイミングチャート的に描いてみると図3のようになります。
■もう少し進んだセマフォの使い方
例えば、一つのプロセスが共有リソースにデータを書き込みを行い、他の数個のプロセスが共有リソースからデータを読み込むような状況を考えてみてください。 前述した単純なセマフォの使い方では、読み書き関係なく共有リソースにアクセス可能なプロセスは一つだけになります。ちょっと無駄だと思いませんか? 共有 リソースへデータの書き込みが行われていない場合は、複数のプロセスで読み込みを行っても良いとは思いませんか?
このようなケースは、セマフォを2つ使用することで解決できます。例えば、セマフォ番号0を書き込み用にし、セマフォ番号1を読みだし用にします。 書き込みが発生した場合、読み書き両方のセマフォでクリティカルセクションに入ったことを伝えるようにしないといけません。 共有リソースへの書き込みを行うプロセスのクリティカルセクションの前後でのセマフォ操作をまとめた関数semaphore_write() の実装例を以下に示します。
#define NREADERS (3)
#define P (-1)
#define V (+1)
static void semaphore_write(int operation)
{
struct sembuf P_write[2] = { {0, -1, SEM_UNDO},
{1,-NREADERS, SEM_UNDO} };
struct sembuf V_write[2] = { {0, +1, SEM_UNDO},
{1, +NREADERS, SEM_UNDO} };
if( operation == P ) {
if( semop(semid,P_write,2) != 0 ) error("semop");
} else {
if( semop(semid,V_write,2) != 0 ) error("semop");
}
}
|
関数 semaphore_read()およびセマフォ集合の識別子の取得などについては課題にしておきます。 文末に実装例を紹介しておきますので自分の実装と比較してみ てください。 プログラム名を read-write.cとしてコンパイルして実行してみて ください。 書き込みが行われているとき共有リソースは、書き込みを行っているプロセスのみアクセス可能で、読み出しが行われている時は、複数のプロセスが読み出しを行っていることを確認できます。
$ cc -Wall -o read-write read-write.c
$ ./read-write
[PPP][C0C0C0][PPP][C0C0C0][P][PPPPP][PP][C0][C1C1C1C1C1][C1C1][PPPPP][C1C1[C2C1C1C1]C2C2C2C2]
[PPPPP][C2C2C2C2C2C2][C2C2C2][PPPPPP][PPP] <- 出力結果は毎回異なる
|
■おまけ
-SystemV IPCセマフォを使用しているプログラムの例
セマフォはちょっとだけ複雑なものなので実際の使用例を参考するときっと理解も早まることでしょう。 以下に実際にセマフォを使っているプログラムと参照するソースコードをまとめておきましたので参考にしてください。
| 名前ソースコードのパス |
ダウンロード元 |
| cdrecord ./cdda2wav/semshm.c |
ftp://ftp.fokus.gmd.de/pub/unix/cdrecord/cdrecord-1.9.tar.gz |
| SDL ./src/thread/linux/SDL_syssem.c |
http://www.libsdl.org/release/SDL-1.1.6.tar.gz |
| apache ./src/main/http_main.c |
http://www.apache.org/dist/apache_1.3.14.tar.bz2 |
| samba ./source/locking/shmem_sysv.c |
ftp://ftp.samba.gr.jp/pub/samba-jp/samba-2.0.7-ja/samba-2.0.7-ja-2.1.tar.bz2 |
| postgresql ./src/backend/storage/ipc/ipc.c |
ftp://ftp.postgresql.org/pub/source/v7.0.2/postgresql-7.0.2.tar.gz |
| apcupsd ./apcipc.c |
ftp://ftp.oasi.gpa.it/pub/apcupsd/stable/tar/apcupsd-3.7.2.tar.gz |
| typhoon ./src/unix.c |
http://users.footprints.net/~kaz/typhoon-1.11.0.tar.gz |
セマフォの制限
一応、セマフォにも制限はあります。コマンド ipcs で確認することができます。
$ ipcs -l -s
------ セマフォの制限 --------
最大配列数 = 128
配列毎の最大セマフォ数 = 250
システム全体の最大セマフォ数 = 32000
semop 呼び出し毎の最大命令数 = 32
セマフォ最大値 = 32767
一般的な使い方では、これらの制限を越えることはありませんが、それぞれが何に対応しているかを説明します。
最大配列数は、セマフォ集合を格納しておくための配列の大きさです(図2のNに対応)。 独立した識別子を、関数 semget()を 128個取得可能なことを意味します。 配列毎の最大セマフォ数は、関数 semget()の第2引数 nsemsに与えることのできる最大値です。 システム全体の最大セマフォ数は、最大配列数 * 配列毎の最大セマフォ数です。 セマフォ最大値は、semvalの最大値にあたります。呼び出し毎の最大命令数 は、関数 semop()の 第21数 nsopsに与えることのできる最大値です( procファイルシステムで変更可能)。
プログラムリスト
[sem.c]
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <errno.h>
static int semid;
static void error(char *msg) { if(msg) perror(msg) ; exit(1); }
static void rand_sleep(void) { sleep(rand()%2); }
static void access_shared_resource(char *msg)
{
if(msg) printf("%s",msg),fflush(stdout);
}
#define P (-1)
#define V (+1)
static void semaphore(int operation)
{
struct sembuf semb;
semb.sem_num = 0;
semb.sem_op = operation;
semb.sem_flg = SEM_UNDO;
if( semop(semid,&semb,1) != 0 ) error("semop");
}
static void do_dangerous(char* msg)
{
int i,j;
rand_sleep();
for(i=0; i<3 ; i++ ) {
semaphore(P);
access_shared_resource("[");
for(j=0; j<(rand()%10)+1; j++ ) {
rand_sleep();
access_shared_resource(msg);
}
access_shared_resource("]");
semaphore(V);
rand_sleep();
}
}
int main(int argc, char *argv[])
{
int pid, status;
short initval = 1;
srand((unsigned int)getpid());
if( (semid = semget((key_t)0113,1,0666|IPC_CREAT))==-1 ) error("semget");
if( semctl(semid, 0, SETALL, &initval) == -1 ) error("semctl");
if ((pid = fork()) < 0) error("fork");
else if (pid == 0) {
do_dangerous("C");
exit(0);
} else {
do_dangerous("P");
}
wait(&status);
access_shared_resource("\n");
if( semctl(semid,0,IPC_RMID) !=0 ) error( "semctl");
return 0;
}
|
[read-write.c]
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <errno.h>
static int semid;
static void error(char *msg) { if(msg) perror(msg) ; exit(1); }
static void rand_sleep(void) { sleep(rand()%2); }
static void access_shared_resource(char *msg)
{
if(msg) printf("%s",msg),fflush(stdout);
}
#define NREADERS (3)
#define P (-1)
#define V (+1)
static void semaphore_read(int operation)
{
struct sembuf P_read = {1, -1, SEM_UNDO};
struct sembuf V_read = {1, +1, SEM_UNDO};
if( operation == P ) {
if( semop(semid,&P_read,1) != 0 ) error("semop");
} else {
if( semop(semid,&V_read,1) != 0 ) error("semop");
}
}
static void semaphore_write(int operation)
{
struct sembuf P_write[2] = { {0, -1, SEM_UNDO},
{1,-NREADERS, SEM_UNDO} };
struct sembuf V_write[2] = { {0, +1, SEM_UNDO},
{1, +NREADERS, SEM_UNDO} };
if( operation == P ) {
if( semop(semid,P_write,2) != 0 ) error("semop");
} else {
if( semop(semid,V_write,2) != 0 ) error("semop");
}
}
static void do_read(char* msg)
{
int i,j;
rand_sleep();
for(i=0; i<3 ; i++ ) {
semaphore_read(P);
access_shared_resource("[");
for(j=0; j<(rand()%10)+1; j++ ) {
rand_sleep();
access_shared_resource(msg);
}
access_shared_resource("]");
semaphore_read(V);
rand_sleep();
}
}
static void do_write(char* msg)
{
int i,j;
rand_sleep();
for(i=0; i<3 ; i++ ) {
semaphore_write(P);
access_shared_resource("[");
for(j=0; j<(rand()%10)+1; j++ ) {
rand_sleep();
access_shared_resource(msg);
}
access_shared_resource("]");
semaphore_write(V);
rand_sleep();
}
}
int main(int argc, char *argv[])
{
int status,i;
short initval[2] = {1, NREADERS };
char tmp[10];
srand((unsigned int)getpid());
if( (semid = semget((key_t)1234,2,0666|IPC_CREAT))==-1 ) error("semget");
if( semctl(semid, 0, SETALL, &initval) == -1 ) error("semctl");
for(i=0; i
if( access_shared_resource( \n ); wait(&status); break; do_write( P );
default: error( fork() ); -1: case exit(0); do_read(tmp); i);
C%d , 10, tmp, snprintf( 0: { fork() switch( i++>
|